clientSml.c 50.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

wmmhello's avatar
wmmhello 已提交
16 17 18 19 20
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

21 22 23 24 25 26 27
#include "clientSml.h"

int64_t smlToMilli[3] = {3600000LL, 60000LL, 1000LL};
int64_t smlFactorNS[3] = {NANOSECOND_PER_MSEC, NANOSECOND_PER_USEC, 1};
int64_t smlFactorS[3] = {1000LL, 1000000LL, 1000000000LL};

void* nodeListGet(NodeList* list, const void *key, int32_t len, _equal_fn_sml fn){
wmmhello's avatar
wmmhello 已提交
28 29
  NodeList *tmp = list;
  while(tmp){
30 31 32 33 34 35 36 37
    if(fn == NULL){
      if(tmp->data.used && tmp->data.keyLen == len && memcmp(tmp->data.key, key, len) == 0) {
        return tmp->data.value;
      }
    }else{
      if(tmp->data.used && fn(tmp->data.key, key) == 0) {
        return tmp->data.value;
      }
wmmhello's avatar
wmmhello 已提交
38
    }
39

wmmhello's avatar
wmmhello 已提交
40 41 42 43 44
    tmp = tmp->next;
  }
  return NULL;
}

45
int nodeListSet(NodeList** list, const void *key, int32_t len, void* value, _equal_fn_sml fn){
wmmhello's avatar
wmmhello 已提交
46 47 48
  NodeList *tmp = *list;
  while (tmp){
    if(!tmp->data.used) break;
49 50 51 52 53 54 55 56
    if(fn == NULL){
      if(tmp->data.keyLen == len && memcmp(tmp->data.key, key, len) == 0) {
        return -1;
      }
    }else{
      if(tmp->data.keyLen == len && fn(tmp->data.key, key) == 0) {
        return -1;
      }
wmmhello's avatar
wmmhello 已提交
57
    }
58

wmmhello's avatar
wmmhello 已提交
59 60 61 62 63 64 65 66
    tmp = tmp->next;
  }
  if(tmp){
    tmp->data.key = key;
    tmp->data.keyLen = len;
    tmp->data.value = value;
    tmp->data.used = true;
  }else{
67
    NodeList *newNode = (NodeList *)taosMemoryCalloc(1, sizeof(NodeList));
wmmhello's avatar
wmmhello 已提交
68 69 70 71 72 73 74 75 76 77 78 79 80
    if(newNode == NULL){
      return -1;
    }
    newNode->data.key = key;
    newNode->data.keyLen = len;
    newNode->data.value = value;
    newNode->data.used = true;
    newNode->next = *list;
    *list = newNode;
  }
  return 0;
}

81
int nodeListSize(NodeList* list){
wmmhello's avatar
wmmhello 已提交
82 83 84 85 86 87 88 89
  int cnt = 0;
  while(list){
    if(list->data.used) cnt++;
    else break;
    list = list->next;
  }
  return cnt;
}
wmmhello's avatar
wmmhello 已提交
90

91
inline bool smlDoubleToInt64OverFlow(double num) {
X
Xiaoyu Wang 已提交
92
  if (num >= (double)INT64_MAX || num <= (double)INT64_MIN) return true;
93 94 95
  return false;
}

96
int32_t smlBuildInvalidDataMsg(SSmlMsgBuf *pBuf, const char *msg1, const char *msg2) {
97
  if (pBuf->buf) {
98 99 100 101 102 103 104
    memset(pBuf->buf, 0, pBuf->len);
    if (msg1) strncat(pBuf->buf, msg1, pBuf->len);
    int32_t left = pBuf->len - strlen(pBuf->buf);
    if (left > 2 && msg2) {
      strncat(pBuf->buf, ":", left - 1);
      strncat(pBuf->buf, msg2, left - 2);
    }
wmmhello's avatar
wmmhello 已提交
105
  }
wmmhello's avatar
wmmhello 已提交
106 107 108
  return TSDB_CODE_SML_INVALID_DATA;
}

109

110 111 112 113 114
int64_t smlGetTimeValue(const char *value, int32_t len, uint8_t fromPrecision, uint8_t toPrecision) {
  char   *endPtr = NULL;
  int64_t tsInt64 = taosStr2Int64(value, &endPtr, 10);
  if (unlikely(value + len != endPtr)) {
    return -1;
115
  }
wmmhello's avatar
wmmhello 已提交
116

117 118 119 120 121 122 123
  if(unlikely(fromPrecision >= TSDB_TIME_PRECISION_HOURS)){
    int64_t unit = smlToMilli[fromPrecision - TSDB_TIME_PRECISION_HOURS];
    if(unit > INT64_MAX / tsInt64){
      return -1;
    }
    tsInt64 *= unit;
    fromPrecision = TSDB_TIME_PRECISION_MILLI;
wmmhello's avatar
wmmhello 已提交
124
  }
wmmhello's avatar
wmmhello 已提交
125

126
  return convertTimePrecision(tsInt64, fromPrecision, toPrecision);
wmmhello's avatar
wmmhello 已提交
127 128
}

129 130 131 132 133 134 135
int8_t smlGetTsTypeByLen(int32_t len) {
  if (len == TSDB_TIME_PRECISION_SEC_DIGITS) {
    return TSDB_TIME_PRECISION_SECONDS;
  } else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) {
    return TSDB_TIME_PRECISION_MILLI;
  } else {
    return -1;
136 137 138
  }
}

139 140 141 142
SSmlTableInfo *smlBuildTableInfo(int numRows, const char* measure, int32_t measureLen) {
  SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
  if (!tag) {
    return NULL;
143 144
  }

145 146
  tag->sTableName = measure;
  tag->sTableNameLen = measureLen;
147

148 149 150 151
  tag->cols = taosArrayInit(numRows, POINTER_BYTES);
  if (tag->cols == NULL) {
    uError("SML:smlBuildTableInfo failed to allocate memory");
    goto cleanup;
152
  }
153 154 155 156 157 158 159 160 161 162 163

//  tag->tags = taosArrayInit(16, sizeof(SSmlKv));
//  if (tag->tags == NULL) {
//    uError("SML:smlBuildTableInfo failed to allocate memory");
//    goto cleanup;
//  }
  return tag;

  cleanup:
  taosMemoryFree(tag);
  return NULL;
164 165
}

166 167 168 169 170 171 172 173 174 175 176
static int32_t smlParseTableName(SArray *tags, char *childTableName) {
  size_t      childTableNameLen = strlen(tsSmlChildTableName);
  if (childTableNameLen <= 0) return TSDB_CODE_SUCCESS;

  for(int i = 0; i < taosArrayGetSize(tags); i++){
    SSmlKv *tag = (SSmlKv *)taosArrayGet(tags, i);
    // handle child table name
    if (childTableNameLen == tag->keyLen && strncmp(tag->key, tsSmlChildTableName, tag->keyLen) == 0) {
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, tag->value, (tag->length < TSDB_TABLE_NAME_LEN ? tag->length : TSDB_TABLE_NAME_LEN));
      break;
wmmhello's avatar
wmmhello 已提交
177 178
    }
  }
179

wmmhello's avatar
wmmhello 已提交
180 181 182
  return TSDB_CODE_SUCCESS;
}

183 184
int32_t smlSetCTableName(SSmlTableInfo *oneTable){
  smlParseTableName(oneTable->tags, oneTable->childTableName);
185

186 187 188 189
  if (strlen(oneTable->childTableName) == 0) {
    SArray* dst = taosArrayDup(oneTable->tags, NULL);
    RandTableName rName = {dst, oneTable->sTableName, (uint8_t)oneTable->sTableNameLen,
                           oneTable->childTableName, 0};
wmmhello's avatar
wmmhello 已提交
190

191 192 193 194 195
    buildChildTableName(&rName);
    taosArrayDestroy(dst);
    oneTable->uid = rName.uid;
  } else {
    oneTable->uid = *(uint64_t *)(oneTable->childTableName);
196
  }
197 198
  return TSDB_CODE_SUCCESS;
}
199

200 201 202 203
SSmlSTableMeta *smlBuildSTableMeta(bool isDataFormat) {
  SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
  if (!meta) {
    return NULL;
204 205
  }

206 207 208 209 210 211
  if(unlikely(!isDataFormat)){
    meta->tagHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
    if (meta->tagHash == NULL) {
      uError("SML:smlBuildSTableMeta failed to allocate memory");
      goto cleanup;
    }
wmmhello's avatar
wmmhello 已提交
212

213 214 215 216 217
    meta->colHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
    if (meta->colHash == NULL) {
      uError("SML:smlBuildSTableMeta failed to allocate memory");
      goto cleanup;
    }
wmmhello's avatar
wmmhello 已提交
218 219
  }

220 221 222 223
  meta->tags = taosArrayInit(32, sizeof(SSmlKv));
  if (meta->tags == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
224 225
  }

226 227 228 229 230 231
  meta->cols = taosArrayInit(32, sizeof(SSmlKv));
  if (meta->cols == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }
  return meta;
232

233 234 235
  cleanup:
  taosMemoryFree(meta);
  return NULL;
wmmhello's avatar
wmmhello 已提交
236 237
}

wmmhello's avatar
wmmhello 已提交
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338
//uint16_t smlCalTypeSum(char* endptr, int32_t left){
//  uint16_t sum = 0;
//  for(int i = 0; i < left; i++){
//    sum += endptr[i];
//  }
//  return sum;
//}

#define RETURN_FALSE \
smlBuildInvalidDataMsg(msg, "invalid data", pVal); \
return false;

#define SET_DOUBLE kvVal->type = TSDB_DATA_TYPE_DOUBLE;\
                   kvVal->d = result;

#define SET_FLOAT \
  if (!IS_VALID_FLOAT(result)) {\
    smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", pVal);\
    return false;\
  }\
  kvVal->type = TSDB_DATA_TYPE_FLOAT;\
  kvVal->f = (float)result;

#define SET_BIGINT \
  if (smlDoubleToInt64OverFlow(result)) {\
    errno = 0;\
    int64_t tmp = taosStr2Int64(pVal, &endptr, 10);\
    if (errno == ERANGE) {\
      smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal);\
      return false;\
    }\
    kvVal->type = TSDB_DATA_TYPE_BIGINT;\
    kvVal->i = tmp;\
    return true;\
  }\
  kvVal->type = TSDB_DATA_TYPE_BIGINT;\
  kvVal->i = (int64_t)result;

#define SET_INT \
  if (!IS_VALID_INT(result)) {\
    smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal);\
    return false;\
  }\
  kvVal->type = TSDB_DATA_TYPE_INT;\
  kvVal->i = result;

#define SET_SMALL_INT \
  if (!IS_VALID_SMALLINT(result)) {\
    smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", pVal);\
    return false;\
  }\
  kvVal->type = TSDB_DATA_TYPE_SMALLINT;\
  kvVal->i = result;

#define SET_UBIGINT \
  if (result >= (double)UINT64_MAX || result < 0) {\
    errno = 0;\
    uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10);\
    if (errno == ERANGE || result < 0) {\
      smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal);\
      return false;\
    }\
    kvVal->type = TSDB_DATA_TYPE_UBIGINT;\
    kvVal->u = tmp;\
    return true;\
  }\
  kvVal->type = TSDB_DATA_TYPE_UBIGINT;\
  kvVal->u = result;

#define SET_UINT \
  if (!IS_VALID_UINT(result)) {\
    smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", pVal);\
    return false;\
  }\
  kvVal->type = TSDB_DATA_TYPE_UINT;\
  kvVal->u = result;

#define SET_USMALL_INT \
  if (!IS_VALID_USMALLINT(result)) {\
    smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", pVal);\
    return false;\
  }\
  kvVal->type = TSDB_DATA_TYPE_USMALLINT;\
  kvVal->u = result;

#define SET_TINYINT \
  if (!IS_VALID_TINYINT(result)) { \
    smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", pVal);\
    return false;\
  }\
  kvVal->type = TSDB_DATA_TYPE_TINYINT;\
  kvVal->i = result;

#define SET_UTINYINT \
  if (!IS_VALID_UTINYINT(result)) {\
    smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", pVal);\
    return false;\
  }\
  kvVal->type = TSDB_DATA_TYPE_UTINYINT;\
  kvVal->u = result;

339
bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413
  const char *pVal = kvVal->value;
  int32_t     len = kvVal->length;
  char *      endptr = NULL;
  double      result = taosStr2Double(pVal, &endptr);
  if (pVal == endptr) {
    RETURN_FALSE
  }

  int32_t left = len - (endptr - pVal);
  if (left == 0) {
    SET_DOUBLE
  } else if (left == 3) {
    if(endptr[0] == 'f' || endptr[0] == 'F'){
      if(endptr[1] == '6' && endptr[2] == '4'){
        SET_DOUBLE
      }else if(endptr[1] == '3' && endptr[2] == '2'){
        SET_FLOAT
      }else{
        RETURN_FALSE
      }
    }else if(endptr[0] == 'i' || endptr[0] == 'I'){
      if(endptr[1] == '6' && endptr[2] == '4'){
        SET_BIGINT
      }else if(endptr[1] == '3' && endptr[2] == '2'){
        SET_INT
      }else if(endptr[1] == '1' && endptr[2] == '6'){
        SET_SMALL_INT
      }else{
        RETURN_FALSE
      }
    }else if(endptr[0] == 'u' || endptr[0] == 'U'){
      if(endptr[1] == '6' && endptr[2] == '4'){
        SET_UBIGINT
      }else if(endptr[1] == '3' && endptr[2] == '2'){
        SET_UINT
      }else if(endptr[1] == '1' && endptr[2] == '6'){
        SET_USMALL_INT
      }else{
        RETURN_FALSE
      }
    }else{
      RETURN_FALSE
    }
  } else if(left == 2){
    if(endptr[0] == 'i' || endptr[0] == 'I'){
      if(endptr[1] == '8') {
        SET_TINYINT
      }else{
        RETURN_FALSE
      }
    }else if(endptr[0] == 'u' || endptr[0] == 'U') {
      if (endptr[1] == '8') {
        SET_UTINYINT
      } else {
        RETURN_FALSE
      }
    }else{
      RETURN_FALSE
    }
  } else if(left == 1){
    if(endptr[0] == 'i' || endptr[0] == 'I'){
      SET_BIGINT
    }else if(endptr[0] == 'u' || endptr[0] == 'U') {
      SET_UBIGINT
    }else{
      RETURN_FALSE
    }
  } else {
    RETURN_FALSE;
  }
  return true;
}

bool smlParseNumberOld(SSmlKv *kvVal, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
414
  const char *pVal = kvVal->value;
X
Xiaoyu Wang 已提交
415 416 417 418
  int32_t     len = kvVal->length;
  char       *endptr = NULL;
  double      result = taosStr2Double(pVal, &endptr);
  if (pVal == endptr) {
419
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
420 421 422
    return false;
  }

423
  int32_t left = len - (endptr - pVal);
X
Xiaoyu Wang 已提交
424
  if (left == 0 || (left == 3 && strncasecmp(endptr, "f64", left) == 0)) {
425 426
    kvVal->type = TSDB_DATA_TYPE_DOUBLE;
    kvVal->d = result;
X
Xiaoyu Wang 已提交
427 428
  } else if ((left == 3 && strncasecmp(endptr, "f32", left) == 0)) {
    if (!IS_VALID_FLOAT(result)) {
429 430
      smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
431
    }
432 433
    kvVal->type = TSDB_DATA_TYPE_FLOAT;
    kvVal->f = (float)result;
X
Xiaoyu Wang 已提交
434 435
  } else if ((left == 1 && *endptr == 'i') || (left == 3 && strncasecmp(endptr, "i64", left) == 0)) {
    if (smlDoubleToInt64OverFlow(result)) {
wmmhello's avatar
wmmhello 已提交
436 437
      errno = 0;
      int64_t tmp = taosStr2Int64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
438
      if (errno == ERANGE) {
wmmhello's avatar
wmmhello 已提交
439 440 441 442 443 444
        smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_BIGINT;
      kvVal->i = tmp;
      return true;
wmmhello's avatar
wmmhello 已提交
445
    }
446
    kvVal->type = TSDB_DATA_TYPE_BIGINT;
wmmhello's avatar
wmmhello 已提交
447
    kvVal->i = (int64_t)result;
wmmhello's avatar
wmmhello 已提交
448
  } else if ((left == 1 && *endptr == 'u') || (left == 3 && strncasecmp(endptr, "u64", left) == 0)) {
X
Xiaoyu Wang 已提交
449
    if (result >= (double)UINT64_MAX || result < 0) {
wmmhello's avatar
wmmhello 已提交
450 451
      errno = 0;
      uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
452
      if (errno == ERANGE || result < 0) {
wmmhello's avatar
wmmhello 已提交
453 454 455 456 457 458
        smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_UBIGINT;
      kvVal->u = tmp;
      return true;
459
    }
460
    kvVal->type = TSDB_DATA_TYPE_UBIGINT;
wmmhello's avatar
wmmhello 已提交
461
    kvVal->u = result;
X
Xiaoyu Wang 已提交
462 463
  } else if (left == 3 && strncasecmp(endptr, "i32", left) == 0) {
    if (!IS_VALID_INT(result)) {
464 465
      smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
466
    }
467 468
    kvVal->type = TSDB_DATA_TYPE_INT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
469 470
  } else if (left == 3 && strncasecmp(endptr, "u32", left) == 0) {
    if (!IS_VALID_UINT(result)) {
471 472
      smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
473
    }
474 475
    kvVal->type = TSDB_DATA_TYPE_UINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
476 477
  } else if (left == 3 && strncasecmp(endptr, "i16", left) == 0) {
    if (!IS_VALID_SMALLINT(result)) {
478 479
      smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
480
    }
481 482
    kvVal->type = TSDB_DATA_TYPE_SMALLINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
483 484
  } else if (left == 3 && strncasecmp(endptr, "u16", left) == 0) {
    if (!IS_VALID_USMALLINT(result)) {
485 486
      smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
487
    }
488 489
    kvVal->type = TSDB_DATA_TYPE_USMALLINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
490 491
  } else if (left == 2 && strncasecmp(endptr, "i8", left) == 0) {
    if (!IS_VALID_TINYINT(result)) {
492 493
      smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
494
    }
495 496
    kvVal->type = TSDB_DATA_TYPE_TINYINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
497 498
  } else if (left == 2 && strncasecmp(endptr, "u8", left) == 0) {
    if (!IS_VALID_UTINYINT(result)) {
499 500
      smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
501
    }
502 503
    kvVal->type = TSDB_DATA_TYPE_UTINYINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
504
  } else {
505
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
506 507
    return false;
  }
508
  return true;
wmmhello's avatar
wmmhello 已提交
509 510
}

511 512
STableMeta* smlGetMeta(SSmlHandle *info, const void* measure, int32_t measureLen){
  STableMeta *pTableMeta = NULL;
wmmhello's avatar
wmmhello 已提交
513

514 515
  SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
  tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
wmmhello's avatar
wmmhello 已提交
516

517 518 519 520 521 522 523
  SRequestConnInfo conn = {0};
  conn.pTrans = info->taos->pAppInfo->pTransporter;
  conn.requestId = info->pRequest->requestId;
  conn.requestObjRefId = info->pRequest->self;
  conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
  memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
  memcpy(pName.tname, measure, measureLen);
wmmhello's avatar
wmmhello 已提交
524

525 526
  catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
  return pTableMeta;
wmmhello's avatar
wmmhello 已提交
527
}
wmmhello's avatar
wmmhello 已提交
528

529

530 531
static int64_t smlGenId() {
  static volatile int64_t linesSmlHandleId = 0;
wmmhello's avatar
wmmhello 已提交
532

533 534 535 536
  int64_t id = 0;
  do {
    id = atomic_add_fetch_64(&linesSmlHandleId, 1);
  } while (id == 0);
537

538
  return id;
539 540
}

541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566
static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSmlKv *kv, bool isTag,
                                       ESchemaAction *action, SSmlHandle *info) {
  uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL;
  if (index) {
    if (colField[*index].type != kv->type) {
      uError("SML:0x%" PRIx64 " point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id,
             kv->key, colField[*index].type, kv->type);
      return TSDB_CODE_TSC_INVALID_VALUE;
    }

    if ((colField[*index].type == TSDB_DATA_TYPE_VARCHAR &&
         (colField[*index].bytes - VARSTR_HEADER_SIZE) < kv->length) ||
        (colField[*index].type == TSDB_DATA_TYPE_NCHAR &&
         ((colField[*index].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE < kv->length))) {
      if (isTag) {
        *action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
      } else {
        *action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
      }
    }
  } else {
    if (isTag) {
      *action = SCHEMA_ACTION_ADD_TAG;
    } else {
      *action = SCHEMA_ACTION_ADD_COLUMN;
    }
wmmhello's avatar
wmmhello 已提交
567
  }
568
  return 0;
569 570
}

571 572 573 574
static int32_t smlFindNearestPowerOf2(int32_t length, uint8_t type) {
  int32_t result = 1;
  while (result <= length) {
    result *= 2;
575
  }
576 577 578 579
  if (type == TSDB_DATA_TYPE_BINARY && result > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
    result = TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE;
  } else if (type == TSDB_DATA_TYPE_NCHAR && result > (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
    result = (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
580
  }
581 582 583 584 585

  if (type == TSDB_DATA_TYPE_NCHAR) {
    result = result * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
  } else if (type == TSDB_DATA_TYPE_BINARY) {
    result = result + VARSTR_HEADER_SIZE;
586
  }
587
  return result;
588 589
}

590 591 592 593 594 595 596 597 598 599
static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
                                      ESchemaAction *action, bool isTag) {
  int32_t code = TSDB_CODE_SUCCESS;
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
    if (j == 0 && !isTag) continue;
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j);
    code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, info);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
600
  }
601
  return TSDB_CODE_SUCCESS;
602 603
}

604 605 606 607 608
static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool isTag) {
  SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
  int32_t   i = 0;
  for (; i < length; i++) {
    taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES);
609 610
  }

611 612 613 614
  if (isTag) {
    i = 0;
  } else {
    i = 1;
615
  }
616 617 618 619 620 621
  for (; i < taosArrayGetSize(cols); i++) {
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
    if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) {
      taosHashCleanup(hashTmp);
      return -1;
    }
622
  }
623 624 625
  taosHashCleanup(hashTmp);
  return 0;
}
626

627 628 629 630 631 632
static int32_t getBytes(uint8_t type, int32_t length) {
  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
    return smlFindNearestPowerOf2(length, type);
  } else {
    return tDataTypes[type].bytes;
  }
633 634
}

635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652
static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
                                  SArray *results, int32_t numOfCols, bool isTag) {
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
    SSmlKv       *kv = (SSmlKv *)taosArrayGet(cols, j);
    ESchemaAction action = SCHEMA_ACTION_NULL;
    smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, &action, info);
    if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_ADD_TAG) {
      SField field = {0};
      field.type = kv->type;
      field.bytes = getBytes(kv->type, kv->length);
      memcpy(field.name, kv->key, kv->keyLen);
      taosArrayPush(results, &field);
    } else if (action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
      uint16_t *index = (uint16_t *)taosHashGet(schemaHash, kv->key, kv->keyLen);
      uint16_t  newIndex = *index;
      if (isTag) newIndex -= numOfCols;
      SField *field = (SField *)taosArrayGet(results, newIndex);
      field->bytes = getBytes(kv->type, kv->length);
653 654 655 656 657
    }
  }
  return TSDB_CODE_SUCCESS;
}

658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674
// static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SSmlSTableMeta *sTableData,
//                               int32_t colVer, int32_t tagVer, int8_t source, uint64_t suid){
static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, SArray *pTags, STableMeta *pTableMeta,
                              ESchemaAction action) {
  SRequestObj   *pRequest = NULL;
  SMCreateStbReq pReq = {0};
  int32_t        code = TSDB_CODE_SUCCESS;
  SCmdMsgInfo    pCmdMsg = {0};

  // put front for free
  pReq.numOfColumns = taosArrayGetSize(pColumns);
  pReq.pColumns = pColumns;
  pReq.numOfTags = taosArrayGetSize(pTags);
  pReq.pTags = pTags;

  code = buildRequest(info->taos->id, "", 0, NULL, false, &pRequest, 0);
  if (code != TSDB_CODE_SUCCESS) {
675 676
    goto end;
  }
677 678 679 680

  pRequest->syncQuery = true;
  if (!pRequest->pDb) {
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
681 682 683
    goto end;
  }

684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699
  if (action == SCHEMA_ACTION_CREATE_STABLE) {
    pReq.colVer = 1;
    pReq.tagVer = 1;
    pReq.suid = 0;
    pReq.source = TD_REQ_FROM_APP;
  } else if (action == SCHEMA_ACTION_ADD_TAG || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
    pReq.colVer = pTableMeta->sversion;
    pReq.tagVer = pTableMeta->tversion + 1;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
  } else if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE) {
    pReq.colVer = pTableMeta->sversion + 1;
    pReq.tagVer = pTableMeta->tversion;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
  }
700

701 702 703 704 705 706 707
  if (pReq.numOfTags == 0) {
    pReq.numOfTags = 1;
    SField field = {0};
    field.type = TSDB_DATA_TYPE_NCHAR;
    field.bytes = 1;
    strcpy(field.name, tsSmlTagName);
    taosArrayPush(pReq.pTags, &field);
708
  }
wmmhello's avatar
wmmhello 已提交
709

710 711 712
  pReq.commentLen = -1;
  pReq.igExists = true;
  tNameExtractFullName(pName, pReq.name);
713

714 715 716 717 718 719 720 721 722
  pCmdMsg.epSet = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);
723

724 725 726 727 728 729
  SQuery pQuery;
  memset(&pQuery, 0, sizeof(pQuery));
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;
730

731
  launchQueryImpl(pRequest, &pQuery, true, NULL);
732

733 734
  if (pRequest->code == TSDB_CODE_SUCCESS) {
    catalogRemoveTableMeta(info->pCatalog, pName);
735
  }
736 737
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);
738

739 740 741 742 743
  end:
  destroyRequest(pRequest);
  tFreeSMCreateStbReq(&pReq);
  return code;
}
744

745 746 747
static int32_t smlModifyDBSchemas(SSmlHandle *info) {
  if(info->dataFormat && !info->needModifySchema){
    return TSDB_CODE_SUCCESS;
748
  }
749 750 751
  int32_t     code = 0;
  SHashObj   *hashTmp = NULL;
  STableMeta *pTableMeta = NULL;
752 753 754 755 756 757 758 759 760

  SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
  tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));

  SRequestConnInfo conn = {0};
  conn.pTrans = info->taos->pAppInfo->pTransporter;
  conn.requestId = info->pRequest->requestId;
  conn.requestObjRefId = info->pRequest->self;
  conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
761

762 763 764 765
  NodeList *tmp = info->superTables;
  while (tmp) {
    SSmlSTableMeta *sTableData = (SSmlSTableMeta *)tmp->data.value;
    bool            needCheckMeta = false;  // for multi thread
766

767 768 769 770
    size_t superTableLen = (size_t)tmp->data.keyLen;
    const void  *superTable = tmp->data.key;
    memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
    memcpy(pName.tname, superTable, superTableLen);
771

772
    code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
773

774 775 776 777 778
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
      SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
      SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
      smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
      smlBuildFieldsList(info, NULL, NULL, sTableData->cols, pColumns, 0, false);
779

780 781 782 783
      code = smlSendMetaMsg(info, &pName, pColumns, pTags, NULL, SCHEMA_ACTION_CREATE_STABLE);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
        goto end;
784
      }
785 786
      info->cost.numOfCreateSTables++;
      taosMemoryFreeClear(pTableMeta);
787

788 789 790 791
      code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
        goto end;
792
      }
793 794 795 796 797 798
    } else if (code == TSDB_CODE_SUCCESS) {
      hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true,
                             HASH_NO_LOCK);
      for (uint16_t i = pTableMeta->tableInfo.numOfColumns;
           i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
799
      }
800

801 802 803 804
      ESchemaAction action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, &action, true);
      if (code != TSDB_CODE_SUCCESS) {
        goto end;
805
      }
806 807 808 809 810
      if (action != SCHEMA_ACTION_NULL) {
        SArray *pColumns =
            taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray *pTags =
            taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
811

812 813 814 815 816 817 818 819 820
        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
          if (i < pTableMeta->tableInfo.numOfColumns) {
            taosArrayPush(pColumns, &field);
          } else {
            taosArrayPush(pTags, &field);
821 822
          }
        }
823 824
        smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->tags, pTags,
                           pTableMeta->tableInfo.numOfColumns, true);
825

826 827 828 829
        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
          goto end;
830
        }
831

832 833 834 835 836 837 838 839 840
        info->cost.numOfAlterTagSTables++;
        taosMemoryFreeClear(pTableMeta);
        code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
        }
        code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
wmmhello's avatar
wmmhello 已提交
841
        }
842 843
      }

844 845 846
      taosHashClear(hashTmp);
      for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
847
      }
848 849 850 851
      action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &action, false);
      if (code != TSDB_CODE_SUCCESS) {
        goto end;
852
      }
853 854 855 856 857
      if (action != SCHEMA_ACTION_NULL) {
        SArray *pColumns =
            taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray *pTags =
            taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
858

859 860 861 862 863 864 865 866 867 868
        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
          if (i < pTableMeta->tableInfo.numOfColumns) {
            taosArrayPush(pColumns, &field);
          } else {
            taosArrayPush(pTags, &field);
          }
869 870
        }

871 872
        smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->cols, pColumns,
                           pTableMeta->tableInfo.numOfColumns, false);
873

874 875 876 877
        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
          goto end;
878
        }
879

880 881 882 883 884 885 886 887 888 889
        info->cost.numOfAlterColSTables++;
        taosMemoryFreeClear(pTableMeta);
        code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
        }
        code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
          goto end;
890 891
        }
      }
wmmhello's avatar
wmmhello 已提交
892

893 894 895
      needCheckMeta = true;
      taosHashCleanup(hashTmp);
      hashTmp = NULL;
X
Xiaoyu Wang 已提交
896
    } else {
897 898
      uError("SML:0x%" PRIx64 " load table meta error: %s", info->id, tstrerror(code));
      goto end;
wmmhello's avatar
wmmhello 已提交
899
    }
900

901 902 903 904 905 906 907 908 909 910 911
    if (needCheckMeta) {
      code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags,
                          sTableData->tags, true);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " check tag failed. super table name %s", info->id, pName.tname);
        goto end;
      }
      code = smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols, false);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " check cols failed. super table name %s", info->id, pName.tname);
        goto end;
912 913
      }
    }
914 915 916 917

    sTableData->tableMeta = pTableMeta;

    tmp = tmp->next;
wmmhello's avatar
wmmhello 已提交
918
  }
919
  return 0;
920

921 922 923 924 925
  end:
  taosHashCleanup(hashTmp);
  taosMemoryFreeClear(pTableMeta);
//  catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
  return code;
wmmhello's avatar
wmmhello 已提交
926 927 928
}


929 930 931 932 933 934 935
/*
static int32_t smlCheckDupUnit(SHashObj *dumplicateKey, SArray *tags, SSmlMsgBuf *msg){
  for(int i = 0; i < taosArrayGetSize(tags); i++) {
    SSmlKv *tag = taosArrayGet(tags, i);
    if (smlCheckDuplicateKey(tag->key, tag->keyLen, dumplicateKey)) {
      smlBuildInvalidDataMsg(msg, "dumplicate key", tag->key);
      return TSDB_CODE_TSC_DUP_NAMES;
936
    }
wmmhello's avatar
wmmhello 已提交
937
  }
938 939 940
  return TSDB_CODE_SUCCESS;
}

941 942 943 944 945
static int32_t smlJudgeDupColName(SArray *cols, SArray *tags, SSmlMsgBuf *msg) {
  SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  int ret = smlCheckDupUnit(dumplicateKey, cols, msg);
  if(ret != TSDB_CODE_SUCCESS){
    goto end;
946
  }
947 948 949
  ret = smlCheckDupUnit(dumplicateKey, tags, msg);
  if(ret != TSDB_CODE_SUCCESS){
    goto end;
950
  }
951

952 953 954
  end:
  taosHashCleanup(dumplicateKey);
  return ret;
wmmhello's avatar
wmmhello 已提交
955
}
956
*/
wmmhello's avatar
wmmhello 已提交
957

958 959 960 961 962 963 964
static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols){
  for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
    int ret = taosHashPut(metaHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
    if(ret == 0){
      taosArrayPush(metaArray, kv);
    }
965
  }
966
}
wmmhello's avatar
wmmhello 已提交
967

968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986
static void smlDestroySTableMeta(SSmlSTableMeta *meta) {
  taosHashCleanup(meta->tagHash);
  taosHashCleanup(meta->colHash);
  taosArrayDestroy(meta->tags);
  taosArrayDestroy(meta->cols);
  taosMemoryFree(meta->tableMeta);
  taosMemoryFree(meta);
}

static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, bool isTag, SSmlMsgBuf *msg) {
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);

    int16_t *index = (int16_t *)taosHashGet(metaHash, kv->key, kv->keyLen);
    if (index) {
      SSmlKv *value = (SSmlKv *)taosArrayGet(metaArray, *index);
      if (isTag){
        if (kv->length > value->length) {
          value->length = kv->length;
987
        }
988 989 990 991 992
        continue;
      }
      if (kv->type != value->type) {
        smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
        return TSDB_CODE_SML_NOT_SAME_TYPE;
993 994
      }

995 996 997 998 999 1000 1001 1002 1003 1004
      if (IS_VAR_DATA_TYPE(kv->type) && (kv->length > value->length)) {  // update string len, if bigger
        value->length = kv->length;
      }
    } else {
      size_t tmp = taosArrayGetSize(metaArray);
      ASSERT(tmp <= INT16_MAX);
      int16_t size = tmp;
      int ret = taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
      if(ret == 0){
        taosArrayPush(metaArray, kv);
1005 1006 1007 1008
      }
    }
  }

1009 1010
  return TSDB_CODE_SUCCESS;
}
1011

1012 1013 1014 1015 1016
static void smlDestroyTableInfo(SSmlTableInfo *tag) {
  for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
    SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i);
    taosHashCleanup(kvHash);
  }
1017

1018 1019 1020 1021 1022
  taosMemoryFree(tag->key);
  taosArrayDestroy(tag->cols);
  taosArrayDestroy(tag->tags);
  taosMemoryFree(tag);
}
1023

wmmhello's avatar
wmmhello 已提交
1024
void smlDestroyInfo(SSmlHandle *info) {
1025 1026
  if (!info) return;
  qDestroyQuery(info->pQuery);
1027

1028 1029 1030 1031 1032
  // destroy info->childTables
  NodeList* tmp = info->childTables;
  while (tmp) {
    if(tmp->data.used) {
      smlDestroyTableInfo((SSmlTableInfo*)tmp->data.value);
1033
    }
1034 1035 1036
    NodeList* t = tmp->next;
    taosMemoryFree(tmp);
    tmp = t;
1037 1038
  }

1039 1040 1041 1042 1043
  // destroy info->superTables
  tmp = info->superTables;
  while (tmp) {
    if(tmp->data.used) {
      smlDestroySTableMeta((SSmlSTableMeta*)tmp->data.value);
1044
    }
1045 1046 1047
    NodeList* t = tmp->next;
    taosMemoryFree(tmp);
    tmp = t;
1048
  }
1049

1050 1051
  // destroy info->pVgHash
  taosHashCleanup(info->pVgHash);
wmmhello's avatar
wmmhello 已提交
1052

1053 1054
  taosArrayDestroy(info->preLineTagKV);
  taosArrayDestroy(info->preLineColKV);
wmmhello's avatar
wmmhello 已提交
1055

1056 1057 1058 1059 1060
  if(!info->dataFormat){
    for(int i = 0; i < info->lineNum; i++){
      taosArrayDestroy(info->lines[i].colArray);
    }
    taosMemoryFree(info->lines);
wmmhello's avatar
wmmhello 已提交
1061
  }
1062

1063 1064
  taosMemoryFreeClear(info);
}
wmmhello's avatar
wmmhello 已提交
1065

wmmhello's avatar
wmmhello 已提交
1066
SSmlHandle *smlBuildSmlInfo(TAOS *taos) {
1067 1068 1069 1070
  int32_t     code = TSDB_CODE_SUCCESS;
  SSmlHandle *info = (SSmlHandle *)taosMemoryCalloc(1, sizeof(SSmlHandle));
  if (NULL == info) {
    return NULL;
1071
  }
1072 1073 1074 1075 1076 1077 1078
  if(taos != NULL){
    info->taos = acquireTscObj(*(int64_t *)taos);
    code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code);
      goto cleanup;
    }
1079
  }
wmmhello's avatar
wmmhello 已提交
1080

1081 1082 1083 1084
  info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
  info->id = smlGenId();
  info->pQuery = smlInitHandle();
  info->dataFormat = true;
1085

1086 1087 1088 1089 1090 1091
  info->preLineTagKV = taosArrayInit(8, sizeof(SSmlKv));
  info->preLineColKV = taosArrayInit(8, sizeof(SSmlKv));

  if (NULL == info->pVgHash) {
    uError("create SSmlHandle failed");
    goto cleanup;
1092 1093
  }

1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109
  return info;

cleanup:
  smlDestroyInfo(info);
  return NULL;
}

static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
  SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (!kvHash) {
    uError("SML:smlDealCols failed to allocate memory");
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
    taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
1110 1111
  }

1112
  taosArrayPush(colsArray, &kvHash);
1113
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1114
}
1115

1116 1117 1118
static int32_t smlParseLineBottom(SSmlHandle *info) {
  if(info->dataFormat) return TSDB_CODE_SUCCESS;

wmmhello's avatar
wmmhello 已提交
1119 1120
  for(int32_t i = 0; i < info->lineNum; i ++){
    SSmlLineInfo* elements = info->lines + i;
1121
    SSmlTableInfo *tinfo = NULL;
1122
    if(info->protocol == TSDB_SML_LINE_PROTOCOL){
1123
      tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements->measure, elements->measureTagsLen, NULL);
1124 1125
    }else if(info->protocol == TSDB_SML_TELNET_PROTOCOL){
      tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements, POINTER_BYTES, is_same_child_table_telnet);
1126
    }else{
wmmhello's avatar
wmmhello 已提交
1127
      tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements, POINTER_BYTES, is_same_child_table_telnet);
1128 1129
    }

wmmhello's avatar
wmmhello 已提交
1130
    if(tinfo == NULL){
1131 1132 1133
      uError("SML:0x%" PRIx64 "get oneTable failed, line num:%d", info->id, i);
      smlBuildInvalidDataMsg(&info->msgBuf, "get oneTable failed", elements->measure);
      return TSDB_CODE_SML_INVALID_DATA;
1134
    }
wmmhello's avatar
wmmhello 已提交
1135

1136 1137 1138 1139 1140
    if (taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS) {
      smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
      return TSDB_CODE_PAR_INVALID_TAGS_NUM;
    }

1141 1142 1143
    if (taosArrayGetSize(elements->colArray) + taosArrayGetSize(tinfo->tags) > TSDB_MAX_COLUMNS) {
      smlBuildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL);
      return TSDB_CODE_PAR_TOO_MANY_COLUMNS;
wmmhello's avatar
wmmhello 已提交
1144
    }
wmmhello's avatar
wmmhello 已提交
1145

1146
    int ret = smlPushCols(tinfo->cols, elements->colArray);
X
Xiaoyu Wang 已提交
1147
    if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1148 1149 1150
      return ret;
    }

1151
    SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
1152
    if (tableMeta) {  // update meta
wmmhello's avatar
wmmhello 已提交
1153
      ret = smlUpdateMeta(tableMeta->colHash, tableMeta->cols, elements->colArray, false, &info->msgBuf);
1154
      if (ret == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1155
        ret = smlUpdateMeta(tableMeta->tagHash, tableMeta->tags, tinfo->tags, true, &info->msgBuf);
1156 1157 1158 1159 1160
      }
      if (ret != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
        return ret;
      }
X
Xiaoyu Wang 已提交
1161
    } else {
1162 1163 1164 1165 1166
//      ret = smlJudgeDupColName(elements->colArray, tinfo->tags, &info->msgBuf);
//      if (ret != TSDB_CODE_SUCCESS) {
//        uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
//        return ret;
//      }
wmmhello's avatar
wmmhello 已提交
1167

1168 1169 1170
      SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat);
      smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags);
      smlInsertMeta(meta->colHash, meta->cols, elements->colArray);
1171
      nodeListSet(&info->superTables, elements->measure, elements->measureLen, meta, NULL);
wmmhello's avatar
wmmhello 已提交
1172
    }
wmmhello's avatar
wmmhello 已提交
1173
  }
1174

wmmhello's avatar
wmmhello 已提交
1175 1176 1177
  return TSDB_CODE_SUCCESS;
}

1178

X
Xiaoyu Wang 已提交
1179
static int32_t smlInsertData(SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
1180 1181
  int32_t code = TSDB_CODE_SUCCESS;

wmmhello's avatar
wmmhello 已提交
1182 1183 1184
  NodeList* tmp = info->childTables;
  while (tmp) {
    SSmlTableInfo *tableData = (SSmlTableInfo *)tmp->data.value;
wmmhello's avatar
wmmhello 已提交
1185 1186

    SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
wmmhello's avatar
wmmhello 已提交
1187
    tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
wmmhello's avatar
wmmhello 已提交
1188
    memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName));
D
dapan1121 已提交
1189 1190 1191 1192 1193 1194

    SRequestConnInfo conn = {0};
    conn.pTrans = info->taos->pAppInfo->pTransporter;
    conn.requestId = info->pRequest->requestId;
    conn.requestObjRefId = info->pRequest->self;
    conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
X
Xiaoyu Wang 已提交
1195

wmmhello's avatar
wmmhello 已提交
1196
    SVgroupInfo vg;
D
dapan1121 已提交
1197
    code = catalogGetTableHashVgroup(info->pCatalog, &conn, &pName, &vg);
1198
    if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
1199
      uError("SML:0x%" PRIx64 " catalogGetTableHashVgroup failed. table name: %s", info->id, tableData->childTableName);
wmmhello's avatar
wmmhello 已提交
1200 1201
      return code;
    }
X
Xiaoyu Wang 已提交
1202
    taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
wmmhello's avatar
wmmhello 已提交
1203

wmmhello's avatar
wmmhello 已提交
1204
    SSmlSTableMeta *pMeta =
1205
        (SSmlSTableMeta *)nodeListGet(info->superTables, tableData->sTableName, tableData->sTableNameLen, NULL);
wmmhello's avatar
wmmhello 已提交
1206
    ASSERT(NULL != pMeta);
wmmhello's avatar
wmmhello 已提交
1207

1208
    // use tablemeta of stable to save vgid and uid of child table
wmmhello's avatar
wmmhello 已提交
1209 1210
    pMeta->tableMeta->vgId = vg.vgId;
    pMeta->tableMeta->uid = tableData->uid;  // one table merge data block together according uid
wmmhello's avatar
wmmhello 已提交
1211

wmmhello's avatar
wmmhello 已提交
1212 1213
    code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, pMeta->cols, tableData->cols,
                       pMeta->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen,
1214
                       info->ttl, info->msgBuf.buf, info->msgBuf.len);
X
Xiaoyu Wang 已提交
1215 1216
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " smlBindData failed", info->id);
wmmhello's avatar
wmmhello 已提交
1217 1218
      return code;
    }
wmmhello's avatar
wmmhello 已提交
1219
    tmp = tmp->next;
wmmhello's avatar
wmmhello 已提交
1220
  }
wmmhello's avatar
wmmhello 已提交
1221

wmmhello's avatar
wmmhello 已提交
1222
  code = smlBuildOutput(info->pQuery, info->pVgHash);
1223
  if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
1224
    uError("SML:0x%" PRIx64 " smlBuildOutput failed", info->id);
1225 1226
    return code;
  }
1227 1228
  info->cost.insertRpcTime = taosGetTimestampUs();

1229 1230 1231
  SAppClusterSummary *pActivity = &info->taos->pAppInfo->summary;
  atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);

wmmhello's avatar
wmmhello 已提交
1232 1233
  launchQueryImpl(info->pRequest, info->pQuery, true, NULL);
  return info->pRequest->code;
wmmhello's avatar
wmmhello 已提交
1234 1235
}

X
Xiaoyu Wang 已提交
1236 1237
static void smlPrintStatisticInfo(SSmlHandle *info) {
  uError("SML:0x%" PRIx64
1238
             " smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \
X
Xiaoyu Wang 已提交
1239
        parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64
1240
             "",
X
Xiaoyu Wang 已提交
1241
         info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables,
1242 1243
         info->cost.numOfCreateSTables, info->cost.numOfAlterTagSTables, info->cost.numOfAlterColSTables,
         info->cost.schemaTime - info->cost.parseTime,
X
Xiaoyu Wang 已提交
1244 1245
         info->cost.insertBindTime - info->cost.schemaTime, info->cost.insertRpcTime - info->cost.insertBindTime,
         info->cost.endTime - info->cost.insertRpcTime, info->cost.endTime - info->cost.parseTime);
1246 1247
}

1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285
int32_t smlClearForRerun(SSmlHandle *info){
  info->reRun = false;
  // clear info->childTables
  NodeList* pList = info->childTables;
  while (pList) {
    if(pList->data.used) {
      smlDestroyTableInfo((SSmlTableInfo*)pList->data.value);
      pList->data.used = false;
    }
    pList = pList->next;
  }

  // clear info->superTables
  pList = info->superTables;
  while (pList) {
    if(pList->data.used) {
      smlDestroySTableMeta((SSmlSTableMeta*)pList->data.value);
      pList->data.used = false;
    }
    pList = pList->next;
  }

  if(unlikely(info->lines != NULL)){
    uError("SML:0x%" PRIx64 " info->lines != NULL", info->id);
    return TSDB_CODE_SML_INVALID_DATA;
  }
  info->lines = (SSmlLineInfo*)taosMemoryCalloc(info->lineNum, sizeof(SSmlLineInfo));

  memset(&info->preLine, 0, sizeof(SSmlLineInfo));
  info->currSTableMeta = NULL;
  info->currTableDataCtx = NULL;

  SVnodeModifOpStmt* stmt= (SVnodeModifOpStmt*)(info->pQuery->pRoot);
  stmt->freeHashFunc(stmt->pTableBlockHashObj);
  stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1286
static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
wmmhello's avatar
wmmhello 已提交
1287
  int32_t code = TSDB_CODE_SUCCESS;
1288
  if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
dengyihao's avatar
dengyihao 已提交
1289
    if (lines) {
wmmhello's avatar
wmmhello 已提交
1290
      code = smlParseJSON(info, *lines);
dengyihao's avatar
dengyihao 已提交
1291
    } else if (rawLine) {
wmmhello's avatar
wmmhello 已提交
1292 1293
      code = smlParseJSON(info, rawLine);
    }
1294
    if (code != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
1295
      uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, lines ? *lines : rawLine);
1296 1297
      return code;
    }
wmmhello's avatar
wmmhello 已提交
1298
    return code;
wmmhello's avatar
wmmhello 已提交
1299
  }
wmmhello's avatar
wmmhello 已提交
1300

1301
  char *oldRaw = rawLine;
1302 1303
  int32_t i = 0;
  while (i < numLines) {
wmmhello's avatar
wmmhello 已提交
1304
    char *tmp = NULL;
dengyihao's avatar
dengyihao 已提交
1305 1306
    int   len = 0;
    if (lines) {
wmmhello's avatar
wmmhello 已提交
1307 1308
      tmp = lines[i];
      len = strlen(tmp);
dengyihao's avatar
dengyihao 已提交
1309
    } else if (rawLine) {
wmmhello's avatar
wmmhello 已提交
1310
      tmp = rawLine;
dengyihao's avatar
dengyihao 已提交
1311 1312
      while (rawLine < rawLineEnd) {
        if (*(rawLine++) == '\n') {
wmmhello's avatar
wmmhello 已提交
1313 1314 1315 1316
          break;
        }
        len++;
      }
dengyihao's avatar
dengyihao 已提交
1317
      if (info->protocol == TSDB_SML_LINE_PROTOCOL && tmp[0] == '#') {  // this line is comment
wmmhello's avatar
wmmhello 已提交
1318 1319
        continue;
      }
wmmhello's avatar
wmmhello 已提交
1320 1321
    }

1322 1323
    uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, (info->isRawLine ? "rawdata" : tmp));

X
Xiaoyu Wang 已提交
1324
    if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
wmmhello's avatar
wmmhello 已提交
1325 1326 1327 1328 1329 1330
      if(info->dataFormat){
        SSmlLineInfo element = {0};
        code = smlParseInfluxString(info, tmp, tmp + len, &element);
      }else{
        code = smlParseInfluxString(info, tmp, tmp + len, info->lines + i);
      }
X
Xiaoyu Wang 已提交
1331
    } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
1332 1333 1334 1335 1336 1337 1338
      if(info->dataFormat) {
        SSmlLineInfo element = {0};
        code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, &element);
      }else{
        code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, info->lines + i);
      }

X
Xiaoyu Wang 已提交
1339
    } else {
1340 1341
      ASSERT(0);
    }
wmmhello's avatar
wmmhello 已提交
1342
    if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1343
      uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, tmp);
1344
      return code;
wmmhello's avatar
wmmhello 已提交
1345
    }
1346 1347
    if(info->reRun){
      i = 0;
1348
      rawLine = oldRaw;
1349 1350 1351
      code = smlClearForRerun(info);
      if(code != TSDB_CODE_SUCCESS){
        return code;
1352
      }
1353
      continue;
1354
    }
1355
    i++;
wmmhello's avatar
wmmhello 已提交
1356
  }
1357

1358 1359 1360
  return code;
}

dengyihao's avatar
dengyihao 已提交
1361
static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
1362
  int32_t code = TSDB_CODE_SUCCESS;
1363 1364
  int32_t retryNum = 0;

1365 1366
  info->cost.parseTime = taosGetTimestampUs();

wmmhello's avatar
wmmhello 已提交
1367
  code = smlParseLine(info, lines, rawLine, rawLineEnd, numLines);
1368
  if (code != 0) {
X
Xiaoyu Wang 已提交
1369
    uError("SML:0x%" PRIx64 " smlParseLine error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
1370
    return code;
1371
  }
1372 1373 1374 1375 1376 1377
  code = smlParseLineBottom(info);
  if (code != 0) {
    uError("SML:0x%" PRIx64 " smlParseLineBottom error : %s", info->id, tstrerror(code));
    return code;
  }

1378
  info->cost.lineNum = info->lineNum;
wmmhello's avatar
wmmhello 已提交
1379 1380
  info->cost.numOfSTables = nodeListSize(info->superTables);
  info->cost.numOfCTables = nodeListSize(info->childTables);
1381 1382

  info->cost.schemaTime = taosGetTimestampUs();
1383

X
Xiaoyu Wang 已提交
1384
  do {
1385 1386
    code = smlModifyDBSchemas(info);
    if (code == 0) break;
wmmhello's avatar
wmmhello 已提交
1387
  } while (retryNum++ < nodeListSize(info->superTables) * MAX_RETRY_TIMES);
1388

wmmhello's avatar
wmmhello 已提交
1389
  if (code != 0) {
X
Xiaoyu Wang 已提交
1390
    uError("SML:0x%" PRIx64 " smlModifyDBSchemas error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
1391
    return code;
wmmhello's avatar
wmmhello 已提交
1392
  }
wmmhello's avatar
wmmhello 已提交
1393

1394
  info->cost.insertBindTime = taosGetTimestampUs();
wmmhello's avatar
wmmhello 已提交
1395 1396
  code = smlInsertData(info);
  if (code != 0) {
X
Xiaoyu Wang 已提交
1397
    uError("SML:0x%" PRIx64 " smlInsertData error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
1398
    return code;
wmmhello's avatar
wmmhello 已提交
1399 1400 1401 1402 1403
  }

  return code;
}

wmmhello's avatar
wmmhello 已提交
1404 1405
TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, char *rawLineEnd,
                                       int numLines, int protocol, int precision, int32_t ttl, int64_t reqid) {
1406
  int32_t code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1407 1408 1409
  if (NULL == taos) {
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    return NULL;
1410
  }
1411

wmmhello's avatar
wmmhello 已提交
1412 1413 1414 1415
  SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
  if (request == NULL) {
    uError("SML:taos_schemaless_insert error request is null");
    return NULL;
wmmhello's avatar
wmmhello 已提交
1416
  }
1417

wmmhello's avatar
wmmhello 已提交
1418 1419 1420 1421
  SSmlHandle *info = smlBuildSmlInfo(taos);
  if (info == NULL) {
    request->code = TSDB_CODE_OUT_OF_MEMORY;
    uError("SML:taos_schemaless_insert error SSmlHandle is null");
1422
    return (TAOS_RES *)request;
wmmhello's avatar
wmmhello 已提交
1423
  }
wmmhello's avatar
wmmhello 已提交
1424 1425 1426 1427
  info->pRequest = request;
  info->isRawLine = rawLine != NULL;
  info->ttl       = ttl;
  info->precision = precision;
1428
  info->protocol = (TSDB_SML_PROTOCOL_TYPE)protocol;
wmmhello's avatar
wmmhello 已提交
1429 1430 1431
  info->msgBuf.buf = info->pRequest->msgBuf;
  info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
  info->lineNum = numLines;
wmmhello's avatar
wmmhello 已提交
1432

wmmhello's avatar
wmmhello 已提交
1433
  SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
wmmhello's avatar
wmmhello 已提交
1434 1435 1436
  if (request->pDb == NULL) {
    request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
wmmhello's avatar
wmmhello 已提交
1437 1438 1439
    goto end;
  }

X
Xiaoyu Wang 已提交
1440
  if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) {
1441
    request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
wmmhello's avatar
wmmhello 已提交
1442
    smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL);
1443
    goto end;
wmmhello's avatar
wmmhello 已提交
1444 1445
  }

X
Xiaoyu Wang 已提交
1446 1447
  if (protocol == TSDB_SML_LINE_PROTOCOL &&
      (precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)) {
1448
    request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
wmmhello's avatar
wmmhello 已提交
1449
    smlBuildInvalidDataMsg(&msg, "precision invalidate for line protocol", NULL);
1450 1451 1452
    goto end;
  }

1453
  if (protocol == TSDB_SML_JSON_PROTOCOL) {
wmmhello's avatar
wmmhello 已提交
1454
    numLines = 1;
1455
  } else if (numLines <= 0) {
wmmhello's avatar
wmmhello 已提交
1456 1457 1458 1459 1460
    request->code = TSDB_CODE_SML_INVALID_DATA;
    smlBuildInvalidDataMsg(&msg, "line num is invalid", NULL);
    goto end;
  }

1461
  code = smlProcess(info, lines, rawLine, rawLineEnd, numLines);
wmmhello's avatar
wmmhello 已提交
1462 1463 1464
  request->code = code;
  info->cost.endTime = taosGetTimestampUs();
  info->cost.code = code;
1465
//  smlPrintStatisticInfo(info);
1466

wmmhello's avatar
wmmhello 已提交
1467
end:
wmmhello's avatar
wmmhello 已提交
1468
  smlDestroyInfo(info);
1469
  return (TAOS_RES *)request;
wmmhello's avatar
wmmhello 已提交
1470
}
wmmhello's avatar
wmmhello 已提交
1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487

/**
 * taos_schemaless_insert() parse and insert data points into database according to
 * different protocol.
 *
 * @param $lines input array may contain multiple lines, each line indicates a data point.
 *               If protocol=2 is used input array should contain single JSON
 *               string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
 *               multiple data points in JSON format, should include them in $JSON_string
 *               as a JSON array.
 * @param $numLines indicates how many data points in $lines.
 *                  If protocol = 2 is used this param will be ignored as $lines should
 *                  contain single JSON string.
 * @param $protocol indicates which protocol to use for parsing:
 *                  0 - influxDB line protocol
 *                  1 - OpenTSDB telnet line protocol
 *                  2 - OpenTSDB JSON format protocol
dengyihao's avatar
dengyihao 已提交
1488
 * @return TAOS_RES
wmmhello's avatar
wmmhello 已提交
1489 1490
 */

1491 1492
TAOS_RES *taos_schemaless_insert_ttl_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision,
                                                int32_t ttl, int64_t reqid) {
wmmhello's avatar
wmmhello 已提交
1493
  return taos_schemaless_insert_inner(taos, lines, NULL, NULL, numLines, protocol, precision, ttl, reqid);
dengyihao's avatar
dengyihao 已提交
1494 1495
}

1496 1497 1498
TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, TSDB_DEFAULT_TABLE_TTL, 0);
}
wmmhello's avatar
wmmhello 已提交
1499

1500 1501 1502
TAOS_RES *taos_schemaless_insert_ttl(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int32_t ttl) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, ttl, 0);
}
wmmhello's avatar
wmmhello 已提交
1503

1504 1505
TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int64_t reqid) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, TSDB_DEFAULT_TABLE_TTL, reqid);
wmmhello's avatar
wmmhello 已提交
1506 1507
}

1508
TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
1509
                                                    int precision, int32_t ttl, int64_t reqid) {
dengyihao's avatar
dengyihao 已提交
1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521
  int numLines = 0;
  *totalRows = 0;
  char *tmp = lines;
  for (int i = 0; i < len; i++) {
    if (lines[i] == '\n' || i == len - 1) {
      numLines++;
      if (tmp[0] != '#' || protocol != TSDB_SML_LINE_PROTOCOL) {  // ignore comment
        (*totalRows)++;
      }
      tmp = lines + i + 1;
    }
  }
wmmhello's avatar
wmmhello 已提交
1522
  return taos_schemaless_insert_inner(taos, NULL, lines, lines + len, *totalRows, protocol, precision, ttl, reqid);
dengyihao's avatar
dengyihao 已提交
1523 1524
}

1525 1526 1527 1528 1529 1530
TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int64_t reqid) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, TSDB_DEFAULT_TABLE_TTL, reqid);
}
TAOS_RES *taos_schemaless_insert_raw_ttl(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int32_t ttl) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, ttl, 0);
}
wmmhello's avatar
wmmhello 已提交
1531

1532 1533
TAOS_RES *taos_schemaless_insert_raw(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, TSDB_DEFAULT_TABLE_TTL, 0);
wmmhello's avatar
wmmhello 已提交
1534
}