clientSml.c 46.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

wmmhello's avatar
wmmhello 已提交
16 17 18 19 20
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

21 22 23 24 25 26 27
#include "clientSml.h"

int64_t smlToMilli[3] = {3600000LL, 60000LL, 1000LL};
int64_t smlFactorNS[3] = {NANOSECOND_PER_MSEC, NANOSECOND_PER_USEC, 1};
int64_t smlFactorS[3] = {1000LL, 1000000LL, 1000000000LL};

void* nodeListGet(NodeList* list, const void *key, int32_t len, _equal_fn_sml fn){
wmmhello's avatar
wmmhello 已提交
28 29
  NodeList *tmp = list;
  while(tmp){
30 31 32 33 34 35 36 37
    if(fn == NULL){
      if(tmp->data.used && tmp->data.keyLen == len && memcmp(tmp->data.key, key, len) == 0) {
        return tmp->data.value;
      }
    }else{
      if(tmp->data.used && fn(tmp->data.key, key) == 0) {
        return tmp->data.value;
      }
wmmhello's avatar
wmmhello 已提交
38
    }
39

wmmhello's avatar
wmmhello 已提交
40 41 42 43 44
    tmp = tmp->next;
  }
  return NULL;
}

45
int nodeListSet(NodeList** list, const void *key, int32_t len, void* value, _equal_fn_sml fn){
wmmhello's avatar
wmmhello 已提交
46 47 48
  NodeList *tmp = *list;
  while (tmp){
    if(!tmp->data.used) break;
49 50 51 52 53 54 55 56
    if(fn == NULL){
      if(tmp->data.keyLen == len && memcmp(tmp->data.key, key, len) == 0) {
        return -1;
      }
    }else{
      if(tmp->data.keyLen == len && fn(tmp->data.key, key) == 0) {
        return -1;
      }
wmmhello's avatar
wmmhello 已提交
57
    }
58

wmmhello's avatar
wmmhello 已提交
59 60 61 62 63 64 65 66
    tmp = tmp->next;
  }
  if(tmp){
    tmp->data.key = key;
    tmp->data.keyLen = len;
    tmp->data.value = value;
    tmp->data.used = true;
  }else{
67
    NodeList *newNode = (NodeList *)taosMemoryCalloc(1, sizeof(NodeList));
wmmhello's avatar
wmmhello 已提交
68 69 70 71 72 73 74 75 76 77 78 79 80
    if(newNode == NULL){
      return -1;
    }
    newNode->data.key = key;
    newNode->data.keyLen = len;
    newNode->data.value = value;
    newNode->data.used = true;
    newNode->next = *list;
    *list = newNode;
  }
  return 0;
}

81
int nodeListSize(NodeList* list){
wmmhello's avatar
wmmhello 已提交
82 83 84 85 86 87 88 89
  int cnt = 0;
  while(list){
    if(list->data.used) cnt++;
    else break;
    list = list->next;
  }
  return cnt;
}
wmmhello's avatar
wmmhello 已提交
90

91
inline bool smlDoubleToInt64OverFlow(double num) {
X
Xiaoyu Wang 已提交
92
  if (num >= (double)INT64_MAX || num <= (double)INT64_MIN) return true;
93 94 95
  return false;
}

96
int32_t smlBuildInvalidDataMsg(SSmlMsgBuf *pBuf, const char *msg1, const char *msg2) {
97
  if (pBuf->buf) {
98 99 100 101 102 103 104
    memset(pBuf->buf, 0, pBuf->len);
    if (msg1) strncat(pBuf->buf, msg1, pBuf->len);
    int32_t left = pBuf->len - strlen(pBuf->buf);
    if (left > 2 && msg2) {
      strncat(pBuf->buf, ":", left - 1);
      strncat(pBuf->buf, msg2, left - 2);
    }
wmmhello's avatar
wmmhello 已提交
105
  }
wmmhello's avatar
wmmhello 已提交
106 107 108
  return TSDB_CODE_SML_INVALID_DATA;
}

109

110 111 112 113 114
int64_t smlGetTimeValue(const char *value, int32_t len, uint8_t fromPrecision, uint8_t toPrecision) {
  char   *endPtr = NULL;
  int64_t tsInt64 = taosStr2Int64(value, &endPtr, 10);
  if (unlikely(value + len != endPtr)) {
    return -1;
115
  }
wmmhello's avatar
wmmhello 已提交
116

117 118 119 120 121 122 123
  if(unlikely(fromPrecision >= TSDB_TIME_PRECISION_HOURS)){
    int64_t unit = smlToMilli[fromPrecision - TSDB_TIME_PRECISION_HOURS];
    if(unit > INT64_MAX / tsInt64){
      return -1;
    }
    tsInt64 *= unit;
    fromPrecision = TSDB_TIME_PRECISION_MILLI;
wmmhello's avatar
wmmhello 已提交
124
  }
wmmhello's avatar
wmmhello 已提交
125

126
  return convertTimePrecision(tsInt64, fromPrecision, toPrecision);
wmmhello's avatar
wmmhello 已提交
127 128
}

129 130 131 132 133 134 135
int8_t smlGetTsTypeByLen(int32_t len) {
  if (len == TSDB_TIME_PRECISION_SEC_DIGITS) {
    return TSDB_TIME_PRECISION_SECONDS;
  } else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) {
    return TSDB_TIME_PRECISION_MILLI;
  } else {
    return -1;
136 137 138
  }
}

139 140 141 142
SSmlTableInfo *smlBuildTableInfo(int numRows, const char* measure, int32_t measureLen) {
  SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
  if (!tag) {
    return NULL;
143 144
  }

145 146
  tag->sTableName = measure;
  tag->sTableNameLen = measureLen;
147

148 149 150 151
  tag->cols = taosArrayInit(numRows, POINTER_BYTES);
  if (tag->cols == NULL) {
    uError("SML:smlBuildTableInfo failed to allocate memory");
    goto cleanup;
152
  }
153 154 155 156 157 158 159 160 161 162 163

//  tag->tags = taosArrayInit(16, sizeof(SSmlKv));
//  if (tag->tags == NULL) {
//    uError("SML:smlBuildTableInfo failed to allocate memory");
//    goto cleanup;
//  }
  return tag;

  cleanup:
  taosMemoryFree(tag);
  return NULL;
164 165
}

166 167 168 169 170 171 172 173 174 175 176
static int32_t smlParseTableName(SArray *tags, char *childTableName) {
  size_t      childTableNameLen = strlen(tsSmlChildTableName);
  if (childTableNameLen <= 0) return TSDB_CODE_SUCCESS;

  for(int i = 0; i < taosArrayGetSize(tags); i++){
    SSmlKv *tag = (SSmlKv *)taosArrayGet(tags, i);
    // handle child table name
    if (childTableNameLen == tag->keyLen && strncmp(tag->key, tsSmlChildTableName, tag->keyLen) == 0) {
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, tag->value, (tag->length < TSDB_TABLE_NAME_LEN ? tag->length : TSDB_TABLE_NAME_LEN));
      break;
wmmhello's avatar
wmmhello 已提交
177 178
    }
  }
179

wmmhello's avatar
wmmhello 已提交
180 181 182
  return TSDB_CODE_SUCCESS;
}

183 184
int32_t smlSetCTableName(SSmlTableInfo *oneTable){
  smlParseTableName(oneTable->tags, oneTable->childTableName);
185

186 187 188 189
  if (strlen(oneTable->childTableName) == 0) {
    SArray* dst = taosArrayDup(oneTable->tags, NULL);
    RandTableName rName = {dst, oneTable->sTableName, (uint8_t)oneTable->sTableNameLen,
                           oneTable->childTableName, 0};
wmmhello's avatar
wmmhello 已提交
190

191 192 193 194 195
    buildChildTableName(&rName);
    taosArrayDestroy(dst);
    oneTable->uid = rName.uid;
  } else {
    oneTable->uid = *(uint64_t *)(oneTable->childTableName);
196
  }
197 198
  return TSDB_CODE_SUCCESS;
}
199

200 201 202 203
SSmlSTableMeta *smlBuildSTableMeta(bool isDataFormat) {
  SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
  if (!meta) {
    return NULL;
204 205
  }

206 207 208 209 210 211
  if(unlikely(!isDataFormat)){
    meta->tagHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
    if (meta->tagHash == NULL) {
      uError("SML:smlBuildSTableMeta failed to allocate memory");
      goto cleanup;
    }
wmmhello's avatar
wmmhello 已提交
212

213 214 215 216 217
    meta->colHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
    if (meta->colHash == NULL) {
      uError("SML:smlBuildSTableMeta failed to allocate memory");
      goto cleanup;
    }
wmmhello's avatar
wmmhello 已提交
218 219
  }

220 221 222 223
  meta->tags = taosArrayInit(32, sizeof(SSmlKv));
  if (meta->tags == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
224 225
  }

226 227 228 229 230 231
  meta->cols = taosArrayInit(32, sizeof(SSmlKv));
  if (meta->cols == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }
  return meta;
232

233 234 235
  cleanup:
  taosMemoryFree(meta);
  return NULL;
wmmhello's avatar
wmmhello 已提交
236 237
}

238
bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
239
  const char *pVal = kvVal->value;
X
Xiaoyu Wang 已提交
240 241 242 243
  int32_t     len = kvVal->length;
  char       *endptr = NULL;
  double      result = taosStr2Double(pVal, &endptr);
  if (pVal == endptr) {
244
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
245 246 247
    return false;
  }

248
  int32_t left = len - (endptr - pVal);
X
Xiaoyu Wang 已提交
249
  if (left == 0 || (left == 3 && strncasecmp(endptr, "f64", left) == 0)) {
250 251
    kvVal->type = TSDB_DATA_TYPE_DOUBLE;
    kvVal->d = result;
X
Xiaoyu Wang 已提交
252 253
  } else if ((left == 3 && strncasecmp(endptr, "f32", left) == 0)) {
    if (!IS_VALID_FLOAT(result)) {
254 255
      smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
256
    }
257 258
    kvVal->type = TSDB_DATA_TYPE_FLOAT;
    kvVal->f = (float)result;
X
Xiaoyu Wang 已提交
259 260
  } else if ((left == 1 && *endptr == 'i') || (left == 3 && strncasecmp(endptr, "i64", left) == 0)) {
    if (smlDoubleToInt64OverFlow(result)) {
wmmhello's avatar
wmmhello 已提交
261 262
      errno = 0;
      int64_t tmp = taosStr2Int64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
263
      if (errno == ERANGE) {
wmmhello's avatar
wmmhello 已提交
264 265 266 267 268 269
        smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_BIGINT;
      kvVal->i = tmp;
      return true;
wmmhello's avatar
wmmhello 已提交
270
    }
271
    kvVal->type = TSDB_DATA_TYPE_BIGINT;
wmmhello's avatar
wmmhello 已提交
272
    kvVal->i = (int64_t)result;
wmmhello's avatar
wmmhello 已提交
273
  } else if ((left == 1 && *endptr == 'u') || (left == 3 && strncasecmp(endptr, "u64", left) == 0)) {
X
Xiaoyu Wang 已提交
274
    if (result >= (double)UINT64_MAX || result < 0) {
wmmhello's avatar
wmmhello 已提交
275 276
      errno = 0;
      uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
277
      if (errno == ERANGE || result < 0) {
wmmhello's avatar
wmmhello 已提交
278 279 280 281 282 283
        smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_UBIGINT;
      kvVal->u = tmp;
      return true;
284
    }
285
    kvVal->type = TSDB_DATA_TYPE_UBIGINT;
wmmhello's avatar
wmmhello 已提交
286
    kvVal->u = result;
X
Xiaoyu Wang 已提交
287 288
  } else if (left == 3 && strncasecmp(endptr, "i32", left) == 0) {
    if (!IS_VALID_INT(result)) {
289 290
      smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
291
    }
292 293
    kvVal->type = TSDB_DATA_TYPE_INT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
294 295
  } else if (left == 3 && strncasecmp(endptr, "u32", left) == 0) {
    if (!IS_VALID_UINT(result)) {
296 297
      smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
298
    }
299 300
    kvVal->type = TSDB_DATA_TYPE_UINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
301 302
  } else if (left == 3 && strncasecmp(endptr, "i16", left) == 0) {
    if (!IS_VALID_SMALLINT(result)) {
303 304
      smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
305
    }
306 307
    kvVal->type = TSDB_DATA_TYPE_SMALLINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
308 309
  } else if (left == 3 && strncasecmp(endptr, "u16", left) == 0) {
    if (!IS_VALID_USMALLINT(result)) {
310 311
      smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
312
    }
313 314
    kvVal->type = TSDB_DATA_TYPE_USMALLINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
315 316
  } else if (left == 2 && strncasecmp(endptr, "i8", left) == 0) {
    if (!IS_VALID_TINYINT(result)) {
317 318
      smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
319
    }
320 321
    kvVal->type = TSDB_DATA_TYPE_TINYINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
322 323
  } else if (left == 2 && strncasecmp(endptr, "u8", left) == 0) {
    if (!IS_VALID_UTINYINT(result)) {
324 325
      smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
326
    }
327 328
    kvVal->type = TSDB_DATA_TYPE_UTINYINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
329
  } else {
330
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
331 332
    return false;
  }
333
  return true;
wmmhello's avatar
wmmhello 已提交
334 335
}

336 337
STableMeta* smlGetMeta(SSmlHandle *info, const void* measure, int32_t measureLen){
  STableMeta *pTableMeta = NULL;
wmmhello's avatar
wmmhello 已提交
338

339 340
  SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
  tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
wmmhello's avatar
wmmhello 已提交
341

342 343 344 345 346 347 348
  SRequestConnInfo conn = {0};
  conn.pTrans = info->taos->pAppInfo->pTransporter;
  conn.requestId = info->pRequest->requestId;
  conn.requestObjRefId = info->pRequest->self;
  conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
  memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
  memcpy(pName.tname, measure, measureLen);
wmmhello's avatar
wmmhello 已提交
349

350 351
  catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
  return pTableMeta;
wmmhello's avatar
wmmhello 已提交
352
}
wmmhello's avatar
wmmhello 已提交
353

354

355 356
static int64_t smlGenId() {
  static volatile int64_t linesSmlHandleId = 0;
wmmhello's avatar
wmmhello 已提交
357

358 359 360 361
  int64_t id = 0;
  do {
    id = atomic_add_fetch_64(&linesSmlHandleId, 1);
  } while (id == 0);
362

363
  return id;
364 365
}

366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391
static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSmlKv *kv, bool isTag,
                                       ESchemaAction *action, SSmlHandle *info) {
  uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL;
  if (index) {
    if (colField[*index].type != kv->type) {
      uError("SML:0x%" PRIx64 " point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id,
             kv->key, colField[*index].type, kv->type);
      return TSDB_CODE_TSC_INVALID_VALUE;
    }

    if ((colField[*index].type == TSDB_DATA_TYPE_VARCHAR &&
         (colField[*index].bytes - VARSTR_HEADER_SIZE) < kv->length) ||
        (colField[*index].type == TSDB_DATA_TYPE_NCHAR &&
         ((colField[*index].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE < kv->length))) {
      if (isTag) {
        *action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
      } else {
        *action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
      }
    }
  } else {
    if (isTag) {
      *action = SCHEMA_ACTION_ADD_TAG;
    } else {
      *action = SCHEMA_ACTION_ADD_COLUMN;
    }
wmmhello's avatar
wmmhello 已提交
392
  }
393
  return 0;
394 395
}

396 397 398 399
static int32_t smlFindNearestPowerOf2(int32_t length, uint8_t type) {
  int32_t result = 1;
  while (result <= length) {
    result *= 2;
400
  }
401 402 403 404
  if (type == TSDB_DATA_TYPE_BINARY && result > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
    result = TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE;
  } else if (type == TSDB_DATA_TYPE_NCHAR && result > (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
    result = (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
405
  }
406 407 408 409 410

  if (type == TSDB_DATA_TYPE_NCHAR) {
    result = result * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
  } else if (type == TSDB_DATA_TYPE_BINARY) {
    result = result + VARSTR_HEADER_SIZE;
411
  }
412
  return result;
413 414
}

415 416 417 418 419 420 421 422 423 424
static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
                                      ESchemaAction *action, bool isTag) {
  int32_t code = TSDB_CODE_SUCCESS;
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
    if (j == 0 && !isTag) continue;
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j);
    code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, info);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
425
  }
426
  return TSDB_CODE_SUCCESS;
427 428
}

429 430 431 432 433
static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool isTag) {
  SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
  int32_t   i = 0;
  for (; i < length; i++) {
    taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES);
434 435
  }

436 437 438 439
  if (isTag) {
    i = 0;
  } else {
    i = 1;
440
  }
441 442 443 444 445 446
  for (; i < taosArrayGetSize(cols); i++) {
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
    if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) {
      taosHashCleanup(hashTmp);
      return -1;
    }
447
  }
448 449 450
  taosHashCleanup(hashTmp);
  return 0;
}
451

452 453 454 455 456 457
static int32_t getBytes(uint8_t type, int32_t length) {
  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
    return smlFindNearestPowerOf2(length, type);
  } else {
    return tDataTypes[type].bytes;
  }
458 459
}

460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477
static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
                                  SArray *results, int32_t numOfCols, bool isTag) {
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
    SSmlKv       *kv = (SSmlKv *)taosArrayGet(cols, j);
    ESchemaAction action = SCHEMA_ACTION_NULL;
    smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, &action, info);
    if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_ADD_TAG) {
      SField field = {0};
      field.type = kv->type;
      field.bytes = getBytes(kv->type, kv->length);
      memcpy(field.name, kv->key, kv->keyLen);
      taosArrayPush(results, &field);
    } else if (action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
      uint16_t *index = (uint16_t *)taosHashGet(schemaHash, kv->key, kv->keyLen);
      uint16_t  newIndex = *index;
      if (isTag) newIndex -= numOfCols;
      SField *field = (SField *)taosArrayGet(results, newIndex);
      field->bytes = getBytes(kv->type, kv->length);
478 479 480 481 482
    }
  }
  return TSDB_CODE_SUCCESS;
}

483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499
// static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SSmlSTableMeta *sTableData,
//                               int32_t colVer, int32_t tagVer, int8_t source, uint64_t suid){
static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, SArray *pTags, STableMeta *pTableMeta,
                              ESchemaAction action) {
  SRequestObj   *pRequest = NULL;
  SMCreateStbReq pReq = {0};
  int32_t        code = TSDB_CODE_SUCCESS;
  SCmdMsgInfo    pCmdMsg = {0};

  // put front for free
  pReq.numOfColumns = taosArrayGetSize(pColumns);
  pReq.pColumns = pColumns;
  pReq.numOfTags = taosArrayGetSize(pTags);
  pReq.pTags = pTags;

  code = buildRequest(info->taos->id, "", 0, NULL, false, &pRequest, 0);
  if (code != TSDB_CODE_SUCCESS) {
500 501
    goto end;
  }
502 503 504 505

  pRequest->syncQuery = true;
  if (!pRequest->pDb) {
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
506 507 508
    goto end;
  }

509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524
  if (action == SCHEMA_ACTION_CREATE_STABLE) {
    pReq.colVer = 1;
    pReq.tagVer = 1;
    pReq.suid = 0;
    pReq.source = TD_REQ_FROM_APP;
  } else if (action == SCHEMA_ACTION_ADD_TAG || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
    pReq.colVer = pTableMeta->sversion;
    pReq.tagVer = pTableMeta->tversion + 1;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
  } else if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE) {
    pReq.colVer = pTableMeta->sversion + 1;
    pReq.tagVer = pTableMeta->tversion;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
  }
525

526 527 528 529 530 531 532
  if (pReq.numOfTags == 0) {
    pReq.numOfTags = 1;
    SField field = {0};
    field.type = TSDB_DATA_TYPE_NCHAR;
    field.bytes = 1;
    strcpy(field.name, tsSmlTagName);
    taosArrayPush(pReq.pTags, &field);
533
  }
wmmhello's avatar
wmmhello 已提交
534

535 536 537
  pReq.commentLen = -1;
  pReq.igExists = true;
  tNameExtractFullName(pName, pReq.name);
538

539 540 541 542 543 544 545 546 547
  pCmdMsg.epSet = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);
548

549 550 551 552 553 554
  SQuery pQuery;
  memset(&pQuery, 0, sizeof(pQuery));
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;
555

556
  launchQueryImpl(pRequest, &pQuery, true, NULL);
557

558 559
  if (pRequest->code == TSDB_CODE_SUCCESS) {
    catalogRemoveTableMeta(info->pCatalog, pName);
560
  }
561 562
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);
563

564 565 566 567 568
  end:
  destroyRequest(pRequest);
  tFreeSMCreateStbReq(&pReq);
  return code;
}
569

570 571 572
static int32_t smlModifyDBSchemas(SSmlHandle *info) {
  if(info->dataFormat && !info->needModifySchema){
    return TSDB_CODE_SUCCESS;
573
  }
574 575 576
  int32_t     code = 0;
  SHashObj   *hashTmp = NULL;
  STableMeta *pTableMeta = NULL;
577 578 579 580 581 582 583 584 585

  SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
  tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));

  SRequestConnInfo conn = {0};
  conn.pTrans = info->taos->pAppInfo->pTransporter;
  conn.requestId = info->pRequest->requestId;
  conn.requestObjRefId = info->pRequest->self;
  conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
586

587 588 589 590
  NodeList *tmp = info->superTables;
  while (tmp) {
    SSmlSTableMeta *sTableData = (SSmlSTableMeta *)tmp->data.value;
    bool            needCheckMeta = false;  // for multi thread
591

592 593 594 595
    size_t superTableLen = (size_t)tmp->data.keyLen;
    const void  *superTable = tmp->data.key;
    memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
    memcpy(pName.tname, superTable, superTableLen);
596

597
    code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
598

599 600 601 602 603
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
      SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
      SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
      smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
      smlBuildFieldsList(info, NULL, NULL, sTableData->cols, pColumns, 0, false);
604

605 606 607 608
      code = smlSendMetaMsg(info, &pName, pColumns, pTags, NULL, SCHEMA_ACTION_CREATE_STABLE);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
        goto end;
609
      }
610 611
      info->cost.numOfCreateSTables++;
      taosMemoryFreeClear(pTableMeta);
612

613 614 615 616
      code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
        goto end;
617
      }
618 619 620 621 622 623
    } else if (code == TSDB_CODE_SUCCESS) {
      hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true,
                             HASH_NO_LOCK);
      for (uint16_t i = pTableMeta->tableInfo.numOfColumns;
           i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
624
      }
625

626 627 628 629
      ESchemaAction action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, &action, true);
      if (code != TSDB_CODE_SUCCESS) {
        goto end;
630
      }
631 632 633 634 635
      if (action != SCHEMA_ACTION_NULL) {
        SArray *pColumns =
            taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray *pTags =
            taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
636

637 638 639 640 641 642 643 644 645
        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
          if (i < pTableMeta->tableInfo.numOfColumns) {
            taosArrayPush(pColumns, &field);
          } else {
            taosArrayPush(pTags, &field);
646 647
          }
        }
648 649
        smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->tags, pTags,
                           pTableMeta->tableInfo.numOfColumns, true);
650

651 652 653 654
        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
          goto end;
655
        }
656

657 658 659 660 661 662 663 664 665
        info->cost.numOfAlterTagSTables++;
        taosMemoryFreeClear(pTableMeta);
        code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
        }
        code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
wmmhello's avatar
wmmhello 已提交
666
        }
667 668
      }

669 670 671
      taosHashClear(hashTmp);
      for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
672
      }
673 674 675 676
      action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &action, false);
      if (code != TSDB_CODE_SUCCESS) {
        goto end;
677
      }
678 679 680 681 682
      if (action != SCHEMA_ACTION_NULL) {
        SArray *pColumns =
            taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray *pTags =
            taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
683

684 685 686 687 688 689 690 691 692 693
        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
          if (i < pTableMeta->tableInfo.numOfColumns) {
            taosArrayPush(pColumns, &field);
          } else {
            taosArrayPush(pTags, &field);
          }
694 695
        }

696 697
        smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->cols, pColumns,
                           pTableMeta->tableInfo.numOfColumns, false);
698

699 700 701 702
        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
          goto end;
703
        }
704

705 706 707 708 709 710 711 712 713 714
        info->cost.numOfAlterColSTables++;
        taosMemoryFreeClear(pTableMeta);
        code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
        }
        code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
          goto end;
715 716
        }
      }
wmmhello's avatar
wmmhello 已提交
717

718 719 720
      needCheckMeta = true;
      taosHashCleanup(hashTmp);
      hashTmp = NULL;
X
Xiaoyu Wang 已提交
721
    } else {
722 723
      uError("SML:0x%" PRIx64 " load table meta error: %s", info->id, tstrerror(code));
      goto end;
wmmhello's avatar
wmmhello 已提交
724
    }
725

726 727 728 729 730 731 732 733 734 735 736
    if (needCheckMeta) {
      code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags,
                          sTableData->tags, true);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " check tag failed. super table name %s", info->id, pName.tname);
        goto end;
      }
      code = smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols, false);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " check cols failed. super table name %s", info->id, pName.tname);
        goto end;
737 738
      }
    }
739 740 741 742

    sTableData->tableMeta = pTableMeta;

    tmp = tmp->next;
wmmhello's avatar
wmmhello 已提交
743
  }
744
  return 0;
745

746 747 748 749 750
  end:
  taosHashCleanup(hashTmp);
  taosMemoryFreeClear(pTableMeta);
//  catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
  return code;
wmmhello's avatar
wmmhello 已提交
751 752 753
}


754 755 756 757 758 759 760
/*
static int32_t smlCheckDupUnit(SHashObj *dumplicateKey, SArray *tags, SSmlMsgBuf *msg){
  for(int i = 0; i < taosArrayGetSize(tags); i++) {
    SSmlKv *tag = taosArrayGet(tags, i);
    if (smlCheckDuplicateKey(tag->key, tag->keyLen, dumplicateKey)) {
      smlBuildInvalidDataMsg(msg, "dumplicate key", tag->key);
      return TSDB_CODE_TSC_DUP_NAMES;
761
    }
wmmhello's avatar
wmmhello 已提交
762
  }
763 764 765
  return TSDB_CODE_SUCCESS;
}

766 767 768 769 770
static int32_t smlJudgeDupColName(SArray *cols, SArray *tags, SSmlMsgBuf *msg) {
  SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  int ret = smlCheckDupUnit(dumplicateKey, cols, msg);
  if(ret != TSDB_CODE_SUCCESS){
    goto end;
771
  }
772 773 774
  ret = smlCheckDupUnit(dumplicateKey, tags, msg);
  if(ret != TSDB_CODE_SUCCESS){
    goto end;
775
  }
776

777 778 779
  end:
  taosHashCleanup(dumplicateKey);
  return ret;
wmmhello's avatar
wmmhello 已提交
780
}
781
*/
wmmhello's avatar
wmmhello 已提交
782

783 784 785 786 787 788 789
static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols){
  for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
    int ret = taosHashPut(metaHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
    if(ret == 0){
      taosArrayPush(metaArray, kv);
    }
790
  }
791
}
wmmhello's avatar
wmmhello 已提交
792

793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811
static void smlDestroySTableMeta(SSmlSTableMeta *meta) {
  taosHashCleanup(meta->tagHash);
  taosHashCleanup(meta->colHash);
  taosArrayDestroy(meta->tags);
  taosArrayDestroy(meta->cols);
  taosMemoryFree(meta->tableMeta);
  taosMemoryFree(meta);
}

static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, bool isTag, SSmlMsgBuf *msg) {
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);

    int16_t *index = (int16_t *)taosHashGet(metaHash, kv->key, kv->keyLen);
    if (index) {
      SSmlKv *value = (SSmlKv *)taosArrayGet(metaArray, *index);
      if (isTag){
        if (kv->length > value->length) {
          value->length = kv->length;
812
        }
813 814 815 816 817
        continue;
      }
      if (kv->type != value->type) {
        smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
        return TSDB_CODE_SML_NOT_SAME_TYPE;
818 819
      }

820 821 822 823 824 825 826 827 828 829
      if (IS_VAR_DATA_TYPE(kv->type) && (kv->length > value->length)) {  // update string len, if bigger
        value->length = kv->length;
      }
    } else {
      size_t tmp = taosArrayGetSize(metaArray);
      ASSERT(tmp <= INT16_MAX);
      int16_t size = tmp;
      int ret = taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
      if(ret == 0){
        taosArrayPush(metaArray, kv);
830 831 832 833
      }
    }
  }

834 835
  return TSDB_CODE_SUCCESS;
}
836

837 838 839 840 841
static void smlDestroyTableInfo(SSmlTableInfo *tag) {
  for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
    SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i);
    taosHashCleanup(kvHash);
  }
842

843 844 845 846 847
  taosMemoryFree(tag->key);
  taosArrayDestroy(tag->cols);
  taosArrayDestroy(tag->tags);
  taosMemoryFree(tag);
}
848

849 850 851
static void smlDestroyInfo(SSmlHandle *info) {
  if (!info) return;
  qDestroyQuery(info->pQuery);
852

853 854 855 856 857
  // destroy info->childTables
  NodeList* tmp = info->childTables;
  while (tmp) {
    if(tmp->data.used) {
      smlDestroyTableInfo((SSmlTableInfo*)tmp->data.value);
858
    }
859 860 861
    NodeList* t = tmp->next;
    taosMemoryFree(tmp);
    tmp = t;
862 863
  }

864 865 866 867 868
  // destroy info->superTables
  tmp = info->superTables;
  while (tmp) {
    if(tmp->data.used) {
      smlDestroySTableMeta((SSmlSTableMeta*)tmp->data.value);
869
    }
870 871 872
    NodeList* t = tmp->next;
    taosMemoryFree(tmp);
    tmp = t;
873
  }
874

875 876
  // destroy info->pVgHash
  taosHashCleanup(info->pVgHash);
wmmhello's avatar
wmmhello 已提交
877

878 879
  taosArrayDestroy(info->preLineTagKV);
  taosArrayDestroy(info->preLineColKV);
wmmhello's avatar
wmmhello 已提交
880

881 882 883 884 885
  if(!info->dataFormat){
    for(int i = 0; i < info->lineNum; i++){
      taosArrayDestroy(info->lines[i].colArray);
    }
    taosMemoryFree(info->lines);
wmmhello's avatar
wmmhello 已提交
886
  }
887

888 889 890
  cJSON_Delete(info->root);
  taosMemoryFreeClear(info);
}
wmmhello's avatar
wmmhello 已提交
891

892 893 894 895 896
static SSmlHandle *smlBuildSmlInfo(TAOS *taos) {
  int32_t     code = TSDB_CODE_SUCCESS;
  SSmlHandle *info = (SSmlHandle *)taosMemoryCalloc(1, sizeof(SSmlHandle));
  if (NULL == info) {
    return NULL;
897
  }
898 899 900 901 902 903 904
  if(taos != NULL){
    info->taos = acquireTscObj(*(int64_t *)taos);
    code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code);
      goto cleanup;
    }
905
  }
wmmhello's avatar
wmmhello 已提交
906

907 908 909 910
  info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
  info->id = smlGenId();
  info->pQuery = smlInitHandle();
  info->dataFormat = true;
911

912 913 914 915 916 917
  info->preLineTagKV = taosArrayInit(8, sizeof(SSmlKv));
  info->preLineColKV = taosArrayInit(8, sizeof(SSmlKv));

  if (NULL == info->pVgHash) {
    uError("create SSmlHandle failed");
    goto cleanup;
918 919
  }

920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935
  return info;

cleanup:
  smlDestroyInfo(info);
  return NULL;
}

static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
  SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (!kvHash) {
    uError("SML:smlDealCols failed to allocate memory");
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
    taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
936 937
  }

938
  taosArrayPush(colsArray, &kvHash);
939
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
940
}
941

942 943 944
static int32_t smlParseLineBottom(SSmlHandle *info) {
  if(info->dataFormat) return TSDB_CODE_SUCCESS;

wmmhello's avatar
wmmhello 已提交
945 946
  for(int32_t i = 0; i < info->lineNum; i ++){
    SSmlLineInfo* elements = info->lines + i;
947
    SSmlTableInfo *tinfo = NULL;
948
    if(info->protocol == TSDB_SML_LINE_PROTOCOL){
949
      tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements->measure, elements->measureTagsLen, NULL);
950 951
    }else if(info->protocol == TSDB_SML_TELNET_PROTOCOL){
      tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements, POINTER_BYTES, is_same_child_table_telnet);
952 953 954 955
    }else{
      tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements->tags, POINTER_BYTES, is_same_child_table_json);
    }

wmmhello's avatar
wmmhello 已提交
956
    if(tinfo == NULL){
957 958 959
      uError("SML:0x%" PRIx64 "get oneTable failed, line num:%d", info->id, i);
      smlBuildInvalidDataMsg(&info->msgBuf, "get oneTable failed", elements->measure);
      return TSDB_CODE_SML_INVALID_DATA;
960
    }
wmmhello's avatar
wmmhello 已提交
961

962 963 964 965 966
    if (taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS) {
      smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
      return TSDB_CODE_PAR_INVALID_TAGS_NUM;
    }

967 968 969
    if (taosArrayGetSize(elements->colArray) + taosArrayGetSize(tinfo->tags) > TSDB_MAX_COLUMNS) {
      smlBuildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL);
      return TSDB_CODE_PAR_TOO_MANY_COLUMNS;
wmmhello's avatar
wmmhello 已提交
970
    }
wmmhello's avatar
wmmhello 已提交
971

972
    int ret = smlPushCols(tinfo->cols, elements->colArray);
X
Xiaoyu Wang 已提交
973
    if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
974 975 976
      return ret;
    }

977
    SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
978
    if (tableMeta) {  // update meta
wmmhello's avatar
wmmhello 已提交
979
      ret = smlUpdateMeta(tableMeta->colHash, tableMeta->cols, elements->colArray, false, &info->msgBuf);
980
      if (ret == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
981
        ret = smlUpdateMeta(tableMeta->tagHash, tableMeta->tags, tinfo->tags, true, &info->msgBuf);
982 983 984 985 986
      }
      if (ret != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
        return ret;
      }
X
Xiaoyu Wang 已提交
987
    } else {
988 989 990 991 992
//      ret = smlJudgeDupColName(elements->colArray, tinfo->tags, &info->msgBuf);
//      if (ret != TSDB_CODE_SUCCESS) {
//        uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
//        return ret;
//      }
wmmhello's avatar
wmmhello 已提交
993

994 995 996
      SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat);
      smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags);
      smlInsertMeta(meta->colHash, meta->cols, elements->colArray);
997
      nodeListSet(&info->superTables, elements->measure, elements->measureLen, meta, NULL);
wmmhello's avatar
wmmhello 已提交
998
    }
wmmhello's avatar
wmmhello 已提交
999
  }
1000

wmmhello's avatar
wmmhello 已提交
1001 1002 1003
  return TSDB_CODE_SUCCESS;
}

1004

X
Xiaoyu Wang 已提交
1005
static int32_t smlInsertData(SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
1006 1007
  int32_t code = TSDB_CODE_SUCCESS;

wmmhello's avatar
wmmhello 已提交
1008 1009 1010
  NodeList* tmp = info->childTables;
  while (tmp) {
    SSmlTableInfo *tableData = (SSmlTableInfo *)tmp->data.value;
wmmhello's avatar
wmmhello 已提交
1011 1012

    SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
wmmhello's avatar
wmmhello 已提交
1013
    tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
wmmhello's avatar
wmmhello 已提交
1014
    memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName));
D
dapan1121 已提交
1015 1016 1017 1018 1019 1020

    SRequestConnInfo conn = {0};
    conn.pTrans = info->taos->pAppInfo->pTransporter;
    conn.requestId = info->pRequest->requestId;
    conn.requestObjRefId = info->pRequest->self;
    conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
X
Xiaoyu Wang 已提交
1021

wmmhello's avatar
wmmhello 已提交
1022
    SVgroupInfo vg;
D
dapan1121 已提交
1023
    code = catalogGetTableHashVgroup(info->pCatalog, &conn, &pName, &vg);
1024
    if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
1025
      uError("SML:0x%" PRIx64 " catalogGetTableHashVgroup failed. table name: %s", info->id, tableData->childTableName);
wmmhello's avatar
wmmhello 已提交
1026 1027
      return code;
    }
X
Xiaoyu Wang 已提交
1028
    taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
wmmhello's avatar
wmmhello 已提交
1029

wmmhello's avatar
wmmhello 已提交
1030
    SSmlSTableMeta *pMeta =
1031
        (SSmlSTableMeta *)nodeListGet(info->superTables, tableData->sTableName, tableData->sTableNameLen, NULL);
wmmhello's avatar
wmmhello 已提交
1032
    ASSERT(NULL != pMeta);
wmmhello's avatar
wmmhello 已提交
1033

1034
    // use tablemeta of stable to save vgid and uid of child table
wmmhello's avatar
wmmhello 已提交
1035 1036
    pMeta->tableMeta->vgId = vg.vgId;
    pMeta->tableMeta->uid = tableData->uid;  // one table merge data block together according uid
wmmhello's avatar
wmmhello 已提交
1037

wmmhello's avatar
wmmhello 已提交
1038 1039
    code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, pMeta->cols, tableData->cols,
                       pMeta->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen,
1040
                       info->ttl, info->msgBuf.buf, info->msgBuf.len);
X
Xiaoyu Wang 已提交
1041 1042
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " smlBindData failed", info->id);
wmmhello's avatar
wmmhello 已提交
1043 1044
      return code;
    }
wmmhello's avatar
wmmhello 已提交
1045
    tmp = tmp->next;
wmmhello's avatar
wmmhello 已提交
1046
  }
wmmhello's avatar
wmmhello 已提交
1047

wmmhello's avatar
wmmhello 已提交
1048
  code = smlBuildOutput(info->pQuery, info->pVgHash);
1049
  if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
1050
    uError("SML:0x%" PRIx64 " smlBuildOutput failed", info->id);
1051 1052
    return code;
  }
1053 1054
  info->cost.insertRpcTime = taosGetTimestampUs();

1055 1056 1057
  SAppClusterSummary *pActivity = &info->taos->pAppInfo->summary;
  atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);

wmmhello's avatar
wmmhello 已提交
1058 1059
  launchQueryImpl(info->pRequest, info->pQuery, true, NULL);
  return info->pRequest->code;
wmmhello's avatar
wmmhello 已提交
1060 1061
}

X
Xiaoyu Wang 已提交
1062 1063
static void smlPrintStatisticInfo(SSmlHandle *info) {
  uError("SML:0x%" PRIx64
1064
             " smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \
X
Xiaoyu Wang 已提交
1065
        parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64
1066
             "",
X
Xiaoyu Wang 已提交
1067
         info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables,
1068 1069
         info->cost.numOfCreateSTables, info->cost.numOfAlterTagSTables, info->cost.numOfAlterColSTables,
         info->cost.schemaTime - info->cost.parseTime,
X
Xiaoyu Wang 已提交
1070 1071
         info->cost.insertBindTime - info->cost.schemaTime, info->cost.insertRpcTime - info->cost.insertBindTime,
         info->cost.endTime - info->cost.insertRpcTime, info->cost.endTime - info->cost.parseTime);
1072 1073
}

1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111
int32_t smlClearForRerun(SSmlHandle *info){
  info->reRun = false;
  // clear info->childTables
  NodeList* pList = info->childTables;
  while (pList) {
    if(pList->data.used) {
      smlDestroyTableInfo((SSmlTableInfo*)pList->data.value);
      pList->data.used = false;
    }
    pList = pList->next;
  }

  // clear info->superTables
  pList = info->superTables;
  while (pList) {
    if(pList->data.used) {
      smlDestroySTableMeta((SSmlSTableMeta*)pList->data.value);
      pList->data.used = false;
    }
    pList = pList->next;
  }

  if(unlikely(info->lines != NULL)){
    uError("SML:0x%" PRIx64 " info->lines != NULL", info->id);
    return TSDB_CODE_SML_INVALID_DATA;
  }
  info->lines = (SSmlLineInfo*)taosMemoryCalloc(info->lineNum, sizeof(SSmlLineInfo));

  memset(&info->preLine, 0, sizeof(SSmlLineInfo));
  info->currSTableMeta = NULL;
  info->currTableDataCtx = NULL;

  SVnodeModifOpStmt* stmt= (SVnodeModifOpStmt*)(info->pQuery->pRoot);
  stmt->freeHashFunc(stmt->pTableBlockHashObj);
  stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1112
static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
wmmhello's avatar
wmmhello 已提交
1113
  int32_t code = TSDB_CODE_SUCCESS;
1114
  if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
dengyihao's avatar
dengyihao 已提交
1115
    if (lines) {
wmmhello's avatar
wmmhello 已提交
1116
      code = smlParseJSON(info, *lines);
dengyihao's avatar
dengyihao 已提交
1117
    } else if (rawLine) {
wmmhello's avatar
wmmhello 已提交
1118 1119
      code = smlParseJSON(info, rawLine);
    }
1120
    if (code != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
1121
      uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, lines ? *lines : rawLine);
1122 1123
      return code;
    }
wmmhello's avatar
wmmhello 已提交
1124
    return code;
wmmhello's avatar
wmmhello 已提交
1125
  }
wmmhello's avatar
wmmhello 已提交
1126

1127
  char *oldRaw = rawLine;
1128 1129
  int32_t i = 0;
  while (i < numLines) {
wmmhello's avatar
wmmhello 已提交
1130
    char *tmp = NULL;
dengyihao's avatar
dengyihao 已提交
1131 1132
    int   len = 0;
    if (lines) {
wmmhello's avatar
wmmhello 已提交
1133 1134
      tmp = lines[i];
      len = strlen(tmp);
dengyihao's avatar
dengyihao 已提交
1135
    } else if (rawLine) {
wmmhello's avatar
wmmhello 已提交
1136
      tmp = rawLine;
dengyihao's avatar
dengyihao 已提交
1137 1138
      while (rawLine < rawLineEnd) {
        if (*(rawLine++) == '\n') {
wmmhello's avatar
wmmhello 已提交
1139 1140 1141 1142
          break;
        }
        len++;
      }
dengyihao's avatar
dengyihao 已提交
1143
      if (info->protocol == TSDB_SML_LINE_PROTOCOL && tmp[0] == '#') {  // this line is comment
wmmhello's avatar
wmmhello 已提交
1144 1145
        continue;
      }
wmmhello's avatar
wmmhello 已提交
1146 1147
    }

1148 1149
    uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, (info->isRawLine ? "rawdata" : tmp));

X
Xiaoyu Wang 已提交
1150
    if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
wmmhello's avatar
wmmhello 已提交
1151 1152 1153 1154 1155 1156
      if(info->dataFormat){
        SSmlLineInfo element = {0};
        code = smlParseInfluxString(info, tmp, tmp + len, &element);
      }else{
        code = smlParseInfluxString(info, tmp, tmp + len, info->lines + i);
      }
X
Xiaoyu Wang 已提交
1157
    } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
1158 1159 1160 1161 1162 1163 1164
      if(info->dataFormat) {
        SSmlLineInfo element = {0};
        code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, &element);
      }else{
        code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, info->lines + i);
      }

X
Xiaoyu Wang 已提交
1165
    } else {
1166 1167
      ASSERT(0);
    }
wmmhello's avatar
wmmhello 已提交
1168
    if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1169
      uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, tmp);
1170
      return code;
wmmhello's avatar
wmmhello 已提交
1171
    }
1172 1173
    if(info->reRun){
      i = 0;
1174
      rawLine = oldRaw;
1175 1176 1177
      code = smlClearForRerun(info);
      if(code != TSDB_CODE_SUCCESS){
        return code;
1178
      }
1179
      continue;
1180
    }
1181
    i++;
wmmhello's avatar
wmmhello 已提交
1182
  }
1183

1184 1185 1186
  return code;
}

dengyihao's avatar
dengyihao 已提交
1187
static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
1188
  int32_t code = TSDB_CODE_SUCCESS;
1189 1190
  int32_t retryNum = 0;

1191 1192
  info->cost.parseTime = taosGetTimestampUs();

wmmhello's avatar
wmmhello 已提交
1193
  code = smlParseLine(info, lines, rawLine, rawLineEnd, numLines);
1194
  if (code != 0) {
X
Xiaoyu Wang 已提交
1195
    uError("SML:0x%" PRIx64 " smlParseLine error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
1196
    return code;
1197
  }
1198 1199 1200 1201 1202 1203
  code = smlParseLineBottom(info);
  if (code != 0) {
    uError("SML:0x%" PRIx64 " smlParseLineBottom error : %s", info->id, tstrerror(code));
    return code;
  }

1204
  info->cost.lineNum = numLines;
wmmhello's avatar
wmmhello 已提交
1205 1206
  info->cost.numOfSTables = nodeListSize(info->superTables);
  info->cost.numOfCTables = nodeListSize(info->childTables);
1207 1208

  info->cost.schemaTime = taosGetTimestampUs();
1209

X
Xiaoyu Wang 已提交
1210
  do {
1211 1212
    code = smlModifyDBSchemas(info);
    if (code == 0) break;
wmmhello's avatar
wmmhello 已提交
1213
  } while (retryNum++ < nodeListSize(info->superTables) * MAX_RETRY_TIMES);
1214

wmmhello's avatar
wmmhello 已提交
1215
  if (code != 0) {
X
Xiaoyu Wang 已提交
1216
    uError("SML:0x%" PRIx64 " smlModifyDBSchemas error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
1217
    return code;
wmmhello's avatar
wmmhello 已提交
1218
  }
wmmhello's avatar
wmmhello 已提交
1219

1220
  info->cost.insertBindTime = taosGetTimestampUs();
wmmhello's avatar
wmmhello 已提交
1221 1222
  code = smlInsertData(info);
  if (code != 0) {
X
Xiaoyu Wang 已提交
1223
    uError("SML:0x%" PRIx64 " smlInsertData error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
1224
    return code;
wmmhello's avatar
wmmhello 已提交
1225 1226 1227 1228 1229
  }

  return code;
}

wmmhello's avatar
wmmhello 已提交
1230 1231
TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, char *rawLineEnd,
                                       int numLines, int protocol, int precision, int32_t ttl, int64_t reqid) {
1232
  int32_t code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1233 1234 1235
  if (NULL == taos) {
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    return NULL;
1236
  }
1237

wmmhello's avatar
wmmhello 已提交
1238 1239 1240 1241
  SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
  if (request == NULL) {
    uError("SML:taos_schemaless_insert error request is null");
    return NULL;
wmmhello's avatar
wmmhello 已提交
1242
  }
1243

wmmhello's avatar
wmmhello 已提交
1244 1245 1246 1247
  SSmlHandle *info = smlBuildSmlInfo(taos);
  if (info == NULL) {
    request->code = TSDB_CODE_OUT_OF_MEMORY;
    uError("SML:taos_schemaless_insert error SSmlHandle is null");
1248
    return (TAOS_RES *)request;
wmmhello's avatar
wmmhello 已提交
1249
  }
wmmhello's avatar
wmmhello 已提交
1250 1251 1252 1253
  info->pRequest = request;
  info->isRawLine = rawLine != NULL;
  info->ttl       = ttl;
  info->precision = precision;
1254
  info->protocol = (TSDB_SML_PROTOCOL_TYPE)protocol;
wmmhello's avatar
wmmhello 已提交
1255 1256 1257
  info->msgBuf.buf = info->pRequest->msgBuf;
  info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
  info->lineNum = numLines;
wmmhello's avatar
wmmhello 已提交
1258

wmmhello's avatar
wmmhello 已提交
1259
  SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
wmmhello's avatar
wmmhello 已提交
1260 1261 1262
  if (request->pDb == NULL) {
    request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
wmmhello's avatar
wmmhello 已提交
1263 1264 1265
    goto end;
  }

X
Xiaoyu Wang 已提交
1266
  if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) {
1267
    request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
wmmhello's avatar
wmmhello 已提交
1268
    smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL);
1269
    goto end;
wmmhello's avatar
wmmhello 已提交
1270 1271
  }

X
Xiaoyu Wang 已提交
1272 1273
  if (protocol == TSDB_SML_LINE_PROTOCOL &&
      (precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)) {
1274
    request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
wmmhello's avatar
wmmhello 已提交
1275
    smlBuildInvalidDataMsg(&msg, "precision invalidate for line protocol", NULL);
1276 1277 1278
    goto end;
  }

1279
  if (protocol == TSDB_SML_JSON_PROTOCOL) {
wmmhello's avatar
wmmhello 已提交
1280
    numLines = 1;
1281
  } else if (numLines <= 0) {
wmmhello's avatar
wmmhello 已提交
1282 1283 1284 1285 1286
    request->code = TSDB_CODE_SML_INVALID_DATA;
    smlBuildInvalidDataMsg(&msg, "line num is invalid", NULL);
    goto end;
  }

1287
  code = smlProcess(info, lines, rawLine, rawLineEnd, numLines);
wmmhello's avatar
wmmhello 已提交
1288 1289 1290 1291
  request->code = code;
  info->cost.endTime = taosGetTimestampUs();
  info->cost.code = code;
  smlPrintStatisticInfo(info);
1292

wmmhello's avatar
wmmhello 已提交
1293
end:
1294
  uDebug("resultend:%s", request->msgBuf);
wmmhello's avatar
wmmhello 已提交
1295
  smlDestroyInfo(info);
1296
  return (TAOS_RES *)request;
wmmhello's avatar
wmmhello 已提交
1297
}
wmmhello's avatar
wmmhello 已提交
1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314

/**
 * taos_schemaless_insert() parse and insert data points into database according to
 * different protocol.
 *
 * @param $lines input array may contain multiple lines, each line indicates a data point.
 *               If protocol=2 is used input array should contain single JSON
 *               string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
 *               multiple data points in JSON format, should include them in $JSON_string
 *               as a JSON array.
 * @param $numLines indicates how many data points in $lines.
 *                  If protocol = 2 is used this param will be ignored as $lines should
 *                  contain single JSON string.
 * @param $protocol indicates which protocol to use for parsing:
 *                  0 - influxDB line protocol
 *                  1 - OpenTSDB telnet line protocol
 *                  2 - OpenTSDB JSON format protocol
dengyihao's avatar
dengyihao 已提交
1315
 * @return TAOS_RES
wmmhello's avatar
wmmhello 已提交
1316 1317
 */

1318 1319
TAOS_RES *taos_schemaless_insert_ttl_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision,
                                                int32_t ttl, int64_t reqid) {
wmmhello's avatar
wmmhello 已提交
1320
  return taos_schemaless_insert_inner(taos, lines, NULL, NULL, numLines, protocol, precision, ttl, reqid);
dengyihao's avatar
dengyihao 已提交
1321 1322
}

1323 1324 1325
TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, TSDB_DEFAULT_TABLE_TTL, 0);
}
wmmhello's avatar
wmmhello 已提交
1326

1327 1328 1329
TAOS_RES *taos_schemaless_insert_ttl(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int32_t ttl) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, ttl, 0);
}
wmmhello's avatar
wmmhello 已提交
1330

1331 1332
TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int64_t reqid) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, TSDB_DEFAULT_TABLE_TTL, reqid);
wmmhello's avatar
wmmhello 已提交
1333 1334
}

1335
TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
1336
                                                    int precision, int32_t ttl, int64_t reqid) {
dengyihao's avatar
dengyihao 已提交
1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348
  int numLines = 0;
  *totalRows = 0;
  char *tmp = lines;
  for (int i = 0; i < len; i++) {
    if (lines[i] == '\n' || i == len - 1) {
      numLines++;
      if (tmp[0] != '#' || protocol != TSDB_SML_LINE_PROTOCOL) {  // ignore comment
        (*totalRows)++;
      }
      tmp = lines + i + 1;
    }
  }
wmmhello's avatar
wmmhello 已提交
1349
  return taos_schemaless_insert_inner(taos, NULL, lines, lines + len, *totalRows, protocol, precision, ttl, reqid);
dengyihao's avatar
dengyihao 已提交
1350 1351
}

1352 1353 1354 1355 1356 1357
TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int64_t reqid) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, TSDB_DEFAULT_TABLE_TTL, reqid);
}
TAOS_RES *taos_schemaless_insert_raw_ttl(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int32_t ttl) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, ttl, 0);
}
wmmhello's avatar
wmmhello 已提交
1358

1359 1360
TAOS_RES *taos_schemaless_insert_raw(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, TSDB_DEFAULT_TABLE_TTL, 0);
wmmhello's avatar
wmmhello 已提交
1361
}