clientSml.c 50.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

wmmhello's avatar
wmmhello 已提交
16 17 18 19 20
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

21 22 23 24 25 26 27
#include "clientSml.h"

int64_t smlToMilli[3] = {3600000LL, 60000LL, 1000LL};
int64_t smlFactorNS[3] = {NANOSECOND_PER_MSEC, NANOSECOND_PER_USEC, 1};
int64_t smlFactorS[3] = {1000LL, 1000000LL, 1000000000LL};

void* nodeListGet(NodeList* list, const void *key, int32_t len, _equal_fn_sml fn){
wmmhello's avatar
wmmhello 已提交
28 29
  NodeList *tmp = list;
  while(tmp){
30 31 32 33 34 35 36 37
    if(fn == NULL){
      if(tmp->data.used && tmp->data.keyLen == len && memcmp(tmp->data.key, key, len) == 0) {
        return tmp->data.value;
      }
    }else{
      if(tmp->data.used && fn(tmp->data.key, key) == 0) {
        return tmp->data.value;
      }
wmmhello's avatar
wmmhello 已提交
38
    }
39

wmmhello's avatar
wmmhello 已提交
40 41 42 43 44
    tmp = tmp->next;
  }
  return NULL;
}

45
int nodeListSet(NodeList** list, const void *key, int32_t len, void* value, _equal_fn_sml fn){
wmmhello's avatar
wmmhello 已提交
46 47 48
  NodeList *tmp = *list;
  while (tmp){
    if(!tmp->data.used) break;
49 50 51 52 53 54 55 56
    if(fn == NULL){
      if(tmp->data.keyLen == len && memcmp(tmp->data.key, key, len) == 0) {
        return -1;
      }
    }else{
      if(tmp->data.keyLen == len && fn(tmp->data.key, key) == 0) {
        return -1;
      }
wmmhello's avatar
wmmhello 已提交
57
    }
58

wmmhello's avatar
wmmhello 已提交
59 60 61 62 63 64 65 66
    tmp = tmp->next;
  }
  if(tmp){
    tmp->data.key = key;
    tmp->data.keyLen = len;
    tmp->data.value = value;
    tmp->data.used = true;
  }else{
67
    NodeList *newNode = (NodeList *)taosMemoryCalloc(1, sizeof(NodeList));
wmmhello's avatar
wmmhello 已提交
68 69 70 71 72 73 74 75 76 77 78 79 80
    if(newNode == NULL){
      return -1;
    }
    newNode->data.key = key;
    newNode->data.keyLen = len;
    newNode->data.value = value;
    newNode->data.used = true;
    newNode->next = *list;
    *list = newNode;
  }
  return 0;
}

81
int nodeListSize(NodeList* list){
wmmhello's avatar
wmmhello 已提交
82 83 84 85 86 87 88 89
  int cnt = 0;
  while(list){
    if(list->data.used) cnt++;
    else break;
    list = list->next;
  }
  return cnt;
}
wmmhello's avatar
wmmhello 已提交
90

91
inline bool smlDoubleToInt64OverFlow(double num) {
X
Xiaoyu Wang 已提交
92
  if (num >= (double)INT64_MAX || num <= (double)INT64_MIN) return true;
93 94 95
  return false;
}

96
int32_t smlBuildInvalidDataMsg(SSmlMsgBuf *pBuf, const char *msg1, const char *msg2) {
97
  if (pBuf->buf) {
98 99 100 101 102 103 104
    memset(pBuf->buf, 0, pBuf->len);
    if (msg1) strncat(pBuf->buf, msg1, pBuf->len);
    int32_t left = pBuf->len - strlen(pBuf->buf);
    if (left > 2 && msg2) {
      strncat(pBuf->buf, ":", left - 1);
      strncat(pBuf->buf, msg2, left - 2);
    }
wmmhello's avatar
wmmhello 已提交
105
  }
wmmhello's avatar
wmmhello 已提交
106 107 108
  return TSDB_CODE_SML_INVALID_DATA;
}

109

110 111 112 113 114
int64_t smlGetTimeValue(const char *value, int32_t len, uint8_t fromPrecision, uint8_t toPrecision) {
  char   *endPtr = NULL;
  int64_t tsInt64 = taosStr2Int64(value, &endPtr, 10);
  if (unlikely(value + len != endPtr)) {
    return -1;
115
  }
wmmhello's avatar
wmmhello 已提交
116

117 118 119 120 121 122 123
  if(unlikely(fromPrecision >= TSDB_TIME_PRECISION_HOURS)){
    int64_t unit = smlToMilli[fromPrecision - TSDB_TIME_PRECISION_HOURS];
    if(unit > INT64_MAX / tsInt64){
      return -1;
    }
    tsInt64 *= unit;
    fromPrecision = TSDB_TIME_PRECISION_MILLI;
wmmhello's avatar
wmmhello 已提交
124
  }
wmmhello's avatar
wmmhello 已提交
125

126
  return convertTimePrecision(tsInt64, fromPrecision, toPrecision);
wmmhello's avatar
wmmhello 已提交
127 128
}

129 130 131 132 133 134 135
int8_t smlGetTsTypeByLen(int32_t len) {
  if (len == TSDB_TIME_PRECISION_SEC_DIGITS) {
    return TSDB_TIME_PRECISION_SECONDS;
  } else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) {
    return TSDB_TIME_PRECISION_MILLI;
  } else {
    return -1;
136 137 138
  }
}

139 140 141 142
SSmlTableInfo *smlBuildTableInfo(int numRows, const char* measure, int32_t measureLen) {
  SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
  if (!tag) {
    return NULL;
143 144
  }

145 146
  tag->sTableName = measure;
  tag->sTableNameLen = measureLen;
147

148 149 150 151
  tag->cols = taosArrayInit(numRows, POINTER_BYTES);
  if (tag->cols == NULL) {
    uError("SML:smlBuildTableInfo failed to allocate memory");
    goto cleanup;
152
  }
153 154 155 156 157 158 159 160 161 162 163

//  tag->tags = taosArrayInit(16, sizeof(SSmlKv));
//  if (tag->tags == NULL) {
//    uError("SML:smlBuildTableInfo failed to allocate memory");
//    goto cleanup;
//  }
  return tag;

  cleanup:
  taosMemoryFree(tag);
  return NULL;
164 165
}

166 167 168 169 170 171 172 173 174 175 176
static int32_t smlParseTableName(SArray *tags, char *childTableName) {
  size_t      childTableNameLen = strlen(tsSmlChildTableName);
  if (childTableNameLen <= 0) return TSDB_CODE_SUCCESS;

  for(int i = 0; i < taosArrayGetSize(tags); i++){
    SSmlKv *tag = (SSmlKv *)taosArrayGet(tags, i);
    // handle child table name
    if (childTableNameLen == tag->keyLen && strncmp(tag->key, tsSmlChildTableName, tag->keyLen) == 0) {
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, tag->value, (tag->length < TSDB_TABLE_NAME_LEN ? tag->length : TSDB_TABLE_NAME_LEN));
      break;
wmmhello's avatar
wmmhello 已提交
177 178
    }
  }
179

wmmhello's avatar
wmmhello 已提交
180 181 182
  return TSDB_CODE_SUCCESS;
}

183 184
int32_t smlSetCTableName(SSmlTableInfo *oneTable){
  smlParseTableName(oneTable->tags, oneTable->childTableName);
185

186 187 188 189
  if (strlen(oneTable->childTableName) == 0) {
    SArray* dst = taosArrayDup(oneTable->tags, NULL);
    RandTableName rName = {dst, oneTable->sTableName, (uint8_t)oneTable->sTableNameLen,
                           oneTable->childTableName, 0};
wmmhello's avatar
wmmhello 已提交
190

191 192 193 194 195
    buildChildTableName(&rName);
    taosArrayDestroy(dst);
    oneTable->uid = rName.uid;
  } else {
    oneTable->uid = *(uint64_t *)(oneTable->childTableName);
196
  }
197 198
  return TSDB_CODE_SUCCESS;
}
199

200 201 202 203
SSmlSTableMeta *smlBuildSTableMeta(bool isDataFormat) {
  SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
  if (!meta) {
    return NULL;
204 205
  }

206 207 208 209 210 211
  if(unlikely(!isDataFormat)){
    meta->tagHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
    if (meta->tagHash == NULL) {
      uError("SML:smlBuildSTableMeta failed to allocate memory");
      goto cleanup;
    }
wmmhello's avatar
wmmhello 已提交
212

213 214 215 216 217
    meta->colHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
    if (meta->colHash == NULL) {
      uError("SML:smlBuildSTableMeta failed to allocate memory");
      goto cleanup;
    }
wmmhello's avatar
wmmhello 已提交
218 219
  }

220 221 222 223
  meta->tags = taosArrayInit(32, sizeof(SSmlKv));
  if (meta->tags == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
224 225
  }

226 227 228 229 230 231
  meta->cols = taosArrayInit(32, sizeof(SSmlKv));
  if (meta->cols == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }
  return meta;
232

233 234 235
  cleanup:
  taosMemoryFree(meta);
  return NULL;
wmmhello's avatar
wmmhello 已提交
236 237
}

wmmhello's avatar
wmmhello 已提交
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338
//uint16_t smlCalTypeSum(char* endptr, int32_t left){
//  uint16_t sum = 0;
//  for(int i = 0; i < left; i++){
//    sum += endptr[i];
//  }
//  return sum;
//}

#define RETURN_FALSE \
smlBuildInvalidDataMsg(msg, "invalid data", pVal); \
return false;

#define SET_DOUBLE kvVal->type = TSDB_DATA_TYPE_DOUBLE;\
                   kvVal->d = result;

#define SET_FLOAT \
  if (!IS_VALID_FLOAT(result)) {\
    smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", pVal);\
    return false;\
  }\
  kvVal->type = TSDB_DATA_TYPE_FLOAT;\
  kvVal->f = (float)result;

#define SET_BIGINT \
  if (smlDoubleToInt64OverFlow(result)) {\
    errno = 0;\
    int64_t tmp = taosStr2Int64(pVal, &endptr, 10);\
    if (errno == ERANGE) {\
      smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal);\
      return false;\
    }\
    kvVal->type = TSDB_DATA_TYPE_BIGINT;\
    kvVal->i = tmp;\
    return true;\
  }\
  kvVal->type = TSDB_DATA_TYPE_BIGINT;\
  kvVal->i = (int64_t)result;

#define SET_INT \
  if (!IS_VALID_INT(result)) {\
    smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal);\
    return false;\
  }\
  kvVal->type = TSDB_DATA_TYPE_INT;\
  kvVal->i = result;

#define SET_SMALL_INT \
  if (!IS_VALID_SMALLINT(result)) {\
    smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", pVal);\
    return false;\
  }\
  kvVal->type = TSDB_DATA_TYPE_SMALLINT;\
  kvVal->i = result;

#define SET_UBIGINT \
  if (result >= (double)UINT64_MAX || result < 0) {\
    errno = 0;\
    uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10);\
    if (errno == ERANGE || result < 0) {\
      smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal);\
      return false;\
    }\
    kvVal->type = TSDB_DATA_TYPE_UBIGINT;\
    kvVal->u = tmp;\
    return true;\
  }\
  kvVal->type = TSDB_DATA_TYPE_UBIGINT;\
  kvVal->u = result;

#define SET_UINT \
  if (!IS_VALID_UINT(result)) {\
    smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", pVal);\
    return false;\
  }\
  kvVal->type = TSDB_DATA_TYPE_UINT;\
  kvVal->u = result;

#define SET_USMALL_INT \
  if (!IS_VALID_USMALLINT(result)) {\
    smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", pVal);\
    return false;\
  }\
  kvVal->type = TSDB_DATA_TYPE_USMALLINT;\
  kvVal->u = result;

#define SET_TINYINT \
  if (!IS_VALID_TINYINT(result)) { \
    smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", pVal);\
    return false;\
  }\
  kvVal->type = TSDB_DATA_TYPE_TINYINT;\
  kvVal->i = result;

#define SET_UTINYINT \
  if (!IS_VALID_UTINYINT(result)) {\
    smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", pVal);\
    return false;\
  }\
  kvVal->type = TSDB_DATA_TYPE_UTINYINT;\
  kvVal->u = result;

339
bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413
  const char *pVal = kvVal->value;
  int32_t     len = kvVal->length;
  char *      endptr = NULL;
  double      result = taosStr2Double(pVal, &endptr);
  if (pVal == endptr) {
    RETURN_FALSE
  }

  int32_t left = len - (endptr - pVal);
  if (left == 0) {
    SET_DOUBLE
  } else if (left == 3) {
    if(endptr[0] == 'f' || endptr[0] == 'F'){
      if(endptr[1] == '6' && endptr[2] == '4'){
        SET_DOUBLE
      }else if(endptr[1] == '3' && endptr[2] == '2'){
        SET_FLOAT
      }else{
        RETURN_FALSE
      }
    }else if(endptr[0] == 'i' || endptr[0] == 'I'){
      if(endptr[1] == '6' && endptr[2] == '4'){
        SET_BIGINT
      }else if(endptr[1] == '3' && endptr[2] == '2'){
        SET_INT
      }else if(endptr[1] == '1' && endptr[2] == '6'){
        SET_SMALL_INT
      }else{
        RETURN_FALSE
      }
    }else if(endptr[0] == 'u' || endptr[0] == 'U'){
      if(endptr[1] == '6' && endptr[2] == '4'){
        SET_UBIGINT
      }else if(endptr[1] == '3' && endptr[2] == '2'){
        SET_UINT
      }else if(endptr[1] == '1' && endptr[2] == '6'){
        SET_USMALL_INT
      }else{
        RETURN_FALSE
      }
    }else{
      RETURN_FALSE
    }
  } else if(left == 2){
    if(endptr[0] == 'i' || endptr[0] == 'I'){
      if(endptr[1] == '8') {
        SET_TINYINT
      }else{
        RETURN_FALSE
      }
    }else if(endptr[0] == 'u' || endptr[0] == 'U') {
      if (endptr[1] == '8') {
        SET_UTINYINT
      } else {
        RETURN_FALSE
      }
    }else{
      RETURN_FALSE
    }
  } else if(left == 1){
    if(endptr[0] == 'i' || endptr[0] == 'I'){
      SET_BIGINT
    }else if(endptr[0] == 'u' || endptr[0] == 'U') {
      SET_UBIGINT
    }else{
      RETURN_FALSE
    }
  } else {
    RETURN_FALSE;
  }
  return true;
}

bool smlParseNumberOld(SSmlKv *kvVal, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
414
  const char *pVal = kvVal->value;
X
Xiaoyu Wang 已提交
415 416 417 418
  int32_t     len = kvVal->length;
  char       *endptr = NULL;
  double      result = taosStr2Double(pVal, &endptr);
  if (pVal == endptr) {
419
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
420 421 422
    return false;
  }

423
  int32_t left = len - (endptr - pVal);
X
Xiaoyu Wang 已提交
424
  if (left == 0 || (left == 3 && strncasecmp(endptr, "f64", left) == 0)) {
425 426
    kvVal->type = TSDB_DATA_TYPE_DOUBLE;
    kvVal->d = result;
X
Xiaoyu Wang 已提交
427 428
  } else if ((left == 3 && strncasecmp(endptr, "f32", left) == 0)) {
    if (!IS_VALID_FLOAT(result)) {
429 430
      smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
431
    }
432 433
    kvVal->type = TSDB_DATA_TYPE_FLOAT;
    kvVal->f = (float)result;
X
Xiaoyu Wang 已提交
434 435
  } else if ((left == 1 && *endptr == 'i') || (left == 3 && strncasecmp(endptr, "i64", left) == 0)) {
    if (smlDoubleToInt64OverFlow(result)) {
wmmhello's avatar
wmmhello 已提交
436 437
      errno = 0;
      int64_t tmp = taosStr2Int64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
438
      if (errno == ERANGE) {
wmmhello's avatar
wmmhello 已提交
439 440 441 442 443 444
        smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_BIGINT;
      kvVal->i = tmp;
      return true;
wmmhello's avatar
wmmhello 已提交
445
    }
446
    kvVal->type = TSDB_DATA_TYPE_BIGINT;
wmmhello's avatar
wmmhello 已提交
447
    kvVal->i = (int64_t)result;
wmmhello's avatar
wmmhello 已提交
448
  } else if ((left == 1 && *endptr == 'u') || (left == 3 && strncasecmp(endptr, "u64", left) == 0)) {
X
Xiaoyu Wang 已提交
449
    if (result >= (double)UINT64_MAX || result < 0) {
wmmhello's avatar
wmmhello 已提交
450 451
      errno = 0;
      uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
452
      if (errno == ERANGE || result < 0) {
wmmhello's avatar
wmmhello 已提交
453 454 455 456 457 458
        smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_UBIGINT;
      kvVal->u = tmp;
      return true;
459
    }
460
    kvVal->type = TSDB_DATA_TYPE_UBIGINT;
wmmhello's avatar
wmmhello 已提交
461
    kvVal->u = result;
X
Xiaoyu Wang 已提交
462 463
  } else if (left == 3 && strncasecmp(endptr, "i32", left) == 0) {
    if (!IS_VALID_INT(result)) {
464 465
      smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
466
    }
467 468
    kvVal->type = TSDB_DATA_TYPE_INT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
469 470
  } else if (left == 3 && strncasecmp(endptr, "u32", left) == 0) {
    if (!IS_VALID_UINT(result)) {
471 472
      smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
473
    }
474 475
    kvVal->type = TSDB_DATA_TYPE_UINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
476 477
  } else if (left == 3 && strncasecmp(endptr, "i16", left) == 0) {
    if (!IS_VALID_SMALLINT(result)) {
478 479
      smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
480
    }
481 482
    kvVal->type = TSDB_DATA_TYPE_SMALLINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
483 484
  } else if (left == 3 && strncasecmp(endptr, "u16", left) == 0) {
    if (!IS_VALID_USMALLINT(result)) {
485 486
      smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
487
    }
488 489
    kvVal->type = TSDB_DATA_TYPE_USMALLINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
490 491
  } else if (left == 2 && strncasecmp(endptr, "i8", left) == 0) {
    if (!IS_VALID_TINYINT(result)) {
492 493
      smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
494
    }
495 496
    kvVal->type = TSDB_DATA_TYPE_TINYINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
497 498
  } else if (left == 2 && strncasecmp(endptr, "u8", left) == 0) {
    if (!IS_VALID_UTINYINT(result)) {
499 500
      smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
501
    }
502 503
    kvVal->type = TSDB_DATA_TYPE_UTINYINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
504
  } else {
505
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
506 507
    return false;
  }
508
  return true;
wmmhello's avatar
wmmhello 已提交
509 510
}

511 512
STableMeta* smlGetMeta(SSmlHandle *info, const void* measure, int32_t measureLen){
  STableMeta *pTableMeta = NULL;
wmmhello's avatar
wmmhello 已提交
513

514 515
  SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
  tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
wmmhello's avatar
wmmhello 已提交
516

517 518 519 520 521 522 523
  SRequestConnInfo conn = {0};
  conn.pTrans = info->taos->pAppInfo->pTransporter;
  conn.requestId = info->pRequest->requestId;
  conn.requestObjRefId = info->pRequest->self;
  conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
  memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
  memcpy(pName.tname, measure, measureLen);
wmmhello's avatar
wmmhello 已提交
524

525 526
  catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
  return pTableMeta;
wmmhello's avatar
wmmhello 已提交
527
}
wmmhello's avatar
wmmhello 已提交
528

529

530 531
static int64_t smlGenId() {
  static volatile int64_t linesSmlHandleId = 0;
wmmhello's avatar
wmmhello 已提交
532

533 534 535 536
  int64_t id = 0;
  do {
    id = atomic_add_fetch_64(&linesSmlHandleId, 1);
  } while (id == 0);
537

538
  return id;
539 540
}

541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566
static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSmlKv *kv, bool isTag,
                                       ESchemaAction *action, SSmlHandle *info) {
  uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL;
  if (index) {
    if (colField[*index].type != kv->type) {
      uError("SML:0x%" PRIx64 " point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id,
             kv->key, colField[*index].type, kv->type);
      return TSDB_CODE_TSC_INVALID_VALUE;
    }

    if ((colField[*index].type == TSDB_DATA_TYPE_VARCHAR &&
         (colField[*index].bytes - VARSTR_HEADER_SIZE) < kv->length) ||
        (colField[*index].type == TSDB_DATA_TYPE_NCHAR &&
         ((colField[*index].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE < kv->length))) {
      if (isTag) {
        *action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
      } else {
        *action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
      }
    }
  } else {
    if (isTag) {
      *action = SCHEMA_ACTION_ADD_TAG;
    } else {
      *action = SCHEMA_ACTION_ADD_COLUMN;
    }
wmmhello's avatar
wmmhello 已提交
567
  }
568
  return 0;
569 570
}

571 572 573 574
static int32_t smlFindNearestPowerOf2(int32_t length, uint8_t type) {
  int32_t result = 1;
  while (result <= length) {
    result *= 2;
575
  }
576 577 578 579
  if (type == TSDB_DATA_TYPE_BINARY && result > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
    result = TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE;
  } else if (type == TSDB_DATA_TYPE_NCHAR && result > (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
    result = (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
580
  }
581 582 583 584 585

  if (type == TSDB_DATA_TYPE_NCHAR) {
    result = result * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
  } else if (type == TSDB_DATA_TYPE_BINARY) {
    result = result + VARSTR_HEADER_SIZE;
586
  }
587
  return result;
588 589
}

590 591 592 593 594 595 596 597 598 599
static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
                                      ESchemaAction *action, bool isTag) {
  int32_t code = TSDB_CODE_SUCCESS;
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
    if (j == 0 && !isTag) continue;
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j);
    code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, info);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
600
  }
601
  return TSDB_CODE_SUCCESS;
602 603
}

604 605 606 607 608
static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool isTag) {
  SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
  int32_t   i = 0;
  for (; i < length; i++) {
    taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES);
609 610
  }

611 612 613 614
  if (isTag) {
    i = 0;
  } else {
    i = 1;
615
  }
616 617 618 619 620 621
  for (; i < taosArrayGetSize(cols); i++) {
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
    if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) {
      taosHashCleanup(hashTmp);
      return -1;
    }
622
  }
623 624 625
  taosHashCleanup(hashTmp);
  return 0;
}
626

627 628 629 630 631 632
static int32_t getBytes(uint8_t type, int32_t length) {
  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
    return smlFindNearestPowerOf2(length, type);
  } else {
    return tDataTypes[type].bytes;
  }
633 634
}

635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652
static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
                                  SArray *results, int32_t numOfCols, bool isTag) {
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
    SSmlKv       *kv = (SSmlKv *)taosArrayGet(cols, j);
    ESchemaAction action = SCHEMA_ACTION_NULL;
    smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, &action, info);
    if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_ADD_TAG) {
      SField field = {0};
      field.type = kv->type;
      field.bytes = getBytes(kv->type, kv->length);
      memcpy(field.name, kv->key, kv->keyLen);
      taosArrayPush(results, &field);
    } else if (action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
      uint16_t *index = (uint16_t *)taosHashGet(schemaHash, kv->key, kv->keyLen);
      uint16_t  newIndex = *index;
      if (isTag) newIndex -= numOfCols;
      SField *field = (SField *)taosArrayGet(results, newIndex);
      field->bytes = getBytes(kv->type, kv->length);
653 654 655 656 657
    }
  }
  return TSDB_CODE_SUCCESS;
}

658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674
// static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SSmlSTableMeta *sTableData,
//                               int32_t colVer, int32_t tagVer, int8_t source, uint64_t suid){
static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, SArray *pTags, STableMeta *pTableMeta,
                              ESchemaAction action) {
  SRequestObj   *pRequest = NULL;
  SMCreateStbReq pReq = {0};
  int32_t        code = TSDB_CODE_SUCCESS;
  SCmdMsgInfo    pCmdMsg = {0};

  // put front for free
  pReq.numOfColumns = taosArrayGetSize(pColumns);
  pReq.pColumns = pColumns;
  pReq.numOfTags = taosArrayGetSize(pTags);
  pReq.pTags = pTags;

  code = buildRequest(info->taos->id, "", 0, NULL, false, &pRequest, 0);
  if (code != TSDB_CODE_SUCCESS) {
675 676
    goto end;
  }
677 678 679 680

  pRequest->syncQuery = true;
  if (!pRequest->pDb) {
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
681 682 683
    goto end;
  }

684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699
  if (action == SCHEMA_ACTION_CREATE_STABLE) {
    pReq.colVer = 1;
    pReq.tagVer = 1;
    pReq.suid = 0;
    pReq.source = TD_REQ_FROM_APP;
  } else if (action == SCHEMA_ACTION_ADD_TAG || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
    pReq.colVer = pTableMeta->sversion;
    pReq.tagVer = pTableMeta->tversion + 1;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
  } else if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE) {
    pReq.colVer = pTableMeta->sversion + 1;
    pReq.tagVer = pTableMeta->tversion;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
  }
700

701 702 703 704 705 706 707
  if (pReq.numOfTags == 0) {
    pReq.numOfTags = 1;
    SField field = {0};
    field.type = TSDB_DATA_TYPE_NCHAR;
    field.bytes = 1;
    strcpy(field.name, tsSmlTagName);
    taosArrayPush(pReq.pTags, &field);
708
  }
wmmhello's avatar
wmmhello 已提交
709

710 711 712
  pReq.commentLen = -1;
  pReq.igExists = true;
  tNameExtractFullName(pName, pReq.name);
713

714 715 716 717 718 719 720 721 722
  pCmdMsg.epSet = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);
723

724 725 726 727 728 729
  SQuery pQuery;
  memset(&pQuery, 0, sizeof(pQuery));
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;
730

731
  launchQueryImpl(pRequest, &pQuery, true, NULL);
732

733 734
  if (pRequest->code == TSDB_CODE_SUCCESS) {
    catalogRemoveTableMeta(info->pCatalog, pName);
735
  }
736 737
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);
738

739 740 741 742 743
  end:
  destroyRequest(pRequest);
  tFreeSMCreateStbReq(&pReq);
  return code;
}
744

745 746 747
static int32_t smlModifyDBSchemas(SSmlHandle *info) {
  if(info->dataFormat && !info->needModifySchema){
    return TSDB_CODE_SUCCESS;
748
  }
749 750 751
  int32_t     code = 0;
  SHashObj   *hashTmp = NULL;
  STableMeta *pTableMeta = NULL;
752 753 754 755 756 757 758 759 760

  SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
  tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));

  SRequestConnInfo conn = {0};
  conn.pTrans = info->taos->pAppInfo->pTransporter;
  conn.requestId = info->pRequest->requestId;
  conn.requestObjRefId = info->pRequest->self;
  conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
761

762 763 764 765
  NodeList *tmp = info->superTables;
  while (tmp) {
    SSmlSTableMeta *sTableData = (SSmlSTableMeta *)tmp->data.value;
    bool            needCheckMeta = false;  // for multi thread
766

767 768 769 770
    size_t superTableLen = (size_t)tmp->data.keyLen;
    const void  *superTable = tmp->data.key;
    memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
    memcpy(pName.tname, superTable, superTableLen);
771

772
    code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
773

774 775 776 777 778
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
      SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
      SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
      smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
      smlBuildFieldsList(info, NULL, NULL, sTableData->cols, pColumns, 0, false);
779

780 781 782 783
      code = smlSendMetaMsg(info, &pName, pColumns, pTags, NULL, SCHEMA_ACTION_CREATE_STABLE);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
        goto end;
784
      }
785 786
      info->cost.numOfCreateSTables++;
      taosMemoryFreeClear(pTableMeta);
787

788 789 790 791
      code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
        goto end;
792
      }
793 794 795 796 797 798
    } else if (code == TSDB_CODE_SUCCESS) {
      hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true,
                             HASH_NO_LOCK);
      for (uint16_t i = pTableMeta->tableInfo.numOfColumns;
           i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
799
      }
800

801 802 803 804
      ESchemaAction action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, &action, true);
      if (code != TSDB_CODE_SUCCESS) {
        goto end;
805
      }
806 807 808 809 810
      if (action != SCHEMA_ACTION_NULL) {
        SArray *pColumns =
            taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray *pTags =
            taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
811

812 813 814 815 816 817 818 819 820
        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
          if (i < pTableMeta->tableInfo.numOfColumns) {
            taosArrayPush(pColumns, &field);
          } else {
            taosArrayPush(pTags, &field);
821 822
          }
        }
823 824
        smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->tags, pTags,
                           pTableMeta->tableInfo.numOfColumns, true);
825

826 827 828 829
        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
          goto end;
830
        }
831

832 833 834 835 836 837 838 839 840
        info->cost.numOfAlterTagSTables++;
        taosMemoryFreeClear(pTableMeta);
        code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
        }
        code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
wmmhello's avatar
wmmhello 已提交
841
        }
842 843
      }

844 845 846
      taosHashClear(hashTmp);
      for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
847
      }
848 849 850 851
      action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &action, false);
      if (code != TSDB_CODE_SUCCESS) {
        goto end;
852
      }
853 854 855 856 857
      if (action != SCHEMA_ACTION_NULL) {
        SArray *pColumns =
            taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray *pTags =
            taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
858

859 860 861 862 863 864 865 866 867 868
        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
          if (i < pTableMeta->tableInfo.numOfColumns) {
            taosArrayPush(pColumns, &field);
          } else {
            taosArrayPush(pTags, &field);
          }
869 870
        }

871 872
        smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->cols, pColumns,
                           pTableMeta->tableInfo.numOfColumns, false);
873

874 875 876 877
        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
          goto end;
878
        }
879

880 881 882 883 884 885 886 887 888 889
        info->cost.numOfAlterColSTables++;
        taosMemoryFreeClear(pTableMeta);
        code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
        }
        code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
          goto end;
890 891
        }
      }
wmmhello's avatar
wmmhello 已提交
892

893 894 895
      needCheckMeta = true;
      taosHashCleanup(hashTmp);
      hashTmp = NULL;
X
Xiaoyu Wang 已提交
896
    } else {
897 898
      uError("SML:0x%" PRIx64 " load table meta error: %s", info->id, tstrerror(code));
      goto end;
wmmhello's avatar
wmmhello 已提交
899
    }
900

901 902 903 904 905 906 907 908 909 910 911
    if (needCheckMeta) {
      code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags,
                          sTableData->tags, true);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " check tag failed. super table name %s", info->id, pName.tname);
        goto end;
      }
      code = smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols, false);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " check cols failed. super table name %s", info->id, pName.tname);
        goto end;
912 913
      }
    }
914 915 916 917

    sTableData->tableMeta = pTableMeta;

    tmp = tmp->next;
wmmhello's avatar
wmmhello 已提交
918
  }
919
  return 0;
920

921 922 923 924 925
  end:
  taosHashCleanup(hashTmp);
  taosMemoryFreeClear(pTableMeta);
//  catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
  return code;
wmmhello's avatar
wmmhello 已提交
926 927 928
}


929 930 931 932 933 934 935
/*
static int32_t smlCheckDupUnit(SHashObj *dumplicateKey, SArray *tags, SSmlMsgBuf *msg){
  for(int i = 0; i < taosArrayGetSize(tags); i++) {
    SSmlKv *tag = taosArrayGet(tags, i);
    if (smlCheckDuplicateKey(tag->key, tag->keyLen, dumplicateKey)) {
      smlBuildInvalidDataMsg(msg, "dumplicate key", tag->key);
      return TSDB_CODE_TSC_DUP_NAMES;
936
    }
wmmhello's avatar
wmmhello 已提交
937
  }
938 939 940
  return TSDB_CODE_SUCCESS;
}

941 942 943 944 945
static int32_t smlJudgeDupColName(SArray *cols, SArray *tags, SSmlMsgBuf *msg) {
  SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  int ret = smlCheckDupUnit(dumplicateKey, cols, msg);
  if(ret != TSDB_CODE_SUCCESS){
    goto end;
946
  }
947 948 949
  ret = smlCheckDupUnit(dumplicateKey, tags, msg);
  if(ret != TSDB_CODE_SUCCESS){
    goto end;
950
  }
951

952 953 954
  end:
  taosHashCleanup(dumplicateKey);
  return ret;
wmmhello's avatar
wmmhello 已提交
955
}
956
*/
wmmhello's avatar
wmmhello 已提交
957

958 959 960 961 962 963 964
static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols){
  for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
    int ret = taosHashPut(metaHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
    if(ret == 0){
      taosArrayPush(metaArray, kv);
    }
965
  }
966
}
wmmhello's avatar
wmmhello 已提交
967

968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986
static void smlDestroySTableMeta(SSmlSTableMeta *meta) {
  taosHashCleanup(meta->tagHash);
  taosHashCleanup(meta->colHash);
  taosArrayDestroy(meta->tags);
  taosArrayDestroy(meta->cols);
  taosMemoryFree(meta->tableMeta);
  taosMemoryFree(meta);
}

static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, bool isTag, SSmlMsgBuf *msg) {
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);

    int16_t *index = (int16_t *)taosHashGet(metaHash, kv->key, kv->keyLen);
    if (index) {
      SSmlKv *value = (SSmlKv *)taosArrayGet(metaArray, *index);
      if (isTag){
        if (kv->length > value->length) {
          value->length = kv->length;
987
        }
988 989 990 991 992
        continue;
      }
      if (kv->type != value->type) {
        smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
        return TSDB_CODE_SML_NOT_SAME_TYPE;
993 994
      }

995 996 997 998 999 1000 1001 1002 1003 1004
      if (IS_VAR_DATA_TYPE(kv->type) && (kv->length > value->length)) {  // update string len, if bigger
        value->length = kv->length;
      }
    } else {
      size_t tmp = taosArrayGetSize(metaArray);
      ASSERT(tmp <= INT16_MAX);
      int16_t size = tmp;
      int ret = taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
      if(ret == 0){
        taosArrayPush(metaArray, kv);
1005 1006 1007 1008
      }
    }
  }

1009 1010
  return TSDB_CODE_SUCCESS;
}
1011

1012 1013 1014 1015 1016
static void smlDestroyTableInfo(SSmlTableInfo *tag) {
  for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
    SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i);
    taosHashCleanup(kvHash);
  }
1017

1018 1019 1020 1021 1022
  taosMemoryFree(tag->key);
  taosArrayDestroy(tag->cols);
  taosArrayDestroy(tag->tags);
  taosMemoryFree(tag);
}
1023

1024 1025 1026
static void smlDestroyInfo(SSmlHandle *info) {
  if (!info) return;
  qDestroyQuery(info->pQuery);
1027

1028 1029 1030 1031 1032
  // destroy info->childTables
  NodeList* tmp = info->childTables;
  while (tmp) {
    if(tmp->data.used) {
      smlDestroyTableInfo((SSmlTableInfo*)tmp->data.value);
1033
    }
1034 1035 1036
    NodeList* t = tmp->next;
    taosMemoryFree(tmp);
    tmp = t;
1037 1038
  }

1039 1040 1041 1042 1043
  // destroy info->superTables
  tmp = info->superTables;
  while (tmp) {
    if(tmp->data.used) {
      smlDestroySTableMeta((SSmlSTableMeta*)tmp->data.value);
1044
    }
1045 1046 1047
    NodeList* t = tmp->next;
    taosMemoryFree(tmp);
    tmp = t;
1048
  }
1049

1050 1051
  // destroy info->pVgHash
  taosHashCleanup(info->pVgHash);
wmmhello's avatar
wmmhello 已提交
1052

1053 1054
  taosArrayDestroy(info->preLineTagKV);
  taosArrayDestroy(info->preLineColKV);
wmmhello's avatar
wmmhello 已提交
1055

1056 1057 1058 1059 1060
  if(!info->dataFormat){
    for(int i = 0; i < info->lineNum; i++){
      taosArrayDestroy(info->lines[i].colArray);
    }
    taosMemoryFree(info->lines);
wmmhello's avatar
wmmhello 已提交
1061
  }
1062

1063 1064 1065
  cJSON_Delete(info->root);
  taosMemoryFreeClear(info);
}
wmmhello's avatar
wmmhello 已提交
1066

1067 1068 1069 1070 1071
static SSmlHandle *smlBuildSmlInfo(TAOS *taos) {
  int32_t     code = TSDB_CODE_SUCCESS;
  SSmlHandle *info = (SSmlHandle *)taosMemoryCalloc(1, sizeof(SSmlHandle));
  if (NULL == info) {
    return NULL;
1072
  }
1073 1074 1075 1076 1077 1078 1079
  if(taos != NULL){
    info->taos = acquireTscObj(*(int64_t *)taos);
    code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code);
      goto cleanup;
    }
1080
  }
wmmhello's avatar
wmmhello 已提交
1081

1082 1083 1084 1085
  info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
  info->id = smlGenId();
  info->pQuery = smlInitHandle();
  info->dataFormat = true;
1086

1087 1088 1089 1090 1091 1092
  info->preLineTagKV = taosArrayInit(8, sizeof(SSmlKv));
  info->preLineColKV = taosArrayInit(8, sizeof(SSmlKv));

  if (NULL == info->pVgHash) {
    uError("create SSmlHandle failed");
    goto cleanup;
1093 1094
  }

1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110
  return info;

cleanup:
  smlDestroyInfo(info);
  return NULL;
}

static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
  SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (!kvHash) {
    uError("SML:smlDealCols failed to allocate memory");
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
    taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
1111 1112
  }

1113
  taosArrayPush(colsArray, &kvHash);
1114
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1115
}
1116

1117 1118 1119
static int32_t smlParseLineBottom(SSmlHandle *info) {
  if(info->dataFormat) return TSDB_CODE_SUCCESS;

wmmhello's avatar
wmmhello 已提交
1120 1121
  for(int32_t i = 0; i < info->lineNum; i ++){
    SSmlLineInfo* elements = info->lines + i;
1122
    SSmlTableInfo *tinfo = NULL;
1123
    if(info->protocol == TSDB_SML_LINE_PROTOCOL){
1124
      tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements->measure, elements->measureTagsLen, NULL);
1125 1126
    }else if(info->protocol == TSDB_SML_TELNET_PROTOCOL){
      tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements, POINTER_BYTES, is_same_child_table_telnet);
1127 1128 1129 1130
    }else{
      tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements->tags, POINTER_BYTES, is_same_child_table_json);
    }

wmmhello's avatar
wmmhello 已提交
1131
    if(tinfo == NULL){
1132 1133 1134
      uError("SML:0x%" PRIx64 "get oneTable failed, line num:%d", info->id, i);
      smlBuildInvalidDataMsg(&info->msgBuf, "get oneTable failed", elements->measure);
      return TSDB_CODE_SML_INVALID_DATA;
1135
    }
wmmhello's avatar
wmmhello 已提交
1136

1137 1138 1139 1140 1141
    if (taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS) {
      smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
      return TSDB_CODE_PAR_INVALID_TAGS_NUM;
    }

1142 1143 1144
    if (taosArrayGetSize(elements->colArray) + taosArrayGetSize(tinfo->tags) > TSDB_MAX_COLUMNS) {
      smlBuildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL);
      return TSDB_CODE_PAR_TOO_MANY_COLUMNS;
wmmhello's avatar
wmmhello 已提交
1145
    }
wmmhello's avatar
wmmhello 已提交
1146

1147
    int ret = smlPushCols(tinfo->cols, elements->colArray);
X
Xiaoyu Wang 已提交
1148
    if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1149 1150 1151
      return ret;
    }

1152
    SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
1153
    if (tableMeta) {  // update meta
wmmhello's avatar
wmmhello 已提交
1154
      ret = smlUpdateMeta(tableMeta->colHash, tableMeta->cols, elements->colArray, false, &info->msgBuf);
1155
      if (ret == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1156
        ret = smlUpdateMeta(tableMeta->tagHash, tableMeta->tags, tinfo->tags, true, &info->msgBuf);
1157 1158 1159 1160 1161
      }
      if (ret != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
        return ret;
      }
X
Xiaoyu Wang 已提交
1162
    } else {
1163 1164 1165 1166 1167
//      ret = smlJudgeDupColName(elements->colArray, tinfo->tags, &info->msgBuf);
//      if (ret != TSDB_CODE_SUCCESS) {
//        uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
//        return ret;
//      }
wmmhello's avatar
wmmhello 已提交
1168

1169 1170 1171
      SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat);
      smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags);
      smlInsertMeta(meta->colHash, meta->cols, elements->colArray);
1172
      nodeListSet(&info->superTables, elements->measure, elements->measureLen, meta, NULL);
wmmhello's avatar
wmmhello 已提交
1173
    }
wmmhello's avatar
wmmhello 已提交
1174
  }
1175

wmmhello's avatar
wmmhello 已提交
1176 1177 1178
  return TSDB_CODE_SUCCESS;
}

1179

X
Xiaoyu Wang 已提交
1180
static int32_t smlInsertData(SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
1181 1182
  int32_t code = TSDB_CODE_SUCCESS;

wmmhello's avatar
wmmhello 已提交
1183 1184 1185
  NodeList* tmp = info->childTables;
  while (tmp) {
    SSmlTableInfo *tableData = (SSmlTableInfo *)tmp->data.value;
wmmhello's avatar
wmmhello 已提交
1186 1187

    SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
wmmhello's avatar
wmmhello 已提交
1188
    tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
wmmhello's avatar
wmmhello 已提交
1189
    memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName));
D
dapan1121 已提交
1190 1191 1192 1193 1194 1195

    SRequestConnInfo conn = {0};
    conn.pTrans = info->taos->pAppInfo->pTransporter;
    conn.requestId = info->pRequest->requestId;
    conn.requestObjRefId = info->pRequest->self;
    conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
X
Xiaoyu Wang 已提交
1196

wmmhello's avatar
wmmhello 已提交
1197
    SVgroupInfo vg;
D
dapan1121 已提交
1198
    code = catalogGetTableHashVgroup(info->pCatalog, &conn, &pName, &vg);
1199
    if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
1200
      uError("SML:0x%" PRIx64 " catalogGetTableHashVgroup failed. table name: %s", info->id, tableData->childTableName);
wmmhello's avatar
wmmhello 已提交
1201 1202
      return code;
    }
X
Xiaoyu Wang 已提交
1203
    taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
wmmhello's avatar
wmmhello 已提交
1204

wmmhello's avatar
wmmhello 已提交
1205
    SSmlSTableMeta *pMeta =
1206
        (SSmlSTableMeta *)nodeListGet(info->superTables, tableData->sTableName, tableData->sTableNameLen, NULL);
wmmhello's avatar
wmmhello 已提交
1207
    ASSERT(NULL != pMeta);
wmmhello's avatar
wmmhello 已提交
1208

1209
    // use tablemeta of stable to save vgid and uid of child table
wmmhello's avatar
wmmhello 已提交
1210 1211
    pMeta->tableMeta->vgId = vg.vgId;
    pMeta->tableMeta->uid = tableData->uid;  // one table merge data block together according uid
wmmhello's avatar
wmmhello 已提交
1212

wmmhello's avatar
wmmhello 已提交
1213 1214
    code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, pMeta->cols, tableData->cols,
                       pMeta->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen,
1215
                       info->ttl, info->msgBuf.buf, info->msgBuf.len);
X
Xiaoyu Wang 已提交
1216 1217
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " smlBindData failed", info->id);
wmmhello's avatar
wmmhello 已提交
1218 1219
      return code;
    }
wmmhello's avatar
wmmhello 已提交
1220
    tmp = tmp->next;
wmmhello's avatar
wmmhello 已提交
1221
  }
wmmhello's avatar
wmmhello 已提交
1222

wmmhello's avatar
wmmhello 已提交
1223
  code = smlBuildOutput(info->pQuery, info->pVgHash);
1224
  if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
1225
    uError("SML:0x%" PRIx64 " smlBuildOutput failed", info->id);
1226 1227
    return code;
  }
1228 1229
  info->cost.insertRpcTime = taosGetTimestampUs();

1230 1231 1232
  SAppClusterSummary *pActivity = &info->taos->pAppInfo->summary;
  atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);

wmmhello's avatar
wmmhello 已提交
1233 1234
  launchQueryImpl(info->pRequest, info->pQuery, true, NULL);
  return info->pRequest->code;
wmmhello's avatar
wmmhello 已提交
1235 1236
}

X
Xiaoyu Wang 已提交
1237 1238
static void smlPrintStatisticInfo(SSmlHandle *info) {
  uError("SML:0x%" PRIx64
1239
             " smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \
X
Xiaoyu Wang 已提交
1240
        parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64
1241
             "",
X
Xiaoyu Wang 已提交
1242
         info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables,
1243 1244
         info->cost.numOfCreateSTables, info->cost.numOfAlterTagSTables, info->cost.numOfAlterColSTables,
         info->cost.schemaTime - info->cost.parseTime,
X
Xiaoyu Wang 已提交
1245 1246
         info->cost.insertBindTime - info->cost.schemaTime, info->cost.insertRpcTime - info->cost.insertBindTime,
         info->cost.endTime - info->cost.insertRpcTime, info->cost.endTime - info->cost.parseTime);
1247 1248
}

1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286
int32_t smlClearForRerun(SSmlHandle *info){
  info->reRun = false;
  // clear info->childTables
  NodeList* pList = info->childTables;
  while (pList) {
    if(pList->data.used) {
      smlDestroyTableInfo((SSmlTableInfo*)pList->data.value);
      pList->data.used = false;
    }
    pList = pList->next;
  }

  // clear info->superTables
  pList = info->superTables;
  while (pList) {
    if(pList->data.used) {
      smlDestroySTableMeta((SSmlSTableMeta*)pList->data.value);
      pList->data.used = false;
    }
    pList = pList->next;
  }

  if(unlikely(info->lines != NULL)){
    uError("SML:0x%" PRIx64 " info->lines != NULL", info->id);
    return TSDB_CODE_SML_INVALID_DATA;
  }
  info->lines = (SSmlLineInfo*)taosMemoryCalloc(info->lineNum, sizeof(SSmlLineInfo));

  memset(&info->preLine, 0, sizeof(SSmlLineInfo));
  info->currSTableMeta = NULL;
  info->currTableDataCtx = NULL;

  SVnodeModifOpStmt* stmt= (SVnodeModifOpStmt*)(info->pQuery->pRoot);
  stmt->freeHashFunc(stmt->pTableBlockHashObj);
  stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1287
static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
wmmhello's avatar
wmmhello 已提交
1288
  int32_t code = TSDB_CODE_SUCCESS;
1289
  if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
dengyihao's avatar
dengyihao 已提交
1290
    if (lines) {
wmmhello's avatar
wmmhello 已提交
1291
      code = smlParseJSON(info, *lines);
dengyihao's avatar
dengyihao 已提交
1292
    } else if (rawLine) {
wmmhello's avatar
wmmhello 已提交
1293 1294
      code = smlParseJSON(info, rawLine);
    }
1295
    if (code != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
1296
      uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, lines ? *lines : rawLine);
1297 1298
      return code;
    }
wmmhello's avatar
wmmhello 已提交
1299
    return code;
wmmhello's avatar
wmmhello 已提交
1300
  }
wmmhello's avatar
wmmhello 已提交
1301

1302
  char *oldRaw = rawLine;
1303 1304
  int32_t i = 0;
  while (i < numLines) {
wmmhello's avatar
wmmhello 已提交
1305
    char *tmp = NULL;
dengyihao's avatar
dengyihao 已提交
1306 1307
    int   len = 0;
    if (lines) {
wmmhello's avatar
wmmhello 已提交
1308 1309
      tmp = lines[i];
      len = strlen(tmp);
dengyihao's avatar
dengyihao 已提交
1310
    } else if (rawLine) {
wmmhello's avatar
wmmhello 已提交
1311
      tmp = rawLine;
dengyihao's avatar
dengyihao 已提交
1312 1313
      while (rawLine < rawLineEnd) {
        if (*(rawLine++) == '\n') {
wmmhello's avatar
wmmhello 已提交
1314 1315 1316 1317
          break;
        }
        len++;
      }
dengyihao's avatar
dengyihao 已提交
1318
      if (info->protocol == TSDB_SML_LINE_PROTOCOL && tmp[0] == '#') {  // this line is comment
wmmhello's avatar
wmmhello 已提交
1319 1320
        continue;
      }
wmmhello's avatar
wmmhello 已提交
1321 1322
    }

1323 1324
    uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, (info->isRawLine ? "rawdata" : tmp));

X
Xiaoyu Wang 已提交
1325
    if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
wmmhello's avatar
wmmhello 已提交
1326 1327 1328 1329 1330 1331
      if(info->dataFormat){
        SSmlLineInfo element = {0};
        code = smlParseInfluxString(info, tmp, tmp + len, &element);
      }else{
        code = smlParseInfluxString(info, tmp, tmp + len, info->lines + i);
      }
X
Xiaoyu Wang 已提交
1332
    } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
1333 1334 1335 1336 1337 1338 1339
      if(info->dataFormat) {
        SSmlLineInfo element = {0};
        code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, &element);
      }else{
        code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, info->lines + i);
      }

X
Xiaoyu Wang 已提交
1340
    } else {
1341 1342
      ASSERT(0);
    }
wmmhello's avatar
wmmhello 已提交
1343
    if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1344
      uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, tmp);
1345
      return code;
wmmhello's avatar
wmmhello 已提交
1346
    }
1347 1348
    if(info->reRun){
      i = 0;
1349
      rawLine = oldRaw;
1350 1351 1352
      code = smlClearForRerun(info);
      if(code != TSDB_CODE_SUCCESS){
        return code;
1353
      }
1354
      continue;
1355
    }
1356
    i++;
wmmhello's avatar
wmmhello 已提交
1357
  }
1358

1359 1360 1361
  return code;
}

dengyihao's avatar
dengyihao 已提交
1362
static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
1363
  int32_t code = TSDB_CODE_SUCCESS;
1364 1365
  int32_t retryNum = 0;

1366 1367
  info->cost.parseTime = taosGetTimestampUs();

wmmhello's avatar
wmmhello 已提交
1368
  code = smlParseLine(info, lines, rawLine, rawLineEnd, numLines);
1369
  if (code != 0) {
X
Xiaoyu Wang 已提交
1370
    uError("SML:0x%" PRIx64 " smlParseLine error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
1371
    return code;
1372
  }
1373 1374 1375 1376 1377 1378
  code = smlParseLineBottom(info);
  if (code != 0) {
    uError("SML:0x%" PRIx64 " smlParseLineBottom error : %s", info->id, tstrerror(code));
    return code;
  }

1379
  info->cost.lineNum = info->lineNum;
wmmhello's avatar
wmmhello 已提交
1380 1381
  info->cost.numOfSTables = nodeListSize(info->superTables);
  info->cost.numOfCTables = nodeListSize(info->childTables);
1382 1383

  info->cost.schemaTime = taosGetTimestampUs();
1384

X
Xiaoyu Wang 已提交
1385
  do {
1386 1387
    code = smlModifyDBSchemas(info);
    if (code == 0) break;
wmmhello's avatar
wmmhello 已提交
1388
  } while (retryNum++ < nodeListSize(info->superTables) * MAX_RETRY_TIMES);
1389

wmmhello's avatar
wmmhello 已提交
1390
  if (code != 0) {
X
Xiaoyu Wang 已提交
1391
    uError("SML:0x%" PRIx64 " smlModifyDBSchemas error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
1392
    return code;
wmmhello's avatar
wmmhello 已提交
1393
  }
wmmhello's avatar
wmmhello 已提交
1394

1395
  info->cost.insertBindTime = taosGetTimestampUs();
wmmhello's avatar
wmmhello 已提交
1396 1397
  code = smlInsertData(info);
  if (code != 0) {
X
Xiaoyu Wang 已提交
1398
    uError("SML:0x%" PRIx64 " smlInsertData error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
1399
    return code;
wmmhello's avatar
wmmhello 已提交
1400 1401 1402 1403 1404
  }

  return code;
}

wmmhello's avatar
wmmhello 已提交
1405 1406
TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, char *rawLineEnd,
                                       int numLines, int protocol, int precision, int32_t ttl, int64_t reqid) {
1407
  int32_t code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1408 1409 1410
  if (NULL == taos) {
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    return NULL;
1411
  }
1412

wmmhello's avatar
wmmhello 已提交
1413 1414 1415 1416
  SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
  if (request == NULL) {
    uError("SML:taos_schemaless_insert error request is null");
    return NULL;
wmmhello's avatar
wmmhello 已提交
1417
  }
1418

wmmhello's avatar
wmmhello 已提交
1419 1420 1421 1422
  SSmlHandle *info = smlBuildSmlInfo(taos);
  if (info == NULL) {
    request->code = TSDB_CODE_OUT_OF_MEMORY;
    uError("SML:taos_schemaless_insert error SSmlHandle is null");
1423
    return (TAOS_RES *)request;
wmmhello's avatar
wmmhello 已提交
1424
  }
wmmhello's avatar
wmmhello 已提交
1425 1426 1427 1428
  info->pRequest = request;
  info->isRawLine = rawLine != NULL;
  info->ttl       = ttl;
  info->precision = precision;
1429
  info->protocol = (TSDB_SML_PROTOCOL_TYPE)protocol;
wmmhello's avatar
wmmhello 已提交
1430 1431 1432
  info->msgBuf.buf = info->pRequest->msgBuf;
  info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
  info->lineNum = numLines;
wmmhello's avatar
wmmhello 已提交
1433

wmmhello's avatar
wmmhello 已提交
1434
  SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
wmmhello's avatar
wmmhello 已提交
1435 1436 1437
  if (request->pDb == NULL) {
    request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
wmmhello's avatar
wmmhello 已提交
1438 1439 1440
    goto end;
  }

X
Xiaoyu Wang 已提交
1441
  if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) {
1442
    request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
wmmhello's avatar
wmmhello 已提交
1443
    smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL);
1444
    goto end;
wmmhello's avatar
wmmhello 已提交
1445 1446
  }

X
Xiaoyu Wang 已提交
1447 1448
  if (protocol == TSDB_SML_LINE_PROTOCOL &&
      (precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)) {
1449
    request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
wmmhello's avatar
wmmhello 已提交
1450
    smlBuildInvalidDataMsg(&msg, "precision invalidate for line protocol", NULL);
1451 1452 1453
    goto end;
  }

1454
  if (protocol == TSDB_SML_JSON_PROTOCOL) {
wmmhello's avatar
wmmhello 已提交
1455
    numLines = 1;
1456
  } else if (numLines <= 0) {
wmmhello's avatar
wmmhello 已提交
1457 1458 1459 1460 1461
    request->code = TSDB_CODE_SML_INVALID_DATA;
    smlBuildInvalidDataMsg(&msg, "line num is invalid", NULL);
    goto end;
  }

1462
  code = smlProcess(info, lines, rawLine, rawLineEnd, numLines);
wmmhello's avatar
wmmhello 已提交
1463 1464 1465
  request->code = code;
  info->cost.endTime = taosGetTimestampUs();
  info->cost.code = code;
1466
//  smlPrintStatisticInfo(info);
1467

wmmhello's avatar
wmmhello 已提交
1468
end:
wmmhello's avatar
wmmhello 已提交
1469
  smlDestroyInfo(info);
1470
  return (TAOS_RES *)request;
wmmhello's avatar
wmmhello 已提交
1471
}
wmmhello's avatar
wmmhello 已提交
1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488

/**
 * taos_schemaless_insert() parse and insert data points into database according to
 * different protocol.
 *
 * @param $lines input array may contain multiple lines, each line indicates a data point.
 *               If protocol=2 is used input array should contain single JSON
 *               string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
 *               multiple data points in JSON format, should include them in $JSON_string
 *               as a JSON array.
 * @param $numLines indicates how many data points in $lines.
 *                  If protocol = 2 is used this param will be ignored as $lines should
 *                  contain single JSON string.
 * @param $protocol indicates which protocol to use for parsing:
 *                  0 - influxDB line protocol
 *                  1 - OpenTSDB telnet line protocol
 *                  2 - OpenTSDB JSON format protocol
dengyihao's avatar
dengyihao 已提交
1489
 * @return TAOS_RES
wmmhello's avatar
wmmhello 已提交
1490 1491
 */

1492 1493
TAOS_RES *taos_schemaless_insert_ttl_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision,
                                                int32_t ttl, int64_t reqid) {
wmmhello's avatar
wmmhello 已提交
1494
  return taos_schemaless_insert_inner(taos, lines, NULL, NULL, numLines, protocol, precision, ttl, reqid);
dengyihao's avatar
dengyihao 已提交
1495 1496
}

1497 1498 1499
TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, TSDB_DEFAULT_TABLE_TTL, 0);
}
wmmhello's avatar
wmmhello 已提交
1500

1501 1502 1503
TAOS_RES *taos_schemaless_insert_ttl(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int32_t ttl) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, ttl, 0);
}
wmmhello's avatar
wmmhello 已提交
1504

1505 1506
TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int64_t reqid) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, TSDB_DEFAULT_TABLE_TTL, reqid);
wmmhello's avatar
wmmhello 已提交
1507 1508
}

1509
TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
1510
                                                    int precision, int32_t ttl, int64_t reqid) {
dengyihao's avatar
dengyihao 已提交
1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522
  int numLines = 0;
  *totalRows = 0;
  char *tmp = lines;
  for (int i = 0; i < len; i++) {
    if (lines[i] == '\n' || i == len - 1) {
      numLines++;
      if (tmp[0] != '#' || protocol != TSDB_SML_LINE_PROTOCOL) {  // ignore comment
        (*totalRows)++;
      }
      tmp = lines + i + 1;
    }
  }
wmmhello's avatar
wmmhello 已提交
1523
  return taos_schemaless_insert_inner(taos, NULL, lines, lines + len, *totalRows, protocol, precision, ttl, reqid);
dengyihao's avatar
dengyihao 已提交
1524 1525
}

1526 1527 1528 1529 1530 1531
TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int64_t reqid) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, TSDB_DEFAULT_TABLE_TTL, reqid);
}
TAOS_RES *taos_schemaless_insert_raw_ttl(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int32_t ttl) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, ttl, 0);
}
wmmhello's avatar
wmmhello 已提交
1532

1533 1534
TAOS_RES *taos_schemaless_insert_raw(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, TSDB_DEFAULT_TABLE_TTL, 0);
wmmhello's avatar
wmmhello 已提交
1535
}