clientSml.c 100.8 KB
Newer Older
wmmhello's avatar
wmmhello 已提交
1 2 3 4 5
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

X
Xiaoyu Wang 已提交
6 7 8 9 10
#include "cJSON.h"
#include "catalog.h"
#include "clientInt.h"
#include "osSemaphore.h"
#include "osThread.h"
wmmhello's avatar
wmmhello 已提交
11 12
#include "query.h"
#include "taos.h"
wmmhello's avatar
wmmhello 已提交
13
#include "taoserror.h"
X
Xiaoyu Wang 已提交
14
#include "tcommon.h"
wmmhello's avatar
wmmhello 已提交
15
#include "tdef.h"
X
Xiaoyu Wang 已提交
16
#include "tglobal.h"
wmmhello's avatar
wmmhello 已提交
17 18
#include "tlog.h"
#include "tmsg.h"
X
Xiaoyu Wang 已提交
19
#include "tname.h"
wmmhello's avatar
wmmhello 已提交
20 21
#include "ttime.h"
#include "ttypes.h"
wmmhello's avatar
wmmhello 已提交
22

23 24 25 26 27 28 29 30 31 32 33 34 35
#if (defined(__GNUC__) && (__GNUC__ >= 3)) || (defined(__INTEL_COMPILER) && (__INTEL_COMPILER >= 800)) || defined(__clang__)
#  define expect(expr,value)    (__builtin_expect ((expr),(value)) )
#else
#  define expect(expr,value)    (expr)
#endif

#ifndef likely
#define likely(expr)     expect((expr) != 0, 1)
#endif
#ifndef unlikely
#define unlikely(expr)   expect((expr) != 0, 0)
#endif

wmmhello's avatar
wmmhello 已提交
36
//=================================================================================================
wmmhello's avatar
wmmhello 已提交
37 38 39 40 41 42 43

#define SPACE ' '
#define COMMA ','
#define EQUAL '='
#define QUOTE '"'
#define SLASH '\\'

dengyihao's avatar
dengyihao 已提交
44 45 46 47 48 49
#define JUMP_SPACE(sql, sqlEnd) \
  while (sql < sqlEnd) {        \
    if (*sql == SPACE)          \
      sql++;                    \
    else                        \
      break;                    \
X
Xiaoyu Wang 已提交
50
  }
wmmhello's avatar
wmmhello 已提交
51
// comma ,
wmmhello's avatar
wmmhello 已提交
52
//#define IS_SLASH_COMMA(sql) (*(sql) == COMMA && *((sql)-1) == SLASH)
X
Xiaoyu Wang 已提交
53
#define IS_COMMA(sql)       (*(sql) == COMMA && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
54
// space
wmmhello's avatar
wmmhello 已提交
55
//#define IS_SLASH_SPACE(sql) (*(sql) == SPACE && *((sql)-1) == SLASH)
X
Xiaoyu Wang 已提交
56
#define IS_SPACE(sql)       (*(sql) == SPACE && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
57
// equal =
wmmhello's avatar
wmmhello 已提交
58
//#define IS_SLASH_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) == SLASH)
X
Xiaoyu Wang 已提交
59
#define IS_EQUAL(sql)       (*(sql) == EQUAL && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
60
// quote "
wmmhello's avatar
wmmhello 已提交
61
//#define IS_SLASH_QUOTE(sql) (*(sql) == QUOTE && *((sql)-1) == SLASH)
X
Xiaoyu Wang 已提交
62
#define IS_QUOTE(sql)       (*(sql) == QUOTE && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
63
// SLASH
wmmhello's avatar
wmmhello 已提交
64
//#define IS_SLASH_SLASH(sql) (*(sql) == SLASH && *((sql)-1) == SLASH)
wmmhello's avatar
wmmhello 已提交
65

X
Xiaoyu Wang 已提交
66
#define IS_SLASH_LETTER(sql) \
wmmhello's avatar
wmmhello 已提交
67 68
  (*((sql)-1) == SLASH && (*(sql) == COMMA || *(sql) == SPACE || *(sql) == EQUAL || *(sql) == QUOTE || *(sql) == SLASH))                          \
//  (IS_SLASH_COMMA(sql) || IS_SLASH_SPACE(sql) || IS_SLASH_EQUAL(sql) || IS_SLASH_QUOTE(sql) || IS_SLASH_SLASH(sql))
wmmhello's avatar
wmmhello 已提交
69

X
Xiaoyu Wang 已提交
70
#define MOVE_FORWARD_ONE(sql, len) (memmove((void *)((sql)-1), (sql), len))
wmmhello's avatar
wmmhello 已提交
71

X
Xiaoyu Wang 已提交
72 73 74 75 76 77 78 79
#define PROCESS_SLASH(key, keyLen)           \
  for (int i = 1; i < keyLen; ++i) {         \
    if (IS_SLASH_LETTER(key + i)) {          \
      MOVE_FORWARD_ONE(key + i, keyLen - i); \
      i--;                                   \
      keyLen--;                              \
    }                                        \
  }
wmmhello's avatar
wmmhello 已提交
80

81 82 83
#define IS_INVALID_COL_LEN(len)   ((len) <= 0 || (len) >= TSDB_COL_NAME_LEN)
#define IS_INVALID_TABLE_LEN(len) ((len) <= 0 || (len) >= TSDB_TABLE_NAME_LEN)

84 85 86
#define OTD_JSON_SUB_FIELDS_NUM 2
#define OTD_JSON_FIELDS_NUM     4

X
Xiaoyu Wang 已提交
87 88 89 90
#define TS        "_ts"
#define TS_LEN    3
#define VALUE     "_value"
#define VALUE_LEN 6
91

92 93
#define JSON_METERS_NAME "__JM"

X
Xiaoyu Wang 已提交
94 95
#define BINARY_ADD_LEN 2  // "binary"   2 means " "
#define NCHAR_ADD_LEN  3  // L"nchar"   3 means L" "
wmmhello's avatar
wmmhello 已提交
96 97

#define MAX_RETRY_TIMES 5
wmmhello's avatar
wmmhello 已提交
98 99 100 101
//=================================================================================================
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;

typedef enum {
102
  SCHEMA_ACTION_NULL,
wmmhello's avatar
wmmhello 已提交
103 104 105 106 107
  SCHEMA_ACTION_CREATE_STABLE,
  SCHEMA_ACTION_ADD_COLUMN,
  SCHEMA_ACTION_ADD_TAG,
  SCHEMA_ACTION_CHANGE_COLUMN_SIZE,
  SCHEMA_ACTION_CHANGE_TAG_SIZE,
wmmhello's avatar
wmmhello 已提交
108 109
} ESchemaAction;

wmmhello's avatar
wmmhello 已提交
110 111 112 113 114 115 116 117 118 119 120 121 122
/*********************** list start *********************************/
typedef struct {
  const void  *key;
  int32_t      keyLen;
  void        *value;
  bool         used;
}Node;

typedef struct NodeList{
  Node             data;
  struct NodeList* next;
}NodeList;

123 124 125
typedef int32_t (*_equal_fn_sml)(const void *, const void *);

static void* nodeListGet(NodeList* list, const void *key, int32_t len, _equal_fn_sml fn){
wmmhello's avatar
wmmhello 已提交
126 127
  NodeList *tmp = list;
  while(tmp){
128 129 130 131 132 133 134 135
    if(fn == NULL){
      if(tmp->data.used && tmp->data.keyLen == len && memcmp(tmp->data.key, key, len) == 0) {
        return tmp->data.value;
      }
    }else{
      if(tmp->data.used && fn(tmp->data.key, key) == 0) {
        return tmp->data.value;
      }
wmmhello's avatar
wmmhello 已提交
136
    }
137

wmmhello's avatar
wmmhello 已提交
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
    tmp = tmp->next;
  }
  return NULL;
}

static int nodeListSet(NodeList** list, const void *key, int32_t len, void* value){
  NodeList *tmp = *list;
  while (tmp){
    if(!tmp->data.used) break;
    if(tmp->data.keyLen == len && memcmp(tmp->data.key, key, len) == 0) {
      return -1;
    }
    tmp = tmp->next;
  }
  if(tmp){
    tmp->data.key = key;
    tmp->data.keyLen = len;
    tmp->data.value = value;
    tmp->data.used = true;
  }else{
    NodeList *newNode = taosMemoryCalloc(1, sizeof(NodeList));
    if(newNode == NULL){
      return -1;
    }
    newNode->data.key = key;
    newNode->data.keyLen = len;
    newNode->data.value = value;
    newNode->data.used = true;
    newNode->next = *list;
    *list = newNode;
  }
  return 0;
}

static int nodeListSize(NodeList* list){
  int cnt = 0;
  while(list){
    if(list->data.used) cnt++;
    else break;
    list = list->next;
  }
  return cnt;
}
/*********************** list end *********************************/

wmmhello's avatar
wmmhello 已提交
183
typedef struct {
184 185 186 187
  char *measure;
  char *tags;
  char *cols;
  char *timestamp;
wmmhello's avatar
wmmhello 已提交
188 189 190 191 192 193

  int32_t measureLen;
  int32_t measureTagsLen;
  int32_t tagsLen;
  int32_t colsLen;
  int32_t timestampLen;
194 195

  SArray *colArray;
wmmhello's avatar
wmmhello 已提交
196 197 198
} SSmlLineInfo;

typedef struct {
X
Xiaoyu Wang 已提交
199 200 201 202
  const char *sTableName;  // super table name
  int32_t     sTableNameLen;
  char        childTableName[TSDB_TABLE_NAME_LEN];
  uint64_t    uid;
wmmhello's avatar
wmmhello 已提交
203

X
Xiaoyu Wang 已提交
204
  SArray *tags;
wmmhello's avatar
wmmhello 已提交
205

206
  // elements are SHashObj<cols key string, SSmlKv*> for find by key quickly
X
Xiaoyu Wang 已提交
207
  SArray *cols;
208
  STableDataCxt *tableDataCtx;
wmmhello's avatar
wmmhello 已提交
209 210 211
} SSmlTableInfo;

typedef struct {
X
Xiaoyu Wang 已提交
212 213
  SArray   *tags;     // save the origin order to create table
  SHashObj *tagHash;  // elements are <key, index in tags>
214

X
Xiaoyu Wang 已提交
215 216
  SArray   *cols;
  SHashObj *colHash;
217

wmmhello's avatar
wmmhello 已提交
218 219 220 221
  STableMeta *tableMeta;
} SSmlSTableMeta;

typedef struct {
X
Xiaoyu Wang 已提交
222 223
  int32_t len;
  char   *buf;
wmmhello's avatar
wmmhello 已提交
224 225
} SSmlMsgBuf;

226 227 228 229 230 231 232
typedef struct {
  int32_t code;
  int32_t lineNum;

  int32_t numOfSTables;
  int32_t numOfCTables;
  int32_t numOfCreateSTables;
233 234
  int32_t numOfAlterColSTables;
  int32_t numOfAlterTagSTables;
235 236 237 238 239 240 241 242

  int64_t parseTime;
  int64_t schemaTime;
  int64_t insertBindTime;
  int64_t insertRpcTime;
  int64_t endTime;
} SSmlCostInfo;

wmmhello's avatar
wmmhello 已提交
243
typedef struct {
X
Xiaoyu Wang 已提交
244 245 246 247
  int64_t id;

  SMLProtocolType protocol;
  int8_t          precision;
248
  bool            reRun;
249 250 251
  bool            dataFormat;  // true means that the name and order of keys in each line are the same(only for influx protocol)
  bool            isRawLine;
  int32_t         ttl;
X
Xiaoyu Wang 已提交
252

wmmhello's avatar
wmmhello 已提交
253 254
  NodeList *childTables;
  NodeList *superTables;
X
Xiaoyu Wang 已提交
255 256 257 258 259 260 261 262
  SHashObj *pVgHash;

  STscObj     *taos;
  SCatalog    *pCatalog;
  SRequestObj *pRequest;
  SQuery      *pQuery;

  SSmlCostInfo cost;
wmmhello's avatar
wmmhello 已提交
263
  int32_t      lineNum;
X
Xiaoyu Wang 已提交
264
  SSmlMsgBuf   msgBuf;
wmmhello's avatar
wmmhello 已提交
265 266

  cJSON       *root;  // for parse json
wmmhello's avatar
wmmhello 已提交
267
  SSmlLineInfo      *lines; // element is SSmlLineInfo
268 269 270 271 272 273 274 275

  //
  SArray      *preLineTagKV;
  SArray      *preLineColKV;

  SSmlLineInfo preLine;
  STableMeta  *currSTableMeta;
  STableDataCxt *currTableDataCtx;
wmmhello's avatar
wmmhello 已提交
276
  bool         needModifySchema;
wmmhello's avatar
wmmhello 已提交
277
} SSmlHandle;
wmmhello's avatar
wmmhello 已提交
278 279
//=================================================================================================

wmmhello's avatar
wmmhello 已提交
280
//=================================================================================================
281
static volatile int64_t linesSmlHandleId = 0;
X
Xiaoyu Wang 已提交
282
static int64_t          smlGenId() {
283
  int64_t id;
wmmhello's avatar
wmmhello 已提交
284

285 286
  do {
    id = atomic_add_fetch_64(&linesSmlHandleId, 1);
wmmhello's avatar
wmmhello 已提交
287 288
  } while (id == 0);

289
  return id;
wmmhello's avatar
wmmhello 已提交
290 291
}

292
static inline bool smlDoubleToInt64OverFlow(double num) {
X
Xiaoyu Wang 已提交
293
  if (num >= (double)INT64_MAX || num <= (double)INT64_MIN) return true;
294 295 296 297 298 299 300 301 302 303 304 305
  return false;
}

static inline bool smlCheckDuplicateKey(const char *key, int32_t keyLen, SHashObj *pHash) {
  void *val = taosHashGet(pHash, key, keyLen);
  if (val) {
    return true;
  }
  taosHashPut(pHash, key, keyLen, key, 1);
  return false;
}

X
Xiaoyu Wang 已提交
306
static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf *pBuf, const char *msg1, const char *msg2) {
307
  if (pBuf->buf) {
308 309 310 311 312 313 314
    memset(pBuf->buf, 0, pBuf->len);
    if (msg1) strncat(pBuf->buf, msg1, pBuf->len);
    int32_t left = pBuf->len - strlen(pBuf->buf);
    if (left > 2 && msg2) {
      strncat(pBuf->buf, ":", left - 1);
      strncat(pBuf->buf, msg2, left - 2);
    }
wmmhello's avatar
wmmhello 已提交
315
  }
wmmhello's avatar
wmmhello 已提交
316 317 318
  return TSDB_CODE_SML_INVALID_DATA;
}

X
Xiaoyu Wang 已提交
319
static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSmlKv *kv, bool isTag,
320
                                       ESchemaAction *action, SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
321
  uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL;
322 323
  if (index) {
    if (colField[*index].type != kv->type) {
X
Xiaoyu Wang 已提交
324 325
      uError("SML:0x%" PRIx64 " point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id,
             kv->key, colField[*index].type, kv->type);
326 327 328
      return TSDB_CODE_TSC_INVALID_VALUE;
    }

X
Xiaoyu Wang 已提交
329 330 331 332
    if ((colField[*index].type == TSDB_DATA_TYPE_VARCHAR &&
         (colField[*index].bytes - VARSTR_HEADER_SIZE) < kv->length) ||
        (colField[*index].type == TSDB_DATA_TYPE_NCHAR &&
         ((colField[*index].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE < kv->length))) {
333
      if (isTag) {
wmmhello's avatar
wmmhello 已提交
334
        *action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
335
      } else {
wmmhello's avatar
wmmhello 已提交
336
        *action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
337 338 339 340
      }
    }
  } else {
    if (isTag) {
wmmhello's avatar
wmmhello 已提交
341
      *action = SCHEMA_ACTION_ADD_TAG;
342
    } else {
wmmhello's avatar
wmmhello 已提交
343
      *action = SCHEMA_ACTION_ADD_COLUMN;
344 345
    }
  }
wmmhello's avatar
wmmhello 已提交
346 347 348
  return 0;
}

wmmhello's avatar
wmmhello 已提交
349
static int32_t smlFindNearestPowerOf2(int32_t length, uint8_t type) {
wmmhello's avatar
wmmhello 已提交
350
  int32_t result = 1;
X
Xiaoyu Wang 已提交
351
  while (result <= length) {
wmmhello's avatar
wmmhello 已提交
352 353
    result *= 2;
  }
354
  if (type == TSDB_DATA_TYPE_BINARY && result > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
wmmhello's avatar
wmmhello 已提交
355
    result = TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE;
356
  } else if (type == TSDB_DATA_TYPE_NCHAR && result > (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
wmmhello's avatar
wmmhello 已提交
357 358
    result = (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
  }
wmmhello's avatar
wmmhello 已提交
359

360
  if (type == TSDB_DATA_TYPE_NCHAR) {
361
    result = result * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
362
  } else if (type == TSDB_DATA_TYPE_BINARY) {
363
    result = result + VARSTR_HEADER_SIZE;
wmmhello's avatar
wmmhello 已提交
364
  }
365
  return result;
wmmhello's avatar
wmmhello 已提交
366 367
}

X
Xiaoyu Wang 已提交
368
static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
369
                                      ESchemaAction *action, bool isTag) {
370 371
  int32_t code = TSDB_CODE_SUCCESS;
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
372
    if (j == 0 && !isTag) continue;
373
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j);
374
    code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, info);
X
Xiaoyu Wang 已提交
375
    if (code != TSDB_CODE_SUCCESS) {
376 377 378 379 380 381
      return code;
    }
  }
  return TSDB_CODE_SUCCESS;
}

382
static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool isTag) {
383
  SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
384 385
  int32_t   i = 0;
  for (; i < length; i++) {
386 387 388
    taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES);
  }

389
  if (isTag) {
390 391 392 393 394
    i = 0;
  } else {
    i = 1;
  }
  for (; i < taosArrayGetSize(cols); i++) {
395
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
X
Xiaoyu Wang 已提交
396
    if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) {
wmmhello's avatar
wmmhello 已提交
397
      taosHashCleanup(hashTmp);
398 399 400
      return -1;
    }
  }
401
  taosHashCleanup(hashTmp);
402 403 404
  return 0;
}

405
static int32_t getBytes(uint8_t type, int32_t length) {
406 407 408 409 410 411 412
  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
    return smlFindNearestPowerOf2(length, type);
  } else {
    return tDataTypes[type].bytes;
  }
}

413 414
static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
                                  SArray *results, int32_t numOfCols, bool isTag) {
wmmhello's avatar
wmmhello 已提交
415
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
416
    SSmlKv       *kv = (SSmlKv *)taosArrayGet(cols, j);
wmmhello's avatar
wmmhello 已提交
417 418
    ESchemaAction action = SCHEMA_ACTION_NULL;
    smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, &action, info);
419
    if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_ADD_TAG) {
wmmhello's avatar
wmmhello 已提交
420 421 422 423 424
      SField field = {0};
      field.type = kv->type;
      field.bytes = getBytes(kv->type, kv->length);
      memcpy(field.name, kv->key, kv->keyLen);
      taosArrayPush(results, &field);
425
    } else if (action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
wmmhello's avatar
wmmhello 已提交
426
      uint16_t *index = (uint16_t *)taosHashGet(schemaHash, kv->key, kv->keyLen);
427 428
      uint16_t  newIndex = *index;
      if (isTag) newIndex -= numOfCols;
wmmhello's avatar
wmmhello 已提交
429 430 431 432 433 434 435
      SField *field = (SField *)taosArrayGet(results, newIndex);
      field->bytes = getBytes(kv->type, kv->length);
    }
  }
  return TSDB_CODE_SUCCESS;
}

436 437 438 439 440
// static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SSmlSTableMeta *sTableData,
//                               int32_t colVer, int32_t tagVer, int8_t source, uint64_t suid){
static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, SArray *pTags, STableMeta *pTableMeta,
                              ESchemaAction action) {
  SRequestObj   *pRequest = NULL;
441 442 443 444
  SMCreateStbReq pReq = {0};
  int32_t        code = TSDB_CODE_SUCCESS;
  SCmdMsgInfo    pCmdMsg = {0};

wmmhello's avatar
wmmhello 已提交
445 446 447 448 449 450
  // put front for free
  pReq.numOfColumns = taosArrayGetSize(pColumns);
  pReq.pColumns = pColumns;
  pReq.numOfTags = taosArrayGetSize(pTags);
  pReq.pTags = pTags;

dengyihao's avatar
dengyihao 已提交
451
  code = buildRequest(info->taos->id, "", 0, NULL, false, &pRequest, 0);
452 453 454 455
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
456
  pRequest->syncQuery = true;
457 458 459 460 461
  if (!pRequest->pDb) {
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }

462
  if (action == SCHEMA_ACTION_CREATE_STABLE) {
wmmhello's avatar
wmmhello 已提交
463 464 465 466
    pReq.colVer = 1;
    pReq.tagVer = 1;
    pReq.suid = 0;
    pReq.source = TD_REQ_FROM_APP;
467
  } else if (action == SCHEMA_ACTION_ADD_TAG || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
wmmhello's avatar
wmmhello 已提交
468 469 470 471
    pReq.colVer = pTableMeta->sversion;
    pReq.tagVer = pTableMeta->tversion + 1;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
472
  } else if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE) {
wmmhello's avatar
wmmhello 已提交
473 474 475 476 477 478
    pReq.colVer = pTableMeta->sversion + 1;
    pReq.tagVer = pTableMeta->tversion;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
  }

479
  if (pReq.numOfTags == 0) {
wmmhello's avatar
wmmhello 已提交
480 481 482 483 484 485 486 487
    pReq.numOfTags = 1;
    SField field = {0};
    field.type = TSDB_DATA_TYPE_NCHAR;
    field.bytes = 1;
    strcpy(field.name, tsSmlTagName);
    taosArrayPush(pReq.pTags, &field);
  }

488 489 490 491 492 493 494 495 496 497 498 499 500 501
  pReq.commentLen = -1;
  pReq.igExists = true;
  tNameExtractFullName(pName, pReq.name);

  pCmdMsg.epSet = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

D
dapan1121 已提交
502 503
  SQuery pQuery;
  memset(&pQuery, 0, sizeof(pQuery));
504 505 506 507 508 509 510
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);

511
  if (pRequest->code == TSDB_CODE_SUCCESS) {
512 513 514 515 516
    catalogRemoveTableMeta(info->pCatalog, pName);
  }
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

517
  end:
518 519 520 521 522
  destroyRequest(pRequest);
  tFreeSMCreateStbReq(&pReq);
  return code;
}

X
Xiaoyu Wang 已提交
523
static int32_t smlModifyDBSchemas(SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
524 525 526
  if(info->dataFormat && !info->needModifySchema){
    return TSDB_CODE_SUCCESS;
  }
527 528 529
  int32_t     code = 0;
  SHashObj   *hashTmp = NULL;
  STableMeta *pTableMeta = NULL;
wmmhello's avatar
wmmhello 已提交
530

531
  SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
wmmhello's avatar
wmmhello 已提交
532
  tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
wmmhello's avatar
wmmhello 已提交
533

D
dapan1121 已提交
534 535 536 537 538
  SRequestConnInfo conn = {0};
  conn.pTrans = info->taos->pAppInfo->pTransporter;
  conn.requestId = info->pRequest->requestId;
  conn.requestObjRefId = info->pRequest->self;
  conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
X
Xiaoyu Wang 已提交
539

wmmhello's avatar
wmmhello 已提交
540 541 542
  NodeList *tmp = info->superTables;
  while (tmp) {
    SSmlSTableMeta *sTableData = tmp->data.value;
X
Xiaoyu Wang 已提交
543
    bool            needCheckMeta = false;  // for multi thread
wmmhello's avatar
wmmhello 已提交
544

wmmhello's avatar
wmmhello 已提交
545 546
    size_t superTableLen = (size_t)tmp->data.keyLen;
    const void  *superTable = tmp->data.key;
547
    memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
wmmhello's avatar
wmmhello 已提交
548
    memcpy(pName.tname, superTable, superTableLen);
wmmhello's avatar
wmmhello 已提交
549

D
dapan1121 已提交
550
    code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
wmmhello's avatar
wmmhello 已提交
551

552
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
553 554
      SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
      SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
wmmhello's avatar
wmmhello 已提交
555 556 557 558
      smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
      smlBuildFieldsList(info, NULL, NULL, sTableData->cols, pColumns, 0, false);

      code = smlSendMetaMsg(info, &pName, pColumns, pTags, NULL, SCHEMA_ACTION_CREATE_STABLE);
559
      if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
560
        uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
561
        goto end;
wmmhello's avatar
wmmhello 已提交
562
      }
563
      info->cost.numOfCreateSTables++;
wmmhello's avatar
wmmhello 已提交
564 565 566 567 568 569 570
      taosMemoryFreeClear(pTableMeta);

      code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
        goto end;
      }
X
Xiaoyu Wang 已提交
571
    } else if (code == TSDB_CODE_SUCCESS) {
572 573
      hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true,
                             HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
574 575
      for (uint16_t i = pTableMeta->tableInfo.numOfColumns;
           i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
576 577
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
      }
wmmhello's avatar
wmmhello 已提交
578

579 580
      ESchemaAction action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, &action, true);
581
      if (code != TSDB_CODE_SUCCESS) {
582
        goto end;
583
      }
584 585 586 587 588
      if (action != SCHEMA_ACTION_NULL) {
        SArray *pColumns =
            taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray *pTags =
            taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
wmmhello's avatar
wmmhello 已提交
589 590 591 592 593 594

        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
595
          if (i < pTableMeta->tableInfo.numOfColumns) {
wmmhello's avatar
wmmhello 已提交
596
            taosArrayPush(pColumns, &field);
597
          } else {
wmmhello's avatar
wmmhello 已提交
598 599 600
            taosArrayPush(pTags, &field);
          }
        }
601 602
        smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->tags, pTags,
                           pTableMeta->tableInfo.numOfColumns, true);
wmmhello's avatar
wmmhello 已提交
603 604

        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
605
        if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
606
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
607 608 609
          goto end;
        }

610
        info->cost.numOfAlterTagSTables++;
wmmhello's avatar
wmmhello 已提交
611 612 613 614 615 616 617 618 619
        taosMemoryFreeClear(pTableMeta);
        code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
        }
        code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
        }
wmmhello's avatar
wmmhello 已提交
620
      }
621 622

      taosHashClear(hashTmp);
wmmhello's avatar
wmmhello 已提交
623
      for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
624 625
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
      }
626 627
      action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &action, false);
628
      if (code != TSDB_CODE_SUCCESS) {
629
        goto end;
wmmhello's avatar
wmmhello 已提交
630
      }
631 632 633 634 635
      if (action != SCHEMA_ACTION_NULL) {
        SArray *pColumns =
            taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray *pTags =
            taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
wmmhello's avatar
wmmhello 已提交
636 637 638 639 640 641

        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
642
          if (i < pTableMeta->tableInfo.numOfColumns) {
wmmhello's avatar
wmmhello 已提交
643
            taosArrayPush(pColumns, &field);
644
          } else {
wmmhello's avatar
wmmhello 已提交
645 646 647 648
            taosArrayPush(pTags, &field);
          }
        }

649 650
        smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->cols, pColumns,
                           pTableMeta->tableInfo.numOfColumns, false);
wmmhello's avatar
wmmhello 已提交
651 652

        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
653
        if (code != TSDB_CODE_SUCCESS) {
654
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
655 656
          goto end;
        }
wmmhello's avatar
wmmhello 已提交
657

658
        info->cost.numOfAlterColSTables++;
wmmhello's avatar
wmmhello 已提交
659
        taosMemoryFreeClear(pTableMeta);
wmmhello's avatar
wmmhello 已提交
660 661 662 663
        code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
        }
664 665 666 667 668
        code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
          goto end;
        }
wmmhello's avatar
wmmhello 已提交
669
      }
wmmhello's avatar
wmmhello 已提交
670

671
      needCheckMeta = true;
wmmhello's avatar
wmmhello 已提交
672 673
      taosHashCleanup(hashTmp);
      hashTmp = NULL;
wmmhello's avatar
wmmhello 已提交
674
    } else {
X
Xiaoyu Wang 已提交
675
      uError("SML:0x%" PRIx64 " load table meta error: %s", info->id, tstrerror(code));
676
      goto end;
wmmhello's avatar
wmmhello 已提交
677
    }
678

X
Xiaoyu Wang 已提交
679 680
    if (needCheckMeta) {
      code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags,
681
                          sTableData->tags, true);
682
      if (code != TSDB_CODE_SUCCESS) {
683
        uError("SML:0x%" PRIx64 " check tag failed. super table name %s", info->id, pName.tname);
684 685
        goto end;
      }
686
      code = smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols, false);
687
      if (code != TSDB_CODE_SUCCESS) {
688
        uError("SML:0x%" PRIx64 " check cols failed. super table name %s", info->id, pName.tname);
689 690 691 692
        goto end;
      }
    }

693
    sTableData->tableMeta = pTableMeta;
wmmhello's avatar
wmmhello 已提交
694

wmmhello's avatar
wmmhello 已提交
695
    tmp = tmp->next;
wmmhello's avatar
wmmhello 已提交
696 697
  }
  return 0;
698

699
  end:
wmmhello's avatar
wmmhello 已提交
700 701
  taosHashCleanup(hashTmp);
  taosMemoryFreeClear(pTableMeta);
wmmhello's avatar
wmmhello 已提交
702
//  catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
703
  return code;
wmmhello's avatar
wmmhello 已提交
704 705
}

706
/******************************* parse basic type function **********************/
X
Xiaoyu Wang 已提交
707
static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
708
  const char *pVal = kvVal->value;
X
Xiaoyu Wang 已提交
709 710 711 712
  int32_t     len = kvVal->length;
  char       *endptr = NULL;
  double      result = taosStr2Double(pVal, &endptr);
  if (pVal == endptr) {
713
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
714 715 716
    return false;
  }

717
  int32_t left = len - (endptr - pVal);
X
Xiaoyu Wang 已提交
718
  if (left == 0 || (left == 3 && strncasecmp(endptr, "f64", left) == 0)) {
719 720
    kvVal->type = TSDB_DATA_TYPE_DOUBLE;
    kvVal->d = result;
X
Xiaoyu Wang 已提交
721 722
  } else if ((left == 3 && strncasecmp(endptr, "f32", left) == 0)) {
    if (!IS_VALID_FLOAT(result)) {
723 724
      smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
725
    }
726 727
    kvVal->type = TSDB_DATA_TYPE_FLOAT;
    kvVal->f = (float)result;
X
Xiaoyu Wang 已提交
728 729
  } else if ((left == 1 && *endptr == 'i') || (left == 3 && strncasecmp(endptr, "i64", left) == 0)) {
    if (smlDoubleToInt64OverFlow(result)) {
wmmhello's avatar
wmmhello 已提交
730 731
      errno = 0;
      int64_t tmp = taosStr2Int64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
732
      if (errno == ERANGE) {
wmmhello's avatar
wmmhello 已提交
733 734 735 736 737 738
        smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_BIGINT;
      kvVal->i = tmp;
      return true;
wmmhello's avatar
wmmhello 已提交
739
    }
740
    kvVal->type = TSDB_DATA_TYPE_BIGINT;
wmmhello's avatar
wmmhello 已提交
741
    kvVal->i = (int64_t)result;
wmmhello's avatar
wmmhello 已提交
742
  } else if ((left == 1 && *endptr == 'u') || (left == 3 && strncasecmp(endptr, "u64", left) == 0)) {
X
Xiaoyu Wang 已提交
743
    if (result >= (double)UINT64_MAX || result < 0) {
wmmhello's avatar
wmmhello 已提交
744 745
      errno = 0;
      uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
746
      if (errno == ERANGE || result < 0) {
wmmhello's avatar
wmmhello 已提交
747 748 749 750 751 752
        smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_UBIGINT;
      kvVal->u = tmp;
      return true;
753
    }
754
    kvVal->type = TSDB_DATA_TYPE_UBIGINT;
wmmhello's avatar
wmmhello 已提交
755
    kvVal->u = result;
X
Xiaoyu Wang 已提交
756 757
  } else if (left == 3 && strncasecmp(endptr, "i32", left) == 0) {
    if (!IS_VALID_INT(result)) {
758 759
      smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
760
    }
761 762
    kvVal->type = TSDB_DATA_TYPE_INT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
763 764
  } else if (left == 3 && strncasecmp(endptr, "u32", left) == 0) {
    if (!IS_VALID_UINT(result)) {
765 766
      smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
767
    }
768 769
    kvVal->type = TSDB_DATA_TYPE_UINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
770 771
  } else if (left == 3 && strncasecmp(endptr, "i16", left) == 0) {
    if (!IS_VALID_SMALLINT(result)) {
772 773
      smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
774
    }
775 776
    kvVal->type = TSDB_DATA_TYPE_SMALLINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
777 778
  } else if (left == 3 && strncasecmp(endptr, "u16", left) == 0) {
    if (!IS_VALID_USMALLINT(result)) {
779 780
      smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
781
    }
782 783
    kvVal->type = TSDB_DATA_TYPE_USMALLINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
784 785
  } else if (left == 2 && strncasecmp(endptr, "i8", left) == 0) {
    if (!IS_VALID_TINYINT(result)) {
786 787
      smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
788
    }
789 790
    kvVal->type = TSDB_DATA_TYPE_TINYINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
791 792
  } else if (left == 2 && strncasecmp(endptr, "u8", left) == 0) {
    if (!IS_VALID_UTINYINT(result)) {
793 794
      smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
795
    }
796 797
    kvVal->type = TSDB_DATA_TYPE_UTINYINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
798
  } else {
799
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
800 801
    return false;
  }
802
  return true;
wmmhello's avatar
wmmhello 已提交
803 804
}

wmmhello's avatar
wmmhello 已提交
805
static bool smlParseBool(SSmlKv *kvVal) {
wmmhello's avatar
wmmhello 已提交
806
  const char *pVal = kvVal->value;
X
Xiaoyu Wang 已提交
807
  int32_t     len = kvVal->length;
808
  if ((len == 1) && (pVal[0] == 't' || pVal[0] == 'T')) {
809
    kvVal->i = TSDB_TRUE;
wmmhello's avatar
wmmhello 已提交
810 811 812
    return true;
  }

813
  if ((len == 1) && (pVal[0] == 'f' || pVal[0] == 'F')) {
814
    kvVal->i = TSDB_FALSE;
wmmhello's avatar
wmmhello 已提交
815 816 817
    return true;
  }

X
Xiaoyu Wang 已提交
818
  if ((len == 4) && !strncasecmp(pVal, "true", len)) {
819
    kvVal->i = TSDB_TRUE;
wmmhello's avatar
wmmhello 已提交
820 821
    return true;
  }
X
Xiaoyu Wang 已提交
822
  if ((len == 5) && !strncasecmp(pVal, "false", len)) {
823
    kvVal->i = TSDB_FALSE;
wmmhello's avatar
wmmhello 已提交
824 825 826 827 828
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
829
static bool smlIsBinary(const char *pVal, uint16_t len) {
X
Xiaoyu Wang 已提交
830
  // binary: "abc"
wmmhello's avatar
wmmhello 已提交
831 832 833 834 835 836 837 838 839
  if (len < 2) {
    return false;
  }
  if (pVal[0] == '"' && pVal[len - 1] == '"') {
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
840
static bool smlIsNchar(const char *pVal, uint16_t len) {
X
Xiaoyu Wang 已提交
841
  // nchar: L"abc"
wmmhello's avatar
wmmhello 已提交
842 843 844
  if (len < 3) {
    return false;
  }
wmmhello's avatar
wmmhello 已提交
845
  if (pVal[1] == '"' && pVal[len - 1] == '"' && (pVal[0] == 'l' || pVal[0] == 'L')) {
wmmhello's avatar
wmmhello 已提交
846 847 848 849
    return true;
  }
  return false;
}
850
/******************************* parse basic type function end **********************/
wmmhello's avatar
wmmhello 已提交
851

852
/******************************* time function **********************/
853
static uint8_t smlPrecisionConvert[7] = {TSDB_TIME_PRECISION_NANO, TSDB_TIME_PRECISION_HOURS, TSDB_TIME_PRECISION_MINUTES,
wmmhello's avatar
wmmhello 已提交
854 855
                                     TSDB_TIME_PRECISION_SECONDS, TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_MICRO,
                                     TSDB_TIME_PRECISION_NANO};
856 857 858
static int64_t smlFactorNS[3] = {NANOSECOND_PER_MSEC, NANOSECOND_PER_USEC, 1};
static int64_t smlFactorS[3] = {1000LL, 1000000LL, 1000000000LL};
static int64_t smlToMilli[3] = {3600LL, 60LL, 1LL};
wmmhello's avatar
wmmhello 已提交
859

860
static int64_t smlGetTimeValue(const char *value, int32_t len, uint8_t fromPrecision, uint8_t toPrecision) {
X
Xiaoyu Wang 已提交
861
  char   *endPtr = NULL;
wafwerar's avatar
wafwerar 已提交
862
  int64_t tsInt64 = taosStr2Int64(value, &endPtr, 10);
863
  if (unlikely(value + len != endPtr)) {
864
    return -1;
wmmhello's avatar
wmmhello 已提交
865
  }
866 867 868 869 870 871 872

  if(unlikely(fromPrecision >= TSDB_TIME_PRECISION_HOURS)){
    int64_t unit = smlToMilli[fromPrecision - TSDB_TIME_PRECISION_HOURS];
    if(unit > INT64_MAX / tsInt64){
      return -1;
    }
    tsInt64 *= unit;
873
    fromPrecision = TSDB_TIME_PRECISION_MILLI;
wmmhello's avatar
wmmhello 已提交
874 875
  }

876
  return convertTimePrecision(tsInt64, fromPrecision, toPrecision);
877 878 879 880 881 882
}

static int8_t smlGetTsTypeByLen(int32_t len) {
  if (len == TSDB_TIME_PRECISION_SEC_DIGITS) {
    return TSDB_TIME_PRECISION_SECONDS;
  } else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) {
883
    return TSDB_TIME_PRECISION_MILLI;
884 885
  } else {
    return -1;
wmmhello's avatar
wmmhello 已提交
886
  }
887 888
}

X
Xiaoyu Wang 已提交
889
static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t len) {
890
  uint8_t toPrecision = info->currSTableMeta ? info->currSTableMeta->tableInfo.precision : TSDB_TIME_PRECISION_NANO;
wmmhello's avatar
wmmhello 已提交
891

892 893
  if(unlikely(len == 0 || (len == 1 && data[0] == '0'))){
    return taosGetTimestampNs()/smlFactorNS[toPrecision];
wmmhello's avatar
wmmhello 已提交
894
  }
895

896 897 898 899
  uint8_t fromPrecision = smlPrecisionConvert[info->precision];

  int64_t ts = smlGetTimeValue(data, len, fromPrecision, toPrecision);
  if (unlikely(ts == -1)) {
900 901
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
    return -1;
wmmhello's avatar
wmmhello 已提交
902
  }
903 904 905
  return ts;
}

X
Xiaoyu Wang 已提交
906
static int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t len) {
907 908 909
  uint8_t toPrecision = info->currSTableMeta ? info->currSTableMeta->tableInfo.precision : TSDB_TIME_PRECISION_NANO;

  if (unlikely(!data)) {
910 911
    smlBuildInvalidDataMsg(&info->msgBuf, "timestamp can not be null", NULL);
    return -1;
wmmhello's avatar
wmmhello 已提交
912
  }
913 914
  if (unlikely(len == 1 && data[0] == '0')) {
    return taosGetTimestampNs()/smlFactorNS[toPrecision];
915
  }
916 917
  uint8_t fromPrecision = smlGetTsTypeByLen(len);
  if (unlikely(fromPrecision == -1)) {
X
Xiaoyu Wang 已提交
918 919
    smlBuildInvalidDataMsg(&info->msgBuf,
                           "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", data);
920 921
    return -1;
  }
922 923
  int64_t ts = smlGetTimeValue(data, len, fromPrecision, toPrecision);
  if (unlikely(ts == -1)) {
924 925 926 927 928 929
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
    return -1;
  }
  return ts;
}

930
static int64_t smlParseTS(SSmlHandle *info, const char *data, int32_t len) {
931
  int64_t ts = 0;
X
Xiaoyu Wang 已提交
932
  if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
933
    //    uError("SML:data:%s,len:%d", data, len);
934
    ts = smlParseInfluxTime(info, data, len);
X
Xiaoyu Wang 已提交
935
  } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
936
    ts = smlParseOpenTsdbTime(info, data, len);
X
Xiaoyu Wang 已提交
937
  } else {
938
    ASSERT(0);
939
  }
wmmhello's avatar
wmmhello 已提交
940
  uDebug("SML:0x%" PRIx64 " smlParseTS:%" PRId64, info->id, ts);
941

942
  return ts;
943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013
}
/******************************* time function end **********************/

/******************************* Sml struct related function **********************/
static SSmlTableInfo *smlBuildTableInfo(int numRows, const char* measure, int32_t measureLen) {
  SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
  if (!tag) {
    return NULL;
  }

  tag->sTableName = measure;
  tag->sTableNameLen = measureLen;

  tag->cols = taosArrayInit(numRows, POINTER_BYTES);
  if (tag->cols == NULL) {
    uError("SML:smlBuildTableInfo failed to allocate memory");
    goto cleanup;
  }

  tag->tags = taosArrayInit(16, sizeof(SSmlKv));
  if (tag->tags == NULL) {
    uError("SML:smlBuildTableInfo failed to allocate memory");
    goto cleanup;
  }
  return tag;

  cleanup:
  taosMemoryFree(tag);
  return NULL;
}

static int32_t smlCheckDupUnit(SHashObj *dumplicateKey, SArray *tags, SSmlMsgBuf *msg){
  for(int i = 0; i < taosArrayGetSize(tags); i++) {
    SSmlKv *tag = taosArrayGet(tags, i);
    if (smlCheckDuplicateKey(tag->key, tag->keyLen, dumplicateKey)) {
      smlBuildInvalidDataMsg(msg, "dumplicate key", tag->key);
      return TSDB_CODE_TSC_DUP_NAMES;
    }
  }
  return TSDB_CODE_SUCCESS;
}

static int32_t smlJudgeDupColName(SArray *cols, SArray *tags, SSmlMsgBuf *msg) {
  SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  int ret = smlCheckDupUnit(dumplicateKey, cols, msg);
  if(ret != TSDB_CODE_SUCCESS){
    goto end;
  }
  ret = smlCheckDupUnit(dumplicateKey, tags, msg);
  if(ret != TSDB_CODE_SUCCESS){
    goto end;
  }

  end:
  taosHashCleanup(dumplicateKey);
  return ret;
}

static int32_t smlParseTableName(SArray *tags, char *childTableName) {
  size_t      childTableNameLen = strlen(tsSmlChildTableName);
  if (childTableNameLen <= 0) return TSDB_CODE_SUCCESS;

  for(int i = 0; i < taosArrayGetSize(tags); i++){
    SSmlKv *tag = taosArrayGet(tags, i);
    // handle child table name
    if (childTableNameLen == tag->keyLen && strncmp(tag->key, tsSmlChildTableName, tag->keyLen) == 0) {
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, tag->value, (tag->length < TSDB_TABLE_NAME_LEN ? tag->length : TSDB_TABLE_NAME_LEN));
      break;
    }
  }
wmmhello's avatar
wmmhello 已提交
1014

1015 1016 1017
  return TSDB_CODE_SUCCESS;
}

1018 1019 1020 1021
static int32_t smlSetCTableName(SSmlTableInfo *oneTable){
  smlParseTableName(oneTable->tags, oneTable->childTableName);

  if (strlen(oneTable->childTableName) == 0) {
1022 1023
    SArray* dst = taosArrayDup(oneTable->tags, NULL);
    RandTableName rName = {dst, oneTable->sTableName, (uint8_t)oneTable->sTableNameLen,
1024 1025 1026
                           oneTable->childTableName, 0};

    buildChildTableName(&rName);
1027
    taosArrayDestroy(dst);
1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075
    oneTable->uid = rName.uid;
  } else {
    oneTable->uid = *(uint64_t *)(oneTable->childTableName);
  }
  return TSDB_CODE_SUCCESS;
}

static SSmlSTableMeta *smlBuildSTableMeta(bool isDataFormat) {
  SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
  if (!meta) {
    return NULL;
  }

  if(unlikely(!isDataFormat)){
    meta->tagHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
    if (meta->tagHash == NULL) {
      uError("SML:smlBuildSTableMeta failed to allocate memory");
      goto cleanup;
    }

    meta->colHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
    if (meta->colHash == NULL) {
      uError("SML:smlBuildSTableMeta failed to allocate memory");
      goto cleanup;
    }
  }

  meta->tags = taosArrayInit(32, sizeof(SSmlKv));
  if (meta->tags == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }

  meta->cols = taosArrayInit(32, sizeof(SSmlKv));
  if (meta->cols == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }
  return meta;

  cleanup:
  taosMemoryFree(meta);
  return NULL;
}

static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols){
  for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
1076
    taosArrayPush(metaArray, kv);
1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110
    if(unlikely(metaHash != NULL)) {
      taosHashPut(metaHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
    }
  }
}

static STableMeta* smlGetMeta(SSmlHandle *info, const void* measure, int32_t measureLen){
  STableMeta *pTableMeta = NULL;

  SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
  tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));

  SRequestConnInfo conn = {0};
  conn.pTrans = info->taos->pAppInfo->pTransporter;
  conn.requestId = info->pRequest->requestId;
  conn.requestObjRefId = info->pRequest->self;
  conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
  memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
  memcpy(pName.tname, measure, measureLen);

  catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
  return pTableMeta;
}

static void smlDestroySTableMeta(SSmlSTableMeta *meta) {
  taosHashCleanup(meta->tagHash);
  taosHashCleanup(meta->colHash);
  taosArrayDestroy(meta->tags);
  taosArrayDestroy(meta->cols);
  taosMemoryFree(meta->tableMeta);
  taosMemoryFree(meta);
}
/******************************* Sml struct related function end **********************/

wmmhello's avatar
wmmhello 已提交
1111
static int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
X
Xiaoyu Wang 已提交
1112
  // binary
wmmhello's avatar
wmmhello 已提交
1113
  if (smlIsBinary(pVal->value, pVal->length)) {
1114
    pVal->type = TSDB_DATA_TYPE_BINARY;
wmmhello's avatar
wmmhello 已提交
1115
    pVal->length -= BINARY_ADD_LEN;
1116
    if (pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
wmmhello's avatar
wmmhello 已提交
1117 1118
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }
1119
    pVal->value += (BINARY_ADD_LEN - 1);
wmmhello's avatar
wmmhello 已提交
1120
    return TSDB_CODE_SUCCESS;
1121
  }
X
Xiaoyu Wang 已提交
1122
  // nchar
wmmhello's avatar
wmmhello 已提交
1123
  if (smlIsNchar(pVal->value, pVal->length)) {
1124
    pVal->type = TSDB_DATA_TYPE_NCHAR;
wmmhello's avatar
wmmhello 已提交
1125
    pVal->length -= NCHAR_ADD_LEN;
1126
    if (pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
wmmhello's avatar
wmmhello 已提交
1127 1128
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }
1129
    pVal->value += (NCHAR_ADD_LEN - 1);
wmmhello's avatar
wmmhello 已提交
1130
    return TSDB_CODE_SUCCESS;
1131 1132
  }

X
Xiaoyu Wang 已提交
1133
  // bool
1134 1135 1136
  if (smlParseBool(pVal)) {
    pVal->type = TSDB_DATA_TYPE_BOOL;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
wmmhello's avatar
wmmhello 已提交
1137
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1138
  }
X
Xiaoyu Wang 已提交
1139
  // number
1140
  if (smlParseNumber(pVal, msg)) {
wmmhello's avatar
wmmhello 已提交
1141
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
wmmhello's avatar
wmmhello 已提交
1142
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1143 1144
  }

wmmhello's avatar
wmmhello 已提交
1145
  return TSDB_CODE_TSC_INVALID_VALUE;
wmmhello's avatar
wmmhello 已提交
1146 1147
}

1148 1149 1150
int32_t is_same_child_table_json(const void *a, const void *b){
  return (cJSON_Compare((const cJSON *)a, (const cJSON *)b, true)) ? 0 : 1;
}
wmmhello's avatar
wmmhello 已提交
1151

1152 1153 1154 1155 1156
#define IS_SAME_CHILD_TABLE (elements->measureTagsLen == info->preLine.measureTagsLen \
&& memcmp(elements->measure, info->preLine.measure, elements->measureTagsLen) == 0)

#define IS_SAME_SUPER_TABLE (elements->measureLen == info->preLine.measureLen \
&& memcmp(elements->measure, info->preLine.measure, elements->measureLen) == 0)
1157

1158 1159 1160 1161 1162 1163 1164
#define IS_SAME_KEY (preKV->keyLen == kv.keyLen && memcmp(preKV->key, kv.key, kv.keyLen) == 0)

static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd,
                          SSmlLineInfo* currElement, bool isSameMeasure, bool isSameCTable){
  if(isSameCTable){
    return TSDB_CODE_SUCCESS;
  }
wmmhello's avatar
wmmhello 已提交
1165

1166 1167 1168 1169 1170
  int     cnt = 0;
  SArray *preLineKV = info->preLineTagKV;
  bool    isSuperKVInit = true;
  SArray *superKV = NULL;
  if(info->dataFormat){
1171
    if(!isSameMeasure){
1172
      SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen, NULL);
wmmhello's avatar
wmmhello 已提交
1173

1174
      if(unlikely(sMeta == NULL)){
wmmhello's avatar
wmmhello 已提交
1175
        sMeta = smlBuildSTableMeta(info->dataFormat);
1176
        STableMeta * pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen);
wmmhello's avatar
wmmhello 已提交
1177
        sMeta->tableMeta = pTableMeta;
1178 1179 1180 1181 1182
        if(pTableMeta == NULL){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
wmmhello's avatar
wmmhello 已提交
1183
        nodeListSet(&info->superTables, currElement->measure, currElement->measureLen, sMeta);
1184 1185
      }
      info->currSTableMeta = sMeta->tableMeta;
1186
      superKV = sMeta->tags;
1187 1188

      if(unlikely(taosArrayGetSize(superKV) == 0)){
1189
        isSuperKVInit = false;
1190
      }
1191 1192 1193 1194 1195 1196 1197 1198 1199 1200
      taosArraySetSize(preLineKV, 0);
    }
  }else{
    taosArraySetSize(preLineKV, 0);
  }


  while (*sql < sqlEnd) {
    if (unlikely(IS_SPACE(*sql))) {
      break;
1201 1202
    }

1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218
    bool hasSlash = false;
    // parse key
    const char *key = *sql;
    int32_t     keyLen = 0;
    while (*sql < sqlEnd) {
      if (unlikely(IS_COMMA(*sql))) {
        smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      if (unlikely(IS_EQUAL(*sql))) {
        keyLen = *sql - key;
        (*sql)++;
        break;
      }
      if(!hasSlash){
        hasSlash = (*(*sql) == SLASH);
1219
      }
1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241
      (*sql)++;
    }
    if(unlikely(hasSlash)) {
      PROCESS_SLASH(key, keyLen)
    }

    if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
      smlBuildInvalidDataMsg(&info->msgBuf, "invalid key or key is too long than 64", key);
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
    }

    // parse value
    const char *value = *sql;
    int32_t     valueLen = 0;
    hasSlash = false;
    while (*sql < sqlEnd) {
      // parse value
      if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) {
        break;
      }else if (unlikely(IS_EQUAL(*sql))) {
        smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
        return TSDB_CODE_SML_INVALID_DATA;
1242
      }
1243

1244 1245
      if(!hasSlash){
        hasSlash = (*(*sql) == SLASH);
1246
      }
1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268

      (*sql)++;
    }
    valueLen = *sql - value;

    if (unlikely(valueLen == 0)) {
      smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", value);
      return TSDB_CODE_SML_INVALID_DATA;
    }

    if(unlikely(hasSlash)) {
      PROCESS_SLASH(value, valueLen)
    }

    if (unlikely(valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)) {
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }

    SSmlKv kv = {.key = key, .type = TSDB_DATA_TYPE_NCHAR, .keyLen = keyLen, .value = value, .length = valueLen};
    if(info->dataFormat){
      if(unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)){
        info->needModifySchema = true;
1269
      }
1270

1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315
      if(isSameMeasure){
        if(unlikely(cnt >= taosArrayGetSize(preLineKV))) {
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
        SSmlKv *preKV = taosArrayGet(preLineKV, cnt);
        if(unlikely(kv.length > preKV->length)){
          preKV->length = kv.length;
          SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen, NULL);
          ASSERT(tableMeta != NULL);

          SSmlKv *oldKV = taosArrayGet(tableMeta->tags, cnt);
          oldKV->length = kv.length;
          info->needModifySchema = true;
        }
        if(unlikely(!IS_SAME_KEY)){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
      }else{
        if(isSuperKVInit){
          if(unlikely(cnt >= taosArrayGetSize(superKV))) {
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }
          SSmlKv *preKV = taosArrayGet(superKV, cnt);
          if(unlikely(kv.length > preKV->length)) {
            preKV->length = kv.length;
          }else{
            kv.length = preKV->length;
          }
          info->needModifySchema = true;

          if(unlikely(!IS_SAME_KEY)){
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }
        }else{
          taosArrayPush(superKV, &kv);
        }
        taosArrayPush(preLineKV, &kv);
1316
      }
1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353
    }else{
      taosArrayPush(preLineKV, &kv);
    }

    cnt++;
    if(IS_SPACE(*sql)){
      break;
    }
    (*sql)++;
  }

  void* oneTable = nodeListGet(info->childTables, currElement->measure, currElement->measureTagsLen, NULL);
  if ((oneTable != NULL)) {
    return TSDB_CODE_SUCCESS;
  }

  SSmlTableInfo *tinfo = smlBuildTableInfo(1, currElement->measure, currElement->measureLen);
  if (!tinfo) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  for(int i = 0; i < taosArrayGetSize(preLineKV); i++){
    taosArrayPush(tinfo->tags, taosArrayGet(preLineKV, i));
  }
  smlSetCTableName(tinfo);
  if(info->dataFormat) {
    info->currSTableMeta->uid = tinfo->uid;
    tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
    if(tinfo->tableDataCtx == NULL){
      smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
      return TSDB_CODE_SML_INVALID_DATA;
    }
  }

  nodeListSet(&info->childTables, currElement->measure, currElement->measureTagsLen, tinfo);

  return TSDB_CODE_SUCCESS;
}
1354

1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366
static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd,
                          SSmlLineInfo* currElement, bool isSameMeasure, bool isSameCTable){
  int     cnt = 0;
  SArray *preLineKV = info->preLineColKV;
  bool    isSuperKVInit = true;
  SArray *superKV = NULL;
  if(info->dataFormat){
    if(unlikely(!isSameCTable)){
      SSmlTableInfo *oneTable = (SSmlTableInfo *)nodeListGet(info->childTables, currElement->measure, currElement->measureTagsLen, NULL);
      if (unlikely(oneTable == NULL)) {
        smlBuildInvalidDataMsg(&info->msgBuf, "child table should inside", currElement->measure);
        return TSDB_CODE_SML_INVALID_DATA;
1367
      }
1368
      info->currTableDataCtx = oneTable->tableDataCtx;
1369 1370
    }

1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389
    if(unlikely(!isSameMeasure)){
      SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen, NULL);

      if(unlikely(sMeta == NULL)){
        sMeta = smlBuildSTableMeta(info->dataFormat);
        STableMeta * pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen);
        sMeta->tableMeta = pTableMeta;
        if(pTableMeta == NULL){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
        nodeListSet(&info->superTables, currElement->measure, currElement->measureLen, sMeta);
      }
      info->currSTableMeta = sMeta->tableMeta;
      superKV = sMeta->cols;
      if(unlikely(taosArrayGetSize(superKV) == 0)){
        isSuperKVInit = false;
      }
1390 1391 1392 1393 1394
      taosArraySetSize(preLineKV, 0);
    }
  }

  while (*sql < sqlEnd) {
1395
    if (unlikely(IS_SPACE(*sql))) {
1396 1397 1398
      break;
    }

1399
    bool hasSlash = false;
1400 1401 1402 1403
    // parse key
    const char *key = *sql;
    int32_t     keyLen = 0;
    while (*sql < sqlEnd) {
1404
      if (unlikely(IS_COMMA(*sql))) {
1405 1406 1407
        smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
1408
      if (unlikely(IS_EQUAL(*sql))) {
1409 1410 1411 1412
        keyLen = *sql - key;
        (*sql)++;
        break;
      }
1413 1414 1415
      if(!hasSlash){
        hasSlash = (*(*sql) == SLASH);
      }
1416 1417
      (*sql)++;
    }
1418 1419 1420
    if(unlikely(hasSlash)) {
      PROCESS_SLASH(key, keyLen)
    }
1421

1422
    if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
1423 1424 1425 1426 1427 1428 1429
      smlBuildInvalidDataMsg(&info->msgBuf, "invalid key or key is too long than 64", key);
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
    }

    // parse value
    const char *value = *sql;
    int32_t     valueLen = 0;
1430
    hasSlash              = false;
1431 1432 1433
    bool        isInQuote = false;
    while (*sql < sqlEnd) {
      // parse value
1434
      if (IS_QUOTE(*sql)) {
1435 1436 1437 1438
        isInQuote = !isInQuote;
        (*sql)++;
        continue;
      }
wmmhello's avatar
wmmhello 已提交
1439
      if (!isInQuote){
1440
        if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) {
wmmhello's avatar
wmmhello 已提交
1441
          break;
1442
        } else if (unlikely(IS_EQUAL(*sql))) {
wmmhello's avatar
wmmhello 已提交
1443 1444 1445
          smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
          return TSDB_CODE_SML_INVALID_DATA;
        }
1446
      }
1447 1448 1449
      if(!hasSlash){
        hasSlash = (*(*sql) == SLASH);
      }
wmmhello's avatar
wmmhello 已提交
1450

1451 1452 1453 1454
      (*sql)++;
    }
    valueLen = *sql - value;

1455
    if (unlikely(isInQuote)) {
1456 1457 1458
      smlBuildInvalidDataMsg(&info->msgBuf, "only one quote", value);
      return TSDB_CODE_SML_INVALID_DATA;
    }
1459
    if (unlikely(valueLen == 0)) {
1460 1461 1462
      smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", value);
      return TSDB_CODE_SML_INVALID_DATA;
    }
1463 1464 1465
    if(unlikely(hasSlash)) {
      PROCESS_SLASH(value, valueLen)
    }
1466 1467

    SSmlKv kv = {.key = key, .keyLen = keyLen, .value = value, .length = valueLen};
1468 1469 1470
    int32_t ret = smlParseValue(&kv, &info->msgBuf);
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
1471 1472 1473
    }

    if(info->dataFormat){
1474 1475 1476
      //cnt begin 0, add ts so + 2
      if(unlikely(cnt + 2 > info->currSTableMeta->tableInfo.numOfColumns)){
        info->needModifySchema = true;
1477 1478
      }
      // bind data
1479 1480 1481 1482
      ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, cnt + 1);
      if (unlikely(ret != TSDB_CODE_SUCCESS)) {
        smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
        return ret;
1483 1484
      }

1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514
      if(isSameMeasure){
        if(cnt >= taosArrayGetSize(preLineKV)) {
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
        SSmlKv *preKV = taosArrayGet(preLineKV, cnt);
        if(kv.type != preKV->type){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }

        if(unlikely(IS_VAR_DATA_TYPE(kv.type) && kv.length > preKV->length)){
          preKV->length = kv.length;
          SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen, NULL);
          ASSERT(tableMeta != NULL);

          SSmlKv *oldKV = taosArrayGet(tableMeta->cols, cnt);
          oldKV->length = kv.length;
          info->needModifySchema = true;
        }
        if(unlikely(!IS_SAME_KEY)){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
      }else{
        if(isSuperKVInit){
          if(unlikely(cnt >= taosArrayGetSize(superKV))) {
1515 1516 1517 1518
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }
1519 1520
          SSmlKv *preKV = taosArrayGet(superKV, cnt);
          if(unlikely(kv.type != preKV->type)){
1521 1522 1523 1524 1525
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }

1526 1527 1528
          if(IS_VAR_DATA_TYPE(kv.type)){
            if(kv.length > preKV->length) {
              preKV->length = kv.length;
1529
            }else{
1530
              kv.length = preKV->length;
1531
            }
wmmhello's avatar
wmmhello 已提交
1532
            info->needModifySchema = true;
1533
          }
1534 1535 1536 1537
          if(unlikely(!IS_SAME_KEY)){
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
1538
          }
1539 1540
        }else{
          taosArrayPush(superKV, &kv);
1541
        }
1542
        taosArrayPush(preLineKV, &kv);
1543
      }
1544
    }else{
1545 1546 1547 1548
      if(currElement->colArray == NULL){
        currElement->colArray = taosArrayInit(16, sizeof(SSmlKv));
        taosArraySetSize(currElement->colArray, 1);
      }
1549 1550
      taosArrayPush(currElement->colArray, &kv);   //reserve for timestamp
    }
1551

1552 1553 1554 1555 1556
    cnt++;
    if(IS_SPACE(*sql)){
      break;
    }
    (*sql)++;
1557 1558
  }

1559 1560
  return TSDB_CODE_SUCCESS;
}
1561

1562 1563 1564 1565 1566
static int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements) {
  if (!sql) return TSDB_CODE_SML_INVALID_DATA;
  JUMP_SPACE(sql, sqlEnd)
  if (unlikely(*sql == COMMA)) return TSDB_CODE_SML_INVALID_DATA;
  elements->measure = sql;
wmmhello's avatar
wmmhello 已提交
1567

wmmhello's avatar
wmmhello 已提交
1568
  // parse measure
wmmhello's avatar
wmmhello 已提交
1569
  while (sql < sqlEnd) {
X
Xiaoyu Wang 已提交
1570
    if ((sql != elements->measure) && IS_SLASH_LETTER(sql)) {
wmmhello's avatar
wmmhello 已提交
1571 1572
      MOVE_FORWARD_ONE(sql, sqlEnd - sql);
      sqlEnd--;
wmmhello's avatar
wmmhello 已提交
1573 1574
      continue;
    }
X
Xiaoyu Wang 已提交
1575
    if (IS_COMMA(sql)) {
wmmhello's avatar
wmmhello 已提交
1576 1577 1578
      break;
    }

X
Xiaoyu Wang 已提交
1579
    if (IS_SPACE(sql)) {
wmmhello's avatar
wmmhello 已提交
1580 1581
      break;
    }
wmmhello's avatar
wmmhello 已提交
1582 1583
    sql++;
  }
wmmhello's avatar
wmmhello 已提交
1584
  elements->measureLen = sql - elements->measure;
1585
  if (unlikely(IS_INVALID_TABLE_LEN(elements->measureLen))) {
1586
    smlBuildInvalidDataMsg(&info->msgBuf, "measure is empty or too large than 192", NULL);
wmmhello's avatar
wmmhello 已提交
1587
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
wmmhello's avatar
wmmhello 已提交
1588
  }
wmmhello's avatar
wmmhello 已提交
1589

1590 1591 1592 1593 1594 1595 1596 1597 1598 1599
  // to get measureTagsLen before
  const char* tmp = sql;
  while (tmp < sqlEnd){
    if (IS_SPACE(tmp)) {
      break;
    }
    tmp++;
  }
  elements->measureTagsLen = tmp - elements->measure;

1600 1601 1602 1603 1604 1605 1606 1607
  bool isSameCTable = false;
  bool isSameMeasure = false;
  if(IS_SAME_CHILD_TABLE){
    isSameCTable = true;
    isSameMeasure = true;
  }else if(info->dataFormat) {
    isSameMeasure = IS_SAME_SUPER_TABLE;
  }
wmmhello's avatar
wmmhello 已提交
1608
  // parse tag
X
Xiaoyu Wang 已提交
1609
  if (*sql == SPACE) {
wmmhello's avatar
wmmhello 已提交
1610
    elements->tagsLen = 0;
X
Xiaoyu Wang 已提交
1611 1612
  } else {
    if (*sql == COMMA) sql++;
wmmhello's avatar
wmmhello 已提交
1613
    elements->tags = sql;
1614 1615

    // tinfo != NULL means child table has never occur before
1616 1617
    int ret = smlParseTagKv(info, &sql, sqlEnd, elements, isSameMeasure, isSameCTable);
    if(unlikely(ret != TSDB_CODE_SUCCESS)){
1618
      return ret;
wmmhello's avatar
wmmhello 已提交
1619
    }
1620
    if(unlikely(info->reRun)){
1621 1622 1623
      return TSDB_CODE_SUCCESS;
    }

1624 1625
    sql = elements->measure + elements->measureTagsLen;

wmmhello's avatar
wmmhello 已提交
1626
    elements->tagsLen = sql - elements->tags;
1627
  }
wmmhello's avatar
wmmhello 已提交
1628

wmmhello's avatar
wmmhello 已提交
1629
  // parse cols
wmmhello's avatar
wmmhello 已提交
1630
  JUMP_SPACE(sql, sqlEnd)
wmmhello's avatar
wmmhello 已提交
1631
  elements->cols = sql;
1632

1633 1634
  int ret = smlParseColKv(info, &sql, sqlEnd, elements, isSameMeasure, isSameCTable);
  if(unlikely(ret != TSDB_CODE_SUCCESS)){
1635
    return ret;
wmmhello's avatar
wmmhello 已提交
1636
  }
1637

1638
  if(unlikely(info->reRun)){
1639
    return TSDB_CODE_SUCCESS;
1640
  }
1641

wmmhello's avatar
wmmhello 已提交
1642
  elements->colsLen = sql - elements->cols;
1643
  if (unlikely(elements->colsLen == 0)) {
1644
    smlBuildInvalidDataMsg(&info->msgBuf, "cols is empty", NULL);
wmmhello's avatar
wmmhello 已提交
1645 1646
    return TSDB_CODE_SML_INVALID_DATA;
  }
wmmhello's avatar
wmmhello 已提交
1647

wmmhello's avatar
wmmhello 已提交
1648
  // parse timestamp
wmmhello's avatar
wmmhello 已提交
1649
  JUMP_SPACE(sql, sqlEnd)
wmmhello's avatar
wmmhello 已提交
1650
  elements->timestamp = sql;
wmmhello's avatar
wmmhello 已提交
1651 1652
  while (sql < sqlEnd) {
    if (isspace(*sql)) {
wmmhello's avatar
wmmhello 已提交
1653 1654 1655 1656
      break;
    }
    sql++;
  }
wmmhello's avatar
wmmhello 已提交
1657
  elements->timestampLen = sql - elements->timestamp;
wmmhello's avatar
wmmhello 已提交
1658

1659 1660 1661 1662
  int64_t ts = smlParseTS(info, elements->timestamp, elements->timestampLen);
  if (ts <= 0) {
    uError("SML:0x%" PRIx64 " smlParseTS error:%" PRId64, info->id, ts);
    return TSDB_CODE_INVALID_TIMESTAMP;
1663
  }
1664 1665
  // add ts to
  SSmlKv kv = { .key = TS, .keyLen = TS_LEN, .i = ts, .type = TSDB_DATA_TYPE_TIMESTAMP, .length = (int16_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
1666
  if(info->dataFormat){
1667
    smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0);
1668
    smlBuildRow(info->currTableDataCtx);
1669 1670
  }else{
    taosArraySet(elements->colArray, 0, &kv);
1671
  }
1672
  info->preLine = *elements;
1673

1674
  return ret;
wmmhello's avatar
wmmhello 已提交
1675 1676
}

1677
static void smlParseTelnetElement(char **sql, char *sqlEnd, char **data, int32_t *len) {
wmmhello's avatar
wmmhello 已提交
1678
  while (*sql < sqlEnd) {
1679
    if (unlikely((**sql != SPACE && !(*data)))) {
1680
      *data = *sql;
1681
    } else if (unlikely(**sql == SPACE && *data)) {
1682 1683 1684 1685 1686 1687 1688
      *len = *sql - *data;
      break;
    }
    (*sql)++;
  }
}

1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726
static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SSmlLineInfo *elements, SSmlMsgBuf *msg) {
  if(IS_SAME_CHILD_TABLE){
    return TSDB_CODE_SUCCESS;
  }

  bool isSameMeasure = IS_SAME_SUPER_TABLE;

  int     cnt = 0;
  SArray *preLineKV = info->preLineTagKV;
  bool    isSuperKVInit = true;
  SArray *superKV = NULL;
  if(info->dataFormat){
    if(!isSameMeasure){
      SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);

      if(unlikely(sMeta == NULL)){
        sMeta = smlBuildSTableMeta(info->dataFormat);
        STableMeta * pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen);
        sMeta->tableMeta = pTableMeta;
        if(pTableMeta == NULL){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
        nodeListSet(&info->superTables, elements->measure, elements->measureLen, sMeta);
      }
      info->currSTableMeta = sMeta->tableMeta;
      superKV = sMeta->tags;

      if(unlikely(taosArrayGetSize(superKV) == 0)){
        isSuperKVInit = false;
      }
      taosArraySetSize(preLineKV, 0);
    }
  }else{
    taosArraySetSize(preLineKV, 0);
  }

wmmhello's avatar
wmmhello 已提交
1727
  const char *sql = data;
X
Xiaoyu Wang 已提交
1728
  size_t      childTableNameLen = strlen(tsSmlChildTableName);
wmmhello's avatar
wmmhello 已提交
1729 1730
  while (sql < sqlEnd) {
    JUMP_SPACE(sql, sqlEnd)
1731
    if (unlikely(*sql == '\0')) break;
wmmhello's avatar
wmmhello 已提交
1732

wmmhello's avatar
wmmhello 已提交
1733
    const char *key = sql;
X
Xiaoyu Wang 已提交
1734
    int32_t     keyLen = 0;
wmmhello's avatar
wmmhello 已提交
1735 1736

    // parse key
wmmhello's avatar
wmmhello 已提交
1737
    while (sql < sqlEnd) {
1738
      if (unlikely(*sql == SPACE)) {
wmmhello's avatar
wmmhello 已提交
1739 1740 1741
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
1742
      if (unlikely(*sql == EQUAL)) {
wmmhello's avatar
wmmhello 已提交
1743 1744
        keyLen = sql - key;
        sql++;
1745 1746
        break;
      }
wmmhello's avatar
wmmhello 已提交
1747
      sql++;
1748
    }
wmmhello's avatar
wmmhello 已提交
1749

1750
    if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
1751
      smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
wmmhello's avatar
wmmhello 已提交
1752
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
1753
    }
1754 1755 1756 1757
//    if (smlCheckDuplicateKey(key, keyLen, dumplicateKey)) {
//      smlBuildInvalidDataMsg(msg, "dumplicate key", key);
//      return TSDB_CODE_TSC_DUP_NAMES;
//    }
1758 1759

    // parse value
wmmhello's avatar
wmmhello 已提交
1760
    const char *value = sql;
X
Xiaoyu Wang 已提交
1761
    int32_t     valueLen = 0;
wmmhello's avatar
wmmhello 已提交
1762
    while (sql < sqlEnd) {
wmmhello's avatar
wmmhello 已提交
1763
      // parse value
1764
      if (unlikely(*sql == SPACE)) {
1765 1766
        break;
      }
1767
      if (unlikely(*sql == EQUAL)) {
wmmhello's avatar
wmmhello 已提交
1768 1769 1770 1771
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      sql++;
1772
    }
wmmhello's avatar
wmmhello 已提交
1773
    valueLen = sql - value;
wmmhello's avatar
wmmhello 已提交
1774

1775
    if (unlikely(valueLen == 0)) {
1776
      smlBuildInvalidDataMsg(msg, "invalid value", value);
wmmhello's avatar
wmmhello 已提交
1777
      return TSDB_CODE_TSC_INVALID_VALUE;
1778
    }
wmmhello's avatar
wmmhello 已提交
1779

1780
    if (unlikely(valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)) {
wmmhello's avatar
wmmhello 已提交
1781 1782 1783
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }

1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825
    SSmlKv kv = {.key = key, .keyLen = keyLen, .value = value, .length = valueLen, .type = TSDB_DATA_TYPE_NCHAR};

    if(info->dataFormat){
      if(unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)){
        info->needModifySchema = true;
      }

      if(isSameMeasure){
        if(unlikely(cnt >= taosArrayGetSize(preLineKV))) {
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
        SSmlKv *preKV = taosArrayGet(preLineKV, cnt);
        if(unlikely(kv.length > preKV->length)){
          preKV->length = kv.length;
          SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
          ASSERT(tableMeta != NULL);

          SSmlKv *oldKV = taosArrayGet(tableMeta->tags, cnt);
          oldKV->length = kv.length;
          info->needModifySchema = true;
        }
        if(unlikely(!IS_SAME_KEY)){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
      }else{
        if(isSuperKVInit){
          if(unlikely(cnt >= taosArrayGetSize(superKV))) {
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }
          SSmlKv *preKV = taosArrayGet(superKV, cnt);
          if(unlikely(kv.length > preKV->length)) {
            preKV->length = kv.length;
          }else{
            kv.length = preKV->length;
          }
          info->needModifySchema = true;
1826

1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840
          if(unlikely(!IS_SAME_KEY)){
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }
        }else{
          taosArrayPush(superKV, &kv);
        }
        taosArrayPush(preLineKV, &kv);
      }
    }else{
      taosArrayPush(preLineKV, &kv);
    }
    cnt++;
1841
  }
1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859
  SSmlTableInfo *tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements->measure, elements->measureTagsLen, NULL);
  if (unlikely(tinfo == NULL)) {
    tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen);
    if (!tinfo) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    for(int i = 0; i < taosArrayGetSize(preLineKV); i++){
      taosArrayPush(tinfo->tags, taosArrayGet(preLineKV, i));
    }
    smlSetCTableName(tinfo);
    if (info->dataFormat) {
      info->currSTableMeta->uid = tinfo->uid;
      tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
      if (tinfo->tableDataCtx == NULL) {
        smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
        return TSDB_CODE_SML_INVALID_DATA;
      }
    }
1860

1861 1862
    nodeListSet(&info->childTables, elements->measure, elements->measureTagsLen, tinfo);
  }
1863 1864
  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
1865

1866
// format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
1867
static int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements) {
X
Xiaoyu Wang 已提交
1868
  if (!sql) return TSDB_CODE_SML_INVALID_DATA;
1869 1870

  // parse metric
1871 1872
  smlParseTelnetElement(&sql, sqlEnd, &elements->measure, &elements->measureLen);
  if (unlikely((!(elements->measure) || IS_INVALID_TABLE_LEN(elements->measureLen)))) {
1873
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
wmmhello's avatar
wmmhello 已提交
1874
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
1875 1876 1877
  }

  // parse timestamp
1878 1879
  smlParseTelnetElement(&sql, sqlEnd, &elements->timestamp, &elements->timestampLen);
  if (unlikely(!elements->timestamp || elements->timestampLen == 0)) {
1880 1881 1882 1883
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
    return TSDB_CODE_SML_INVALID_DATA;
  }

1884 1885 1886 1887 1888 1889
  bool needConverTime = false;  // get TS before parse tag(get meta), so need conver time
  if(info->dataFormat && info->currSTableMeta == NULL){
    needConverTime = true;
  }
  int64_t ts = smlParseTS(info, elements->timestamp, elements->timestampLen);
  if (unlikely(ts < 0)) {
1890
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
1891
    return TSDB_CODE_INVALID_TIMESTAMP;
1892
  }
1893
  SSmlKv kvTs = { .key = TS, .keyLen = TS_LEN, .i = ts, .type = TSDB_DATA_TYPE_TIMESTAMP, .length = (int16_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
1894 1895

  // parse value
1896 1897
  smlParseTelnetElement(&sql, sqlEnd, &elements->cols, &elements->colsLen);
  if (unlikely(!elements->cols || elements->colsLen == 0)) {
1898 1899 1900
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql);
    return TSDB_CODE_TSC_INVALID_VALUE;
  }
1901

1902 1903 1904 1905 1906
  SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN, .value = elements->cols, .length = elements->colsLen};
  if (smlParseNumber(&kv, &info->msgBuf)) {
    kv.length = (int16_t)tDataTypes[kv.type].bytes;
  }else{
    return TSDB_CODE_TSC_INVALID_VALUE;
1907
  }
1908

1909
  // move measure before tags to combine keys to identify child table
1910
  memmove(sql - elements->measureLen, elements->measure, elements->measureLen);
1911 1912 1913 1914 1915 1916
  elements->measure = sql - elements->measureLen;
  elements->measureLen += sqlEnd - sql;


  int ret = smlParseTelnetTags(info, sql, sqlEnd, elements, &info->msgBuf);
  if (unlikely(ret != TSDB_CODE_SUCCESS)) {
1917
    return ret;
wmmhello's avatar
wmmhello 已提交
1918
  }
wmmhello's avatar
wmmhello 已提交
1919

1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939
  if(unlikely(info->reRun)){
    return TSDB_CODE_SUCCESS;
  }

  if(info->dataFormat){
    if(needConverTime) {
      kvTs.i = convertTimePrecision(kvTs.i, TSDB_TIME_PRECISION_NANO, info->currSTableMeta->tableInfo.precision);
    }
    ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kvTs, 0);
    if(ret == TSDB_CODE_SUCCESS){
      ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 1);
    }
    if(ret == TSDB_CODE_SUCCESS){
      ret = smlBuildRow(info->currTableDataCtx);
    }
    if (unlikely(ret != TSDB_CODE_SUCCESS)) {
      smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
      return ret;
    }
  }else{
1940 1941 1942
    if(elements->colArray == NULL){
      elements->colArray = taosArrayInit(16, sizeof(SSmlKv));
    }
1943 1944 1945 1946 1947
    taosArrayPush(elements->colArray, &kvTs);
    taosArrayPush(elements->colArray, &kv);
  }
  info->preLine = *elements;

wmmhello's avatar
wmmhello 已提交
1948 1949 1950
  return TSDB_CODE_SUCCESS;
}

1951
static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, bool isTag, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
1952
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
1953
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
wmmhello's avatar
wmmhello 已提交
1954

wmmhello's avatar
wmmhello 已提交
1955
    int16_t *index = (int16_t *)taosHashGet(metaHash, kv->key, kv->keyLen);
X
Xiaoyu Wang 已提交
1956
    if (index) {
1957 1958 1959 1960 1961 1962 1963 1964
      SSmlKv *value = (SSmlKv *)taosArrayGet(metaArray, *index);
      if (isTag){
        if (kv->length > value->length) {
          value->length = kv->length;
        }
        continue;
      }
      if (kv->type != value->type) {
wmmhello's avatar
wmmhello 已提交
1965
        smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
wmmhello's avatar
wmmhello 已提交
1966
        return TSDB_CODE_SML_NOT_SAME_TYPE;
1967 1968 1969 1970
      }

      if (IS_VAR_DATA_TYPE(kv->type) && (kv->length > value->length)) {  // update string len, if bigger
        value->length = kv->length;
1971
      }
X
Xiaoyu Wang 已提交
1972
    } else {
wmmhello's avatar
wmmhello 已提交
1973 1974 1975
      size_t tmp = taosArrayGetSize(metaArray);
      ASSERT(tmp <= INT16_MAX);
      int16_t size = tmp;
1976
      taosArrayPush(metaArray, kv);
wmmhello's avatar
wmmhello 已提交
1977
      taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
1978
    }
wmmhello's avatar
wmmhello 已提交
1979
  }
wmmhello's avatar
wmmhello 已提交
1980

wmmhello's avatar
wmmhello 已提交
1981
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1982 1983
}

wmmhello's avatar
wmmhello 已提交
1984
static void smlDestroyTableInfo(SSmlTableInfo *tag) {
1985 1986 1987
  for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
    SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i);
    taosHashCleanup(kvHash);
wmmhello's avatar
wmmhello 已提交
1988
  }
1989

1990 1991 1992 1993 1994
  taosArrayDestroy(tag->cols);
  taosArrayDestroy(tag->tags);
  taosMemoryFree(tag);
}

1995
static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
1996
  SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
1997
  if (!kvHash) {
1998
    uError("SML:smlDealCols failed to allocate memory");
1999
    return TSDB_CODE_OUT_OF_MEMORY;
2000
  }
X
Xiaoyu Wang 已提交
2001
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
2002
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
wmmhello's avatar
wmmhello 已提交
2003
    taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
2004 2005
  }

2006
  taosArrayPush(colsArray, &kvHash);
2007 2008 2009
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
2010 2011
void smlDestroyInfo(SSmlHandle *info) {
  if (!info) return;
2012 2013 2014
  qDestroyQuery(info->pQuery);

  // destroy info->childTables
wmmhello's avatar
wmmhello 已提交
2015 2016 2017 2018 2019 2020 2021 2022
  NodeList* tmp = info->childTables;
  while (tmp) {
    if(tmp->data.used) {
      smlDestroyTableInfo(tmp->data.value);
    }
    NodeList* t = tmp->next;
    taosMemoryFree(tmp);
    tmp = t;
2023 2024 2025
  }

  // destroy info->superTables
wmmhello's avatar
wmmhello 已提交
2026 2027 2028 2029 2030 2031 2032 2033
  tmp = info->superTables;
  while (tmp) {
    if(tmp->data.used) {
      smlDestroySTableMeta(tmp->data.value);
    }
    NodeList* t = tmp->next;
    taosMemoryFree(tmp);
    tmp = t;
2034 2035 2036 2037
  }

  // destroy info->pVgHash
  taosHashCleanup(info->pVgHash);
wmmhello's avatar
wmmhello 已提交
2038

2039 2040 2041
  taosArrayDestroy(info->preLineTagKV);
  taosArrayDestroy(info->preLineColKV);

wmmhello's avatar
wmmhello 已提交
2042 2043 2044 2045
  if(!info->dataFormat){
    for(int i = 0; i < info->lineNum; i++){
      taosArrayDestroy(info->lines[i].colArray);
    }
wmmhello's avatar
wmmhello 已提交
2046
    taosMemoryFree(info->lines);
wmmhello's avatar
wmmhello 已提交
2047
  }
wmmhello's avatar
wmmhello 已提交
2048

wmmhello's avatar
wmmhello 已提交
2049
  cJSON_Delete(info->root);
wmmhello's avatar
wmmhello 已提交
2050
  taosMemoryFreeClear(info);
wmmhello's avatar
wmmhello 已提交
2051
}
2052

wmmhello's avatar
wmmhello 已提交
2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064
static SSmlHandle *smlBuildSmlInfo(TAOS *taos) {
  int32_t     code = TSDB_CODE_SUCCESS;
  SSmlHandle *info = (SSmlHandle *)taosMemoryCalloc(1, sizeof(SSmlHandle));
  if (NULL == info) {
    return NULL;
  }
  info->taos = acquireTscObj(*(int64_t *)taos);
  code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code);
    goto cleanup;
  }
X
Xiaoyu Wang 已提交
2065
  info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
wmmhello's avatar
wmmhello 已提交
2066 2067 2068
  info->id = smlGenId();
  info->pQuery = smlInitHandle();
  info->dataFormat = true;
2069

2070 2071 2072
  info->preLineTagKV = taosArrayInit(8, sizeof(SSmlKv));
  info->preLineColKV = taosArrayInit(8, sizeof(SSmlKv));

wmmhello's avatar
wmmhello 已提交
2073 2074
  if (NULL == info->pVgHash) {
    uError("create SSmlHandle failed");
2075 2076 2077 2078
    goto cleanup;
  }

  return info;
wmmhello's avatar
wmmhello 已提交
2079 2080

cleanup:
2081 2082 2083 2084 2085
  smlDestroyInfo(info);
  return NULL;
}

/************* TSDB_SML_JSON_PROTOCOL function start **************/
2086
static int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) {
2087 2088
  cJSON *metric = cJSON_GetObjectItem(root, "metric");
  if (!cJSON_IsString(metric)) {
X
Xiaoyu Wang 已提交
2089
    return TSDB_CODE_TSC_INVALID_JSON;
2090 2091
  }

2092 2093
  elements->measureLen = strlen(metric->valuestring);
  if (IS_INVALID_TABLE_LEN(elements->measureLen)) {
X
Xiaoyu Wang 已提交
2094
    uError("OTD:0x%" PRIx64 " Metric lenght is 0 or large than 192", info->id);
2095 2096 2097
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
  }

2098
  elements->measure = metric->valuestring;
wmmhello's avatar
wmmhello 已提交
2099
  return TSDB_CODE_SUCCESS;
2100 2101
}

2102
static int64_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int32_t toPrecision) {
2103
  int32_t size = cJSON_GetArraySize(root);
2104 2105 2106
  if (unlikely(size != OTD_JSON_SUB_FIELDS_NUM)) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
    return -1;
2107 2108 2109
  }

  cJSON *value = cJSON_GetObjectItem(root, "value");
2110 2111 2112
  if (unlikely(!cJSON_IsNumber(value))) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
    return -1;
2113 2114 2115
  }

  cJSON *type = cJSON_GetObjectItem(root, "type");
2116 2117 2118
  if (unlikely(!cJSON_IsString(type))) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
    return -1;
2119 2120 2121
  }

  double timeDouble = value->valuedouble;
2122
  if (unlikely(smlDoubleToInt64OverFlow(timeDouble))) {
2123
    smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
2124
    return -1;
2125
  }
wmmhello's avatar
wmmhello 已提交
2126 2127

  if (timeDouble == 0) {
2128
    return taosGetTimestampNs()/smlFactorNS[toPrecision];
wmmhello's avatar
wmmhello 已提交
2129 2130 2131
  }

  if (timeDouble < 0) {
2132
    return timeDouble;
2133 2134
  }

2135
  int64_t tsInt64 = timeDouble;
2136
  size_t typeLen = strlen(type->valuestring);
wmmhello's avatar
wmmhello 已提交
2137
  if (typeLen == 1 && (type->valuestring[0] == 's' || type->valuestring[0] == 'S')) {
X
Xiaoyu Wang 已提交
2138
    // seconds
2139 2140 2141
    int8_t fromPrecision = TSDB_TIME_PRECISION_SECONDS;
    if(smlFactorS[toPrecision] < INT64_MAX / tsInt64){
      return tsInt64 * smlFactorS[toPrecision];
2142
    }
2143
    return -1;
wmmhello's avatar
wmmhello 已提交
2144
  } else if (typeLen == 2 && (type->valuestring[1] == 's' || type->valuestring[1] == 'S')) {
2145 2146
    switch (type->valuestring[0]) {
      case 'm':
wmmhello's avatar
wmmhello 已提交
2147
      case 'M':
X
Xiaoyu Wang 已提交
2148
        // milliseconds
2149
        return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_MILLI, toPrecision);
2150 2151
        break;
      case 'u':
wmmhello's avatar
wmmhello 已提交
2152
      case 'U':
X
Xiaoyu Wang 已提交
2153
        // microseconds
2154
        return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_MICRO, toPrecision);
2155 2156
        break;
      case 'n':
wmmhello's avatar
wmmhello 已提交
2157
      case 'N':
2158
        return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_NANO, toPrecision);
2159 2160
        break;
      default:
2161
        return -1;
2162 2163
    }
  } else {
2164
    return -1;
2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176
  }
}

static uint8_t smlGetTimestampLen(int64_t num) {
  uint8_t len = 0;
  while ((num /= 10) != 0) {
    len++;
  }
  len++;
  return len;
}

2177
static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *root) {
X
Xiaoyu Wang 已提交
2178
  // Timestamp must be the first KV to parse
2179
  int32_t toPrecision = info->currSTableMeta ? info->currSTableMeta->tableInfo.precision : TSDB_TIME_PRECISION_NANO;
2180 2181
  cJSON *timestamp = cJSON_GetObjectItem(root, "timestamp");
  if (cJSON_IsNumber(timestamp)) {
X
Xiaoyu Wang 已提交
2182
    // timestamp value 0 indicates current system time
2183
    double timeDouble = timestamp->valuedouble;
2184
    if (unlikely(smlDoubleToInt64OverFlow(timeDouble))) {
2185
      smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
2186
      return -1;
2187
    }
wmmhello's avatar
wmmhello 已提交
2188

2189 2190 2191 2192 2193 2194
    if (unlikely(timeDouble < 0)) {
      smlBuildInvalidDataMsg(&info->msgBuf,
                             "timestamp is negative", NULL);
      return timeDouble;
    }else if (unlikely(timeDouble == 0)) {
      return taosGetTimestampNs()/smlFactorNS[toPrecision];
2195
    }
2196

2197
    uint8_t tsLen = smlGetTimestampLen((int64_t)timeDouble);
2198 2199 2200 2201 2202
    int8_t fromPrecision = smlGetTsTypeByLen(tsLen);
    if (unlikely(fromPrecision == -1)) {
      smlBuildInvalidDataMsg(&info->msgBuf,
                             "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", NULL);
      return -1;
2203
    }
2204 2205

    return convertTimePrecision(timeDouble, fromPrecision, toPrecision);
2206
  } else if (cJSON_IsObject(timestamp)) {
2207
    return smlParseTSFromJSONObj(info, timestamp, toPrecision);
2208
  } else {
2209 2210 2211
    smlBuildInvalidDataMsg(&info->msgBuf,
                           "invalidate json", NULL);
    return -1;
wmmhello's avatar
wmmhello 已提交
2212
  }
wmmhello's avatar
wmmhello 已提交
2213 2214
}

X
Xiaoyu Wang 已提交
2215
static int32_t smlConvertJSONBool(SSmlKv *pVal, char *typeStr, cJSON *value) {
2216 2217 2218 2219 2220 2221 2222
  if (strcasecmp(typeStr, "bool") != 0) {
    uError("OTD:invalid type(%s) for JSON Bool", typeStr);
    return TSDB_CODE_TSC_INVALID_JSON_TYPE;
  }
  pVal->type = TSDB_DATA_TYPE_BOOL;
  pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
  pVal->i = value->valueint;
wmmhello's avatar
wmmhello 已提交
2223

2224 2225
  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
2226

X
Xiaoyu Wang 已提交
2227 2228 2229
static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
  // tinyint
  if (strcasecmp(typeStr, "i8") == 0 || strcasecmp(typeStr, "tinyint") == 0) {
2230 2231 2232 2233 2234 2235 2236 2237 2238
    if (!IS_VALID_TINYINT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(tinyint)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_TINYINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
2239 2240
  // smallint
  if (strcasecmp(typeStr, "i16") == 0 || strcasecmp(typeStr, "smallint") == 0) {
2241 2242 2243 2244 2245 2246 2247 2248 2249
    if (!IS_VALID_SMALLINT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(smallint)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_SMALLINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
2250 2251
  // int
  if (strcasecmp(typeStr, "i32") == 0 || strcasecmp(typeStr, "int") == 0) {
2252 2253 2254 2255 2256 2257 2258 2259 2260
    if (!IS_VALID_INT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(int)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_INT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
2261 2262
  // bigint
  if (strcasecmp(typeStr, "i64") == 0 || strcasecmp(typeStr, "bigint") == 0) {
2263 2264
    pVal->type = TSDB_DATA_TYPE_BIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
X
Xiaoyu Wang 已提交
2265
    if (value->valuedouble >= (double)INT64_MAX) {
2266
      pVal->i = INT64_MAX;
X
Xiaoyu Wang 已提交
2267
    } else if (value->valuedouble <= (double)INT64_MIN) {
2268
      pVal->i = INT64_MIN;
X
Xiaoyu Wang 已提交
2269
    } else {
2270
      pVal->i = value->valuedouble;
2271 2272 2273
    }
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
2274 2275
  // float
  if (strcasecmp(typeStr, "f32") == 0 || strcasecmp(typeStr, "float") == 0) {
2276 2277 2278
    if (!IS_VALID_FLOAT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(float)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
wmmhello's avatar
wmmhello 已提交
2279
    }
2280 2281 2282 2283 2284
    pVal->type = TSDB_DATA_TYPE_FLOAT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->f = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
2285 2286
  // double
  if (strcasecmp(typeStr, "f64") == 0 || strcasecmp(typeStr, "double") == 0) {
2287 2288 2289 2290
    pVal->type = TSDB_DATA_TYPE_DOUBLE;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->d = value->valuedouble;
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2291 2292
  }

X
Xiaoyu Wang 已提交
2293
  // if reach here means type is unsupported
2294 2295 2296
  uError("OTD:invalid type(%s) for JSON Number", typeStr);
  return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
2297

X
Xiaoyu Wang 已提交
2298
static int32_t smlConvertJSONString(SSmlKv *pVal, char *typeStr, cJSON *value) {
2299 2300 2301 2302 2303 2304 2305 2306 2307
  if (strcasecmp(typeStr, "binary") == 0) {
    pVal->type = TSDB_DATA_TYPE_BINARY;
  } else if (strcasecmp(typeStr, "nchar") == 0) {
    pVal->type = TSDB_DATA_TYPE_NCHAR;
  } else {
    uError("OTD:invalid type(%s) for JSON String", typeStr);
    return TSDB_CODE_TSC_INVALID_JSON_TYPE;
  }
  pVal->length = (int16_t)strlen(value->valuestring);
wmmhello's avatar
wmmhello 已提交
2308

2309
  if (pVal->type == TSDB_DATA_TYPE_BINARY && pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
wmmhello's avatar
wmmhello 已提交
2310 2311
    return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
  }
2312 2313
  if (pVal->type == TSDB_DATA_TYPE_NCHAR &&
      pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
wmmhello's avatar
wmmhello 已提交
2314 2315 2316
    return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
  }

wmmhello's avatar
wmmhello 已提交
2317 2318
  pVal->value = value->valuestring;
  return TSDB_CODE_SUCCESS;
2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344
}

static int32_t smlParseValueFromJSONObj(cJSON *root, SSmlKv *kv) {
  int32_t ret = TSDB_CODE_SUCCESS;
  int32_t size = cJSON_GetArraySize(root);

  if (size != OTD_JSON_SUB_FIELDS_NUM) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *value = cJSON_GetObjectItem(root, "value");
  if (value == NULL) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *type = cJSON_GetObjectItem(root, "type");
  if (!cJSON_IsString(type)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  switch (value->type) {
    case cJSON_True:
    case cJSON_False: {
      ret = smlConvertJSONBool(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
wmmhello's avatar
wmmhello 已提交
2345
      }
2346
      break;
wmmhello's avatar
wmmhello 已提交
2347
    }
2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363
    case cJSON_Number: {
      ret = smlConvertJSONNumber(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
      break;
    }
    case cJSON_String: {
      ret = smlConvertJSONString(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
      break;
    }
    default:
      return TSDB_CODE_TSC_INVALID_JSON_TYPE;
wmmhello's avatar
wmmhello 已提交
2364
  }
2365 2366

  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2367 2368
}

2369 2370 2371 2372 2373 2374 2375 2376
static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
  switch (root->type) {
    case cJSON_True:
    case cJSON_False: {
      kv->type = TSDB_DATA_TYPE_BOOL;
      kv->length = (int16_t)tDataTypes[kv->type].bytes;
      kv->i = root->valueint;
      break;
wmmhello's avatar
wmmhello 已提交
2377
    }
2378 2379 2380 2381 2382 2383 2384 2385 2386 2387
    case cJSON_Number: {
      kv->type = TSDB_DATA_TYPE_DOUBLE;
      kv->length = (int16_t)tDataTypes[kv->type].bytes;
      kv->d = root->valuedouble;
      break;
    }
    case cJSON_String: {
      /* set default JSON type to binary/nchar according to
       * user configured parameter tsDefaultJSONStrType
       */
wmmhello's avatar
wmmhello 已提交
2388

X
Xiaoyu Wang 已提交
2389
      char *tsDefaultJSONStrType = "nchar";  // todo
2390 2391
      smlConvertJSONString(kv, tsDefaultJSONStrType, root);
      break;
wmmhello's avatar
wmmhello 已提交
2392
    }
2393 2394 2395 2396 2397 2398 2399 2400 2401 2402
    case cJSON_Object: {
      int32_t ret = smlParseValueFromJSONObj(root, kv);
      if (ret != TSDB_CODE_SUCCESS) {
        uError("OTD:Failed to parse value from JSON Obj");
        return ret;
      }
      break;
    }
    default:
      return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2403
  }
2404 2405 2406 2407

  return TSDB_CODE_SUCCESS;
}

2408
static int32_t smlParseColsFromJSON(cJSON *root, SSmlKv *kv) {
2409 2410 2411 2412 2413
  cJSON *metricVal = cJSON_GetObjectItem(root, "value");
  if (metricVal == NULL) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

2414
  int32_t ret = smlParseValueFromJSON(metricVal, kv);
2415 2416 2417
  if (ret != TSDB_CODE_SUCCESS) {
    return ret;
  }
2418

2419
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2420 2421
}

2422
static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) {
2423
  int32_t ret = TSDB_CODE_SUCCESS;
2424

2425
  cJSON *tags = cJSON_GetObjectItem(root, "tags");
2426
  if (unlikely(tags == NULL || tags->type != cJSON_Object)) {
2427 2428
    return TSDB_CODE_TSC_INVALID_JSON;
  }
wmmhello's avatar
wmmhello 已提交
2429

2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472
  // add measure to tags to identify one child table
  cJSON *cMeasure = cJSON_AddStringToObject(tags, JSON_METERS_NAME, elements->measure);
  if(unlikely(cMeasure == NULL)){
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  if(is_same_child_table_json(elements->tags, info->preLine.tags) == 0){
    return TSDB_CODE_SUCCESS;
  }

  bool isSameMeasure = IS_SAME_SUPER_TABLE;

  int     cnt = 0;
  SArray *preLineKV = info->preLineTagKV;
  bool    isSuperKVInit = true;
  SArray *superKV = NULL;
  if(info->dataFormat){
    if(unlikely(!isSameMeasure)){
      SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);

      if(unlikely(sMeta == NULL)){
        sMeta = smlBuildSTableMeta(info->dataFormat);
        STableMeta * pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen);
        sMeta->tableMeta = pTableMeta;
        if(pTableMeta == NULL){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
        nodeListSet(&info->superTables, elements->measure, elements->measureLen, sMeta);
      }
      info->currSTableMeta = sMeta->tableMeta;
      superKV = sMeta->tags;

      if(unlikely(taosArrayGetSize(superKV) == 0)){
        isSuperKVInit = false;
      }
      taosArraySetSize(preLineKV, 0);
    }
  }else{
    taosArraySetSize(preLineKV, 0);
  }

2473 2474 2475
  int32_t tagNum = cJSON_GetArraySize(tags);
  for (int32_t i = 0; i < tagNum; ++i) {
    cJSON *tag = cJSON_GetArrayItem(tags, i);
2476
    if (unlikely(tag == NULL)) {
2477
      return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2478
    }
2479
    if(unlikely(tag == cMeasure)) continue;
2480
    size_t keyLen = strlen(tag->string);
2481
    if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
2482 2483 2484 2485
      uError("OTD:Tag key length is 0 or too large than 64");
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
    }

2486
    // add kv to SSmlKv
2487
    SSmlKv kv ={.key = tag->string, .keyLen = keyLen};
X
Xiaoyu Wang 已提交
2488
    // value
2489
    ret = smlParseValueFromJSON(tag, &kv);
2490
    if (unlikely(ret != TSDB_CODE_SUCCESS)) {
2491 2492
      return ret;
    }
2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570

    if(info->dataFormat){
      if(unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)){
        info->needModifySchema = true;
      }

      if(isSameMeasure){
        if(unlikely(cnt >= taosArrayGetSize(preLineKV))) {
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
        SSmlKv *preKV = taosArrayGet(preLineKV, cnt);
        if(unlikely(kv.length > preKV->length)){
          preKV->length = kv.length;
          SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
          ASSERT(tableMeta != NULL);

          SSmlKv *oldKV = taosArrayGet(tableMeta->tags, cnt);
          oldKV->length = kv.length;
          info->needModifySchema = true;
        }
        if(unlikely(!IS_SAME_KEY)){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
      }else{
        if(isSuperKVInit){
          if(unlikely(cnt >= taosArrayGetSize(superKV))) {
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }
          SSmlKv *preKV = taosArrayGet(superKV, cnt);
          if(unlikely(kv.length > preKV->length)) {
            preKV->length = kv.length;
          }else{
            kv.length = preKV->length;
          }
          info->needModifySchema = true;

          if(unlikely(!IS_SAME_KEY)){
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }
        }else{
          taosArrayPush(superKV, &kv);
        }
        taosArrayPush(preLineKV, &kv);
      }
    }else{
      taosArrayPush(preLineKV, &kv);
    }
    cnt++;
  }

  void* oneTable = nodeListGet(info->childTables, elements->tags, POINTER_BYTES, is_same_child_table_json);
  if ((oneTable != NULL)) {
    return TSDB_CODE_SUCCESS;
  }

  SSmlTableInfo *tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen);
  if (unlikely(!tinfo)) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  for(int i = 0; i < taosArrayGetSize(preLineKV); i++){
    taosArrayPush(tinfo->tags, taosArrayGet(preLineKV, i));
  }
  smlSetCTableName(tinfo);
  if(info->dataFormat) {
    info->currSTableMeta->uid = tinfo->uid;
    tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
    if(tinfo->tableDataCtx == NULL){
      smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
      return TSDB_CODE_SML_INVALID_DATA;
    }
wmmhello's avatar
wmmhello 已提交
2571 2572
  }

2573 2574
  nodeListSet(&info->childTables, tags, POINTER_BYTES, tinfo);

2575
  return ret;
wmmhello's avatar
wmmhello 已提交
2576 2577
}

2578
static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) {
2579
  int32_t ret = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2580

2581
  int32_t size = cJSON_GetArraySize(root);
X
Xiaoyu Wang 已提交
2582
  // outmost json fields has to be exactly 4
2583
  if (unlikely(size != OTD_JSON_FIELDS_NUM)) {
X
Xiaoyu Wang 已提交
2584
    uError("OTD:0x%" PRIx64 " Invalid number of JSON fields in data point %d", info->id, size);
2585
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2586
  }
2587

X
Xiaoyu Wang 已提交
2588
  // Parse metric
2589 2590
  ret = smlParseMetricFromJSON(info, root, elements);
  if (unlikely(ret != TSDB_CODE_SUCCESS)) {
X
Xiaoyu Wang 已提交
2591
    uError("OTD:0x%" PRIx64 " Unable to parse metric from JSON payload", info->id);
2592
    return ret;
wmmhello's avatar
wmmhello 已提交
2593
  }
X
Xiaoyu Wang 已提交
2594
  uDebug("OTD:0x%" PRIx64 " Parse metric from JSON payload finished", info->id);
wmmhello's avatar
wmmhello 已提交
2595

X
Xiaoyu Wang 已提交
2596
  // Parse metric value
2597 2598 2599
  SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN};
  ret = smlParseColsFromJSON(root, &kv);
  if (unlikely(ret)) {
X
Xiaoyu Wang 已提交
2600
    uError("OTD:0x%" PRIx64 " Unable to parse metric value from JSON payload", info->id);
2601
    return ret;
2602
  }
X
Xiaoyu Wang 已提交
2603
  uDebug("OTD:0x%" PRIx64 " Parse metric value from JSON payload finished", info->id);
2604

2605 2606 2607
  // Parse tags
  ret = smlParseTagsFromJSON(info, root, elements);
  if (unlikely(ret)) {
X
Xiaoyu Wang 已提交
2608
    uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
2609
    return ret;
2610
  }
X
Xiaoyu Wang 已提交
2611
  uDebug("OTD:0x%" PRIx64 " Parse tags from JSON payload finished", info->id);
wmmhello's avatar
wmmhello 已提交
2612

2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639
  if(unlikely(info->reRun)){
    return TSDB_CODE_SUCCESS;
  }

  // Parse timestamp
  // notice!!! put ts back to tag to ensure get meta->precision
  int64_t ts = smlParseTSFromJSON(info, root);
  if (unlikely(ts < 0)) {
    uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id);
    return TSDB_CODE_INVALID_TIMESTAMP;
  }
  uDebug("OTD:0x%" PRIx64 " Parse timestamp from JSON payload finished", info->id);
  SSmlKv kvTs = { .key = TS, .keyLen = TS_LEN, .i = ts, .type = TSDB_DATA_TYPE_TIMESTAMP, .length = (int16_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};

  if(info->dataFormat){
    ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kvTs, 0);
    if(ret == TSDB_CODE_SUCCESS){
      ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 1);
    }
    if(ret == TSDB_CODE_SUCCESS){
      ret = smlBuildRow(info->currTableDataCtx);
    }
    if (unlikely(ret != TSDB_CODE_SUCCESS)) {
      smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
      return ret;
    }
  }else{
2640 2641 2642
    if(elements->colArray == NULL){
      elements->colArray = taosArrayInit(16, sizeof(SSmlKv));
    }
2643 2644 2645 2646 2647
    taosArrayPush(elements->colArray, &kvTs);
    taosArrayPush(elements->colArray, &kv);
  }
  info->preLine = *elements;

2648
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2649
}
2650
/************* TSDB_SML_JSON_PROTOCOL function end **************/
2651 2652 2653
static int32_t smlParseLineBottom(SSmlHandle *info) {
  if(info->dataFormat) return TSDB_CODE_SUCCESS;

wmmhello's avatar
wmmhello 已提交
2654 2655
  for(int32_t i = 0; i < info->lineNum; i ++){
    SSmlLineInfo* elements = info->lines + i;
2656 2657 2658 2659 2660 2661 2662
    SSmlTableInfo *tinfo = NULL;
    if(info->protocol != TSDB_SML_JSON_PROTOCOL){
      tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements->measure, elements->measureTagsLen, NULL);
    }else{
      tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements->tags, POINTER_BYTES, is_same_child_table_json);
    }

wmmhello's avatar
wmmhello 已提交
2663
    if(tinfo == NULL){
2664 2665 2666
      uError("SML:0x%" PRIx64 "get oneTable failed, line num:%d", info->id, i);
      smlBuildInvalidDataMsg(&info->msgBuf, "get oneTable failed", elements->measure);
      return TSDB_CODE_SML_INVALID_DATA;
2667
    }
wmmhello's avatar
wmmhello 已提交
2668

2669 2670 2671 2672 2673
    if (taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS) {
      smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
      return TSDB_CODE_PAR_INVALID_TAGS_NUM;
    }

2674 2675 2676
    if (taosArrayGetSize(elements->colArray) + taosArrayGetSize(tinfo->tags) > TSDB_MAX_COLUMNS) {
      smlBuildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL);
      return TSDB_CODE_PAR_TOO_MANY_COLUMNS;
wmmhello's avatar
wmmhello 已提交
2677
    }
wmmhello's avatar
wmmhello 已提交
2678

2679
    int ret = smlPushCols(tinfo->cols, elements->colArray);
X
Xiaoyu Wang 已提交
2680
    if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2681 2682 2683
      return ret;
    }

2684
    SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
2685
    if (tableMeta) {  // update meta
wmmhello's avatar
wmmhello 已提交
2686
      ret = smlUpdateMeta(tableMeta->colHash, tableMeta->cols, elements->colArray, false, &info->msgBuf);
2687
      if (ret == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2688
        ret = smlUpdateMeta(tableMeta->tagHash, tableMeta->tags, tinfo->tags, true, &info->msgBuf);
2689 2690 2691 2692 2693
      }
      if (ret != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
        return ret;
      }
X
Xiaoyu Wang 已提交
2694
    } else {
2695 2696 2697 2698 2699
      ret = smlJudgeDupColName(elements->colArray, tinfo->tags, &info->msgBuf);
      if (ret != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
        return ret;
      }
wmmhello's avatar
wmmhello 已提交
2700

2701 2702 2703
      SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat);
      smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags);
      smlInsertMeta(meta->colHash, meta->cols, elements->colArray);
wmmhello's avatar
wmmhello 已提交
2704
      nodeListSet(&info->superTables, elements->measure, elements->measureLen, meta);
wmmhello's avatar
wmmhello 已提交
2705
    }
wmmhello's avatar
wmmhello 已提交
2706
  }
2707

wmmhello's avatar
wmmhello 已提交
2708 2709 2710
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
2711
static int32_t smlParseJSON(SSmlHandle *info, char *payload) {
2712 2713
  int32_t payloadNum = 0;
  int32_t ret = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2714

2715
  if (unlikely(payload == NULL)) {
X
Xiaoyu Wang 已提交
2716
    uError("SML:0x%" PRIx64 " empty JSON Payload", info->id);
2717
    return TSDB_CODE_TSC_INVALID_JSON;
2718
  }
2719

wmmhello's avatar
wmmhello 已提交
2720
  info->root = cJSON_Parse(payload);
2721
  if (unlikely(info->root == NULL)) {
X
Xiaoyu Wang 已提交
2722
    uError("SML:0x%" PRIx64 " parse json failed:%s", info->id, payload);
2723 2724
    return TSDB_CODE_TSC_INVALID_JSON;
  }
X
Xiaoyu Wang 已提交
2725
  // multiple data points must be sent in JSON array
2726
  if (cJSON_IsArray(info->root)) {
wmmhello's avatar
wmmhello 已提交
2727
    payloadNum = cJSON_GetArraySize(info->root);
2728 2729
  } else if (cJSON_IsObject(info->root)) {
    payloadNum = 1;
2730
  } else {
X
Xiaoyu Wang 已提交
2731
    uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
2732
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2733
  }
wmmhello's avatar
wmmhello 已提交
2734

2735 2736
  int32_t i = 0;
  while (i < payloadNum) {
wmmhello's avatar
wmmhello 已提交
2737
    cJSON *dataPoint = (payloadNum == 1 && cJSON_IsObject(info->root)) ? info->root : cJSON_GetArrayItem(info->root, i);
2738 2739 2740 2741 2742 2743 2744
    if(info->dataFormat) {
      SSmlLineInfo element = {0};
      ret = smlParseJSONString(info, dataPoint, &element);
    }else{
      ret = smlParseJSONString(info, dataPoint, info->lines + i);
    }
    if (unlikely(ret != TSDB_CODE_SUCCESS)) {
X
Xiaoyu Wang 已提交
2745
      uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777
      return ret;
    }

    if(unlikely(info->reRun)){
      i = 0;
      info->reRun = false;
      // clear info->childTables
      NodeList* pList = info->childTables;
      while (pList) {
        if(pList->data.used) {
          smlDestroyTableInfo(pList->data.value);
          pList->data.used = false;
        }
        pList = pList->next;
      }

      // clear info->superTables
      pList = info->superTables;
      while (pList) {
        if(pList->data.used) {
          smlDestroySTableMeta(pList->data.value);
          pList->data.used = false;
        }
        pList = pList->next;
      }

      if(unlikely(info->lines != NULL)){
        uError("SML:0x%" PRIx64 " info->lines != NULL", info->id);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      info->lineNum = payloadNum;
      info->lines = taosMemoryCalloc(info->lineNum, sizeof(SSmlLineInfo));
2778

2779
      memset(&info->preLine, 0, sizeof(SSmlLineInfo));
2780 2781 2782
      info->currSTableMeta = NULL;
      info->currTableDataCtx = NULL;

2783 2784 2785 2786
      SVnodeModifOpStmt* stmt= (SVnodeModifOpStmt*)(info->pQuery->pRoot);
      stmt->freeHashFunc(stmt->pTableBlockHashObj);
      stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
      continue;
2787 2788 2789
    }
  }

2790
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2791
}
2792

X
Xiaoyu Wang 已提交
2793
static int32_t smlInsertData(SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
2794 2795
  int32_t code = TSDB_CODE_SUCCESS;

wmmhello's avatar
wmmhello 已提交
2796 2797 2798
  NodeList* tmp = info->childTables;
  while (tmp) {
    SSmlTableInfo *tableData = (SSmlTableInfo *)tmp->data.value;
wmmhello's avatar
wmmhello 已提交
2799 2800

    SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
wmmhello's avatar
wmmhello 已提交
2801
    tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
wmmhello's avatar
wmmhello 已提交
2802
    memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName));
D
dapan1121 已提交
2803 2804 2805 2806 2807 2808

    SRequestConnInfo conn = {0};
    conn.pTrans = info->taos->pAppInfo->pTransporter;
    conn.requestId = info->pRequest->requestId;
    conn.requestObjRefId = info->pRequest->self;
    conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
X
Xiaoyu Wang 已提交
2809

wmmhello's avatar
wmmhello 已提交
2810
    SVgroupInfo vg;
D
dapan1121 已提交
2811
    code = catalogGetTableHashVgroup(info->pCatalog, &conn, &pName, &vg);
2812
    if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2813
      uError("SML:0x%" PRIx64 " catalogGetTableHashVgroup failed. table name: %s", info->id, tableData->childTableName);
wmmhello's avatar
wmmhello 已提交
2814 2815
      return code;
    }
X
Xiaoyu Wang 已提交
2816
    taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
wmmhello's avatar
wmmhello 已提交
2817

wmmhello's avatar
wmmhello 已提交
2818
    SSmlSTableMeta *pMeta =
2819
        (SSmlSTableMeta *)nodeListGet(info->superTables, tableData->sTableName, tableData->sTableNameLen, NULL);
wmmhello's avatar
wmmhello 已提交
2820
    ASSERT(NULL != pMeta);
wmmhello's avatar
wmmhello 已提交
2821

2822
    // use tablemeta of stable to save vgid and uid of child table
wmmhello's avatar
wmmhello 已提交
2823 2824
    pMeta->tableMeta->vgId = vg.vgId;
    pMeta->tableMeta->uid = tableData->uid;  // one table merge data block together according uid
wmmhello's avatar
wmmhello 已提交
2825

wmmhello's avatar
wmmhello 已提交
2826 2827
    code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, pMeta->cols, tableData->cols,
                       pMeta->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen,
2828
                       info->ttl, info->msgBuf.buf, info->msgBuf.len);
X
Xiaoyu Wang 已提交
2829 2830
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " smlBindData failed", info->id);
wmmhello's avatar
wmmhello 已提交
2831 2832
      return code;
    }
wmmhello's avatar
wmmhello 已提交
2833
    tmp = tmp->next;
wmmhello's avatar
wmmhello 已提交
2834
  }
wmmhello's avatar
wmmhello 已提交
2835

wmmhello's avatar
wmmhello 已提交
2836
  code = smlBuildOutput(info->pQuery, info->pVgHash);
2837
  if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2838
    uError("SML:0x%" PRIx64 " smlBuildOutput failed", info->id);
2839 2840
    return code;
  }
2841 2842
  info->cost.insertRpcTime = taosGetTimestampUs();

2843 2844 2845
  SAppClusterSummary *pActivity = &info->taos->pAppInfo->summary;
  atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);

wmmhello's avatar
wmmhello 已提交
2846 2847
  launchQueryImpl(info->pRequest, info->pQuery, true, NULL);
  return info->pRequest->code;
wmmhello's avatar
wmmhello 已提交
2848 2849
}

X
Xiaoyu Wang 已提交
2850 2851
static void smlPrintStatisticInfo(SSmlHandle *info) {
  uError("SML:0x%" PRIx64
2852
             " smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \
X
Xiaoyu Wang 已提交
2853
        parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64
2854
             "",
X
Xiaoyu Wang 已提交
2855
         info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables,
2856 2857
         info->cost.numOfCreateSTables, info->cost.numOfAlterTagSTables, info->cost.numOfAlterColSTables,
         info->cost.schemaTime - info->cost.parseTime,
X
Xiaoyu Wang 已提交
2858 2859
         info->cost.insertBindTime - info->cost.schemaTime, info->cost.insertRpcTime - info->cost.insertBindTime,
         info->cost.endTime - info->cost.insertRpcTime, info->cost.endTime - info->cost.parseTime);
2860 2861
}

dengyihao's avatar
dengyihao 已提交
2862
static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
wmmhello's avatar
wmmhello 已提交
2863
  int32_t code = TSDB_CODE_SUCCESS;
2864
  if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
dengyihao's avatar
dengyihao 已提交
2865
    if (lines) {
wmmhello's avatar
wmmhello 已提交
2866
      code = smlParseJSON(info, *lines);
dengyihao's avatar
dengyihao 已提交
2867
    } else if (rawLine) {
wmmhello's avatar
wmmhello 已提交
2868 2869
      code = smlParseJSON(info, rawLine);
    }
2870
    if (code != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
2871
      uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, lines ? *lines : rawLine);
2872 2873
      return code;
    }
wmmhello's avatar
wmmhello 已提交
2874
    return code;
wmmhello's avatar
wmmhello 已提交
2875
  }
wmmhello's avatar
wmmhello 已提交
2876

2877 2878
  int32_t i = 0;
  while (i < numLines) {
wmmhello's avatar
wmmhello 已提交
2879
    char *tmp = NULL;
dengyihao's avatar
dengyihao 已提交
2880 2881
    int   len = 0;
    if (lines) {
wmmhello's avatar
wmmhello 已提交
2882 2883
      tmp = lines[i];
      len = strlen(tmp);
dengyihao's avatar
dengyihao 已提交
2884
    } else if (rawLine) {
wmmhello's avatar
wmmhello 已提交
2885
      tmp = rawLine;
dengyihao's avatar
dengyihao 已提交
2886 2887
      while (rawLine < rawLineEnd) {
        if (*(rawLine++) == '\n') {
wmmhello's avatar
wmmhello 已提交
2888 2889 2890 2891
          break;
        }
        len++;
      }
dengyihao's avatar
dengyihao 已提交
2892
      if (info->protocol == TSDB_SML_LINE_PROTOCOL && tmp[0] == '#') {  // this line is comment
2893
        i++;
wmmhello's avatar
wmmhello 已提交
2894 2895
        continue;
      }
wmmhello's avatar
wmmhello 已提交
2896 2897
    }

2898 2899
    uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, (info->isRawLine ? "rawdata" : tmp));

X
Xiaoyu Wang 已提交
2900
    if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
wmmhello's avatar
wmmhello 已提交
2901 2902 2903 2904 2905 2906
      if(info->dataFormat){
        SSmlLineInfo element = {0};
        code = smlParseInfluxString(info, tmp, tmp + len, &element);
      }else{
        code = smlParseInfluxString(info, tmp, tmp + len, info->lines + i);
      }
X
Xiaoyu Wang 已提交
2907
    } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
2908 2909 2910 2911 2912 2913 2914
      if(info->dataFormat) {
        SSmlLineInfo element = {0};
        code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, &element);
      }else{
        code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, info->lines + i);
      }

X
Xiaoyu Wang 已提交
2915
    } else {
2916 2917
      ASSERT(0);
    }
wmmhello's avatar
wmmhello 已提交
2918
    if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2919
      uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, tmp);
2920
      return code;
wmmhello's avatar
wmmhello 已提交
2921
    }
2922 2923 2924 2925
    if(info->reRun){
      i = 0;
      info->reRun = false;
      // clear info->childTables
wmmhello's avatar
wmmhello 已提交
2926 2927 2928 2929 2930 2931 2932
      NodeList* pList = info->childTables;
      while (pList) {
        if(pList->data.used) {
          smlDestroyTableInfo(pList->data.value);
          pList->data.used = false;
        }
        pList = pList->next;
2933 2934 2935
      }

      // clear info->superTables
wmmhello's avatar
wmmhello 已提交
2936 2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 2947
      pList = info->superTables;
      while (pList) {
        if(pList->data.used) {
          smlDestroySTableMeta(pList->data.value);
          pList->data.used = false;
        }
        pList = pList->next;
      }

      if(info->lines != NULL){
        uError("SML:0x%" PRIx64 " info->lines != NULL", info->id);
        return TSDB_CODE_SML_INVALID_DATA;
2948
      }
wmmhello's avatar
wmmhello 已提交
2949 2950
      info->lines = taosMemoryCalloc(info->lineNum, sizeof(SSmlLineInfo));

2951
      memset(&info->preLine, 0, sizeof(SSmlLineInfo));
2952 2953 2954
      info->currSTableMeta = NULL;
      info->currTableDataCtx = NULL;

2955 2956 2957
      SVnodeModifOpStmt* stmt= (SVnodeModifOpStmt*)(info->pQuery->pRoot);
      stmt->freeHashFunc(stmt->pTableBlockHashObj);
      stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
2958
      continue;
2959
    }
2960
    i++;
wmmhello's avatar
wmmhello 已提交
2961
  }
2962

2963 2964 2965
  return code;
}

dengyihao's avatar
dengyihao 已提交
2966
static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
2967
  int32_t code = TSDB_CODE_SUCCESS;
2968 2969
  int32_t retryNum = 0;

2970 2971
  info->cost.parseTime = taosGetTimestampUs();

wmmhello's avatar
wmmhello 已提交
2972
  code = smlParseLine(info, lines, rawLine, rawLineEnd, numLines);
2973
  if (code != 0) {
X
Xiaoyu Wang 已提交
2974
    uError("SML:0x%" PRIx64 " smlParseLine error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2975
    return code;
2976
  }
wmmhello's avatar
wmmhello 已提交
2977

2978 2979 2980 2981 2982 2983
  code = smlParseLineBottom(info);
  if (code != 0) {
    uError("SML:0x%" PRIx64 " smlParseLineBottom error : %s", info->id, tstrerror(code));
    return code;
  }

2984
  info->cost.lineNum = numLines;
wmmhello's avatar
wmmhello 已提交
2985 2986
  info->cost.numOfSTables = nodeListSize(info->superTables);
  info->cost.numOfCTables = nodeListSize(info->childTables);
2987 2988

  info->cost.schemaTime = taosGetTimestampUs();
2989

X
Xiaoyu Wang 已提交
2990
  do {
2991 2992
    code = smlModifyDBSchemas(info);
    if (code == 0) break;
wmmhello's avatar
wmmhello 已提交
2993
  } while (retryNum++ < nodeListSize(info->superTables) * MAX_RETRY_TIMES);
2994

wmmhello's avatar
wmmhello 已提交
2995
  if (code != 0) {
X
Xiaoyu Wang 已提交
2996
    uError("SML:0x%" PRIx64 " smlModifyDBSchemas error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2997
    return code;
wmmhello's avatar
wmmhello 已提交
2998
  }
wmmhello's avatar
wmmhello 已提交
2999

3000
  info->cost.insertBindTime = taosGetTimestampUs();
wmmhello's avatar
wmmhello 已提交
3001 3002
  code = smlInsertData(info);
  if (code != 0) {
X
Xiaoyu Wang 已提交
3003
    uError("SML:0x%" PRIx64 " smlInsertData error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
3004
    return code;
wmmhello's avatar
wmmhello 已提交
3005 3006 3007 3008 3009
  }

  return code;
}

wmmhello's avatar
wmmhello 已提交
3010 3011 3012 3013 3014
TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, char *rawLineEnd,
                                       int numLines, int protocol, int precision, int32_t ttl, int64_t reqid) {
  if (NULL == taos) {
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    return NULL;
3015
  }
3016

wmmhello's avatar
wmmhello 已提交
3017 3018 3019 3020
  SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
  if (request == NULL) {
    uError("SML:taos_schemaless_insert error request is null");
    return NULL;
wmmhello's avatar
wmmhello 已提交
3021
  }
3022

wmmhello's avatar
wmmhello 已提交
3023 3024 3025 3026
  SSmlHandle *info = smlBuildSmlInfo(taos);
  if (info == NULL) {
    request->code = TSDB_CODE_OUT_OF_MEMORY;
    uError("SML:taos_schemaless_insert error SSmlHandle is null");
wmmhello's avatar
wmmhello 已提交
3027 3028
    goto end;
  }
wmmhello's avatar
wmmhello 已提交
3029 3030 3031 3032 3033 3034 3035 3036
  info->pRequest = request;
  info->isRawLine = rawLine != NULL;
  info->ttl       = ttl;
  info->precision = precision;
  info->protocol = protocol;
  info->msgBuf.buf = info->pRequest->msgBuf;
  info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
  info->lineNum = numLines;
wmmhello's avatar
wmmhello 已提交
3037

wmmhello's avatar
wmmhello 已提交
3038
  SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
wmmhello's avatar
wmmhello 已提交
3039 3040 3041
  if (request->pDb == NULL) {
    request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
wmmhello's avatar
wmmhello 已提交
3042 3043 3044
    goto end;
  }

X
Xiaoyu Wang 已提交
3045
  if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) {
3046
    request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
wmmhello's avatar
wmmhello 已提交
3047
    smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL);
3048
    goto end;
wmmhello's avatar
wmmhello 已提交
3049 3050
  }

X
Xiaoyu Wang 已提交
3051 3052
  if (protocol == TSDB_SML_LINE_PROTOCOL &&
      (precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)) {
3053
    request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
wmmhello's avatar
wmmhello 已提交
3054
    smlBuildInvalidDataMsg(&msg, "precision invalidate for line protocol", NULL);
3055 3056 3057
    goto end;
  }

3058
  if (protocol == TSDB_SML_JSON_PROTOCOL) {
wmmhello's avatar
wmmhello 已提交
3059
    numLines = 1;
3060
  } else if (numLines <= 0) {
wmmhello's avatar
wmmhello 已提交
3061 3062 3063 3064 3065
    request->code = TSDB_CODE_SML_INVALID_DATA;
    smlBuildInvalidDataMsg(&msg, "line num is invalid", NULL);
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
3066 3067 3068 3069 3070
  int32_t code = smlProcess(info, lines, rawLine, rawLineEnd, numLines);
  request->code = code;
  info->cost.endTime = taosGetTimestampUs();
  info->cost.code = code;
  smlPrintStatisticInfo(info);
3071

wmmhello's avatar
wmmhello 已提交
3072
end:
3073
  uDebug("resultend:%s", request->msgBuf);
wmmhello's avatar
wmmhello 已提交
3074
  smlDestroyInfo(info);
3075
  return (TAOS_RES *)request;
wmmhello's avatar
wmmhello 已提交
3076
}
wmmhello's avatar
wmmhello 已提交
3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093

/**
 * taos_schemaless_insert() parse and insert data points into database according to
 * different protocol.
 *
 * @param $lines input array may contain multiple lines, each line indicates a data point.
 *               If protocol=2 is used input array should contain single JSON
 *               string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
 *               multiple data points in JSON format, should include them in $JSON_string
 *               as a JSON array.
 * @param $numLines indicates how many data points in $lines.
 *                  If protocol = 2 is used this param will be ignored as $lines should
 *                  contain single JSON string.
 * @param $protocol indicates which protocol to use for parsing:
 *                  0 - influxDB line protocol
 *                  1 - OpenTSDB telnet line protocol
 *                  2 - OpenTSDB JSON format protocol
dengyihao's avatar
dengyihao 已提交
3094
 * @return TAOS_RES
wmmhello's avatar
wmmhello 已提交
3095 3096
 */

3097 3098
TAOS_RES *taos_schemaless_insert_ttl_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision,
                                                int32_t ttl, int64_t reqid) {
wmmhello's avatar
wmmhello 已提交
3099
  return taos_schemaless_insert_inner(taos, lines, NULL, NULL, numLines, protocol, precision, ttl, reqid);
dengyihao's avatar
dengyihao 已提交
3100 3101
}

3102 3103 3104
TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, TSDB_DEFAULT_TABLE_TTL, 0);
}
wmmhello's avatar
wmmhello 已提交
3105

3106 3107 3108
TAOS_RES *taos_schemaless_insert_ttl(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int32_t ttl) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, ttl, 0);
}
wmmhello's avatar
wmmhello 已提交
3109

3110 3111
TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int64_t reqid) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, TSDB_DEFAULT_TABLE_TTL, reqid);
wmmhello's avatar
wmmhello 已提交
3112 3113
}

3114
TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
3115
                                                    int precision, int32_t ttl, int64_t reqid) {
dengyihao's avatar
dengyihao 已提交
3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127
  int numLines = 0;
  *totalRows = 0;
  char *tmp = lines;
  for (int i = 0; i < len; i++) {
    if (lines[i] == '\n' || i == len - 1) {
      numLines++;
      if (tmp[0] != '#' || protocol != TSDB_SML_LINE_PROTOCOL) {  // ignore comment
        (*totalRows)++;
      }
      tmp = lines + i + 1;
    }
  }
wmmhello's avatar
wmmhello 已提交
3128
  return taos_schemaless_insert_inner(taos, NULL, lines, lines + len, *totalRows, protocol, precision, ttl, reqid);
dengyihao's avatar
dengyihao 已提交
3129 3130
}

3131 3132 3133 3134 3135 3136
TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int64_t reqid) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, TSDB_DEFAULT_TABLE_TTL, reqid);
}
TAOS_RES *taos_schemaless_insert_raw_ttl(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int32_t ttl) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, ttl, 0);
}
wmmhello's avatar
wmmhello 已提交
3137

3138 3139
TAOS_RES *taos_schemaless_insert_raw(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, TSDB_DEFAULT_TABLE_TTL, 0);
wmmhello's avatar
wmmhello 已提交
3140
}