clientSml.c 100.6 KB
Newer Older
wmmhello's avatar
wmmhello 已提交
1 2 3 4 5
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

X
Xiaoyu Wang 已提交
6 7 8 9 10
#include "cJSON.h"
#include "catalog.h"
#include "clientInt.h"
#include "osSemaphore.h"
#include "osThread.h"
wmmhello's avatar
wmmhello 已提交
11 12
#include "query.h"
#include "taos.h"
wmmhello's avatar
wmmhello 已提交
13
#include "taoserror.h"
X
Xiaoyu Wang 已提交
14
#include "tcommon.h"
wmmhello's avatar
wmmhello 已提交
15
#include "tdef.h"
X
Xiaoyu Wang 已提交
16
#include "tglobal.h"
wmmhello's avatar
wmmhello 已提交
17 18
#include "tlog.h"
#include "tmsg.h"
X
Xiaoyu Wang 已提交
19
#include "tname.h"
wmmhello's avatar
wmmhello 已提交
20 21
#include "ttime.h"
#include "ttypes.h"
wmmhello's avatar
wmmhello 已提交
22

23 24 25 26 27 28 29 30 31 32 33 34 35
#if (defined(__GNUC__) && (__GNUC__ >= 3)) || (defined(__INTEL_COMPILER) && (__INTEL_COMPILER >= 800)) || defined(__clang__)
#  define expect(expr,value)    (__builtin_expect ((expr),(value)) )
#else
#  define expect(expr,value)    (expr)
#endif

#ifndef likely
#define likely(expr)     expect((expr) != 0, 1)
#endif
#ifndef unlikely
#define unlikely(expr)   expect((expr) != 0, 0)
#endif

wmmhello's avatar
wmmhello 已提交
36
//=================================================================================================
wmmhello's avatar
wmmhello 已提交
37 38 39 40 41 42 43

#define SPACE ' '
#define COMMA ','
#define EQUAL '='
#define QUOTE '"'
#define SLASH '\\'

dengyihao's avatar
dengyihao 已提交
44 45 46 47 48 49
#define JUMP_SPACE(sql, sqlEnd) \
  while (sql < sqlEnd) {        \
    if (*sql == SPACE)          \
      sql++;                    \
    else                        \
      break;                    \
X
Xiaoyu Wang 已提交
50
  }
wmmhello's avatar
wmmhello 已提交
51
// comma ,
wmmhello's avatar
wmmhello 已提交
52
//#define IS_SLASH_COMMA(sql) (*(sql) == COMMA && *((sql)-1) == SLASH)
X
Xiaoyu Wang 已提交
53
#define IS_COMMA(sql)       (*(sql) == COMMA && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
54
// space
wmmhello's avatar
wmmhello 已提交
55
//#define IS_SLASH_SPACE(sql) (*(sql) == SPACE && *((sql)-1) == SLASH)
X
Xiaoyu Wang 已提交
56
#define IS_SPACE(sql)       (*(sql) == SPACE && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
57
// equal =
wmmhello's avatar
wmmhello 已提交
58
//#define IS_SLASH_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) == SLASH)
X
Xiaoyu Wang 已提交
59
#define IS_EQUAL(sql)       (*(sql) == EQUAL && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
60
// quote "
wmmhello's avatar
wmmhello 已提交
61
//#define IS_SLASH_QUOTE(sql) (*(sql) == QUOTE && *((sql)-1) == SLASH)
X
Xiaoyu Wang 已提交
62
#define IS_QUOTE(sql)       (*(sql) == QUOTE && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
63
// SLASH
wmmhello's avatar
wmmhello 已提交
64
//#define IS_SLASH_SLASH(sql) (*(sql) == SLASH && *((sql)-1) == SLASH)
wmmhello's avatar
wmmhello 已提交
65

X
Xiaoyu Wang 已提交
66
#define IS_SLASH_LETTER(sql) \
wmmhello's avatar
wmmhello 已提交
67 68
  (*((sql)-1) == SLASH && (*(sql) == COMMA || *(sql) == SPACE || *(sql) == EQUAL || *(sql) == QUOTE || *(sql) == SLASH))                          \
//  (IS_SLASH_COMMA(sql) || IS_SLASH_SPACE(sql) || IS_SLASH_EQUAL(sql) || IS_SLASH_QUOTE(sql) || IS_SLASH_SLASH(sql))
wmmhello's avatar
wmmhello 已提交
69

X
Xiaoyu Wang 已提交
70
#define MOVE_FORWARD_ONE(sql, len) (memmove((void *)((sql)-1), (sql), len))
wmmhello's avatar
wmmhello 已提交
71

X
Xiaoyu Wang 已提交
72 73 74 75 76 77 78 79
#define PROCESS_SLASH(key, keyLen)           \
  for (int i = 1; i < keyLen; ++i) {         \
    if (IS_SLASH_LETTER(key + i)) {          \
      MOVE_FORWARD_ONE(key + i, keyLen - i); \
      i--;                                   \
      keyLen--;                              \
    }                                        \
  }
wmmhello's avatar
wmmhello 已提交
80

81 82 83
#define IS_INVALID_COL_LEN(len)   ((len) <= 0 || (len) >= TSDB_COL_NAME_LEN)
#define IS_INVALID_TABLE_LEN(len) ((len) <= 0 || (len) >= TSDB_TABLE_NAME_LEN)

84 85 86
#define OTD_JSON_SUB_FIELDS_NUM 2
#define OTD_JSON_FIELDS_NUM     4

X
Xiaoyu Wang 已提交
87 88 89 90
#define TS        "_ts"
#define TS_LEN    3
#define VALUE     "_value"
#define VALUE_LEN 6
91

92 93
#define JSON_METERS_NAME "__JM"

X
Xiaoyu Wang 已提交
94 95
#define BINARY_ADD_LEN 2  // "binary"   2 means " "
#define NCHAR_ADD_LEN  3  // L"nchar"   3 means L" "
wmmhello's avatar
wmmhello 已提交
96 97

#define MAX_RETRY_TIMES 5
wmmhello's avatar
wmmhello 已提交
98 99 100 101
//=================================================================================================
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;

typedef enum {
102
  SCHEMA_ACTION_NULL,
wmmhello's avatar
wmmhello 已提交
103 104 105 106 107
  SCHEMA_ACTION_CREATE_STABLE,
  SCHEMA_ACTION_ADD_COLUMN,
  SCHEMA_ACTION_ADD_TAG,
  SCHEMA_ACTION_CHANGE_COLUMN_SIZE,
  SCHEMA_ACTION_CHANGE_TAG_SIZE,
wmmhello's avatar
wmmhello 已提交
108 109
} ESchemaAction;

wmmhello's avatar
wmmhello 已提交
110 111 112 113 114 115 116 117 118 119 120 121 122
/*********************** list start *********************************/
typedef struct {
  const void  *key;
  int32_t      keyLen;
  void        *value;
  bool         used;
}Node;

typedef struct NodeList{
  Node             data;
  struct NodeList* next;
}NodeList;

123 124 125
typedef int32_t (*_equal_fn_sml)(const void *, const void *);

static void* nodeListGet(NodeList* list, const void *key, int32_t len, _equal_fn_sml fn){
wmmhello's avatar
wmmhello 已提交
126 127
  NodeList *tmp = list;
  while(tmp){
128 129 130 131 132 133 134 135
    if(fn == NULL){
      if(tmp->data.used && tmp->data.keyLen == len && memcmp(tmp->data.key, key, len) == 0) {
        return tmp->data.value;
      }
    }else{
      if(tmp->data.used && fn(tmp->data.key, key) == 0) {
        return tmp->data.value;
      }
wmmhello's avatar
wmmhello 已提交
136
    }
137

wmmhello's avatar
wmmhello 已提交
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
    tmp = tmp->next;
  }
  return NULL;
}

static int nodeListSet(NodeList** list, const void *key, int32_t len, void* value){
  NodeList *tmp = *list;
  while (tmp){
    if(!tmp->data.used) break;
    if(tmp->data.keyLen == len && memcmp(tmp->data.key, key, len) == 0) {
      return -1;
    }
    tmp = tmp->next;
  }
  if(tmp){
    tmp->data.key = key;
    tmp->data.keyLen = len;
    tmp->data.value = value;
    tmp->data.used = true;
  }else{
    NodeList *newNode = taosMemoryCalloc(1, sizeof(NodeList));
    if(newNode == NULL){
      return -1;
    }
    newNode->data.key = key;
    newNode->data.keyLen = len;
    newNode->data.value = value;
    newNode->data.used = true;
    newNode->next = *list;
    *list = newNode;
  }
  return 0;
}

static int nodeListSize(NodeList* list){
  int cnt = 0;
  while(list){
    if(list->data.used) cnt++;
    else break;
    list = list->next;
  }
  return cnt;
}
/*********************** list end *********************************/

wmmhello's avatar
wmmhello 已提交
183
typedef struct {
184 185 186 187
  char *measure;
  char *tags;
  char *cols;
  char *timestamp;
wmmhello's avatar
wmmhello 已提交
188 189 190 191 192 193

  int32_t measureLen;
  int32_t measureTagsLen;
  int32_t tagsLen;
  int32_t colsLen;
  int32_t timestampLen;
194 195

  SArray *colArray;
wmmhello's avatar
wmmhello 已提交
196 197 198
} SSmlLineInfo;

typedef struct {
X
Xiaoyu Wang 已提交
199 200 201 202
  const char *sTableName;  // super table name
  int32_t     sTableNameLen;
  char        childTableName[TSDB_TABLE_NAME_LEN];
  uint64_t    uid;
wmmhello's avatar
wmmhello 已提交
203

X
Xiaoyu Wang 已提交
204
  SArray *tags;
wmmhello's avatar
wmmhello 已提交
205

206
  // elements are SHashObj<cols key string, SSmlKv*> for find by key quickly
X
Xiaoyu Wang 已提交
207
  SArray *cols;
208
  STableDataCxt *tableDataCtx;
wmmhello's avatar
wmmhello 已提交
209 210 211
} SSmlTableInfo;

typedef struct {
X
Xiaoyu Wang 已提交
212 213
  SArray   *tags;     // save the origin order to create table
  SHashObj *tagHash;  // elements are <key, index in tags>
214

X
Xiaoyu Wang 已提交
215 216
  SArray   *cols;
  SHashObj *colHash;
217

wmmhello's avatar
wmmhello 已提交
218 219 220 221
  STableMeta *tableMeta;
} SSmlSTableMeta;

typedef struct {
X
Xiaoyu Wang 已提交
222 223
  int32_t len;
  char   *buf;
wmmhello's avatar
wmmhello 已提交
224 225
} SSmlMsgBuf;

226 227 228 229 230 231 232
typedef struct {
  int32_t code;
  int32_t lineNum;

  int32_t numOfSTables;
  int32_t numOfCTables;
  int32_t numOfCreateSTables;
233 234
  int32_t numOfAlterColSTables;
  int32_t numOfAlterTagSTables;
235 236 237 238 239 240 241 242

  int64_t parseTime;
  int64_t schemaTime;
  int64_t insertBindTime;
  int64_t insertRpcTime;
  int64_t endTime;
} SSmlCostInfo;

wmmhello's avatar
wmmhello 已提交
243
typedef struct {
X
Xiaoyu Wang 已提交
244 245 246 247
  int64_t id;

  SMLProtocolType protocol;
  int8_t          precision;
248
  bool            reRun;
249 250 251
  bool            dataFormat;  // true means that the name and order of keys in each line are the same(only for influx protocol)
  bool            isRawLine;
  int32_t         ttl;
X
Xiaoyu Wang 已提交
252

wmmhello's avatar
wmmhello 已提交
253 254
  NodeList *childTables;
  NodeList *superTables;
X
Xiaoyu Wang 已提交
255 256 257 258 259 260 261 262
  SHashObj *pVgHash;

  STscObj     *taos;
  SCatalog    *pCatalog;
  SRequestObj *pRequest;
  SQuery      *pQuery;

  SSmlCostInfo cost;
wmmhello's avatar
wmmhello 已提交
263
  int32_t      lineNum;
X
Xiaoyu Wang 已提交
264
  SSmlMsgBuf   msgBuf;
wmmhello's avatar
wmmhello 已提交
265 266

  cJSON       *root;  // for parse json
wmmhello's avatar
wmmhello 已提交
267
  SSmlLineInfo      *lines; // element is SSmlLineInfo
268 269 270 271 272 273 274 275

  //
  SArray      *preLineTagKV;
  SArray      *preLineColKV;

  SSmlLineInfo preLine;
  STableMeta  *currSTableMeta;
  STableDataCxt *currTableDataCtx;
wmmhello's avatar
wmmhello 已提交
276
  bool         needModifySchema;
wmmhello's avatar
wmmhello 已提交
277
} SSmlHandle;
wmmhello's avatar
wmmhello 已提交
278 279
//=================================================================================================

wmmhello's avatar
wmmhello 已提交
280
//=================================================================================================
281
static volatile int64_t linesSmlHandleId = 0;
X
Xiaoyu Wang 已提交
282
static int64_t          smlGenId() {
283
  int64_t id;
wmmhello's avatar
wmmhello 已提交
284

285 286
  do {
    id = atomic_add_fetch_64(&linesSmlHandleId, 1);
wmmhello's avatar
wmmhello 已提交
287 288
  } while (id == 0);

289
  return id;
wmmhello's avatar
wmmhello 已提交
290 291
}

292
static inline bool smlDoubleToInt64OverFlow(double num) {
X
Xiaoyu Wang 已提交
293
  if (num >= (double)INT64_MAX || num <= (double)INT64_MIN) return true;
294 295 296 297 298 299 300 301 302 303 304 305
  return false;
}

static inline bool smlCheckDuplicateKey(const char *key, int32_t keyLen, SHashObj *pHash) {
  void *val = taosHashGet(pHash, key, keyLen);
  if (val) {
    return true;
  }
  taosHashPut(pHash, key, keyLen, key, 1);
  return false;
}

X
Xiaoyu Wang 已提交
306
static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf *pBuf, const char *msg1, const char *msg2) {
307
  if (pBuf->buf) {
308 309 310 311 312 313 314
    memset(pBuf->buf, 0, pBuf->len);
    if (msg1) strncat(pBuf->buf, msg1, pBuf->len);
    int32_t left = pBuf->len - strlen(pBuf->buf);
    if (left > 2 && msg2) {
      strncat(pBuf->buf, ":", left - 1);
      strncat(pBuf->buf, msg2, left - 2);
    }
wmmhello's avatar
wmmhello 已提交
315
  }
wmmhello's avatar
wmmhello 已提交
316 317 318
  return TSDB_CODE_SML_INVALID_DATA;
}

X
Xiaoyu Wang 已提交
319
static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSmlKv *kv, bool isTag,
320
                                       ESchemaAction *action, SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
321
  uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL;
322 323
  if (index) {
    if (colField[*index].type != kv->type) {
X
Xiaoyu Wang 已提交
324 325
      uError("SML:0x%" PRIx64 " point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id,
             kv->key, colField[*index].type, kv->type);
326 327 328
      return TSDB_CODE_TSC_INVALID_VALUE;
    }

X
Xiaoyu Wang 已提交
329 330 331 332
    if ((colField[*index].type == TSDB_DATA_TYPE_VARCHAR &&
         (colField[*index].bytes - VARSTR_HEADER_SIZE) < kv->length) ||
        (colField[*index].type == TSDB_DATA_TYPE_NCHAR &&
         ((colField[*index].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE < kv->length))) {
333
      if (isTag) {
wmmhello's avatar
wmmhello 已提交
334
        *action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
335
      } else {
wmmhello's avatar
wmmhello 已提交
336
        *action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
337 338 339 340
      }
    }
  } else {
    if (isTag) {
wmmhello's avatar
wmmhello 已提交
341
      *action = SCHEMA_ACTION_ADD_TAG;
342
    } else {
wmmhello's avatar
wmmhello 已提交
343
      *action = SCHEMA_ACTION_ADD_COLUMN;
344 345
    }
  }
wmmhello's avatar
wmmhello 已提交
346 347 348
  return 0;
}

wmmhello's avatar
wmmhello 已提交
349
static int32_t smlFindNearestPowerOf2(int32_t length, uint8_t type) {
wmmhello's avatar
wmmhello 已提交
350
  int32_t result = 1;
X
Xiaoyu Wang 已提交
351
  while (result <= length) {
wmmhello's avatar
wmmhello 已提交
352 353
    result *= 2;
  }
354
  if (type == TSDB_DATA_TYPE_BINARY && result > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
wmmhello's avatar
wmmhello 已提交
355
    result = TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE;
356
  } else if (type == TSDB_DATA_TYPE_NCHAR && result > (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
wmmhello's avatar
wmmhello 已提交
357 358
    result = (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
  }
wmmhello's avatar
wmmhello 已提交
359

360
  if (type == TSDB_DATA_TYPE_NCHAR) {
361
    result = result * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
362
  } else if (type == TSDB_DATA_TYPE_BINARY) {
363
    result = result + VARSTR_HEADER_SIZE;
wmmhello's avatar
wmmhello 已提交
364
  }
365
  return result;
wmmhello's avatar
wmmhello 已提交
366 367
}

X
Xiaoyu Wang 已提交
368
static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
369
                                      ESchemaAction *action, bool isTag) {
370 371
  int32_t code = TSDB_CODE_SUCCESS;
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
372
    if (j == 0 && !isTag) continue;
373
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j);
374
    code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, info);
X
Xiaoyu Wang 已提交
375
    if (code != TSDB_CODE_SUCCESS) {
376 377 378 379 380 381
      return code;
    }
  }
  return TSDB_CODE_SUCCESS;
}

382
static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool isTag) {
383
  SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
384 385
  int32_t   i = 0;
  for (; i < length; i++) {
386 387 388
    taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES);
  }

389
  if (isTag) {
390 391 392 393 394
    i = 0;
  } else {
    i = 1;
  }
  for (; i < taosArrayGetSize(cols); i++) {
395
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
X
Xiaoyu Wang 已提交
396
    if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) {
wmmhello's avatar
wmmhello 已提交
397
      taosHashCleanup(hashTmp);
398 399 400
      return -1;
    }
  }
401
  taosHashCleanup(hashTmp);
402 403 404
  return 0;
}

405
static int32_t getBytes(uint8_t type, int32_t length) {
406 407 408 409 410 411 412
  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
    return smlFindNearestPowerOf2(length, type);
  } else {
    return tDataTypes[type].bytes;
  }
}

413 414
static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
                                  SArray *results, int32_t numOfCols, bool isTag) {
wmmhello's avatar
wmmhello 已提交
415
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
416
    SSmlKv       *kv = (SSmlKv *)taosArrayGet(cols, j);
wmmhello's avatar
wmmhello 已提交
417 418
    ESchemaAction action = SCHEMA_ACTION_NULL;
    smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, &action, info);
419
    if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_ADD_TAG) {
wmmhello's avatar
wmmhello 已提交
420 421 422 423 424
      SField field = {0};
      field.type = kv->type;
      field.bytes = getBytes(kv->type, kv->length);
      memcpy(field.name, kv->key, kv->keyLen);
      taosArrayPush(results, &field);
425
    } else if (action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
wmmhello's avatar
wmmhello 已提交
426
      uint16_t *index = (uint16_t *)taosHashGet(schemaHash, kv->key, kv->keyLen);
427 428
      uint16_t  newIndex = *index;
      if (isTag) newIndex -= numOfCols;
wmmhello's avatar
wmmhello 已提交
429 430 431 432 433 434 435
      SField *field = (SField *)taosArrayGet(results, newIndex);
      field->bytes = getBytes(kv->type, kv->length);
    }
  }
  return TSDB_CODE_SUCCESS;
}

436 437 438 439 440
// static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SSmlSTableMeta *sTableData,
//                               int32_t colVer, int32_t tagVer, int8_t source, uint64_t suid){
static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, SArray *pTags, STableMeta *pTableMeta,
                              ESchemaAction action) {
  SRequestObj   *pRequest = NULL;
441 442 443 444
  SMCreateStbReq pReq = {0};
  int32_t        code = TSDB_CODE_SUCCESS;
  SCmdMsgInfo    pCmdMsg = {0};

wmmhello's avatar
wmmhello 已提交
445 446 447 448 449 450
  // put front for free
  pReq.numOfColumns = taosArrayGetSize(pColumns);
  pReq.pColumns = pColumns;
  pReq.numOfTags = taosArrayGetSize(pTags);
  pReq.pTags = pTags;

dengyihao's avatar
dengyihao 已提交
451
  code = buildRequest(info->taos->id, "", 0, NULL, false, &pRequest, 0);
452 453 454 455
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
456
  pRequest->syncQuery = true;
457 458 459 460 461
  if (!pRequest->pDb) {
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }

462
  if (action == SCHEMA_ACTION_CREATE_STABLE) {
wmmhello's avatar
wmmhello 已提交
463 464 465 466
    pReq.colVer = 1;
    pReq.tagVer = 1;
    pReq.suid = 0;
    pReq.source = TD_REQ_FROM_APP;
467
  } else if (action == SCHEMA_ACTION_ADD_TAG || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
wmmhello's avatar
wmmhello 已提交
468 469 470 471
    pReq.colVer = pTableMeta->sversion;
    pReq.tagVer = pTableMeta->tversion + 1;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
472
  } else if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE) {
wmmhello's avatar
wmmhello 已提交
473 474 475 476 477 478
    pReq.colVer = pTableMeta->sversion + 1;
    pReq.tagVer = pTableMeta->tversion;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
  }

479
  if (pReq.numOfTags == 0) {
wmmhello's avatar
wmmhello 已提交
480 481 482 483 484 485 486 487
    pReq.numOfTags = 1;
    SField field = {0};
    field.type = TSDB_DATA_TYPE_NCHAR;
    field.bytes = 1;
    strcpy(field.name, tsSmlTagName);
    taosArrayPush(pReq.pTags, &field);
  }

488 489 490 491 492 493 494 495 496 497 498 499 500 501
  pReq.commentLen = -1;
  pReq.igExists = true;
  tNameExtractFullName(pName, pReq.name);

  pCmdMsg.epSet = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

D
dapan1121 已提交
502 503
  SQuery pQuery;
  memset(&pQuery, 0, sizeof(pQuery));
504 505 506 507 508 509 510
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);

511
  if (pRequest->code == TSDB_CODE_SUCCESS) {
512 513 514 515 516
    catalogRemoveTableMeta(info->pCatalog, pName);
  }
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

517
  end:
518 519 520 521 522
  destroyRequest(pRequest);
  tFreeSMCreateStbReq(&pReq);
  return code;
}

X
Xiaoyu Wang 已提交
523
static int32_t smlModifyDBSchemas(SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
524 525 526
  if(info->dataFormat && !info->needModifySchema){
    return TSDB_CODE_SUCCESS;
  }
527 528 529
  int32_t     code = 0;
  SHashObj   *hashTmp = NULL;
  STableMeta *pTableMeta = NULL;
wmmhello's avatar
wmmhello 已提交
530

531
  SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
wmmhello's avatar
wmmhello 已提交
532
  tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
wmmhello's avatar
wmmhello 已提交
533

D
dapan1121 已提交
534 535 536 537 538
  SRequestConnInfo conn = {0};
  conn.pTrans = info->taos->pAppInfo->pTransporter;
  conn.requestId = info->pRequest->requestId;
  conn.requestObjRefId = info->pRequest->self;
  conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
X
Xiaoyu Wang 已提交
539

wmmhello's avatar
wmmhello 已提交
540 541 542
  NodeList *tmp = info->superTables;
  while (tmp) {
    SSmlSTableMeta *sTableData = tmp->data.value;
X
Xiaoyu Wang 已提交
543
    bool            needCheckMeta = false;  // for multi thread
wmmhello's avatar
wmmhello 已提交
544

wmmhello's avatar
wmmhello 已提交
545 546
    size_t superTableLen = (size_t)tmp->data.keyLen;
    const void  *superTable = tmp->data.key;
547
    memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
wmmhello's avatar
wmmhello 已提交
548
    memcpy(pName.tname, superTable, superTableLen);
wmmhello's avatar
wmmhello 已提交
549

D
dapan1121 已提交
550
    code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
wmmhello's avatar
wmmhello 已提交
551

552
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
553 554
      SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
      SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
wmmhello's avatar
wmmhello 已提交
555 556 557 558
      smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
      smlBuildFieldsList(info, NULL, NULL, sTableData->cols, pColumns, 0, false);

      code = smlSendMetaMsg(info, &pName, pColumns, pTags, NULL, SCHEMA_ACTION_CREATE_STABLE);
559
      if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
560
        uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
561
        goto end;
wmmhello's avatar
wmmhello 已提交
562
      }
563
      info->cost.numOfCreateSTables++;
wmmhello's avatar
wmmhello 已提交
564 565 566 567 568 569 570
      taosMemoryFreeClear(pTableMeta);

      code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
        goto end;
      }
X
Xiaoyu Wang 已提交
571
    } else if (code == TSDB_CODE_SUCCESS) {
572 573
      hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true,
                             HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
574 575
      for (uint16_t i = pTableMeta->tableInfo.numOfColumns;
           i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
576 577
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
      }
wmmhello's avatar
wmmhello 已提交
578

579 580
      ESchemaAction action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, &action, true);
581
      if (code != TSDB_CODE_SUCCESS) {
582
        goto end;
583
      }
584 585 586 587 588
      if (action != SCHEMA_ACTION_NULL) {
        SArray *pColumns =
            taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray *pTags =
            taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
wmmhello's avatar
wmmhello 已提交
589 590 591 592 593 594

        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
595
          if (i < pTableMeta->tableInfo.numOfColumns) {
wmmhello's avatar
wmmhello 已提交
596
            taosArrayPush(pColumns, &field);
597
          } else {
wmmhello's avatar
wmmhello 已提交
598 599 600
            taosArrayPush(pTags, &field);
          }
        }
601 602
        smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->tags, pTags,
                           pTableMeta->tableInfo.numOfColumns, true);
wmmhello's avatar
wmmhello 已提交
603 604

        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
605
        if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
606
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
607 608 609
          goto end;
        }

610
        info->cost.numOfAlterTagSTables++;
wmmhello's avatar
wmmhello 已提交
611 612 613 614 615 616 617 618 619
        taosMemoryFreeClear(pTableMeta);
        code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
        }
        code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
        }
wmmhello's avatar
wmmhello 已提交
620
      }
621 622

      taosHashClear(hashTmp);
wmmhello's avatar
wmmhello 已提交
623
      for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
624 625
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
      }
626 627
      action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &action, false);
628
      if (code != TSDB_CODE_SUCCESS) {
629
        goto end;
wmmhello's avatar
wmmhello 已提交
630
      }
631 632 633 634 635
      if (action != SCHEMA_ACTION_NULL) {
        SArray *pColumns =
            taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray *pTags =
            taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
wmmhello's avatar
wmmhello 已提交
636 637 638 639 640 641

        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
642
          if (i < pTableMeta->tableInfo.numOfColumns) {
wmmhello's avatar
wmmhello 已提交
643
            taosArrayPush(pColumns, &field);
644
          } else {
wmmhello's avatar
wmmhello 已提交
645 646 647 648
            taosArrayPush(pTags, &field);
          }
        }

649 650
        smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->cols, pColumns,
                           pTableMeta->tableInfo.numOfColumns, false);
wmmhello's avatar
wmmhello 已提交
651 652

        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
653
        if (code != TSDB_CODE_SUCCESS) {
654
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
655 656
          goto end;
        }
wmmhello's avatar
wmmhello 已提交
657

658
        info->cost.numOfAlterColSTables++;
wmmhello's avatar
wmmhello 已提交
659
        taosMemoryFreeClear(pTableMeta);
wmmhello's avatar
wmmhello 已提交
660 661 662 663
        code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
        }
664 665 666 667 668
        code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
          goto end;
        }
wmmhello's avatar
wmmhello 已提交
669
      }
wmmhello's avatar
wmmhello 已提交
670

671
      needCheckMeta = true;
wmmhello's avatar
wmmhello 已提交
672 673
      taosHashCleanup(hashTmp);
      hashTmp = NULL;
wmmhello's avatar
wmmhello 已提交
674
    } else {
X
Xiaoyu Wang 已提交
675
      uError("SML:0x%" PRIx64 " load table meta error: %s", info->id, tstrerror(code));
676
      goto end;
wmmhello's avatar
wmmhello 已提交
677
    }
678

X
Xiaoyu Wang 已提交
679 680
    if (needCheckMeta) {
      code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags,
681
                          sTableData->tags, true);
682
      if (code != TSDB_CODE_SUCCESS) {
683
        uError("SML:0x%" PRIx64 " check tag failed. super table name %s", info->id, pName.tname);
684 685
        goto end;
      }
686
      code = smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols, false);
687
      if (code != TSDB_CODE_SUCCESS) {
688
        uError("SML:0x%" PRIx64 " check cols failed. super table name %s", info->id, pName.tname);
689 690 691 692
        goto end;
      }
    }

693
    sTableData->tableMeta = pTableMeta;
wmmhello's avatar
wmmhello 已提交
694

wmmhello's avatar
wmmhello 已提交
695
    tmp = tmp->next;
wmmhello's avatar
wmmhello 已提交
696 697
  }
  return 0;
698

699
  end:
wmmhello's avatar
wmmhello 已提交
700 701
  taosHashCleanup(hashTmp);
  taosMemoryFreeClear(pTableMeta);
wmmhello's avatar
wmmhello 已提交
702
//  catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
703
  return code;
wmmhello's avatar
wmmhello 已提交
704 705
}

706
/******************************* parse basic type function **********************/
X
Xiaoyu Wang 已提交
707
static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
708
  const char *pVal = kvVal->value;
X
Xiaoyu Wang 已提交
709 710 711 712
  int32_t     len = kvVal->length;
  char       *endptr = NULL;
  double      result = taosStr2Double(pVal, &endptr);
  if (pVal == endptr) {
713
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
714 715 716
    return false;
  }

717
  int32_t left = len - (endptr - pVal);
X
Xiaoyu Wang 已提交
718
  if (left == 0 || (left == 3 && strncasecmp(endptr, "f64", left) == 0)) {
719 720
    kvVal->type = TSDB_DATA_TYPE_DOUBLE;
    kvVal->d = result;
X
Xiaoyu Wang 已提交
721 722
  } else if ((left == 3 && strncasecmp(endptr, "f32", left) == 0)) {
    if (!IS_VALID_FLOAT(result)) {
723 724
      smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
725
    }
726 727
    kvVal->type = TSDB_DATA_TYPE_FLOAT;
    kvVal->f = (float)result;
X
Xiaoyu Wang 已提交
728 729
  } else if ((left == 1 && *endptr == 'i') || (left == 3 && strncasecmp(endptr, "i64", left) == 0)) {
    if (smlDoubleToInt64OverFlow(result)) {
wmmhello's avatar
wmmhello 已提交
730 731
      errno = 0;
      int64_t tmp = taosStr2Int64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
732
      if (errno == ERANGE) {
wmmhello's avatar
wmmhello 已提交
733 734 735 736 737 738
        smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_BIGINT;
      kvVal->i = tmp;
      return true;
wmmhello's avatar
wmmhello 已提交
739
    }
740
    kvVal->type = TSDB_DATA_TYPE_BIGINT;
wmmhello's avatar
wmmhello 已提交
741
    kvVal->i = (int64_t)result;
wmmhello's avatar
wmmhello 已提交
742
  } else if ((left == 1 && *endptr == 'u') || (left == 3 && strncasecmp(endptr, "u64", left) == 0)) {
X
Xiaoyu Wang 已提交
743
    if (result >= (double)UINT64_MAX || result < 0) {
wmmhello's avatar
wmmhello 已提交
744 745
      errno = 0;
      uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
746
      if (errno == ERANGE || result < 0) {
wmmhello's avatar
wmmhello 已提交
747 748 749 750 751 752
        smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_UBIGINT;
      kvVal->u = tmp;
      return true;
753
    }
754
    kvVal->type = TSDB_DATA_TYPE_UBIGINT;
wmmhello's avatar
wmmhello 已提交
755
    kvVal->u = result;
X
Xiaoyu Wang 已提交
756 757
  } else if (left == 3 && strncasecmp(endptr, "i32", left) == 0) {
    if (!IS_VALID_INT(result)) {
758 759
      smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
760
    }
761 762
    kvVal->type = TSDB_DATA_TYPE_INT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
763 764
  } else if (left == 3 && strncasecmp(endptr, "u32", left) == 0) {
    if (!IS_VALID_UINT(result)) {
765 766
      smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
767
    }
768 769
    kvVal->type = TSDB_DATA_TYPE_UINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
770 771
  } else if (left == 3 && strncasecmp(endptr, "i16", left) == 0) {
    if (!IS_VALID_SMALLINT(result)) {
772 773
      smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
774
    }
775 776
    kvVal->type = TSDB_DATA_TYPE_SMALLINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
777 778
  } else if (left == 3 && strncasecmp(endptr, "u16", left) == 0) {
    if (!IS_VALID_USMALLINT(result)) {
779 780
      smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
781
    }
782 783
    kvVal->type = TSDB_DATA_TYPE_USMALLINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
784 785
  } else if (left == 2 && strncasecmp(endptr, "i8", left) == 0) {
    if (!IS_VALID_TINYINT(result)) {
786 787
      smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
788
    }
789 790
    kvVal->type = TSDB_DATA_TYPE_TINYINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
791 792
  } else if (left == 2 && strncasecmp(endptr, "u8", left) == 0) {
    if (!IS_VALID_UTINYINT(result)) {
793 794
      smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
795
    }
796 797
    kvVal->type = TSDB_DATA_TYPE_UTINYINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
798
  } else {
799
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
800 801
    return false;
  }
802
  return true;
wmmhello's avatar
wmmhello 已提交
803 804
}

wmmhello's avatar
wmmhello 已提交
805
static bool smlParseBool(SSmlKv *kvVal) {
wmmhello's avatar
wmmhello 已提交
806
  const char *pVal = kvVal->value;
X
Xiaoyu Wang 已提交
807
  int32_t     len = kvVal->length;
808
  if ((len == 1) && (pVal[0] == 't' || pVal[0] == 'T')) {
809
    kvVal->i = TSDB_TRUE;
wmmhello's avatar
wmmhello 已提交
810 811 812
    return true;
  }

813
  if ((len == 1) && (pVal[0] == 'f' || pVal[0] == 'F')) {
814
    kvVal->i = TSDB_FALSE;
wmmhello's avatar
wmmhello 已提交
815 816 817
    return true;
  }

X
Xiaoyu Wang 已提交
818
  if ((len == 4) && !strncasecmp(pVal, "true", len)) {
819
    kvVal->i = TSDB_TRUE;
wmmhello's avatar
wmmhello 已提交
820 821
    return true;
  }
X
Xiaoyu Wang 已提交
822
  if ((len == 5) && !strncasecmp(pVal, "false", len)) {
823
    kvVal->i = TSDB_FALSE;
wmmhello's avatar
wmmhello 已提交
824 825 826 827 828
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
829
static bool smlIsBinary(const char *pVal, uint16_t len) {
X
Xiaoyu Wang 已提交
830
  // binary: "abc"
wmmhello's avatar
wmmhello 已提交
831 832 833 834 835 836 837 838 839
  if (len < 2) {
    return false;
  }
  if (pVal[0] == '"' && pVal[len - 1] == '"') {
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
840
static bool smlIsNchar(const char *pVal, uint16_t len) {
X
Xiaoyu Wang 已提交
841
  // nchar: L"abc"
wmmhello's avatar
wmmhello 已提交
842 843 844
  if (len < 3) {
    return false;
  }
wmmhello's avatar
wmmhello 已提交
845
  if (pVal[1] == '"' && pVal[len - 1] == '"' && (pVal[0] == 'l' || pVal[0] == 'L')) {
wmmhello's avatar
wmmhello 已提交
846 847 848 849
    return true;
  }
  return false;
}
850
/******************************* parse basic type function end **********************/
wmmhello's avatar
wmmhello 已提交
851

852
/******************************* time function **********************/
853
static uint8_t smlPrecisionConvert[7] = {TSDB_TIME_PRECISION_NANO, TSDB_TIME_PRECISION_HOURS, TSDB_TIME_PRECISION_MINUTES,
wmmhello's avatar
wmmhello 已提交
854 855
                                     TSDB_TIME_PRECISION_SECONDS, TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_MICRO,
                                     TSDB_TIME_PRECISION_NANO};
856 857 858
static int64_t smlFactorNS[3] = {NANOSECOND_PER_MSEC, NANOSECOND_PER_USEC, 1};
static int64_t smlFactorS[3] = {1000LL, 1000000LL, 1000000000LL};
static int64_t smlToMilli[3] = {3600LL, 60LL, 1LL};
wmmhello's avatar
wmmhello 已提交
859

860
static int64_t smlGetTimeValue(const char *value, int32_t len, uint8_t fromPrecision, uint8_t toPrecision) {
X
Xiaoyu Wang 已提交
861
  char   *endPtr = NULL;
wafwerar's avatar
wafwerar 已提交
862
  int64_t tsInt64 = taosStr2Int64(value, &endPtr, 10);
863
  if (unlikely(value + len != endPtr)) {
864
    return -1;
wmmhello's avatar
wmmhello 已提交
865
  }
866 867 868 869 870 871 872 873

  if(unlikely(fromPrecision >= TSDB_TIME_PRECISION_HOURS)){
    fromPrecision = TSDB_TIME_PRECISION_MILLI;
    int64_t unit = smlToMilli[fromPrecision - TSDB_TIME_PRECISION_HOURS];
    if(unit > INT64_MAX / tsInt64){
      return -1;
    }
    tsInt64 *= unit;
wmmhello's avatar
wmmhello 已提交
874 875
  }

876
  return convertTimePrecision(tsInt64, fromPrecision, toPrecision);
877 878 879 880 881 882
}

static int8_t smlGetTsTypeByLen(int32_t len) {
  if (len == TSDB_TIME_PRECISION_SEC_DIGITS) {
    return TSDB_TIME_PRECISION_SECONDS;
  } else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) {
883
    return TSDB_TIME_PRECISION_MILLI;
884 885
  } else {
    return -1;
wmmhello's avatar
wmmhello 已提交
886
  }
887 888
}

X
Xiaoyu Wang 已提交
889
static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t len) {
890
  uint8_t toPrecision = info->currSTableMeta ? info->currSTableMeta->tableInfo.precision : TSDB_TIME_PRECISION_NANO;
wmmhello's avatar
wmmhello 已提交
891

892 893
  if(unlikely(len == 0 || (len == 1 && data[0] == '0'))){
    return taosGetTimestampNs()/smlFactorNS[toPrecision];
wmmhello's avatar
wmmhello 已提交
894
  }
895

896 897 898 899
  uint8_t fromPrecision = smlPrecisionConvert[info->precision];

  int64_t ts = smlGetTimeValue(data, len, fromPrecision, toPrecision);
  if (unlikely(ts == -1)) {
900 901
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
    return -1;
wmmhello's avatar
wmmhello 已提交
902
  }
903 904 905
  return ts;
}

X
Xiaoyu Wang 已提交
906
static int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t len) {
907 908 909
  uint8_t toPrecision = info->currSTableMeta ? info->currSTableMeta->tableInfo.precision : TSDB_TIME_PRECISION_NANO;

  if (unlikely(!data)) {
910 911
    smlBuildInvalidDataMsg(&info->msgBuf, "timestamp can not be null", NULL);
    return -1;
wmmhello's avatar
wmmhello 已提交
912
  }
913 914
  if (unlikely(len == 1 && data[0] == '0')) {
    return taosGetTimestampNs()/smlFactorNS[toPrecision];
915
  }
916 917
  uint8_t fromPrecision = smlGetTsTypeByLen(len);
  if (unlikely(fromPrecision == -1)) {
X
Xiaoyu Wang 已提交
918 919
    smlBuildInvalidDataMsg(&info->msgBuf,
                           "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", data);
920 921
    return -1;
  }
922 923
  int64_t ts = smlGetTimeValue(data, len, fromPrecision, toPrecision);
  if (unlikely(ts == -1)) {
924 925 926 927 928 929
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
    return -1;
  }
  return ts;
}

930
static int64_t smlParseTS(SSmlHandle *info, const char *data, int32_t len) {
931
  int64_t ts = 0;
X
Xiaoyu Wang 已提交
932
  if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
933
    //    uError("SML:data:%s,len:%d", data, len);
934
    ts = smlParseInfluxTime(info, data, len);
X
Xiaoyu Wang 已提交
935
  } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
936
    ts = smlParseOpenTsdbTime(info, data, len);
X
Xiaoyu Wang 已提交
937
  } else {
938
    ASSERT(0);
939
  }
wmmhello's avatar
wmmhello 已提交
940
  uDebug("SML:0x%" PRIx64 " smlParseTS:%" PRId64, info->id, ts);
941

942
  return ts;
943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013
}
/******************************* time function end **********************/

/******************************* Sml struct related function **********************/
static SSmlTableInfo *smlBuildTableInfo(int numRows, const char* measure, int32_t measureLen) {
  SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
  if (!tag) {
    return NULL;
  }

  tag->sTableName = measure;
  tag->sTableNameLen = measureLen;

  tag->cols = taosArrayInit(numRows, POINTER_BYTES);
  if (tag->cols == NULL) {
    uError("SML:smlBuildTableInfo failed to allocate memory");
    goto cleanup;
  }

  tag->tags = taosArrayInit(16, sizeof(SSmlKv));
  if (tag->tags == NULL) {
    uError("SML:smlBuildTableInfo failed to allocate memory");
    goto cleanup;
  }
  return tag;

  cleanup:
  taosMemoryFree(tag);
  return NULL;
}

static int32_t smlCheckDupUnit(SHashObj *dumplicateKey, SArray *tags, SSmlMsgBuf *msg){
  for(int i = 0; i < taosArrayGetSize(tags); i++) {
    SSmlKv *tag = taosArrayGet(tags, i);
    if (smlCheckDuplicateKey(tag->key, tag->keyLen, dumplicateKey)) {
      smlBuildInvalidDataMsg(msg, "dumplicate key", tag->key);
      return TSDB_CODE_TSC_DUP_NAMES;
    }
  }
  return TSDB_CODE_SUCCESS;
}

static int32_t smlJudgeDupColName(SArray *cols, SArray *tags, SSmlMsgBuf *msg) {
  SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  int ret = smlCheckDupUnit(dumplicateKey, cols, msg);
  if(ret != TSDB_CODE_SUCCESS){
    goto end;
  }
  ret = smlCheckDupUnit(dumplicateKey, tags, msg);
  if(ret != TSDB_CODE_SUCCESS){
    goto end;
  }

  end:
  taosHashCleanup(dumplicateKey);
  return ret;
}

static int32_t smlParseTableName(SArray *tags, char *childTableName) {
  size_t      childTableNameLen = strlen(tsSmlChildTableName);
  if (childTableNameLen <= 0) return TSDB_CODE_SUCCESS;

  for(int i = 0; i < taosArrayGetSize(tags); i++){
    SSmlKv *tag = taosArrayGet(tags, i);
    // handle child table name
    if (childTableNameLen == tag->keyLen && strncmp(tag->key, tsSmlChildTableName, tag->keyLen) == 0) {
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, tag->value, (tag->length < TSDB_TABLE_NAME_LEN ? tag->length : TSDB_TABLE_NAME_LEN));
      break;
    }
  }
wmmhello's avatar
wmmhello 已提交
1014

1015 1016 1017
  return TSDB_CODE_SUCCESS;
}

1018 1019 1020 1021
static int32_t smlSetCTableName(SSmlTableInfo *oneTable){
  smlParseTableName(oneTable->tags, oneTable->childTableName);

  if (strlen(oneTable->childTableName) == 0) {
1022 1023
    SArray* dst = taosArrayDup(oneTable->tags, NULL);
    RandTableName rName = {dst, oneTable->sTableName, (uint8_t)oneTable->sTableNameLen,
1024 1025 1026
                           oneTable->childTableName, 0};

    buildChildTableName(&rName);
1027
    taosArrayDestroy(dst);
1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075
    oneTable->uid = rName.uid;
  } else {
    oneTable->uid = *(uint64_t *)(oneTable->childTableName);
  }
  return TSDB_CODE_SUCCESS;
}

static SSmlSTableMeta *smlBuildSTableMeta(bool isDataFormat) {
  SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
  if (!meta) {
    return NULL;
  }

  if(unlikely(!isDataFormat)){
    meta->tagHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
    if (meta->tagHash == NULL) {
      uError("SML:smlBuildSTableMeta failed to allocate memory");
      goto cleanup;
    }

    meta->colHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
    if (meta->colHash == NULL) {
      uError("SML:smlBuildSTableMeta failed to allocate memory");
      goto cleanup;
    }
  }

  meta->tags = taosArrayInit(32, sizeof(SSmlKv));
  if (meta->tags == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }

  meta->cols = taosArrayInit(32, sizeof(SSmlKv));
  if (meta->cols == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }
  return meta;

  cleanup:
  taosMemoryFree(meta);
  return NULL;
}

static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols){
  for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
1076
    taosArrayPush(metaArray, kv);
1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110
    if(unlikely(metaHash != NULL)) {
      taosHashPut(metaHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
    }
  }
}

static STableMeta* smlGetMeta(SSmlHandle *info, const void* measure, int32_t measureLen){
  STableMeta *pTableMeta = NULL;

  SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
  tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));

  SRequestConnInfo conn = {0};
  conn.pTrans = info->taos->pAppInfo->pTransporter;
  conn.requestId = info->pRequest->requestId;
  conn.requestObjRefId = info->pRequest->self;
  conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
  memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
  memcpy(pName.tname, measure, measureLen);

  catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
  return pTableMeta;
}

static void smlDestroySTableMeta(SSmlSTableMeta *meta) {
  taosHashCleanup(meta->tagHash);
  taosHashCleanup(meta->colHash);
  taosArrayDestroy(meta->tags);
  taosArrayDestroy(meta->cols);
  taosMemoryFree(meta->tableMeta);
  taosMemoryFree(meta);
}
/******************************* Sml struct related function end **********************/

wmmhello's avatar
wmmhello 已提交
1111
static int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
X
Xiaoyu Wang 已提交
1112
  // binary
wmmhello's avatar
wmmhello 已提交
1113
  if (smlIsBinary(pVal->value, pVal->length)) {
1114
    pVal->type = TSDB_DATA_TYPE_BINARY;
wmmhello's avatar
wmmhello 已提交
1115
    pVal->length -= BINARY_ADD_LEN;
1116
    if (pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
wmmhello's avatar
wmmhello 已提交
1117 1118
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }
1119
    pVal->value += (BINARY_ADD_LEN - 1);
wmmhello's avatar
wmmhello 已提交
1120
    return TSDB_CODE_SUCCESS;
1121
  }
X
Xiaoyu Wang 已提交
1122
  // nchar
wmmhello's avatar
wmmhello 已提交
1123
  if (smlIsNchar(pVal->value, pVal->length)) {
1124
    pVal->type = TSDB_DATA_TYPE_NCHAR;
wmmhello's avatar
wmmhello 已提交
1125
    pVal->length -= NCHAR_ADD_LEN;
1126
    if (pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
wmmhello's avatar
wmmhello 已提交
1127 1128
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }
1129
    pVal->value += (NCHAR_ADD_LEN - 1);
wmmhello's avatar
wmmhello 已提交
1130
    return TSDB_CODE_SUCCESS;
1131 1132
  }

X
Xiaoyu Wang 已提交
1133
  // bool
1134 1135 1136
  if (smlParseBool(pVal)) {
    pVal->type = TSDB_DATA_TYPE_BOOL;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
wmmhello's avatar
wmmhello 已提交
1137
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1138
  }
X
Xiaoyu Wang 已提交
1139
  // number
1140
  if (smlParseNumber(pVal, msg)) {
wmmhello's avatar
wmmhello 已提交
1141
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
wmmhello's avatar
wmmhello 已提交
1142
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1143 1144
  }

wmmhello's avatar
wmmhello 已提交
1145
  return TSDB_CODE_TSC_INVALID_VALUE;
wmmhello's avatar
wmmhello 已提交
1146 1147
}

1148 1149 1150
int32_t is_same_child_table_json(const void *a, const void *b){
  return (cJSON_Compare((const cJSON *)a, (const cJSON *)b, true)) ? 0 : 1;
}
wmmhello's avatar
wmmhello 已提交
1151

1152 1153 1154 1155 1156
#define IS_SAME_CHILD_TABLE (elements->measureTagsLen == info->preLine.measureTagsLen \
&& memcmp(elements->measure, info->preLine.measure, elements->measureTagsLen) == 0)

#define IS_SAME_SUPER_TABLE (elements->measureLen == info->preLine.measureLen \
&& memcmp(elements->measure, info->preLine.measure, elements->measureLen) == 0)
1157

1158 1159 1160 1161 1162 1163 1164
#define IS_SAME_KEY (preKV->keyLen == kv.keyLen && memcmp(preKV->key, kv.key, kv.keyLen) == 0)

static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd,
                          SSmlLineInfo* currElement, bool isSameMeasure, bool isSameCTable){
  if(isSameCTable){
    return TSDB_CODE_SUCCESS;
  }
wmmhello's avatar
wmmhello 已提交
1165

1166 1167 1168 1169 1170
  int     cnt = 0;
  SArray *preLineKV = info->preLineTagKV;
  bool    isSuperKVInit = true;
  SArray *superKV = NULL;
  if(info->dataFormat){
1171
    if(!isSameMeasure){
1172
      SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen, NULL);
wmmhello's avatar
wmmhello 已提交
1173

1174
      if(unlikely(sMeta == NULL)){
wmmhello's avatar
wmmhello 已提交
1175
        sMeta = smlBuildSTableMeta(info->dataFormat);
1176
        STableMeta * pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen);
wmmhello's avatar
wmmhello 已提交
1177
        sMeta->tableMeta = pTableMeta;
1178 1179 1180 1181 1182
        if(pTableMeta == NULL){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
wmmhello's avatar
wmmhello 已提交
1183
        nodeListSet(&info->superTables, currElement->measure, currElement->measureLen, sMeta);
1184 1185
      }
      info->currSTableMeta = sMeta->tableMeta;
1186
      superKV = sMeta->tags;
1187 1188

      if(unlikely(taosArrayGetSize(superKV) == 0)){
1189
        isSuperKVInit = false;
1190
      }
1191 1192 1193 1194 1195 1196 1197 1198 1199 1200
      taosArraySetSize(preLineKV, 0);
    }
  }else{
    taosArraySetSize(preLineKV, 0);
  }


  while (*sql < sqlEnd) {
    if (unlikely(IS_SPACE(*sql))) {
      break;
1201 1202
    }

1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218
    bool hasSlash = false;
    // parse key
    const char *key = *sql;
    int32_t     keyLen = 0;
    while (*sql < sqlEnd) {
      if (unlikely(IS_COMMA(*sql))) {
        smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      if (unlikely(IS_EQUAL(*sql))) {
        keyLen = *sql - key;
        (*sql)++;
        break;
      }
      if(!hasSlash){
        hasSlash = (*(*sql) == SLASH);
1219
      }
1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241
      (*sql)++;
    }
    if(unlikely(hasSlash)) {
      PROCESS_SLASH(key, keyLen)
    }

    if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
      smlBuildInvalidDataMsg(&info->msgBuf, "invalid key or key is too long than 64", key);
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
    }

    // parse value
    const char *value = *sql;
    int32_t     valueLen = 0;
    hasSlash = false;
    while (*sql < sqlEnd) {
      // parse value
      if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) {
        break;
      }else if (unlikely(IS_EQUAL(*sql))) {
        smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
        return TSDB_CODE_SML_INVALID_DATA;
1242
      }
1243

1244 1245
      if(!hasSlash){
        hasSlash = (*(*sql) == SLASH);
1246
      }
1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268

      (*sql)++;
    }
    valueLen = *sql - value;

    if (unlikely(valueLen == 0)) {
      smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", value);
      return TSDB_CODE_SML_INVALID_DATA;
    }

    if(unlikely(hasSlash)) {
      PROCESS_SLASH(value, valueLen)
    }

    if (unlikely(valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)) {
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }

    SSmlKv kv = {.key = key, .type = TSDB_DATA_TYPE_NCHAR, .keyLen = keyLen, .value = value, .length = valueLen};
    if(info->dataFormat){
      if(unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)){
        info->needModifySchema = true;
1269
      }
1270

1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315
      if(isSameMeasure){
        if(unlikely(cnt >= taosArrayGetSize(preLineKV))) {
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
        SSmlKv *preKV = taosArrayGet(preLineKV, cnt);
        if(unlikely(kv.length > preKV->length)){
          preKV->length = kv.length;
          SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen, NULL);
          ASSERT(tableMeta != NULL);

          SSmlKv *oldKV = taosArrayGet(tableMeta->tags, cnt);
          oldKV->length = kv.length;
          info->needModifySchema = true;
        }
        if(unlikely(!IS_SAME_KEY)){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
      }else{
        if(isSuperKVInit){
          if(unlikely(cnt >= taosArrayGetSize(superKV))) {
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }
          SSmlKv *preKV = taosArrayGet(superKV, cnt);
          if(unlikely(kv.length > preKV->length)) {
            preKV->length = kv.length;
          }else{
            kv.length = preKV->length;
          }
          info->needModifySchema = true;

          if(unlikely(!IS_SAME_KEY)){
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }
        }else{
          taosArrayPush(superKV, &kv);
        }
        taosArrayPush(preLineKV, &kv);
1316
      }
1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353
    }else{
      taosArrayPush(preLineKV, &kv);
    }

    cnt++;
    if(IS_SPACE(*sql)){
      break;
    }
    (*sql)++;
  }

  void* oneTable = nodeListGet(info->childTables, currElement->measure, currElement->measureTagsLen, NULL);
  if ((oneTable != NULL)) {
    return TSDB_CODE_SUCCESS;
  }

  SSmlTableInfo *tinfo = smlBuildTableInfo(1, currElement->measure, currElement->measureLen);
  if (!tinfo) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  for(int i = 0; i < taosArrayGetSize(preLineKV); i++){
    taosArrayPush(tinfo->tags, taosArrayGet(preLineKV, i));
  }
  smlSetCTableName(tinfo);
  if(info->dataFormat) {
    info->currSTableMeta->uid = tinfo->uid;
    tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
    if(tinfo->tableDataCtx == NULL){
      smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
      return TSDB_CODE_SML_INVALID_DATA;
    }
  }

  nodeListSet(&info->childTables, currElement->measure, currElement->measureTagsLen, tinfo);

  return TSDB_CODE_SUCCESS;
}
1354

1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366
static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd,
                          SSmlLineInfo* currElement, bool isSameMeasure, bool isSameCTable){
  int     cnt = 0;
  SArray *preLineKV = info->preLineColKV;
  bool    isSuperKVInit = true;
  SArray *superKV = NULL;
  if(info->dataFormat){
    if(unlikely(!isSameCTable)){
      SSmlTableInfo *oneTable = (SSmlTableInfo *)nodeListGet(info->childTables, currElement->measure, currElement->measureTagsLen, NULL);
      if (unlikely(oneTable == NULL)) {
        smlBuildInvalidDataMsg(&info->msgBuf, "child table should inside", currElement->measure);
        return TSDB_CODE_SML_INVALID_DATA;
1367
      }
1368
      info->currTableDataCtx = oneTable->tableDataCtx;
1369 1370
    }

1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389
    if(unlikely(!isSameMeasure)){
      SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen, NULL);

      if(unlikely(sMeta == NULL)){
        sMeta = smlBuildSTableMeta(info->dataFormat);
        STableMeta * pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen);
        sMeta->tableMeta = pTableMeta;
        if(pTableMeta == NULL){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
        nodeListSet(&info->superTables, currElement->measure, currElement->measureLen, sMeta);
      }
      info->currSTableMeta = sMeta->tableMeta;
      superKV = sMeta->cols;
      if(unlikely(taosArrayGetSize(superKV) == 0)){
        isSuperKVInit = false;
      }
1390 1391 1392 1393 1394
      taosArraySetSize(preLineKV, 0);
    }
  }

  while (*sql < sqlEnd) {
1395
    if (unlikely(IS_SPACE(*sql))) {
1396 1397 1398
      break;
    }

1399
    bool hasSlash = false;
1400 1401 1402 1403
    // parse key
    const char *key = *sql;
    int32_t     keyLen = 0;
    while (*sql < sqlEnd) {
1404
      if (unlikely(IS_COMMA(*sql))) {
1405 1406 1407
        smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
1408
      if (unlikely(IS_EQUAL(*sql))) {
1409 1410 1411 1412
        keyLen = *sql - key;
        (*sql)++;
        break;
      }
1413 1414 1415
      if(!hasSlash){
        hasSlash = (*(*sql) == SLASH);
      }
1416 1417
      (*sql)++;
    }
1418 1419 1420
    if(unlikely(hasSlash)) {
      PROCESS_SLASH(key, keyLen)
    }
1421

1422
    if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
1423 1424 1425 1426 1427 1428 1429
      smlBuildInvalidDataMsg(&info->msgBuf, "invalid key or key is too long than 64", key);
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
    }

    // parse value
    const char *value = *sql;
    int32_t     valueLen = 0;
1430
    hasSlash              = false;
1431 1432 1433
    bool        isInQuote = false;
    while (*sql < sqlEnd) {
      // parse value
1434
      if (IS_QUOTE(*sql)) {
1435 1436 1437 1438
        isInQuote = !isInQuote;
        (*sql)++;
        continue;
      }
wmmhello's avatar
wmmhello 已提交
1439
      if (!isInQuote){
1440
        if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) {
wmmhello's avatar
wmmhello 已提交
1441
          break;
1442
        } else if (unlikely(IS_EQUAL(*sql))) {
wmmhello's avatar
wmmhello 已提交
1443 1444 1445
          smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
          return TSDB_CODE_SML_INVALID_DATA;
        }
1446
      }
1447 1448 1449
      if(!hasSlash){
        hasSlash = (*(*sql) == SLASH);
      }
wmmhello's avatar
wmmhello 已提交
1450

1451 1452 1453 1454
      (*sql)++;
    }
    valueLen = *sql - value;

1455
    if (unlikely(isInQuote)) {
1456 1457 1458
      smlBuildInvalidDataMsg(&info->msgBuf, "only one quote", value);
      return TSDB_CODE_SML_INVALID_DATA;
    }
1459
    if (unlikely(valueLen == 0)) {
1460 1461 1462
      smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", value);
      return TSDB_CODE_SML_INVALID_DATA;
    }
1463 1464 1465
    if(unlikely(hasSlash)) {
      PROCESS_SLASH(value, valueLen)
    }
1466 1467

    SSmlKv kv = {.key = key, .keyLen = keyLen, .value = value, .length = valueLen};
1468 1469 1470
    int32_t ret = smlParseValue(&kv, &info->msgBuf);
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
1471 1472 1473
    }

    if(info->dataFormat){
1474 1475 1476
      //cnt begin 0, add ts so + 2
      if(unlikely(cnt + 2 > info->currSTableMeta->tableInfo.numOfColumns)){
        info->needModifySchema = true;
1477 1478
      }
      // bind data
1479 1480 1481 1482
      ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, cnt + 1);
      if (unlikely(ret != TSDB_CODE_SUCCESS)) {
        smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
        return ret;
1483 1484
      }

1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514
      if(isSameMeasure){
        if(cnt >= taosArrayGetSize(preLineKV)) {
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
        SSmlKv *preKV = taosArrayGet(preLineKV, cnt);
        if(kv.type != preKV->type){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }

        if(unlikely(IS_VAR_DATA_TYPE(kv.type) && kv.length > preKV->length)){
          preKV->length = kv.length;
          SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen, NULL);
          ASSERT(tableMeta != NULL);

          SSmlKv *oldKV = taosArrayGet(tableMeta->cols, cnt);
          oldKV->length = kv.length;
          info->needModifySchema = true;
        }
        if(unlikely(!IS_SAME_KEY)){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
      }else{
        if(isSuperKVInit){
          if(unlikely(cnt >= taosArrayGetSize(superKV))) {
1515 1516 1517 1518
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }
1519 1520
          SSmlKv *preKV = taosArrayGet(superKV, cnt);
          if(unlikely(kv.type != preKV->type)){
1521 1522 1523 1524 1525
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }

1526 1527 1528
          if(IS_VAR_DATA_TYPE(kv.type)){
            if(kv.length > preKV->length) {
              preKV->length = kv.length;
1529
            }else{
1530
              kv.length = preKV->length;
1531
            }
wmmhello's avatar
wmmhello 已提交
1532
            info->needModifySchema = true;
1533
          }
1534 1535 1536 1537
          if(unlikely(!IS_SAME_KEY)){
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
1538
          }
1539 1540
        }else{
          taosArrayPush(superKV, &kv);
1541
        }
1542
        taosArrayPush(preLineKV, &kv);
1543
      }
1544 1545
    }else{
      taosArraySetSize(currElement->colArray, 1);
1546 1547
      taosArrayPush(currElement->colArray, &kv);   //reserve for timestamp
    }
1548

1549 1550 1551 1552 1553
    cnt++;
    if(IS_SPACE(*sql)){
      break;
    }
    (*sql)++;
1554 1555
  }

1556 1557
  return TSDB_CODE_SUCCESS;
}
1558

1559 1560 1561 1562 1563
static int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements) {
  if (!sql) return TSDB_CODE_SML_INVALID_DATA;
  JUMP_SPACE(sql, sqlEnd)
  if (unlikely(*sql == COMMA)) return TSDB_CODE_SML_INVALID_DATA;
  elements->measure = sql;
wmmhello's avatar
wmmhello 已提交
1564

wmmhello's avatar
wmmhello 已提交
1565
  // parse measure
wmmhello's avatar
wmmhello 已提交
1566
  while (sql < sqlEnd) {
X
Xiaoyu Wang 已提交
1567
    if ((sql != elements->measure) && IS_SLASH_LETTER(sql)) {
wmmhello's avatar
wmmhello 已提交
1568 1569
      MOVE_FORWARD_ONE(sql, sqlEnd - sql);
      sqlEnd--;
wmmhello's avatar
wmmhello 已提交
1570 1571
      continue;
    }
X
Xiaoyu Wang 已提交
1572
    if (IS_COMMA(sql)) {
wmmhello's avatar
wmmhello 已提交
1573 1574 1575
      break;
    }

X
Xiaoyu Wang 已提交
1576
    if (IS_SPACE(sql)) {
wmmhello's avatar
wmmhello 已提交
1577 1578
      break;
    }
wmmhello's avatar
wmmhello 已提交
1579 1580
    sql++;
  }
wmmhello's avatar
wmmhello 已提交
1581
  elements->measureLen = sql - elements->measure;
1582
  if (unlikely(IS_INVALID_TABLE_LEN(elements->measureLen))) {
1583
    smlBuildInvalidDataMsg(&info->msgBuf, "measure is empty or too large than 192", NULL);
wmmhello's avatar
wmmhello 已提交
1584
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
wmmhello's avatar
wmmhello 已提交
1585
  }
wmmhello's avatar
wmmhello 已提交
1586

1587 1588 1589 1590 1591 1592 1593 1594 1595 1596
  // to get measureTagsLen before
  const char* tmp = sql;
  while (tmp < sqlEnd){
    if (IS_SPACE(tmp)) {
      break;
    }
    tmp++;
  }
  elements->measureTagsLen = tmp - elements->measure;

1597 1598 1599 1600 1601 1602 1603 1604
  bool isSameCTable = false;
  bool isSameMeasure = false;
  if(IS_SAME_CHILD_TABLE){
    isSameCTable = true;
    isSameMeasure = true;
  }else if(info->dataFormat) {
    isSameMeasure = IS_SAME_SUPER_TABLE;
  }
wmmhello's avatar
wmmhello 已提交
1605
  // parse tag
X
Xiaoyu Wang 已提交
1606
  if (*sql == SPACE) {
wmmhello's avatar
wmmhello 已提交
1607
    elements->tagsLen = 0;
X
Xiaoyu Wang 已提交
1608 1609
  } else {
    if (*sql == COMMA) sql++;
wmmhello's avatar
wmmhello 已提交
1610
    elements->tags = sql;
1611 1612

    // tinfo != NULL means child table has never occur before
1613 1614
    int ret = smlParseTagKv(info, &sql, sqlEnd, elements, isSameMeasure, isSameCTable);
    if(unlikely(ret != TSDB_CODE_SUCCESS)){
1615
      return ret;
wmmhello's avatar
wmmhello 已提交
1616
    }
1617
    if(unlikely(info->reRun)){
1618 1619 1620
      return TSDB_CODE_SUCCESS;
    }

1621 1622
    sql = elements->measure + elements->measureTagsLen;

wmmhello's avatar
wmmhello 已提交
1623
    elements->tagsLen = sql - elements->tags;
1624
  }
wmmhello's avatar
wmmhello 已提交
1625

wmmhello's avatar
wmmhello 已提交
1626
  // parse cols
wmmhello's avatar
wmmhello 已提交
1627
  JUMP_SPACE(sql, sqlEnd)
wmmhello's avatar
wmmhello 已提交
1628
  elements->cols = sql;
1629

1630 1631
  int ret = smlParseColKv(info, &sql, sqlEnd, elements, isSameMeasure, isSameCTable);
  if(unlikely(ret != TSDB_CODE_SUCCESS)){
1632
    return ret;
wmmhello's avatar
wmmhello 已提交
1633
  }
1634

1635
  if(unlikely(info->reRun)){
1636
    return TSDB_CODE_SUCCESS;
1637
  }
1638

wmmhello's avatar
wmmhello 已提交
1639
  elements->colsLen = sql - elements->cols;
1640
  if (unlikely(elements->colsLen == 0)) {
1641
    smlBuildInvalidDataMsg(&info->msgBuf, "cols is empty", NULL);
wmmhello's avatar
wmmhello 已提交
1642 1643
    return TSDB_CODE_SML_INVALID_DATA;
  }
wmmhello's avatar
wmmhello 已提交
1644

wmmhello's avatar
wmmhello 已提交
1645
  // parse timestamp
wmmhello's avatar
wmmhello 已提交
1646
  JUMP_SPACE(sql, sqlEnd)
wmmhello's avatar
wmmhello 已提交
1647
  elements->timestamp = sql;
wmmhello's avatar
wmmhello 已提交
1648 1649
  while (sql < sqlEnd) {
    if (isspace(*sql)) {
wmmhello's avatar
wmmhello 已提交
1650 1651 1652 1653
      break;
    }
    sql++;
  }
wmmhello's avatar
wmmhello 已提交
1654
  elements->timestampLen = sql - elements->timestamp;
wmmhello's avatar
wmmhello 已提交
1655

1656 1657 1658 1659
  int64_t ts = smlParseTS(info, elements->timestamp, elements->timestampLen);
  if (ts <= 0) {
    uError("SML:0x%" PRIx64 " smlParseTS error:%" PRId64, info->id, ts);
    return TSDB_CODE_INVALID_TIMESTAMP;
1660
  }
1661 1662
  // add ts to
  SSmlKv kv = { .key = TS, .keyLen = TS_LEN, .i = ts, .type = TSDB_DATA_TYPE_TIMESTAMP, .length = (int16_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
1663
  if(info->dataFormat){
1664
    smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0);
1665
    smlBuildRow(info->currTableDataCtx);
1666 1667
  }else{
    taosArraySet(elements->colArray, 0, &kv);
1668
  }
1669
  info->preLine = *elements;
1670

1671
  return ret;
wmmhello's avatar
wmmhello 已提交
1672 1673
}

1674
static void smlParseTelnetElement(char **sql, char *sqlEnd, char **data, int32_t *len) {
wmmhello's avatar
wmmhello 已提交
1675
  while (*sql < sqlEnd) {
1676
    if (unlikely((**sql != SPACE && !(*data)))) {
1677
      *data = *sql;
1678
    } else if (unlikely(**sql == SPACE && *data)) {
1679 1680 1681 1682 1683 1684 1685
      *len = *sql - *data;
      break;
    }
    (*sql)++;
  }
}

1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723
static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SSmlLineInfo *elements, SSmlMsgBuf *msg) {
  if(IS_SAME_CHILD_TABLE){
    return TSDB_CODE_SUCCESS;
  }

  bool isSameMeasure = IS_SAME_SUPER_TABLE;

  int     cnt = 0;
  SArray *preLineKV = info->preLineTagKV;
  bool    isSuperKVInit = true;
  SArray *superKV = NULL;
  if(info->dataFormat){
    if(!isSameMeasure){
      SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);

      if(unlikely(sMeta == NULL)){
        sMeta = smlBuildSTableMeta(info->dataFormat);
        STableMeta * pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen);
        sMeta->tableMeta = pTableMeta;
        if(pTableMeta == NULL){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
        nodeListSet(&info->superTables, elements->measure, elements->measureLen, sMeta);
      }
      info->currSTableMeta = sMeta->tableMeta;
      superKV = sMeta->tags;

      if(unlikely(taosArrayGetSize(superKV) == 0)){
        isSuperKVInit = false;
      }
      taosArraySetSize(preLineKV, 0);
    }
  }else{
    taosArraySetSize(preLineKV, 0);
  }

wmmhello's avatar
wmmhello 已提交
1724
  const char *sql = data;
X
Xiaoyu Wang 已提交
1725
  size_t      childTableNameLen = strlen(tsSmlChildTableName);
wmmhello's avatar
wmmhello 已提交
1726 1727
  while (sql < sqlEnd) {
    JUMP_SPACE(sql, sqlEnd)
1728
    if (unlikely(*sql == '\0')) break;
wmmhello's avatar
wmmhello 已提交
1729

wmmhello's avatar
wmmhello 已提交
1730
    const char *key = sql;
X
Xiaoyu Wang 已提交
1731
    int32_t     keyLen = 0;
wmmhello's avatar
wmmhello 已提交
1732 1733

    // parse key
wmmhello's avatar
wmmhello 已提交
1734
    while (sql < sqlEnd) {
1735
      if (unlikely(*sql == SPACE)) {
wmmhello's avatar
wmmhello 已提交
1736 1737 1738
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
1739
      if (unlikely(*sql == EQUAL)) {
wmmhello's avatar
wmmhello 已提交
1740 1741
        keyLen = sql - key;
        sql++;
1742 1743
        break;
      }
wmmhello's avatar
wmmhello 已提交
1744
      sql++;
1745
    }
wmmhello's avatar
wmmhello 已提交
1746

1747
    if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
1748
      smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
wmmhello's avatar
wmmhello 已提交
1749
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
1750
    }
1751 1752 1753 1754
//    if (smlCheckDuplicateKey(key, keyLen, dumplicateKey)) {
//      smlBuildInvalidDataMsg(msg, "dumplicate key", key);
//      return TSDB_CODE_TSC_DUP_NAMES;
//    }
1755 1756

    // parse value
wmmhello's avatar
wmmhello 已提交
1757
    const char *value = sql;
X
Xiaoyu Wang 已提交
1758
    int32_t     valueLen = 0;
wmmhello's avatar
wmmhello 已提交
1759
    while (sql < sqlEnd) {
wmmhello's avatar
wmmhello 已提交
1760
      // parse value
1761
      if (unlikely(*sql == SPACE)) {
1762 1763
        break;
      }
1764
      if (unlikely(*sql == EQUAL)) {
wmmhello's avatar
wmmhello 已提交
1765 1766 1767 1768
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      sql++;
1769
    }
wmmhello's avatar
wmmhello 已提交
1770
    valueLen = sql - value;
wmmhello's avatar
wmmhello 已提交
1771

1772
    if (unlikely(valueLen == 0)) {
1773
      smlBuildInvalidDataMsg(msg, "invalid value", value);
wmmhello's avatar
wmmhello 已提交
1774
      return TSDB_CODE_TSC_INVALID_VALUE;
1775
    }
wmmhello's avatar
wmmhello 已提交
1776

1777
    if (unlikely(valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)) {
wmmhello's avatar
wmmhello 已提交
1778 1779 1780
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }

1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822
    SSmlKv kv = {.key = key, .keyLen = keyLen, .value = value, .length = valueLen, .type = TSDB_DATA_TYPE_NCHAR};

    if(info->dataFormat){
      if(unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)){
        info->needModifySchema = true;
      }

      if(isSameMeasure){
        if(unlikely(cnt >= taosArrayGetSize(preLineKV))) {
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
        SSmlKv *preKV = taosArrayGet(preLineKV, cnt);
        if(unlikely(kv.length > preKV->length)){
          preKV->length = kv.length;
          SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
          ASSERT(tableMeta != NULL);

          SSmlKv *oldKV = taosArrayGet(tableMeta->tags, cnt);
          oldKV->length = kv.length;
          info->needModifySchema = true;
        }
        if(unlikely(!IS_SAME_KEY)){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
      }else{
        if(isSuperKVInit){
          if(unlikely(cnt >= taosArrayGetSize(superKV))) {
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }
          SSmlKv *preKV = taosArrayGet(superKV, cnt);
          if(unlikely(kv.length > preKV->length)) {
            preKV->length = kv.length;
          }else{
            kv.length = preKV->length;
          }
          info->needModifySchema = true;
1823

1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837
          if(unlikely(!IS_SAME_KEY)){
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }
        }else{
          taosArrayPush(superKV, &kv);
        }
        taosArrayPush(preLineKV, &kv);
      }
    }else{
      taosArrayPush(preLineKV, &kv);
    }
    cnt++;
1838
  }
1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856
  SSmlTableInfo *tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements->measure, elements->measureTagsLen, NULL);
  if (unlikely(tinfo == NULL)) {
    tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen);
    if (!tinfo) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    for(int i = 0; i < taosArrayGetSize(preLineKV); i++){
      taosArrayPush(tinfo->tags, taosArrayGet(preLineKV, i));
    }
    smlSetCTableName(tinfo);
    if (info->dataFormat) {
      info->currSTableMeta->uid = tinfo->uid;
      tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
      if (tinfo->tableDataCtx == NULL) {
        smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
        return TSDB_CODE_SML_INVALID_DATA;
      }
    }
1857

1858 1859
    nodeListSet(&info->childTables, elements->measure, elements->measureTagsLen, tinfo);
  }
1860 1861
  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
1862

1863
// format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
1864
static int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements) {
X
Xiaoyu Wang 已提交
1865
  if (!sql) return TSDB_CODE_SML_INVALID_DATA;
1866 1867

  // parse metric
1868 1869
  smlParseTelnetElement(&sql, sqlEnd, &elements->measure, &elements->measureLen);
  if (unlikely((!(elements->measure) || IS_INVALID_TABLE_LEN(elements->measureLen)))) {
1870
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
wmmhello's avatar
wmmhello 已提交
1871
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
1872 1873 1874
  }

  // parse timestamp
1875 1876
  smlParseTelnetElement(&sql, sqlEnd, &elements->timestamp, &elements->timestampLen);
  if (unlikely(!elements->timestamp || elements->timestampLen == 0)) {
1877 1878 1879 1880
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
    return TSDB_CODE_SML_INVALID_DATA;
  }

1881 1882 1883 1884 1885 1886
  bool needConverTime = false;  // get TS before parse tag(get meta), so need conver time
  if(info->dataFormat && info->currSTableMeta == NULL){
    needConverTime = true;
  }
  int64_t ts = smlParseTS(info, elements->timestamp, elements->timestampLen);
  if (unlikely(ts < 0)) {
1887
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
1888
    return TSDB_CODE_INVALID_TIMESTAMP;
1889
  }
1890
  SSmlKv kvTs = { .key = TS, .keyLen = TS_LEN, .i = ts, .type = TSDB_DATA_TYPE_TIMESTAMP, .length = (int16_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
1891 1892

  // parse value
1893 1894
  smlParseTelnetElement(&sql, sqlEnd, &elements->cols, &elements->colsLen);
  if (unlikely(!elements->cols || elements->colsLen == 0)) {
1895 1896 1897
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql);
    return TSDB_CODE_TSC_INVALID_VALUE;
  }
1898

1899 1900 1901 1902 1903 1904
  SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN, .value = elements->cols, .length = elements->colsLen};
  if (smlParseNumber(&kv, &info->msgBuf)) {
    kv.length = (int16_t)tDataTypes[kv.type].bytes;
    return TSDB_CODE_SUCCESS;
  }else{
    return TSDB_CODE_TSC_INVALID_VALUE;
1905
  }
1906

1907 1908 1909 1910 1911 1912 1913 1914
  // move measure before tags to combine keys to identify child table
  memcpy(sql - elements->measureLen, elements->measure, elements->measureLen);
  elements->measure = sql - elements->measureLen;
  elements->measureLen += sqlEnd - sql;


  int ret = smlParseTelnetTags(info, sql, sqlEnd, elements, &info->msgBuf);
  if (unlikely(ret != TSDB_CODE_SUCCESS)) {
1915
    return ret;
wmmhello's avatar
wmmhello 已提交
1916
  }
wmmhello's avatar
wmmhello 已提交
1917

1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942
  if(unlikely(info->reRun)){
    return TSDB_CODE_SUCCESS;
  }

  if(info->dataFormat){
    if(needConverTime) {
      kvTs.i = convertTimePrecision(kvTs.i, TSDB_TIME_PRECISION_NANO, info->currSTableMeta->tableInfo.precision);
    }
    ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kvTs, 0);
    if(ret == TSDB_CODE_SUCCESS){
      ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 1);
    }
    if(ret == TSDB_CODE_SUCCESS){
      ret = smlBuildRow(info->currTableDataCtx);
    }
    if (unlikely(ret != TSDB_CODE_SUCCESS)) {
      smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
      return ret;
    }
  }else{
    taosArrayPush(elements->colArray, &kvTs);
    taosArrayPush(elements->colArray, &kv);
  }
  info->preLine = *elements;

wmmhello's avatar
wmmhello 已提交
1943 1944 1945
  return TSDB_CODE_SUCCESS;
}

1946
static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, bool isTag, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
1947
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
1948
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
wmmhello's avatar
wmmhello 已提交
1949

wmmhello's avatar
wmmhello 已提交
1950
    int16_t *index = (int16_t *)taosHashGet(metaHash, kv->key, kv->keyLen);
X
Xiaoyu Wang 已提交
1951
    if (index) {
1952 1953 1954 1955 1956 1957 1958 1959
      SSmlKv *value = (SSmlKv *)taosArrayGet(metaArray, *index);
      if (isTag){
        if (kv->length > value->length) {
          value->length = kv->length;
        }
        continue;
      }
      if (kv->type != value->type) {
wmmhello's avatar
wmmhello 已提交
1960
        smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
wmmhello's avatar
wmmhello 已提交
1961
        return TSDB_CODE_SML_NOT_SAME_TYPE;
1962 1963 1964 1965
      }

      if (IS_VAR_DATA_TYPE(kv->type) && (kv->length > value->length)) {  // update string len, if bigger
        value->length = kv->length;
1966
      }
X
Xiaoyu Wang 已提交
1967
    } else {
wmmhello's avatar
wmmhello 已提交
1968 1969 1970 1971 1972
      size_t tmp = taosArrayGetSize(metaArray);
      ASSERT(tmp <= INT16_MAX);
      int16_t size = tmp;
      taosArrayPush(metaArray, &kv);
      taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
1973
    }
wmmhello's avatar
wmmhello 已提交
1974
  }
wmmhello's avatar
wmmhello 已提交
1975

wmmhello's avatar
wmmhello 已提交
1976
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1977 1978
}

wmmhello's avatar
wmmhello 已提交
1979
static void smlDestroyTableInfo(SSmlTableInfo *tag) {
1980 1981 1982
  for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
    SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i);
    taosHashCleanup(kvHash);
wmmhello's avatar
wmmhello 已提交
1983
  }
1984

1985 1986 1987 1988 1989
  taosArrayDestroy(tag->cols);
  taosArrayDestroy(tag->tags);
  taosMemoryFree(tag);
}

1990
static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
1991
  SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
1992
  if (!kvHash) {
1993
    uError("SML:smlDealCols failed to allocate memory");
1994
    return TSDB_CODE_OUT_OF_MEMORY;
1995
  }
X
Xiaoyu Wang 已提交
1996
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
1997
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
wmmhello's avatar
wmmhello 已提交
1998
    taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
1999 2000
  }

2001
  taosArrayPush(colsArray, &kvHash);
2002 2003 2004
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
2005 2006
void smlDestroyInfo(SSmlHandle *info) {
  if (!info) return;
2007 2008 2009
  qDestroyQuery(info->pQuery);

  // destroy info->childTables
wmmhello's avatar
wmmhello 已提交
2010 2011 2012 2013 2014 2015 2016 2017
  NodeList* tmp = info->childTables;
  while (tmp) {
    if(tmp->data.used) {
      smlDestroyTableInfo(tmp->data.value);
    }
    NodeList* t = tmp->next;
    taosMemoryFree(tmp);
    tmp = t;
2018 2019 2020
  }

  // destroy info->superTables
wmmhello's avatar
wmmhello 已提交
2021 2022 2023 2024 2025 2026 2027 2028
  tmp = info->superTables;
  while (tmp) {
    if(tmp->data.used) {
      smlDestroySTableMeta(tmp->data.value);
    }
    NodeList* t = tmp->next;
    taosMemoryFree(tmp);
    tmp = t;
2029 2030 2031 2032
  }

  // destroy info->pVgHash
  taosHashCleanup(info->pVgHash);
wmmhello's avatar
wmmhello 已提交
2033

2034 2035 2036
  taosArrayDestroy(info->preLineTagKV);
  taosArrayDestroy(info->preLineColKV);

wmmhello's avatar
wmmhello 已提交
2037 2038 2039 2040
  if(!info->dataFormat){
    for(int i = 0; i < info->lineNum; i++){
      taosArrayDestroy(info->lines[i].colArray);
    }
wmmhello's avatar
wmmhello 已提交
2041
    taosMemoryFree(info->lines);
wmmhello's avatar
wmmhello 已提交
2042
  }
wmmhello's avatar
wmmhello 已提交
2043

wmmhello's avatar
wmmhello 已提交
2044
  cJSON_Delete(info->root);
wmmhello's avatar
wmmhello 已提交
2045
  taosMemoryFreeClear(info);
wmmhello's avatar
wmmhello 已提交
2046
}
2047

wmmhello's avatar
wmmhello 已提交
2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059
static SSmlHandle *smlBuildSmlInfo(TAOS *taos) {
  int32_t     code = TSDB_CODE_SUCCESS;
  SSmlHandle *info = (SSmlHandle *)taosMemoryCalloc(1, sizeof(SSmlHandle));
  if (NULL == info) {
    return NULL;
  }
  info->taos = acquireTscObj(*(int64_t *)taos);
  code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code);
    goto cleanup;
  }
X
Xiaoyu Wang 已提交
2060
  info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
wmmhello's avatar
wmmhello 已提交
2061 2062 2063
  info->id = smlGenId();
  info->pQuery = smlInitHandle();
  info->dataFormat = true;
2064

2065 2066 2067
  info->preLineTagKV = taosArrayInit(8, sizeof(SSmlKv));
  info->preLineColKV = taosArrayInit(8, sizeof(SSmlKv));

wmmhello's avatar
wmmhello 已提交
2068 2069
  if (NULL == info->pVgHash) {
    uError("create SSmlHandle failed");
2070 2071 2072 2073
    goto cleanup;
  }

  return info;
wmmhello's avatar
wmmhello 已提交
2074 2075

cleanup:
2076 2077 2078 2079 2080
  smlDestroyInfo(info);
  return NULL;
}

/************* TSDB_SML_JSON_PROTOCOL function start **************/
2081
static int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) {
2082 2083
  cJSON *metric = cJSON_GetObjectItem(root, "metric");
  if (!cJSON_IsString(metric)) {
X
Xiaoyu Wang 已提交
2084
    return TSDB_CODE_TSC_INVALID_JSON;
2085 2086
  }

2087 2088
  elements->measureLen = strlen(metric->valuestring);
  if (IS_INVALID_TABLE_LEN(elements->measureLen)) {
X
Xiaoyu Wang 已提交
2089
    uError("OTD:0x%" PRIx64 " Metric lenght is 0 or large than 192", info->id);
2090 2091 2092
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
  }

2093
  elements->measure = metric->valuestring;
wmmhello's avatar
wmmhello 已提交
2094
  return TSDB_CODE_SUCCESS;
2095 2096
}

2097
static int64_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int32_t toPrecision) {
2098
  int32_t size = cJSON_GetArraySize(root);
2099 2100 2101
  if (unlikely(size != OTD_JSON_SUB_FIELDS_NUM)) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
    return -1;
2102 2103 2104
  }

  cJSON *value = cJSON_GetObjectItem(root, "value");
2105 2106 2107
  if (unlikely(!cJSON_IsNumber(value))) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
    return -1;
2108 2109 2110
  }

  cJSON *type = cJSON_GetObjectItem(root, "type");
2111 2112 2113
  if (unlikely(!cJSON_IsString(type))) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
    return -1;
2114 2115 2116
  }

  double timeDouble = value->valuedouble;
2117
  if (unlikely(smlDoubleToInt64OverFlow(timeDouble))) {
2118
    smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
2119
    return -1;
2120
  }
wmmhello's avatar
wmmhello 已提交
2121 2122

  if (timeDouble == 0) {
2123
    return taosGetTimestampNs()/smlFactorNS[toPrecision];
wmmhello's avatar
wmmhello 已提交
2124 2125 2126
  }

  if (timeDouble < 0) {
2127
    return timeDouble;
2128 2129
  }

2130
  int64_t tsInt64 = timeDouble;
2131
  size_t typeLen = strlen(type->valuestring);
wmmhello's avatar
wmmhello 已提交
2132
  if (typeLen == 1 && (type->valuestring[0] == 's' || type->valuestring[0] == 'S')) {
X
Xiaoyu Wang 已提交
2133
    // seconds
2134 2135 2136
    int8_t fromPrecision = TSDB_TIME_PRECISION_SECONDS;
    if(smlFactorS[toPrecision] < INT64_MAX / tsInt64){
      return tsInt64 * smlFactorS[toPrecision];
2137
    }
2138
    return -1;
wmmhello's avatar
wmmhello 已提交
2139
  } else if (typeLen == 2 && (type->valuestring[1] == 's' || type->valuestring[1] == 'S')) {
2140 2141
    switch (type->valuestring[0]) {
      case 'm':
wmmhello's avatar
wmmhello 已提交
2142
      case 'M':
X
Xiaoyu Wang 已提交
2143
        // milliseconds
2144
        return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_MILLI, toPrecision);
2145 2146
        break;
      case 'u':
wmmhello's avatar
wmmhello 已提交
2147
      case 'U':
X
Xiaoyu Wang 已提交
2148
        // microseconds
2149
        return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_MICRO, toPrecision);
2150 2151
        break;
      case 'n':
wmmhello's avatar
wmmhello 已提交
2152
      case 'N':
2153
        return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_NANO, toPrecision);
2154 2155
        break;
      default:
2156
        return -1;
2157 2158
    }
  } else {
2159
    return -1;
2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171
  }
}

static uint8_t smlGetTimestampLen(int64_t num) {
  uint8_t len = 0;
  while ((num /= 10) != 0) {
    len++;
  }
  len++;
  return len;
}

2172
static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *root) {
X
Xiaoyu Wang 已提交
2173
  // Timestamp must be the first KV to parse
2174
  int32_t toPrecision = info->currSTableMeta ? info->currSTableMeta->tableInfo.precision : TSDB_TIME_PRECISION_NANO;
2175 2176
  cJSON *timestamp = cJSON_GetObjectItem(root, "timestamp");
  if (cJSON_IsNumber(timestamp)) {
X
Xiaoyu Wang 已提交
2177
    // timestamp value 0 indicates current system time
2178
    double timeDouble = timestamp->valuedouble;
2179
    if (unlikely(smlDoubleToInt64OverFlow(timeDouble))) {
2180
      smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
2181
      return -1;
2182
    }
wmmhello's avatar
wmmhello 已提交
2183

2184 2185 2186 2187 2188 2189
    if (unlikely(timeDouble < 0)) {
      smlBuildInvalidDataMsg(&info->msgBuf,
                             "timestamp is negative", NULL);
      return timeDouble;
    }else if (unlikely(timeDouble == 0)) {
      return taosGetTimestampNs()/smlFactorNS[toPrecision];
2190
    }
2191

2192
    uint8_t tsLen = smlGetTimestampLen((int64_t)timeDouble);
2193 2194 2195 2196 2197
    int8_t fromPrecision = smlGetTsTypeByLen(tsLen);
    if (unlikely(fromPrecision == -1)) {
      smlBuildInvalidDataMsg(&info->msgBuf,
                             "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", NULL);
      return -1;
2198
    }
2199 2200

    return convertTimePrecision(timeDouble, fromPrecision, toPrecision);
2201
  } else if (cJSON_IsObject(timestamp)) {
2202
    return smlParseTSFromJSONObj(info, timestamp, toPrecision);
2203
  } else {
2204 2205 2206
    smlBuildInvalidDataMsg(&info->msgBuf,
                           "invalidate json", NULL);
    return -1;
wmmhello's avatar
wmmhello 已提交
2207
  }
wmmhello's avatar
wmmhello 已提交
2208 2209
}

X
Xiaoyu Wang 已提交
2210
static int32_t smlConvertJSONBool(SSmlKv *pVal, char *typeStr, cJSON *value) {
2211 2212 2213 2214 2215 2216 2217
  if (strcasecmp(typeStr, "bool") != 0) {
    uError("OTD:invalid type(%s) for JSON Bool", typeStr);
    return TSDB_CODE_TSC_INVALID_JSON_TYPE;
  }
  pVal->type = TSDB_DATA_TYPE_BOOL;
  pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
  pVal->i = value->valueint;
wmmhello's avatar
wmmhello 已提交
2218

2219 2220
  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
2221

X
Xiaoyu Wang 已提交
2222 2223 2224
static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
  // tinyint
  if (strcasecmp(typeStr, "i8") == 0 || strcasecmp(typeStr, "tinyint") == 0) {
2225 2226 2227 2228 2229 2230 2231 2232 2233
    if (!IS_VALID_TINYINT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(tinyint)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_TINYINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
2234 2235
  // smallint
  if (strcasecmp(typeStr, "i16") == 0 || strcasecmp(typeStr, "smallint") == 0) {
2236 2237 2238 2239 2240 2241 2242 2243 2244
    if (!IS_VALID_SMALLINT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(smallint)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_SMALLINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
2245 2246
  // int
  if (strcasecmp(typeStr, "i32") == 0 || strcasecmp(typeStr, "int") == 0) {
2247 2248 2249 2250 2251 2252 2253 2254 2255
    if (!IS_VALID_INT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(int)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_INT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
2256 2257
  // bigint
  if (strcasecmp(typeStr, "i64") == 0 || strcasecmp(typeStr, "bigint") == 0) {
2258 2259
    pVal->type = TSDB_DATA_TYPE_BIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
X
Xiaoyu Wang 已提交
2260
    if (value->valuedouble >= (double)INT64_MAX) {
2261
      pVal->i = INT64_MAX;
X
Xiaoyu Wang 已提交
2262
    } else if (value->valuedouble <= (double)INT64_MIN) {
2263
      pVal->i = INT64_MIN;
X
Xiaoyu Wang 已提交
2264
    } else {
2265
      pVal->i = value->valuedouble;
2266 2267 2268
    }
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
2269 2270
  // float
  if (strcasecmp(typeStr, "f32") == 0 || strcasecmp(typeStr, "float") == 0) {
2271 2272 2273
    if (!IS_VALID_FLOAT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(float)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
wmmhello's avatar
wmmhello 已提交
2274
    }
2275 2276 2277 2278 2279
    pVal->type = TSDB_DATA_TYPE_FLOAT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->f = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
2280 2281
  // double
  if (strcasecmp(typeStr, "f64") == 0 || strcasecmp(typeStr, "double") == 0) {
2282 2283 2284 2285
    pVal->type = TSDB_DATA_TYPE_DOUBLE;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->d = value->valuedouble;
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2286 2287
  }

X
Xiaoyu Wang 已提交
2288
  // if reach here means type is unsupported
2289 2290 2291
  uError("OTD:invalid type(%s) for JSON Number", typeStr);
  return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
2292

X
Xiaoyu Wang 已提交
2293
static int32_t smlConvertJSONString(SSmlKv *pVal, char *typeStr, cJSON *value) {
2294 2295 2296 2297 2298 2299 2300 2301 2302
  if (strcasecmp(typeStr, "binary") == 0) {
    pVal->type = TSDB_DATA_TYPE_BINARY;
  } else if (strcasecmp(typeStr, "nchar") == 0) {
    pVal->type = TSDB_DATA_TYPE_NCHAR;
  } else {
    uError("OTD:invalid type(%s) for JSON String", typeStr);
    return TSDB_CODE_TSC_INVALID_JSON_TYPE;
  }
  pVal->length = (int16_t)strlen(value->valuestring);
wmmhello's avatar
wmmhello 已提交
2303

2304
  if (pVal->type == TSDB_DATA_TYPE_BINARY && pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
wmmhello's avatar
wmmhello 已提交
2305 2306
    return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
  }
2307 2308
  if (pVal->type == TSDB_DATA_TYPE_NCHAR &&
      pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
wmmhello's avatar
wmmhello 已提交
2309 2310 2311
    return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
  }

wmmhello's avatar
wmmhello 已提交
2312 2313
  pVal->value = value->valuestring;
  return TSDB_CODE_SUCCESS;
2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339
}

static int32_t smlParseValueFromJSONObj(cJSON *root, SSmlKv *kv) {
  int32_t ret = TSDB_CODE_SUCCESS;
  int32_t size = cJSON_GetArraySize(root);

  if (size != OTD_JSON_SUB_FIELDS_NUM) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *value = cJSON_GetObjectItem(root, "value");
  if (value == NULL) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *type = cJSON_GetObjectItem(root, "type");
  if (!cJSON_IsString(type)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  switch (value->type) {
    case cJSON_True:
    case cJSON_False: {
      ret = smlConvertJSONBool(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
wmmhello's avatar
wmmhello 已提交
2340
      }
2341
      break;
wmmhello's avatar
wmmhello 已提交
2342
    }
2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358
    case cJSON_Number: {
      ret = smlConvertJSONNumber(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
      break;
    }
    case cJSON_String: {
      ret = smlConvertJSONString(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
      break;
    }
    default:
      return TSDB_CODE_TSC_INVALID_JSON_TYPE;
wmmhello's avatar
wmmhello 已提交
2359
  }
2360 2361

  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2362 2363
}

2364 2365 2366 2367 2368 2369 2370 2371
static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
  switch (root->type) {
    case cJSON_True:
    case cJSON_False: {
      kv->type = TSDB_DATA_TYPE_BOOL;
      kv->length = (int16_t)tDataTypes[kv->type].bytes;
      kv->i = root->valueint;
      break;
wmmhello's avatar
wmmhello 已提交
2372
    }
2373 2374 2375 2376 2377 2378 2379 2380 2381 2382
    case cJSON_Number: {
      kv->type = TSDB_DATA_TYPE_DOUBLE;
      kv->length = (int16_t)tDataTypes[kv->type].bytes;
      kv->d = root->valuedouble;
      break;
    }
    case cJSON_String: {
      /* set default JSON type to binary/nchar according to
       * user configured parameter tsDefaultJSONStrType
       */
wmmhello's avatar
wmmhello 已提交
2383

X
Xiaoyu Wang 已提交
2384
      char *tsDefaultJSONStrType = "nchar";  // todo
2385 2386
      smlConvertJSONString(kv, tsDefaultJSONStrType, root);
      break;
wmmhello's avatar
wmmhello 已提交
2387
    }
2388 2389 2390 2391 2392 2393 2394 2395 2396 2397
    case cJSON_Object: {
      int32_t ret = smlParseValueFromJSONObj(root, kv);
      if (ret != TSDB_CODE_SUCCESS) {
        uError("OTD:Failed to parse value from JSON Obj");
        return ret;
      }
      break;
    }
    default:
      return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2398
  }
2399 2400 2401 2402

  return TSDB_CODE_SUCCESS;
}

2403
static int32_t smlParseColsFromJSON(cJSON *root, SSmlKv *kv) {
2404 2405 2406 2407 2408
  cJSON *metricVal = cJSON_GetObjectItem(root, "value");
  if (metricVal == NULL) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

2409
  int32_t ret = smlParseValueFromJSON(metricVal, kv);
2410 2411 2412
  if (ret != TSDB_CODE_SUCCESS) {
    return ret;
  }
2413

2414
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2415 2416
}

2417
static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) {
2418
  int32_t ret = TSDB_CODE_SUCCESS;
2419

2420
  cJSON *tags = cJSON_GetObjectItem(root, "tags");
2421
  if (unlikely(tags == NULL || tags->type != cJSON_Object)) {
2422 2423
    return TSDB_CODE_TSC_INVALID_JSON;
  }
wmmhello's avatar
wmmhello 已提交
2424

2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467
  // add measure to tags to identify one child table
  cJSON *cMeasure = cJSON_AddStringToObject(tags, JSON_METERS_NAME, elements->measure);
  if(unlikely(cMeasure == NULL)){
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  if(is_same_child_table_json(elements->tags, info->preLine.tags) == 0){
    return TSDB_CODE_SUCCESS;
  }

  bool isSameMeasure = IS_SAME_SUPER_TABLE;

  int     cnt = 0;
  SArray *preLineKV = info->preLineTagKV;
  bool    isSuperKVInit = true;
  SArray *superKV = NULL;
  if(info->dataFormat){
    if(unlikely(!isSameMeasure)){
      SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);

      if(unlikely(sMeta == NULL)){
        sMeta = smlBuildSTableMeta(info->dataFormat);
        STableMeta * pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen);
        sMeta->tableMeta = pTableMeta;
        if(pTableMeta == NULL){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
        nodeListSet(&info->superTables, elements->measure, elements->measureLen, sMeta);
      }
      info->currSTableMeta = sMeta->tableMeta;
      superKV = sMeta->tags;

      if(unlikely(taosArrayGetSize(superKV) == 0)){
        isSuperKVInit = false;
      }
      taosArraySetSize(preLineKV, 0);
    }
  }else{
    taosArraySetSize(preLineKV, 0);
  }

2468 2469 2470
  int32_t tagNum = cJSON_GetArraySize(tags);
  for (int32_t i = 0; i < tagNum; ++i) {
    cJSON *tag = cJSON_GetArrayItem(tags, i);
2471
    if (unlikely(tag == NULL)) {
2472
      return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2473
    }
2474
    if(unlikely(tag == cMeasure)) continue;
2475
    size_t keyLen = strlen(tag->string);
2476
    if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
2477 2478 2479 2480
      uError("OTD:Tag key length is 0 or too large than 64");
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
    }

2481
    // add kv to SSmlKv
2482
    SSmlKv kv ={.key = tag->string, .keyLen = keyLen};
X
Xiaoyu Wang 已提交
2483
    // value
2484
    ret = smlParseValueFromJSON(tag, &kv);
2485
    if (unlikely(ret != TSDB_CODE_SUCCESS)) {
2486 2487
      return ret;
    }
2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565

    if(info->dataFormat){
      if(unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)){
        info->needModifySchema = true;
      }

      if(isSameMeasure){
        if(unlikely(cnt >= taosArrayGetSize(preLineKV))) {
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
        SSmlKv *preKV = taosArrayGet(preLineKV, cnt);
        if(unlikely(kv.length > preKV->length)){
          preKV->length = kv.length;
          SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
          ASSERT(tableMeta != NULL);

          SSmlKv *oldKV = taosArrayGet(tableMeta->tags, cnt);
          oldKV->length = kv.length;
          info->needModifySchema = true;
        }
        if(unlikely(!IS_SAME_KEY)){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
      }else{
        if(isSuperKVInit){
          if(unlikely(cnt >= taosArrayGetSize(superKV))) {
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }
          SSmlKv *preKV = taosArrayGet(superKV, cnt);
          if(unlikely(kv.length > preKV->length)) {
            preKV->length = kv.length;
          }else{
            kv.length = preKV->length;
          }
          info->needModifySchema = true;

          if(unlikely(!IS_SAME_KEY)){
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }
        }else{
          taosArrayPush(superKV, &kv);
        }
        taosArrayPush(preLineKV, &kv);
      }
    }else{
      taosArrayPush(preLineKV, &kv);
    }
    cnt++;
  }

  void* oneTable = nodeListGet(info->childTables, elements->tags, POINTER_BYTES, is_same_child_table_json);
  if ((oneTable != NULL)) {
    return TSDB_CODE_SUCCESS;
  }

  SSmlTableInfo *tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen);
  if (unlikely(!tinfo)) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  for(int i = 0; i < taosArrayGetSize(preLineKV); i++){
    taosArrayPush(tinfo->tags, taosArrayGet(preLineKV, i));
  }
  smlSetCTableName(tinfo);
  if(info->dataFormat) {
    info->currSTableMeta->uid = tinfo->uid;
    tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
    if(tinfo->tableDataCtx == NULL){
      smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
      return TSDB_CODE_SML_INVALID_DATA;
    }
wmmhello's avatar
wmmhello 已提交
2566 2567
  }

2568 2569
  nodeListSet(&info->childTables, tags, POINTER_BYTES, tinfo);

2570
  return ret;
wmmhello's avatar
wmmhello 已提交
2571 2572
}

2573
static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) {
2574
  int32_t ret = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2575

2576
  int32_t size = cJSON_GetArraySize(root);
X
Xiaoyu Wang 已提交
2577
  // outmost json fields has to be exactly 4
2578
  if (unlikely(size != OTD_JSON_FIELDS_NUM)) {
X
Xiaoyu Wang 已提交
2579
    uError("OTD:0x%" PRIx64 " Invalid number of JSON fields in data point %d", info->id, size);
2580
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2581
  }
2582

X
Xiaoyu Wang 已提交
2583
  // Parse metric
2584 2585
  ret = smlParseMetricFromJSON(info, root, elements);
  if (unlikely(ret != TSDB_CODE_SUCCESS)) {
X
Xiaoyu Wang 已提交
2586
    uError("OTD:0x%" PRIx64 " Unable to parse metric from JSON payload", info->id);
2587
    return ret;
wmmhello's avatar
wmmhello 已提交
2588
  }
X
Xiaoyu Wang 已提交
2589
  uDebug("OTD:0x%" PRIx64 " Parse metric from JSON payload finished", info->id);
wmmhello's avatar
wmmhello 已提交
2590

X
Xiaoyu Wang 已提交
2591
  // Parse metric value
2592 2593 2594
  SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN};
  ret = smlParseColsFromJSON(root, &kv);
  if (unlikely(ret)) {
X
Xiaoyu Wang 已提交
2595
    uError("OTD:0x%" PRIx64 " Unable to parse metric value from JSON payload", info->id);
2596
    return ret;
2597
  }
X
Xiaoyu Wang 已提交
2598
  uDebug("OTD:0x%" PRIx64 " Parse metric value from JSON payload finished", info->id);
2599

2600 2601 2602
  // Parse tags
  ret = smlParseTagsFromJSON(info, root, elements);
  if (unlikely(ret)) {
X
Xiaoyu Wang 已提交
2603
    uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
2604
    return ret;
2605
  }
X
Xiaoyu Wang 已提交
2606
  uDebug("OTD:0x%" PRIx64 " Parse tags from JSON payload finished", info->id);
wmmhello's avatar
wmmhello 已提交
2607

2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639
  if(unlikely(info->reRun)){
    return TSDB_CODE_SUCCESS;
  }

  // Parse timestamp
  // notice!!! put ts back to tag to ensure get meta->precision
  int64_t ts = smlParseTSFromJSON(info, root);
  if (unlikely(ts < 0)) {
    uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id);
    return TSDB_CODE_INVALID_TIMESTAMP;
  }
  uDebug("OTD:0x%" PRIx64 " Parse timestamp from JSON payload finished", info->id);
  SSmlKv kvTs = { .key = TS, .keyLen = TS_LEN, .i = ts, .type = TSDB_DATA_TYPE_TIMESTAMP, .length = (int16_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};

  if(info->dataFormat){
    ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kvTs, 0);
    if(ret == TSDB_CODE_SUCCESS){
      ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 1);
    }
    if(ret == TSDB_CODE_SUCCESS){
      ret = smlBuildRow(info->currTableDataCtx);
    }
    if (unlikely(ret != TSDB_CODE_SUCCESS)) {
      smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
      return ret;
    }
  }else{
    taosArrayPush(elements->colArray, &kvTs);
    taosArrayPush(elements->colArray, &kv);
  }
  info->preLine = *elements;

2640
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2641
}
2642
/************* TSDB_SML_JSON_PROTOCOL function end **************/
2643 2644 2645
static int32_t smlParseLineBottom(SSmlHandle *info) {
  if(info->dataFormat) return TSDB_CODE_SUCCESS;

wmmhello's avatar
wmmhello 已提交
2646 2647
  for(int32_t i = 0; i < info->lineNum; i ++){
    SSmlLineInfo* elements = info->lines + i;
2648 2649 2650 2651 2652 2653 2654
    SSmlTableInfo *tinfo = NULL;
    if(info->protocol != TSDB_SML_JSON_PROTOCOL){
      tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements->measure, elements->measureTagsLen, NULL);
    }else{
      tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements->tags, POINTER_BYTES, is_same_child_table_json);
    }

wmmhello's avatar
wmmhello 已提交
2655
    if(tinfo == NULL){
2656 2657 2658
      uError("SML:0x%" PRIx64 "get oneTable failed, line num:%d", info->id, i);
      smlBuildInvalidDataMsg(&info->msgBuf, "get oneTable failed", elements->measure);
      return TSDB_CODE_SML_INVALID_DATA;
2659
    }
wmmhello's avatar
wmmhello 已提交
2660

2661 2662 2663 2664 2665
    if (taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS) {
      smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
      return TSDB_CODE_PAR_INVALID_TAGS_NUM;
    }

2666 2667 2668
    if (taosArrayGetSize(elements->colArray) + taosArrayGetSize(tinfo->tags) > TSDB_MAX_COLUMNS) {
      smlBuildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL);
      return TSDB_CODE_PAR_TOO_MANY_COLUMNS;
wmmhello's avatar
wmmhello 已提交
2669
    }
wmmhello's avatar
wmmhello 已提交
2670

2671
    int ret = smlPushCols(tinfo->cols, elements->colArray);
X
Xiaoyu Wang 已提交
2672
    if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2673 2674 2675
      return ret;
    }

2676
    SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
2677
    if (tableMeta) {  // update meta
wmmhello's avatar
wmmhello 已提交
2678
      ret = smlUpdateMeta(tableMeta->colHash, tableMeta->cols, elements->colArray, false, &info->msgBuf);
2679
      if (ret == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2680
        ret = smlUpdateMeta(tableMeta->tagHash, tableMeta->tags, tinfo->tags, true, &info->msgBuf);
2681 2682 2683 2684 2685
      }
      if (ret != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
        return ret;
      }
X
Xiaoyu Wang 已提交
2686
    } else {
2687 2688 2689 2690 2691
      ret = smlJudgeDupColName(elements->colArray, tinfo->tags, &info->msgBuf);
      if (ret != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
        return ret;
      }
wmmhello's avatar
wmmhello 已提交
2692

2693 2694 2695
      SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat);
      smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags);
      smlInsertMeta(meta->colHash, meta->cols, elements->colArray);
wmmhello's avatar
wmmhello 已提交
2696
      nodeListSet(&info->superTables, elements->measure, elements->measureLen, meta);
wmmhello's avatar
wmmhello 已提交
2697
    }
wmmhello's avatar
wmmhello 已提交
2698
  }
2699

wmmhello's avatar
wmmhello 已提交
2700 2701 2702
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
2703
static int32_t smlParseJSON(SSmlHandle *info, char *payload) {
2704 2705
  int32_t payloadNum = 0;
  int32_t ret = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2706

2707
  if (unlikely(payload == NULL)) {
X
Xiaoyu Wang 已提交
2708
    uError("SML:0x%" PRIx64 " empty JSON Payload", info->id);
2709
    return TSDB_CODE_TSC_INVALID_JSON;
2710
  }
2711

wmmhello's avatar
wmmhello 已提交
2712
  info->root = cJSON_Parse(payload);
2713
  if (unlikely(info->root == NULL)) {
X
Xiaoyu Wang 已提交
2714
    uError("SML:0x%" PRIx64 " parse json failed:%s", info->id, payload);
2715 2716
    return TSDB_CODE_TSC_INVALID_JSON;
  }
X
Xiaoyu Wang 已提交
2717
  // multiple data points must be sent in JSON array
2718
  if (cJSON_IsArray(info->root)) {
wmmhello's avatar
wmmhello 已提交
2719
    payloadNum = cJSON_GetArraySize(info->root);
2720 2721
  } else if (cJSON_IsObject(info->root)) {
    payloadNum = 1;
2722
  } else {
X
Xiaoyu Wang 已提交
2723
    uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
2724
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2725
  }
wmmhello's avatar
wmmhello 已提交
2726

2727 2728
  int32_t i = 0;
  while (i < payloadNum) {
wmmhello's avatar
wmmhello 已提交
2729
    cJSON *dataPoint = (payloadNum == 1 && cJSON_IsObject(info->root)) ? info->root : cJSON_GetArrayItem(info->root, i);
2730 2731 2732 2733 2734 2735 2736
    if(info->dataFormat) {
      SSmlLineInfo element = {0};
      ret = smlParseJSONString(info, dataPoint, &element);
    }else{
      ret = smlParseJSONString(info, dataPoint, info->lines + i);
    }
    if (unlikely(ret != TSDB_CODE_SUCCESS)) {
X
Xiaoyu Wang 已提交
2737
      uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777
      return ret;
    }

    if(unlikely(info->reRun)){
      i = 0;
      info->reRun = false;
      // clear info->childTables
      NodeList* pList = info->childTables;
      while (pList) {
        if(pList->data.used) {
          smlDestroyTableInfo(pList->data.value);
          pList->data.used = false;
        }
        pList = pList->next;
      }

      // clear info->superTables
      pList = info->superTables;
      while (pList) {
        if(pList->data.used) {
          smlDestroySTableMeta(pList->data.value);
          pList->data.used = false;
        }
        pList = pList->next;
      }

      if(unlikely(info->lines != NULL)){
        uError("SML:0x%" PRIx64 " info->lines != NULL", info->id);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      info->lineNum = payloadNum;
      info->lines = taosMemoryCalloc(info->lineNum, sizeof(SSmlLineInfo));
      for(int j = 0; j < info->lineNum; j++){
        info->lines[j].colArray = taosArrayInit(8, sizeof(SSmlKv));
      }
      memset(&info->preLine, 0, sizeof(SSmlLineInfo));
      SVnodeModifOpStmt* stmt= (SVnodeModifOpStmt*)(info->pQuery->pRoot);
      stmt->freeHashFunc(stmt->pTableBlockHashObj);
      stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
      continue;
2778 2779 2780
    }
  }

2781
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2782
}
2783

X
Xiaoyu Wang 已提交
2784
static int32_t smlInsertData(SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
2785 2786
  int32_t code = TSDB_CODE_SUCCESS;

wmmhello's avatar
wmmhello 已提交
2787 2788 2789
  NodeList* tmp = info->childTables;
  while (tmp) {
    SSmlTableInfo *tableData = (SSmlTableInfo *)tmp->data.value;
wmmhello's avatar
wmmhello 已提交
2790 2791

    SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
wmmhello's avatar
wmmhello 已提交
2792
    tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
wmmhello's avatar
wmmhello 已提交
2793
    memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName));
D
dapan1121 已提交
2794 2795 2796 2797 2798 2799

    SRequestConnInfo conn = {0};
    conn.pTrans = info->taos->pAppInfo->pTransporter;
    conn.requestId = info->pRequest->requestId;
    conn.requestObjRefId = info->pRequest->self;
    conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
X
Xiaoyu Wang 已提交
2800

wmmhello's avatar
wmmhello 已提交
2801
    SVgroupInfo vg;
D
dapan1121 已提交
2802
    code = catalogGetTableHashVgroup(info->pCatalog, &conn, &pName, &vg);
2803
    if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2804
      uError("SML:0x%" PRIx64 " catalogGetTableHashVgroup failed. table name: %s", info->id, tableData->childTableName);
wmmhello's avatar
wmmhello 已提交
2805 2806
      return code;
    }
X
Xiaoyu Wang 已提交
2807
    taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
wmmhello's avatar
wmmhello 已提交
2808

wmmhello's avatar
wmmhello 已提交
2809
    SSmlSTableMeta *pMeta =
2810
        (SSmlSTableMeta *)nodeListGet(info->superTables, tableData->sTableName, tableData->sTableNameLen, NULL);
wmmhello's avatar
wmmhello 已提交
2811
    ASSERT(NULL != pMeta);
wmmhello's avatar
wmmhello 已提交
2812

2813
    // use tablemeta of stable to save vgid and uid of child table
wmmhello's avatar
wmmhello 已提交
2814 2815
    pMeta->tableMeta->vgId = vg.vgId;
    pMeta->tableMeta->uid = tableData->uid;  // one table merge data block together according uid
wmmhello's avatar
wmmhello 已提交
2816

wmmhello's avatar
wmmhello 已提交
2817 2818
    code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, pMeta->cols, tableData->cols,
                       pMeta->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen,
2819
                       info->ttl, info->msgBuf.buf, info->msgBuf.len);
X
Xiaoyu Wang 已提交
2820 2821
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " smlBindData failed", info->id);
wmmhello's avatar
wmmhello 已提交
2822 2823
      return code;
    }
wmmhello's avatar
wmmhello 已提交
2824
    tmp = tmp->next;
wmmhello's avatar
wmmhello 已提交
2825
  }
wmmhello's avatar
wmmhello 已提交
2826

wmmhello's avatar
wmmhello 已提交
2827
  code = smlBuildOutput(info->pQuery, info->pVgHash);
2828
  if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2829
    uError("SML:0x%" PRIx64 " smlBuildOutput failed", info->id);
2830 2831
    return code;
  }
2832 2833
  info->cost.insertRpcTime = taosGetTimestampUs();

2834 2835 2836
  SAppClusterSummary *pActivity = &info->taos->pAppInfo->summary;
  atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);

wmmhello's avatar
wmmhello 已提交
2837 2838
  launchQueryImpl(info->pRequest, info->pQuery, true, NULL);
  return info->pRequest->code;
wmmhello's avatar
wmmhello 已提交
2839 2840
}

X
Xiaoyu Wang 已提交
2841 2842
static void smlPrintStatisticInfo(SSmlHandle *info) {
  uError("SML:0x%" PRIx64
2843
             " smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \
X
Xiaoyu Wang 已提交
2844
        parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64
2845
             "",
X
Xiaoyu Wang 已提交
2846
         info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables,
2847 2848
         info->cost.numOfCreateSTables, info->cost.numOfAlterTagSTables, info->cost.numOfAlterColSTables,
         info->cost.schemaTime - info->cost.parseTime,
X
Xiaoyu Wang 已提交
2849 2850
         info->cost.insertBindTime - info->cost.schemaTime, info->cost.insertRpcTime - info->cost.insertBindTime,
         info->cost.endTime - info->cost.insertRpcTime, info->cost.endTime - info->cost.parseTime);
2851 2852
}

dengyihao's avatar
dengyihao 已提交
2853
static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
wmmhello's avatar
wmmhello 已提交
2854
  int32_t code = TSDB_CODE_SUCCESS;
2855
  if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
dengyihao's avatar
dengyihao 已提交
2856
    if (lines) {
wmmhello's avatar
wmmhello 已提交
2857
      code = smlParseJSON(info, *lines);
dengyihao's avatar
dengyihao 已提交
2858
    } else if (rawLine) {
wmmhello's avatar
wmmhello 已提交
2859 2860
      code = smlParseJSON(info, rawLine);
    }
2861
    if (code != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
2862
      uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, lines ? *lines : rawLine);
2863 2864
      return code;
    }
wmmhello's avatar
wmmhello 已提交
2865
    return code;
wmmhello's avatar
wmmhello 已提交
2866
  }
wmmhello's avatar
wmmhello 已提交
2867

2868 2869
  int32_t i = 0;
  while (i < numLines) {
wmmhello's avatar
wmmhello 已提交
2870
    char *tmp = NULL;
dengyihao's avatar
dengyihao 已提交
2871 2872
    int   len = 0;
    if (lines) {
wmmhello's avatar
wmmhello 已提交
2873 2874
      tmp = lines[i];
      len = strlen(tmp);
dengyihao's avatar
dengyihao 已提交
2875
    } else if (rawLine) {
wmmhello's avatar
wmmhello 已提交
2876
      tmp = rawLine;
dengyihao's avatar
dengyihao 已提交
2877 2878
      while (rawLine < rawLineEnd) {
        if (*(rawLine++) == '\n') {
wmmhello's avatar
wmmhello 已提交
2879 2880 2881 2882
          break;
        }
        len++;
      }
dengyihao's avatar
dengyihao 已提交
2883
      if (info->protocol == TSDB_SML_LINE_PROTOCOL && tmp[0] == '#') {  // this line is comment
2884
        i++;
wmmhello's avatar
wmmhello 已提交
2885 2886
        continue;
      }
wmmhello's avatar
wmmhello 已提交
2887 2888
    }

2889 2890
    uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, (info->isRawLine ? "rawdata" : tmp));

X
Xiaoyu Wang 已提交
2891
    if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
wmmhello's avatar
wmmhello 已提交
2892 2893 2894 2895 2896 2897
      if(info->dataFormat){
        SSmlLineInfo element = {0};
        code = smlParseInfluxString(info, tmp, tmp + len, &element);
      }else{
        code = smlParseInfluxString(info, tmp, tmp + len, info->lines + i);
      }
X
Xiaoyu Wang 已提交
2898
    } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
2899 2900 2901 2902 2903 2904 2905
      if(info->dataFormat) {
        SSmlLineInfo element = {0};
        code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, &element);
      }else{
        code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, info->lines + i);
      }

X
Xiaoyu Wang 已提交
2906
    } else {
2907 2908
      ASSERT(0);
    }
wmmhello's avatar
wmmhello 已提交
2909
    if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2910
      uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, tmp);
2911
      return code;
wmmhello's avatar
wmmhello 已提交
2912
    }
2913 2914 2915 2916
    if(info->reRun){
      i = 0;
      info->reRun = false;
      // clear info->childTables
wmmhello's avatar
wmmhello 已提交
2917 2918 2919 2920 2921 2922 2923
      NodeList* pList = info->childTables;
      while (pList) {
        if(pList->data.used) {
          smlDestroyTableInfo(pList->data.value);
          pList->data.used = false;
        }
        pList = pList->next;
2924 2925 2926
      }

      // clear info->superTables
wmmhello's avatar
wmmhello 已提交
2927 2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938
      pList = info->superTables;
      while (pList) {
        if(pList->data.used) {
          smlDestroySTableMeta(pList->data.value);
          pList->data.used = false;
        }
        pList = pList->next;
      }

      if(info->lines != NULL){
        uError("SML:0x%" PRIx64 " info->lines != NULL", info->id);
        return TSDB_CODE_SML_INVALID_DATA;
2939
      }
wmmhello's avatar
wmmhello 已提交
2940 2941
      info->lines = taosMemoryCalloc(info->lineNum, sizeof(SSmlLineInfo));

2942 2943 2944 2945 2946 2947 2948
      for(int j = 0; j < info->lineNum; j++){
        info->lines[j].colArray = taosArrayInit(8, sizeof(SSmlKv));
      }
      memset(&info->preLine, 0, sizeof(SSmlLineInfo));
      SVnodeModifOpStmt* stmt= (SVnodeModifOpStmt*)(info->pQuery->pRoot);
      stmt->freeHashFunc(stmt->pTableBlockHashObj);
      stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
2949
      continue;
2950
    }
2951
    i++;
wmmhello's avatar
wmmhello 已提交
2952
  }
2953

2954 2955 2956
  return code;
}

dengyihao's avatar
dengyihao 已提交
2957
static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
2958
  int32_t code = TSDB_CODE_SUCCESS;
2959 2960
  int32_t retryNum = 0;

2961 2962
  info->cost.parseTime = taosGetTimestampUs();

wmmhello's avatar
wmmhello 已提交
2963
  code = smlParseLine(info, lines, rawLine, rawLineEnd, numLines);
2964
  if (code != 0) {
X
Xiaoyu Wang 已提交
2965
    uError("SML:0x%" PRIx64 " smlParseLine error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2966
    return code;
2967
  }
wmmhello's avatar
wmmhello 已提交
2968

2969 2970 2971 2972 2973 2974
  code = smlParseLineBottom(info);
  if (code != 0) {
    uError("SML:0x%" PRIx64 " smlParseLineBottom error : %s", info->id, tstrerror(code));
    return code;
  }

2975
  info->cost.lineNum = numLines;
wmmhello's avatar
wmmhello 已提交
2976 2977
  info->cost.numOfSTables = nodeListSize(info->superTables);
  info->cost.numOfCTables = nodeListSize(info->childTables);
2978 2979

  info->cost.schemaTime = taosGetTimestampUs();
2980

X
Xiaoyu Wang 已提交
2981
  do {
2982 2983
    code = smlModifyDBSchemas(info);
    if (code == 0) break;
wmmhello's avatar
wmmhello 已提交
2984
  } while (retryNum++ < nodeListSize(info->superTables) * MAX_RETRY_TIMES);
2985

wmmhello's avatar
wmmhello 已提交
2986
  if (code != 0) {
X
Xiaoyu Wang 已提交
2987
    uError("SML:0x%" PRIx64 " smlModifyDBSchemas error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2988
    return code;
wmmhello's avatar
wmmhello 已提交
2989
  }
wmmhello's avatar
wmmhello 已提交
2990

2991
  info->cost.insertBindTime = taosGetTimestampUs();
wmmhello's avatar
wmmhello 已提交
2992 2993
  code = smlInsertData(info);
  if (code != 0) {
X
Xiaoyu Wang 已提交
2994
    uError("SML:0x%" PRIx64 " smlInsertData error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2995
    return code;
wmmhello's avatar
wmmhello 已提交
2996 2997 2998 2999 3000
  }

  return code;
}

wmmhello's avatar
wmmhello 已提交
3001 3002 3003 3004 3005
TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, char *rawLineEnd,
                                       int numLines, int protocol, int precision, int32_t ttl, int64_t reqid) {
  if (NULL == taos) {
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    return NULL;
3006
  }
3007

wmmhello's avatar
wmmhello 已提交
3008 3009 3010 3011
  SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
  if (request == NULL) {
    uError("SML:taos_schemaless_insert error request is null");
    return NULL;
wmmhello's avatar
wmmhello 已提交
3012
  }
3013

wmmhello's avatar
wmmhello 已提交
3014 3015 3016 3017
  SSmlHandle *info = smlBuildSmlInfo(taos);
  if (info == NULL) {
    request->code = TSDB_CODE_OUT_OF_MEMORY;
    uError("SML:taos_schemaless_insert error SSmlHandle is null");
wmmhello's avatar
wmmhello 已提交
3018 3019
    goto end;
  }
wmmhello's avatar
wmmhello 已提交
3020 3021 3022 3023 3024 3025 3026 3027
  info->pRequest = request;
  info->isRawLine = rawLine != NULL;
  info->ttl       = ttl;
  info->precision = precision;
  info->protocol = protocol;
  info->msgBuf.buf = info->pRequest->msgBuf;
  info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
  info->lineNum = numLines;
wmmhello's avatar
wmmhello 已提交
3028

wmmhello's avatar
wmmhello 已提交
3029
  SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
wmmhello's avatar
wmmhello 已提交
3030 3031 3032
  if (request->pDb == NULL) {
    request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
wmmhello's avatar
wmmhello 已提交
3033 3034 3035
    goto end;
  }

X
Xiaoyu Wang 已提交
3036
  if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) {
3037
    request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
wmmhello's avatar
wmmhello 已提交
3038
    smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL);
3039
    goto end;
wmmhello's avatar
wmmhello 已提交
3040 3041
  }

X
Xiaoyu Wang 已提交
3042 3043
  if (protocol == TSDB_SML_LINE_PROTOCOL &&
      (precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)) {
3044
    request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
wmmhello's avatar
wmmhello 已提交
3045
    smlBuildInvalidDataMsg(&msg, "precision invalidate for line protocol", NULL);
3046 3047 3048
    goto end;
  }

3049
  if (protocol == TSDB_SML_JSON_PROTOCOL) {
wmmhello's avatar
wmmhello 已提交
3050
    numLines = 1;
3051
  } else if (numLines <= 0) {
wmmhello's avatar
wmmhello 已提交
3052 3053 3054 3055 3056
    request->code = TSDB_CODE_SML_INVALID_DATA;
    smlBuildInvalidDataMsg(&msg, "line num is invalid", NULL);
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
3057 3058 3059 3060 3061
  int32_t code = smlProcess(info, lines, rawLine, rawLineEnd, numLines);
  request->code = code;
  info->cost.endTime = taosGetTimestampUs();
  info->cost.code = code;
  smlPrintStatisticInfo(info);
3062

wmmhello's avatar
wmmhello 已提交
3063
end:
3064
  uDebug("resultend:%s", request->msgBuf);
wmmhello's avatar
wmmhello 已提交
3065
  smlDestroyInfo(info);
3066
  return (TAOS_RES *)request;
wmmhello's avatar
wmmhello 已提交
3067
}
wmmhello's avatar
wmmhello 已提交
3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084

/**
 * taos_schemaless_insert() parse and insert data points into database according to
 * different protocol.
 *
 * @param $lines input array may contain multiple lines, each line indicates a data point.
 *               If protocol=2 is used input array should contain single JSON
 *               string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
 *               multiple data points in JSON format, should include them in $JSON_string
 *               as a JSON array.
 * @param $numLines indicates how many data points in $lines.
 *                  If protocol = 2 is used this param will be ignored as $lines should
 *                  contain single JSON string.
 * @param $protocol indicates which protocol to use for parsing:
 *                  0 - influxDB line protocol
 *                  1 - OpenTSDB telnet line protocol
 *                  2 - OpenTSDB JSON format protocol
dengyihao's avatar
dengyihao 已提交
3085
 * @return TAOS_RES
wmmhello's avatar
wmmhello 已提交
3086 3087
 */

3088 3089
TAOS_RES *taos_schemaless_insert_ttl_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision,
                                                int32_t ttl, int64_t reqid) {
wmmhello's avatar
wmmhello 已提交
3090
  return taos_schemaless_insert_inner(taos, lines, NULL, NULL, numLines, protocol, precision, ttl, reqid);
dengyihao's avatar
dengyihao 已提交
3091 3092
}

3093 3094 3095
TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, TSDB_DEFAULT_TABLE_TTL, 0);
}
wmmhello's avatar
wmmhello 已提交
3096

3097 3098 3099
TAOS_RES *taos_schemaless_insert_ttl(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int32_t ttl) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, ttl, 0);
}
wmmhello's avatar
wmmhello 已提交
3100

3101 3102
TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int64_t reqid) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, TSDB_DEFAULT_TABLE_TTL, reqid);
wmmhello's avatar
wmmhello 已提交
3103 3104
}

3105
TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
3106
                                                    int precision, int32_t ttl, int64_t reqid) {
dengyihao's avatar
dengyihao 已提交
3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118
  int numLines = 0;
  *totalRows = 0;
  char *tmp = lines;
  for (int i = 0; i < len; i++) {
    if (lines[i] == '\n' || i == len - 1) {
      numLines++;
      if (tmp[0] != '#' || protocol != TSDB_SML_LINE_PROTOCOL) {  // ignore comment
        (*totalRows)++;
      }
      tmp = lines + i + 1;
    }
  }
wmmhello's avatar
wmmhello 已提交
3119
  return taos_schemaless_insert_inner(taos, NULL, lines, lines + len, *totalRows, protocol, precision, ttl, reqid);
dengyihao's avatar
dengyihao 已提交
3120 3121
}

3122 3123 3124 3125 3126 3127
TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int64_t reqid) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, TSDB_DEFAULT_TABLE_TTL, reqid);
}
TAOS_RES *taos_schemaless_insert_raw_ttl(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int32_t ttl) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, ttl, 0);
}
wmmhello's avatar
wmmhello 已提交
3128

3129 3130
TAOS_RES *taos_schemaless_insert_raw(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, TSDB_DEFAULT_TABLE_TTL, 0);
wmmhello's avatar
wmmhello 已提交
3131
}