clientSml.c 101.9 KB
Newer Older
wmmhello's avatar
wmmhello 已提交
1 2 3 4 5
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

X
Xiaoyu Wang 已提交
6 7 8 9 10
#include "cJSON.h"
#include "catalog.h"
#include "clientInt.h"
#include "osSemaphore.h"
#include "osThread.h"
wmmhello's avatar
wmmhello 已提交
11 12
#include "query.h"
#include "taos.h"
wmmhello's avatar
wmmhello 已提交
13
#include "taoserror.h"
X
Xiaoyu Wang 已提交
14
#include "tcommon.h"
wmmhello's avatar
wmmhello 已提交
15
#include "tdef.h"
X
Xiaoyu Wang 已提交
16
#include "tglobal.h"
wmmhello's avatar
wmmhello 已提交
17 18
#include "tlog.h"
#include "tmsg.h"
X
Xiaoyu Wang 已提交
19
#include "tname.h"
wmmhello's avatar
wmmhello 已提交
20 21
#include "ttime.h"
#include "ttypes.h"
wmmhello's avatar
wmmhello 已提交
22

23 24 25 26 27 28 29 30 31 32 33 34 35
#if (defined(__GNUC__) && (__GNUC__ >= 3)) || (defined(__INTEL_COMPILER) && (__INTEL_COMPILER >= 800)) || defined(__clang__)
#  define expect(expr,value)    (__builtin_expect ((expr),(value)) )
#else
#  define expect(expr,value)    (expr)
#endif

#ifndef likely
#define likely(expr)     expect((expr) != 0, 1)
#endif
#ifndef unlikely
#define unlikely(expr)   expect((expr) != 0, 0)
#endif

wmmhello's avatar
wmmhello 已提交
36
//=================================================================================================
wmmhello's avatar
wmmhello 已提交
37 38 39 40 41 42 43

#define SPACE ' '
#define COMMA ','
#define EQUAL '='
#define QUOTE '"'
#define SLASH '\\'

dengyihao's avatar
dengyihao 已提交
44 45
#define JUMP_SPACE(sql, sqlEnd) \
  while (sql < sqlEnd) {        \
46
    if (unlikely(*sql == SPACE))          \
dengyihao's avatar
dengyihao 已提交
47 48 49
      sql++;                    \
    else                        \
      break;                    \
X
Xiaoyu Wang 已提交
50
  }
wmmhello's avatar
wmmhello 已提交
51
// comma ,
wmmhello's avatar
wmmhello 已提交
52
//#define IS_SLASH_COMMA(sql) (*(sql) == COMMA && *((sql)-1) == SLASH)
X
Xiaoyu Wang 已提交
53
#define IS_COMMA(sql)       (*(sql) == COMMA && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
54
// space
wmmhello's avatar
wmmhello 已提交
55
//#define IS_SLASH_SPACE(sql) (*(sql) == SPACE && *((sql)-1) == SLASH)
X
Xiaoyu Wang 已提交
56
#define IS_SPACE(sql)       (*(sql) == SPACE && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
57
// equal =
wmmhello's avatar
wmmhello 已提交
58
//#define IS_SLASH_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) == SLASH)
X
Xiaoyu Wang 已提交
59
#define IS_EQUAL(sql)       (*(sql) == EQUAL && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
60
// quote "
wmmhello's avatar
wmmhello 已提交
61
//#define IS_SLASH_QUOTE(sql) (*(sql) == QUOTE && *((sql)-1) == SLASH)
X
Xiaoyu Wang 已提交
62
#define IS_QUOTE(sql)       (*(sql) == QUOTE && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
63
// SLASH
wmmhello's avatar
wmmhello 已提交
64
//#define IS_SLASH_SLASH(sql) (*(sql) == SLASH && *((sql)-1) == SLASH)
wmmhello's avatar
wmmhello 已提交
65

X
Xiaoyu Wang 已提交
66
#define IS_SLASH_LETTER(sql) \
wmmhello's avatar
wmmhello 已提交
67 68
  (*((sql)-1) == SLASH && (*(sql) == COMMA || *(sql) == SPACE || *(sql) == EQUAL || *(sql) == QUOTE || *(sql) == SLASH))                          \
//  (IS_SLASH_COMMA(sql) || IS_SLASH_SPACE(sql) || IS_SLASH_EQUAL(sql) || IS_SLASH_QUOTE(sql) || IS_SLASH_SLASH(sql))
wmmhello's avatar
wmmhello 已提交
69

X
Xiaoyu Wang 已提交
70
#define MOVE_FORWARD_ONE(sql, len) (memmove((void *)((sql)-1), (sql), len))
wmmhello's avatar
wmmhello 已提交
71

X
Xiaoyu Wang 已提交
72 73 74 75 76 77 78 79
#define PROCESS_SLASH(key, keyLen)           \
  for (int i = 1; i < keyLen; ++i) {         \
    if (IS_SLASH_LETTER(key + i)) {          \
      MOVE_FORWARD_ONE(key + i, keyLen - i); \
      i--;                                   \
      keyLen--;                              \
    }                                        \
  }
wmmhello's avatar
wmmhello 已提交
80

81 82 83
#define IS_INVALID_COL_LEN(len)   ((len) <= 0 || (len) >= TSDB_COL_NAME_LEN)
#define IS_INVALID_TABLE_LEN(len) ((len) <= 0 || (len) >= TSDB_TABLE_NAME_LEN)

84 85 86
#define OTD_JSON_SUB_FIELDS_NUM 2
#define OTD_JSON_FIELDS_NUM     4

X
Xiaoyu Wang 已提交
87 88 89 90
#define TS        "_ts"
#define TS_LEN    3
#define VALUE     "_value"
#define VALUE_LEN 6
91

92 93
#define JSON_METERS_NAME "__JM"

X
Xiaoyu Wang 已提交
94 95
#define BINARY_ADD_LEN 2  // "binary"   2 means " "
#define NCHAR_ADD_LEN  3  // L"nchar"   3 means L" "
wmmhello's avatar
wmmhello 已提交
96 97

#define MAX_RETRY_TIMES 5
wmmhello's avatar
wmmhello 已提交
98 99 100 101
//=================================================================================================
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;

typedef enum {
102
  SCHEMA_ACTION_NULL,
wmmhello's avatar
wmmhello 已提交
103 104 105 106 107
  SCHEMA_ACTION_CREATE_STABLE,
  SCHEMA_ACTION_ADD_COLUMN,
  SCHEMA_ACTION_ADD_TAG,
  SCHEMA_ACTION_CHANGE_COLUMN_SIZE,
  SCHEMA_ACTION_CHANGE_TAG_SIZE,
wmmhello's avatar
wmmhello 已提交
108 109
} ESchemaAction;

wmmhello's avatar
wmmhello 已提交
110 111 112 113 114 115 116 117 118 119 120 121 122
/*********************** list start *********************************/
typedef struct {
  const void  *key;
  int32_t      keyLen;
  void        *value;
  bool         used;
}Node;

typedef struct NodeList{
  Node             data;
  struct NodeList* next;
}NodeList;

123 124 125
typedef int32_t (*_equal_fn_sml)(const void *, const void *);

static void* nodeListGet(NodeList* list, const void *key, int32_t len, _equal_fn_sml fn){
wmmhello's avatar
wmmhello 已提交
126 127
  NodeList *tmp = list;
  while(tmp){
128 129 130 131 132 133 134 135
    if(fn == NULL){
      if(tmp->data.used && tmp->data.keyLen == len && memcmp(tmp->data.key, key, len) == 0) {
        return tmp->data.value;
      }
    }else{
      if(tmp->data.used && fn(tmp->data.key, key) == 0) {
        return tmp->data.value;
      }
wmmhello's avatar
wmmhello 已提交
136
    }
137

wmmhello's avatar
wmmhello 已提交
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
    tmp = tmp->next;
  }
  return NULL;
}

static int nodeListSet(NodeList** list, const void *key, int32_t len, void* value){
  NodeList *tmp = *list;
  while (tmp){
    if(!tmp->data.used) break;
    if(tmp->data.keyLen == len && memcmp(tmp->data.key, key, len) == 0) {
      return -1;
    }
    tmp = tmp->next;
  }
  if(tmp){
    tmp->data.key = key;
    tmp->data.keyLen = len;
    tmp->data.value = value;
    tmp->data.used = true;
  }else{
    NodeList *newNode = taosMemoryCalloc(1, sizeof(NodeList));
    if(newNode == NULL){
      return -1;
    }
    newNode->data.key = key;
    newNode->data.keyLen = len;
    newNode->data.value = value;
    newNode->data.used = true;
    newNode->next = *list;
    *list = newNode;
  }
  return 0;
}

static int nodeListSize(NodeList* list){
  int cnt = 0;
  while(list){
    if(list->data.used) cnt++;
    else break;
    list = list->next;
  }
  return cnt;
}
/*********************** list end *********************************/

wmmhello's avatar
wmmhello 已提交
183
typedef struct {
184 185 186 187
  char *measure;
  char *tags;
  char *cols;
  char *timestamp;
wmmhello's avatar
wmmhello 已提交
188 189 190 191 192 193

  int32_t measureLen;
  int32_t measureTagsLen;
  int32_t tagsLen;
  int32_t colsLen;
  int32_t timestampLen;
194 195

  SArray *colArray;
wmmhello's avatar
wmmhello 已提交
196 197 198
} SSmlLineInfo;

typedef struct {
X
Xiaoyu Wang 已提交
199 200 201 202
  const char *sTableName;  // super table name
  int32_t     sTableNameLen;
  char        childTableName[TSDB_TABLE_NAME_LEN];
  uint64_t    uid;
203
  void       *key;        // for openTsdb telnet
wmmhello's avatar
wmmhello 已提交
204

X
Xiaoyu Wang 已提交
205
  SArray *tags;
wmmhello's avatar
wmmhello 已提交
206

207
  // elements are SHashObj<cols key string, SSmlKv*> for find by key quickly
X
Xiaoyu Wang 已提交
208
  SArray *cols;
209
  STableDataCxt *tableDataCtx;
wmmhello's avatar
wmmhello 已提交
210 211 212
} SSmlTableInfo;

typedef struct {
X
Xiaoyu Wang 已提交
213 214
  SArray   *tags;     // save the origin order to create table
  SHashObj *tagHash;  // elements are <key, index in tags>
215

X
Xiaoyu Wang 已提交
216 217
  SArray   *cols;
  SHashObj *colHash;
218

wmmhello's avatar
wmmhello 已提交
219 220 221 222
  STableMeta *tableMeta;
} SSmlSTableMeta;

typedef struct {
X
Xiaoyu Wang 已提交
223 224
  int32_t len;
  char   *buf;
wmmhello's avatar
wmmhello 已提交
225 226
} SSmlMsgBuf;

227 228 229 230 231 232 233
typedef struct {
  int32_t code;
  int32_t lineNum;

  int32_t numOfSTables;
  int32_t numOfCTables;
  int32_t numOfCreateSTables;
234 235
  int32_t numOfAlterColSTables;
  int32_t numOfAlterTagSTables;
236 237 238 239 240 241 242 243

  int64_t parseTime;
  int64_t schemaTime;
  int64_t insertBindTime;
  int64_t insertRpcTime;
  int64_t endTime;
} SSmlCostInfo;

wmmhello's avatar
wmmhello 已提交
244
typedef struct {
X
Xiaoyu Wang 已提交
245 246 247 248
  int64_t id;

  SMLProtocolType protocol;
  int8_t          precision;
249
  bool            reRun;
250 251 252
  bool            dataFormat;  // true means that the name and order of keys in each line are the same(only for influx protocol)
  bool            isRawLine;
  int32_t         ttl;
X
Xiaoyu Wang 已提交
253

wmmhello's avatar
wmmhello 已提交
254 255
  NodeList *childTables;
  NodeList *superTables;
X
Xiaoyu Wang 已提交
256 257 258 259 260 261 262 263
  SHashObj *pVgHash;

  STscObj     *taos;
  SCatalog    *pCatalog;
  SRequestObj *pRequest;
  SQuery      *pQuery;

  SSmlCostInfo cost;
wmmhello's avatar
wmmhello 已提交
264
  int32_t      lineNum;
X
Xiaoyu Wang 已提交
265
  SSmlMsgBuf   msgBuf;
wmmhello's avatar
wmmhello 已提交
266 267

  cJSON       *root;  // for parse json
wmmhello's avatar
wmmhello 已提交
268
  SSmlLineInfo      *lines; // element is SSmlLineInfo
269 270 271 272 273 274 275 276

  //
  SArray      *preLineTagKV;
  SArray      *preLineColKV;

  SSmlLineInfo preLine;
  STableMeta  *currSTableMeta;
  STableDataCxt *currTableDataCtx;
wmmhello's avatar
wmmhello 已提交
277
  bool         needModifySchema;
wmmhello's avatar
wmmhello 已提交
278
} SSmlHandle;
wmmhello's avatar
wmmhello 已提交
279 280
//=================================================================================================

wmmhello's avatar
wmmhello 已提交
281
//=================================================================================================
282
static volatile int64_t linesSmlHandleId = 0;
X
Xiaoyu Wang 已提交
283
static int64_t          smlGenId() {
284
  int64_t id;
wmmhello's avatar
wmmhello 已提交
285

286 287
  do {
    id = atomic_add_fetch_64(&linesSmlHandleId, 1);
wmmhello's avatar
wmmhello 已提交
288 289
  } while (id == 0);

290
  return id;
wmmhello's avatar
wmmhello 已提交
291 292
}

293
static inline bool smlDoubleToInt64OverFlow(double num) {
X
Xiaoyu Wang 已提交
294
  if (num >= (double)INT64_MAX || num <= (double)INT64_MIN) return true;
295 296 297 298 299 300 301 302 303 304 305 306
  return false;
}

static inline bool smlCheckDuplicateKey(const char *key, int32_t keyLen, SHashObj *pHash) {
  void *val = taosHashGet(pHash, key, keyLen);
  if (val) {
    return true;
  }
  taosHashPut(pHash, key, keyLen, key, 1);
  return false;
}

X
Xiaoyu Wang 已提交
307
static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf *pBuf, const char *msg1, const char *msg2) {
308
  if (pBuf->buf) {
309 310 311 312 313 314 315
    memset(pBuf->buf, 0, pBuf->len);
    if (msg1) strncat(pBuf->buf, msg1, pBuf->len);
    int32_t left = pBuf->len - strlen(pBuf->buf);
    if (left > 2 && msg2) {
      strncat(pBuf->buf, ":", left - 1);
      strncat(pBuf->buf, msg2, left - 2);
    }
wmmhello's avatar
wmmhello 已提交
316
  }
wmmhello's avatar
wmmhello 已提交
317 318 319
  return TSDB_CODE_SML_INVALID_DATA;
}

X
Xiaoyu Wang 已提交
320
static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSmlKv *kv, bool isTag,
321
                                       ESchemaAction *action, SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
322
  uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL;
323 324
  if (index) {
    if (colField[*index].type != kv->type) {
X
Xiaoyu Wang 已提交
325 326
      uError("SML:0x%" PRIx64 " point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id,
             kv->key, colField[*index].type, kv->type);
327 328 329
      return TSDB_CODE_TSC_INVALID_VALUE;
    }

X
Xiaoyu Wang 已提交
330 331 332 333
    if ((colField[*index].type == TSDB_DATA_TYPE_VARCHAR &&
         (colField[*index].bytes - VARSTR_HEADER_SIZE) < kv->length) ||
        (colField[*index].type == TSDB_DATA_TYPE_NCHAR &&
         ((colField[*index].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE < kv->length))) {
334
      if (isTag) {
wmmhello's avatar
wmmhello 已提交
335
        *action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
336
      } else {
wmmhello's avatar
wmmhello 已提交
337
        *action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
338 339 340 341
      }
    }
  } else {
    if (isTag) {
wmmhello's avatar
wmmhello 已提交
342
      *action = SCHEMA_ACTION_ADD_TAG;
343
    } else {
wmmhello's avatar
wmmhello 已提交
344
      *action = SCHEMA_ACTION_ADD_COLUMN;
345 346
    }
  }
wmmhello's avatar
wmmhello 已提交
347 348 349
  return 0;
}

wmmhello's avatar
wmmhello 已提交
350
static int32_t smlFindNearestPowerOf2(int32_t length, uint8_t type) {
wmmhello's avatar
wmmhello 已提交
351
  int32_t result = 1;
X
Xiaoyu Wang 已提交
352
  while (result <= length) {
wmmhello's avatar
wmmhello 已提交
353 354
    result *= 2;
  }
355
  if (type == TSDB_DATA_TYPE_BINARY && result > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
wmmhello's avatar
wmmhello 已提交
356
    result = TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE;
357
  } else if (type == TSDB_DATA_TYPE_NCHAR && result > (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
wmmhello's avatar
wmmhello 已提交
358 359
    result = (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
  }
wmmhello's avatar
wmmhello 已提交
360

361
  if (type == TSDB_DATA_TYPE_NCHAR) {
362
    result = result * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
363
  } else if (type == TSDB_DATA_TYPE_BINARY) {
364
    result = result + VARSTR_HEADER_SIZE;
wmmhello's avatar
wmmhello 已提交
365
  }
366
  return result;
wmmhello's avatar
wmmhello 已提交
367 368
}

X
Xiaoyu Wang 已提交
369
static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
370
                                      ESchemaAction *action, bool isTag) {
371 372
  int32_t code = TSDB_CODE_SUCCESS;
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
373
    if (j == 0 && !isTag) continue;
374
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j);
375
    code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, info);
X
Xiaoyu Wang 已提交
376
    if (code != TSDB_CODE_SUCCESS) {
377 378 379 380 381 382
      return code;
    }
  }
  return TSDB_CODE_SUCCESS;
}

383
static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool isTag) {
384
  SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
385 386
  int32_t   i = 0;
  for (; i < length; i++) {
387 388 389
    taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES);
  }

390
  if (isTag) {
391 392 393 394 395
    i = 0;
  } else {
    i = 1;
  }
  for (; i < taosArrayGetSize(cols); i++) {
396
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
X
Xiaoyu Wang 已提交
397
    if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) {
wmmhello's avatar
wmmhello 已提交
398
      taosHashCleanup(hashTmp);
399 400 401
      return -1;
    }
  }
402
  taosHashCleanup(hashTmp);
403 404 405
  return 0;
}

406
static int32_t getBytes(uint8_t type, int32_t length) {
407 408 409 410 411 412 413
  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
    return smlFindNearestPowerOf2(length, type);
  } else {
    return tDataTypes[type].bytes;
  }
}

414 415
static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
                                  SArray *results, int32_t numOfCols, bool isTag) {
wmmhello's avatar
wmmhello 已提交
416
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
417
    SSmlKv       *kv = (SSmlKv *)taosArrayGet(cols, j);
wmmhello's avatar
wmmhello 已提交
418 419
    ESchemaAction action = SCHEMA_ACTION_NULL;
    smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, &action, info);
420
    if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_ADD_TAG) {
wmmhello's avatar
wmmhello 已提交
421 422 423 424 425
      SField field = {0};
      field.type = kv->type;
      field.bytes = getBytes(kv->type, kv->length);
      memcpy(field.name, kv->key, kv->keyLen);
      taosArrayPush(results, &field);
426
    } else if (action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
wmmhello's avatar
wmmhello 已提交
427
      uint16_t *index = (uint16_t *)taosHashGet(schemaHash, kv->key, kv->keyLen);
428 429
      uint16_t  newIndex = *index;
      if (isTag) newIndex -= numOfCols;
wmmhello's avatar
wmmhello 已提交
430 431 432 433 434 435 436
      SField *field = (SField *)taosArrayGet(results, newIndex);
      field->bytes = getBytes(kv->type, kv->length);
    }
  }
  return TSDB_CODE_SUCCESS;
}

437 438 439 440 441
// static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SSmlSTableMeta *sTableData,
//                               int32_t colVer, int32_t tagVer, int8_t source, uint64_t suid){
static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, SArray *pTags, STableMeta *pTableMeta,
                              ESchemaAction action) {
  SRequestObj   *pRequest = NULL;
442 443 444 445
  SMCreateStbReq pReq = {0};
  int32_t        code = TSDB_CODE_SUCCESS;
  SCmdMsgInfo    pCmdMsg = {0};

wmmhello's avatar
wmmhello 已提交
446 447 448 449 450 451
  // put front for free
  pReq.numOfColumns = taosArrayGetSize(pColumns);
  pReq.pColumns = pColumns;
  pReq.numOfTags = taosArrayGetSize(pTags);
  pReq.pTags = pTags;

dengyihao's avatar
dengyihao 已提交
452
  code = buildRequest(info->taos->id, "", 0, NULL, false, &pRequest, 0);
453 454 455 456
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
457
  pRequest->syncQuery = true;
458 459 460 461 462
  if (!pRequest->pDb) {
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }

463
  if (action == SCHEMA_ACTION_CREATE_STABLE) {
wmmhello's avatar
wmmhello 已提交
464 465 466 467
    pReq.colVer = 1;
    pReq.tagVer = 1;
    pReq.suid = 0;
    pReq.source = TD_REQ_FROM_APP;
468
  } else if (action == SCHEMA_ACTION_ADD_TAG || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
wmmhello's avatar
wmmhello 已提交
469 470 471 472
    pReq.colVer = pTableMeta->sversion;
    pReq.tagVer = pTableMeta->tversion + 1;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
473
  } else if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE) {
wmmhello's avatar
wmmhello 已提交
474 475 476 477 478 479
    pReq.colVer = pTableMeta->sversion + 1;
    pReq.tagVer = pTableMeta->tversion;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
  }

480
  if (pReq.numOfTags == 0) {
wmmhello's avatar
wmmhello 已提交
481 482 483 484 485 486 487 488
    pReq.numOfTags = 1;
    SField field = {0};
    field.type = TSDB_DATA_TYPE_NCHAR;
    field.bytes = 1;
    strcpy(field.name, tsSmlTagName);
    taosArrayPush(pReq.pTags, &field);
  }

489 490 491 492 493 494 495 496 497 498 499 500 501 502
  pReq.commentLen = -1;
  pReq.igExists = true;
  tNameExtractFullName(pName, pReq.name);

  pCmdMsg.epSet = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

D
dapan1121 已提交
503 504
  SQuery pQuery;
  memset(&pQuery, 0, sizeof(pQuery));
505 506 507 508 509 510 511
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);

512
  if (pRequest->code == TSDB_CODE_SUCCESS) {
513 514 515 516 517
    catalogRemoveTableMeta(info->pCatalog, pName);
  }
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

518
  end:
519 520 521 522 523
  destroyRequest(pRequest);
  tFreeSMCreateStbReq(&pReq);
  return code;
}

X
Xiaoyu Wang 已提交
524
static int32_t smlModifyDBSchemas(SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
525 526 527
  if(info->dataFormat && !info->needModifySchema){
    return TSDB_CODE_SUCCESS;
  }
528 529 530
  int32_t     code = 0;
  SHashObj   *hashTmp = NULL;
  STableMeta *pTableMeta = NULL;
wmmhello's avatar
wmmhello 已提交
531

532
  SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
wmmhello's avatar
wmmhello 已提交
533
  tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
wmmhello's avatar
wmmhello 已提交
534

D
dapan1121 已提交
535 536 537 538 539
  SRequestConnInfo conn = {0};
  conn.pTrans = info->taos->pAppInfo->pTransporter;
  conn.requestId = info->pRequest->requestId;
  conn.requestObjRefId = info->pRequest->self;
  conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
X
Xiaoyu Wang 已提交
540

wmmhello's avatar
wmmhello 已提交
541 542 543
  NodeList *tmp = info->superTables;
  while (tmp) {
    SSmlSTableMeta *sTableData = tmp->data.value;
X
Xiaoyu Wang 已提交
544
    bool            needCheckMeta = false;  // for multi thread
wmmhello's avatar
wmmhello 已提交
545

wmmhello's avatar
wmmhello 已提交
546 547
    size_t superTableLen = (size_t)tmp->data.keyLen;
    const void  *superTable = tmp->data.key;
548
    memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
wmmhello's avatar
wmmhello 已提交
549
    memcpy(pName.tname, superTable, superTableLen);
wmmhello's avatar
wmmhello 已提交
550

D
dapan1121 已提交
551
    code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
wmmhello's avatar
wmmhello 已提交
552

553
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
554 555
      SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
      SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
wmmhello's avatar
wmmhello 已提交
556 557 558 559
      smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
      smlBuildFieldsList(info, NULL, NULL, sTableData->cols, pColumns, 0, false);

      code = smlSendMetaMsg(info, &pName, pColumns, pTags, NULL, SCHEMA_ACTION_CREATE_STABLE);
560
      if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
561
        uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
562
        goto end;
wmmhello's avatar
wmmhello 已提交
563
      }
564
      info->cost.numOfCreateSTables++;
wmmhello's avatar
wmmhello 已提交
565 566 567 568 569 570 571
      taosMemoryFreeClear(pTableMeta);

      code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
        goto end;
      }
X
Xiaoyu Wang 已提交
572
    } else if (code == TSDB_CODE_SUCCESS) {
573 574
      hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true,
                             HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
575 576
      for (uint16_t i = pTableMeta->tableInfo.numOfColumns;
           i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
577 578
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
      }
wmmhello's avatar
wmmhello 已提交
579

580 581
      ESchemaAction action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, &action, true);
582
      if (code != TSDB_CODE_SUCCESS) {
583
        goto end;
584
      }
585 586 587 588 589
      if (action != SCHEMA_ACTION_NULL) {
        SArray *pColumns =
            taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray *pTags =
            taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
wmmhello's avatar
wmmhello 已提交
590 591 592 593 594 595

        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
596
          if (i < pTableMeta->tableInfo.numOfColumns) {
wmmhello's avatar
wmmhello 已提交
597
            taosArrayPush(pColumns, &field);
598
          } else {
wmmhello's avatar
wmmhello 已提交
599 600 601
            taosArrayPush(pTags, &field);
          }
        }
602 603
        smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->tags, pTags,
                           pTableMeta->tableInfo.numOfColumns, true);
wmmhello's avatar
wmmhello 已提交
604 605

        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
606
        if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
607
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
608 609 610
          goto end;
        }

611
        info->cost.numOfAlterTagSTables++;
wmmhello's avatar
wmmhello 已提交
612 613 614 615 616 617 618 619 620
        taosMemoryFreeClear(pTableMeta);
        code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
        }
        code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
        }
wmmhello's avatar
wmmhello 已提交
621
      }
622 623

      taosHashClear(hashTmp);
wmmhello's avatar
wmmhello 已提交
624
      for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
625 626
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
      }
627 628
      action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &action, false);
629
      if (code != TSDB_CODE_SUCCESS) {
630
        goto end;
wmmhello's avatar
wmmhello 已提交
631
      }
632 633 634 635 636
      if (action != SCHEMA_ACTION_NULL) {
        SArray *pColumns =
            taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray *pTags =
            taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
wmmhello's avatar
wmmhello 已提交
637 638 639 640 641 642

        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
643
          if (i < pTableMeta->tableInfo.numOfColumns) {
wmmhello's avatar
wmmhello 已提交
644
            taosArrayPush(pColumns, &field);
645
          } else {
wmmhello's avatar
wmmhello 已提交
646 647 648 649
            taosArrayPush(pTags, &field);
          }
        }

650 651
        smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->cols, pColumns,
                           pTableMeta->tableInfo.numOfColumns, false);
wmmhello's avatar
wmmhello 已提交
652 653

        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
654
        if (code != TSDB_CODE_SUCCESS) {
655
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
656 657
          goto end;
        }
wmmhello's avatar
wmmhello 已提交
658

659
        info->cost.numOfAlterColSTables++;
wmmhello's avatar
wmmhello 已提交
660
        taosMemoryFreeClear(pTableMeta);
wmmhello's avatar
wmmhello 已提交
661 662 663 664
        code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
        }
665 666 667 668 669
        code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
          goto end;
        }
wmmhello's avatar
wmmhello 已提交
670
      }
wmmhello's avatar
wmmhello 已提交
671

672
      needCheckMeta = true;
wmmhello's avatar
wmmhello 已提交
673 674
      taosHashCleanup(hashTmp);
      hashTmp = NULL;
wmmhello's avatar
wmmhello 已提交
675
    } else {
X
Xiaoyu Wang 已提交
676
      uError("SML:0x%" PRIx64 " load table meta error: %s", info->id, tstrerror(code));
677
      goto end;
wmmhello's avatar
wmmhello 已提交
678
    }
679

X
Xiaoyu Wang 已提交
680 681
    if (needCheckMeta) {
      code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags,
682
                          sTableData->tags, true);
683
      if (code != TSDB_CODE_SUCCESS) {
684
        uError("SML:0x%" PRIx64 " check tag failed. super table name %s", info->id, pName.tname);
685 686
        goto end;
      }
687
      code = smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols, false);
688
      if (code != TSDB_CODE_SUCCESS) {
689
        uError("SML:0x%" PRIx64 " check cols failed. super table name %s", info->id, pName.tname);
690 691 692 693
        goto end;
      }
    }

694
    sTableData->tableMeta = pTableMeta;
wmmhello's avatar
wmmhello 已提交
695

wmmhello's avatar
wmmhello 已提交
696
    tmp = tmp->next;
wmmhello's avatar
wmmhello 已提交
697 698
  }
  return 0;
699

700
  end:
wmmhello's avatar
wmmhello 已提交
701 702
  taosHashCleanup(hashTmp);
  taosMemoryFreeClear(pTableMeta);
wmmhello's avatar
wmmhello 已提交
703
//  catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
704
  return code;
wmmhello's avatar
wmmhello 已提交
705 706
}

707
/******************************* parse basic type function **********************/
X
Xiaoyu Wang 已提交
708
static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
709
  const char *pVal = kvVal->value;
X
Xiaoyu Wang 已提交
710 711 712 713
  int32_t     len = kvVal->length;
  char       *endptr = NULL;
  double      result = taosStr2Double(pVal, &endptr);
  if (pVal == endptr) {
714
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
715 716 717
    return false;
  }

718
  int32_t left = len - (endptr - pVal);
X
Xiaoyu Wang 已提交
719
  if (left == 0 || (left == 3 && strncasecmp(endptr, "f64", left) == 0)) {
720 721
    kvVal->type = TSDB_DATA_TYPE_DOUBLE;
    kvVal->d = result;
X
Xiaoyu Wang 已提交
722 723
  } else if ((left == 3 && strncasecmp(endptr, "f32", left) == 0)) {
    if (!IS_VALID_FLOAT(result)) {
724 725
      smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
726
    }
727 728
    kvVal->type = TSDB_DATA_TYPE_FLOAT;
    kvVal->f = (float)result;
X
Xiaoyu Wang 已提交
729 730
  } else if ((left == 1 && *endptr == 'i') || (left == 3 && strncasecmp(endptr, "i64", left) == 0)) {
    if (smlDoubleToInt64OverFlow(result)) {
wmmhello's avatar
wmmhello 已提交
731 732
      errno = 0;
      int64_t tmp = taosStr2Int64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
733
      if (errno == ERANGE) {
wmmhello's avatar
wmmhello 已提交
734 735 736 737 738 739
        smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_BIGINT;
      kvVal->i = tmp;
      return true;
wmmhello's avatar
wmmhello 已提交
740
    }
741
    kvVal->type = TSDB_DATA_TYPE_BIGINT;
wmmhello's avatar
wmmhello 已提交
742
    kvVal->i = (int64_t)result;
wmmhello's avatar
wmmhello 已提交
743
  } else if ((left == 1 && *endptr == 'u') || (left == 3 && strncasecmp(endptr, "u64", left) == 0)) {
X
Xiaoyu Wang 已提交
744
    if (result >= (double)UINT64_MAX || result < 0) {
wmmhello's avatar
wmmhello 已提交
745 746
      errno = 0;
      uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
747
      if (errno == ERANGE || result < 0) {
wmmhello's avatar
wmmhello 已提交
748 749 750 751 752 753
        smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_UBIGINT;
      kvVal->u = tmp;
      return true;
754
    }
755
    kvVal->type = TSDB_DATA_TYPE_UBIGINT;
wmmhello's avatar
wmmhello 已提交
756
    kvVal->u = result;
X
Xiaoyu Wang 已提交
757 758
  } else if (left == 3 && strncasecmp(endptr, "i32", left) == 0) {
    if (!IS_VALID_INT(result)) {
759 760
      smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
761
    }
762 763
    kvVal->type = TSDB_DATA_TYPE_INT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
764 765
  } else if (left == 3 && strncasecmp(endptr, "u32", left) == 0) {
    if (!IS_VALID_UINT(result)) {
766 767
      smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
768
    }
769 770
    kvVal->type = TSDB_DATA_TYPE_UINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
771 772
  } else if (left == 3 && strncasecmp(endptr, "i16", left) == 0) {
    if (!IS_VALID_SMALLINT(result)) {
773 774
      smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
775
    }
776 777
    kvVal->type = TSDB_DATA_TYPE_SMALLINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
778 779
  } else if (left == 3 && strncasecmp(endptr, "u16", left) == 0) {
    if (!IS_VALID_USMALLINT(result)) {
780 781
      smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
782
    }
783 784
    kvVal->type = TSDB_DATA_TYPE_USMALLINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
785 786
  } else if (left == 2 && strncasecmp(endptr, "i8", left) == 0) {
    if (!IS_VALID_TINYINT(result)) {
787 788
      smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
789
    }
790 791
    kvVal->type = TSDB_DATA_TYPE_TINYINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
792 793
  } else if (left == 2 && strncasecmp(endptr, "u8", left) == 0) {
    if (!IS_VALID_UTINYINT(result)) {
794 795
      smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
796
    }
797 798
    kvVal->type = TSDB_DATA_TYPE_UTINYINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
799
  } else {
800
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
801 802
    return false;
  }
803
  return true;
wmmhello's avatar
wmmhello 已提交
804 805
}

wmmhello's avatar
wmmhello 已提交
806
static bool smlParseBool(SSmlKv *kvVal) {
wmmhello's avatar
wmmhello 已提交
807
  const char *pVal = kvVal->value;
X
Xiaoyu Wang 已提交
808
  int32_t     len = kvVal->length;
809
  if ((len == 1) && (pVal[0] == 't' || pVal[0] == 'T')) {
810
    kvVal->i = TSDB_TRUE;
wmmhello's avatar
wmmhello 已提交
811 812 813
    return true;
  }

814
  if ((len == 1) && (pVal[0] == 'f' || pVal[0] == 'F')) {
815
    kvVal->i = TSDB_FALSE;
wmmhello's avatar
wmmhello 已提交
816 817 818
    return true;
  }

X
Xiaoyu Wang 已提交
819
  if ((len == 4) && !strncasecmp(pVal, "true", len)) {
820
    kvVal->i = TSDB_TRUE;
wmmhello's avatar
wmmhello 已提交
821 822
    return true;
  }
X
Xiaoyu Wang 已提交
823
  if ((len == 5) && !strncasecmp(pVal, "false", len)) {
824
    kvVal->i = TSDB_FALSE;
wmmhello's avatar
wmmhello 已提交
825 826 827 828 829
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
830
static bool smlIsBinary(const char *pVal, uint16_t len) {
X
Xiaoyu Wang 已提交
831
  // binary: "abc"
wmmhello's avatar
wmmhello 已提交
832 833 834 835 836 837 838 839 840
  if (len < 2) {
    return false;
  }
  if (pVal[0] == '"' && pVal[len - 1] == '"') {
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
841
static bool smlIsNchar(const char *pVal, uint16_t len) {
X
Xiaoyu Wang 已提交
842
  // nchar: L"abc"
wmmhello's avatar
wmmhello 已提交
843 844 845
  if (len < 3) {
    return false;
  }
wmmhello's avatar
wmmhello 已提交
846
  if (pVal[1] == '"' && pVal[len - 1] == '"' && (pVal[0] == 'l' || pVal[0] == 'L')) {
wmmhello's avatar
wmmhello 已提交
847 848 849 850
    return true;
  }
  return false;
}
851
/******************************* parse basic type function end **********************/
wmmhello's avatar
wmmhello 已提交
852

853
/******************************* time function **********************/
854
static uint8_t smlPrecisionConvert[7] = {TSDB_TIME_PRECISION_NANO, TSDB_TIME_PRECISION_HOURS, TSDB_TIME_PRECISION_MINUTES,
wmmhello's avatar
wmmhello 已提交
855 856
                                     TSDB_TIME_PRECISION_SECONDS, TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_MICRO,
                                     TSDB_TIME_PRECISION_NANO};
857 858
static int64_t smlFactorNS[3] = {NANOSECOND_PER_MSEC, NANOSECOND_PER_USEC, 1};
static int64_t smlFactorS[3] = {1000LL, 1000000LL, 1000000000LL};
859
static int64_t smlToMilli[3] = {3600000LL, 60000LL, 1000LL};
wmmhello's avatar
wmmhello 已提交
860

861
static int64_t smlGetTimeValue(const char *value, int32_t len, uint8_t fromPrecision, uint8_t toPrecision) {
X
Xiaoyu Wang 已提交
862
  char   *endPtr = NULL;
wafwerar's avatar
wafwerar 已提交
863
  int64_t tsInt64 = taosStr2Int64(value, &endPtr, 10);
864
  if (unlikely(value + len != endPtr)) {
865
    return -1;
wmmhello's avatar
wmmhello 已提交
866
  }
867 868 869 870 871 872 873

  if(unlikely(fromPrecision >= TSDB_TIME_PRECISION_HOURS)){
    int64_t unit = smlToMilli[fromPrecision - TSDB_TIME_PRECISION_HOURS];
    if(unit > INT64_MAX / tsInt64){
      return -1;
    }
    tsInt64 *= unit;
874
    fromPrecision = TSDB_TIME_PRECISION_MILLI;
wmmhello's avatar
wmmhello 已提交
875 876
  }

877
  return convertTimePrecision(tsInt64, fromPrecision, toPrecision);
878 879 880 881 882 883
}

static int8_t smlGetTsTypeByLen(int32_t len) {
  if (len == TSDB_TIME_PRECISION_SEC_DIGITS) {
    return TSDB_TIME_PRECISION_SECONDS;
  } else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) {
884
    return TSDB_TIME_PRECISION_MILLI;
885 886
  } else {
    return -1;
wmmhello's avatar
wmmhello 已提交
887
  }
888 889
}

X
Xiaoyu Wang 已提交
890
static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t len) {
891
  uint8_t toPrecision = info->currSTableMeta ? info->currSTableMeta->tableInfo.precision : TSDB_TIME_PRECISION_NANO;
wmmhello's avatar
wmmhello 已提交
892

893 894
  if(unlikely(len == 0 || (len == 1 && data[0] == '0'))){
    return taosGetTimestampNs()/smlFactorNS[toPrecision];
wmmhello's avatar
wmmhello 已提交
895
  }
896

897 898 899 900
  uint8_t fromPrecision = smlPrecisionConvert[info->precision];

  int64_t ts = smlGetTimeValue(data, len, fromPrecision, toPrecision);
  if (unlikely(ts == -1)) {
901 902
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
    return -1;
wmmhello's avatar
wmmhello 已提交
903
  }
904 905 906
  return ts;
}

X
Xiaoyu Wang 已提交
907
static int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t len) {
908 909 910
  uint8_t toPrecision = info->currSTableMeta ? info->currSTableMeta->tableInfo.precision : TSDB_TIME_PRECISION_NANO;

  if (unlikely(!data)) {
911 912
    smlBuildInvalidDataMsg(&info->msgBuf, "timestamp can not be null", NULL);
    return -1;
wmmhello's avatar
wmmhello 已提交
913
  }
914 915
  if (unlikely(len == 1 && data[0] == '0')) {
    return taosGetTimestampNs()/smlFactorNS[toPrecision];
916
  }
917 918
  uint8_t fromPrecision = smlGetTsTypeByLen(len);
  if (unlikely(fromPrecision == -1)) {
X
Xiaoyu Wang 已提交
919 920
    smlBuildInvalidDataMsg(&info->msgBuf,
                           "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", data);
921 922
    return -1;
  }
923 924
  int64_t ts = smlGetTimeValue(data, len, fromPrecision, toPrecision);
  if (unlikely(ts == -1)) {
925 926 927 928 929 930
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
    return -1;
  }
  return ts;
}

931
static int64_t smlParseTS(SSmlHandle *info, const char *data, int32_t len) {
932
  int64_t ts = 0;
X
Xiaoyu Wang 已提交
933
  if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
934
    //    uError("SML:data:%s,len:%d", data, len);
935
    ts = smlParseInfluxTime(info, data, len);
X
Xiaoyu Wang 已提交
936
  } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
937
    ts = smlParseOpenTsdbTime(info, data, len);
X
Xiaoyu Wang 已提交
938
  } else {
939
    ASSERT(0);
940
  }
wmmhello's avatar
wmmhello 已提交
941
  uDebug("SML:0x%" PRIx64 " smlParseTS:%" PRId64, info->id, ts);
942

943
  return ts;
944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014
}
/******************************* time function end **********************/

/******************************* Sml struct related function **********************/
static SSmlTableInfo *smlBuildTableInfo(int numRows, const char* measure, int32_t measureLen) {
  SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
  if (!tag) {
    return NULL;
  }

  tag->sTableName = measure;
  tag->sTableNameLen = measureLen;

  tag->cols = taosArrayInit(numRows, POINTER_BYTES);
  if (tag->cols == NULL) {
    uError("SML:smlBuildTableInfo failed to allocate memory");
    goto cleanup;
  }

  tag->tags = taosArrayInit(16, sizeof(SSmlKv));
  if (tag->tags == NULL) {
    uError("SML:smlBuildTableInfo failed to allocate memory");
    goto cleanup;
  }
  return tag;

  cleanup:
  taosMemoryFree(tag);
  return NULL;
}

static int32_t smlCheckDupUnit(SHashObj *dumplicateKey, SArray *tags, SSmlMsgBuf *msg){
  for(int i = 0; i < taosArrayGetSize(tags); i++) {
    SSmlKv *tag = taosArrayGet(tags, i);
    if (smlCheckDuplicateKey(tag->key, tag->keyLen, dumplicateKey)) {
      smlBuildInvalidDataMsg(msg, "dumplicate key", tag->key);
      return TSDB_CODE_TSC_DUP_NAMES;
    }
  }
  return TSDB_CODE_SUCCESS;
}

static int32_t smlJudgeDupColName(SArray *cols, SArray *tags, SSmlMsgBuf *msg) {
  SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  int ret = smlCheckDupUnit(dumplicateKey, cols, msg);
  if(ret != TSDB_CODE_SUCCESS){
    goto end;
  }
  ret = smlCheckDupUnit(dumplicateKey, tags, msg);
  if(ret != TSDB_CODE_SUCCESS){
    goto end;
  }

  end:
  taosHashCleanup(dumplicateKey);
  return ret;
}

static int32_t smlParseTableName(SArray *tags, char *childTableName) {
  size_t      childTableNameLen = strlen(tsSmlChildTableName);
  if (childTableNameLen <= 0) return TSDB_CODE_SUCCESS;

  for(int i = 0; i < taosArrayGetSize(tags); i++){
    SSmlKv *tag = taosArrayGet(tags, i);
    // handle child table name
    if (childTableNameLen == tag->keyLen && strncmp(tag->key, tsSmlChildTableName, tag->keyLen) == 0) {
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, tag->value, (tag->length < TSDB_TABLE_NAME_LEN ? tag->length : TSDB_TABLE_NAME_LEN));
      break;
    }
  }
wmmhello's avatar
wmmhello 已提交
1015

1016 1017 1018
  return TSDB_CODE_SUCCESS;
}

1019 1020 1021 1022
static int32_t smlSetCTableName(SSmlTableInfo *oneTable){
  smlParseTableName(oneTable->tags, oneTable->childTableName);

  if (strlen(oneTable->childTableName) == 0) {
1023 1024
    SArray* dst = taosArrayDup(oneTable->tags, NULL);
    RandTableName rName = {dst, oneTable->sTableName, (uint8_t)oneTable->sTableNameLen,
1025 1026 1027
                           oneTable->childTableName, 0};

    buildChildTableName(&rName);
1028
    taosArrayDestroy(dst);
1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076
    oneTable->uid = rName.uid;
  } else {
    oneTable->uid = *(uint64_t *)(oneTable->childTableName);
  }
  return TSDB_CODE_SUCCESS;
}

static SSmlSTableMeta *smlBuildSTableMeta(bool isDataFormat) {
  SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
  if (!meta) {
    return NULL;
  }

  if(unlikely(!isDataFormat)){
    meta->tagHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
    if (meta->tagHash == NULL) {
      uError("SML:smlBuildSTableMeta failed to allocate memory");
      goto cleanup;
    }

    meta->colHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
    if (meta->colHash == NULL) {
      uError("SML:smlBuildSTableMeta failed to allocate memory");
      goto cleanup;
    }
  }

  meta->tags = taosArrayInit(32, sizeof(SSmlKv));
  if (meta->tags == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }

  meta->cols = taosArrayInit(32, sizeof(SSmlKv));
  if (meta->cols == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }
  return meta;

  cleanup:
  taosMemoryFree(meta);
  return NULL;
}

static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols){
  for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
1077
    taosArrayPush(metaArray, kv);
1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111
    if(unlikely(metaHash != NULL)) {
      taosHashPut(metaHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
    }
  }
}

static STableMeta* smlGetMeta(SSmlHandle *info, const void* measure, int32_t measureLen){
  STableMeta *pTableMeta = NULL;

  SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
  tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));

  SRequestConnInfo conn = {0};
  conn.pTrans = info->taos->pAppInfo->pTransporter;
  conn.requestId = info->pRequest->requestId;
  conn.requestObjRefId = info->pRequest->self;
  conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
  memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
  memcpy(pName.tname, measure, measureLen);

  catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
  return pTableMeta;
}

static void smlDestroySTableMeta(SSmlSTableMeta *meta) {
  taosHashCleanup(meta->tagHash);
  taosHashCleanup(meta->colHash);
  taosArrayDestroy(meta->tags);
  taosArrayDestroy(meta->cols);
  taosMemoryFree(meta->tableMeta);
  taosMemoryFree(meta);
}
/******************************* Sml struct related function end **********************/

wmmhello's avatar
wmmhello 已提交
1112
static int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
X
Xiaoyu Wang 已提交
1113
  // binary
wmmhello's avatar
wmmhello 已提交
1114
  if (smlIsBinary(pVal->value, pVal->length)) {
1115
    pVal->type = TSDB_DATA_TYPE_BINARY;
wmmhello's avatar
wmmhello 已提交
1116
    pVal->length -= BINARY_ADD_LEN;
1117
    if (pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
wmmhello's avatar
wmmhello 已提交
1118 1119
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }
1120
    pVal->value += (BINARY_ADD_LEN - 1);
wmmhello's avatar
wmmhello 已提交
1121
    return TSDB_CODE_SUCCESS;
1122
  }
X
Xiaoyu Wang 已提交
1123
  // nchar
wmmhello's avatar
wmmhello 已提交
1124
  if (smlIsNchar(pVal->value, pVal->length)) {
1125
    pVal->type = TSDB_DATA_TYPE_NCHAR;
wmmhello's avatar
wmmhello 已提交
1126
    pVal->length -= NCHAR_ADD_LEN;
1127
    if (pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
wmmhello's avatar
wmmhello 已提交
1128 1129
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }
1130
    pVal->value += (NCHAR_ADD_LEN - 1);
wmmhello's avatar
wmmhello 已提交
1131
    return TSDB_CODE_SUCCESS;
1132 1133
  }

X
Xiaoyu Wang 已提交
1134
  // bool
1135 1136 1137
  if (smlParseBool(pVal)) {
    pVal->type = TSDB_DATA_TYPE_BOOL;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
wmmhello's avatar
wmmhello 已提交
1138
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1139
  }
X
Xiaoyu Wang 已提交
1140
  // number
1141
  if (smlParseNumber(pVal, msg)) {
wmmhello's avatar
wmmhello 已提交
1142
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
wmmhello's avatar
wmmhello 已提交
1143
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1144 1145
  }

wmmhello's avatar
wmmhello 已提交
1146
  return TSDB_CODE_TSC_INVALID_VALUE;
wmmhello's avatar
wmmhello 已提交
1147 1148
}

1149 1150 1151
int32_t is_same_child_table_json(const void *a, const void *b){
  return (cJSON_Compare((const cJSON *)a, (const cJSON *)b, true)) ? 0 : 1;
}
wmmhello's avatar
wmmhello 已提交
1152

1153 1154 1155 1156 1157 1158 1159
int32_t is_same_child_table_telnet(const void *a, const void *b){
  SSmlLineInfo *t1 = (SSmlLineInfo *)a;
  SSmlLineInfo *t2 = (SSmlLineInfo *)b;
  return (((t1->measureLen == t2->measureLen) && memcmp(t1->measure, t2->measure, t1->measureLen) == 0)
      && ((t1->tagsLen == t2->tagsLen) && memcmp(t1->tags, t2->tags, t1->tagsLen) == 0)) ? 0 : 1;
}

1160 1161 1162 1163 1164
#define IS_SAME_CHILD_TABLE (elements->measureTagsLen == info->preLine.measureTagsLen \
&& memcmp(elements->measure, info->preLine.measure, elements->measureTagsLen) == 0)

#define IS_SAME_SUPER_TABLE (elements->measureLen == info->preLine.measureLen \
&& memcmp(elements->measure, info->preLine.measure, elements->measureLen) == 0)
1165

1166 1167 1168 1169 1170 1171 1172
#define IS_SAME_KEY (preKV->keyLen == kv.keyLen && memcmp(preKV->key, kv.key, kv.keyLen) == 0)

static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd,
                          SSmlLineInfo* currElement, bool isSameMeasure, bool isSameCTable){
  if(isSameCTable){
    return TSDB_CODE_SUCCESS;
  }
wmmhello's avatar
wmmhello 已提交
1173

1174 1175 1176 1177 1178
  int     cnt = 0;
  SArray *preLineKV = info->preLineTagKV;
  bool    isSuperKVInit = true;
  SArray *superKV = NULL;
  if(info->dataFormat){
1179
    if(!isSameMeasure){
1180
      SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen, NULL);
wmmhello's avatar
wmmhello 已提交
1181

1182
      if(unlikely(sMeta == NULL)){
wmmhello's avatar
wmmhello 已提交
1183
        sMeta = smlBuildSTableMeta(info->dataFormat);
1184
        STableMeta * pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen);
wmmhello's avatar
wmmhello 已提交
1185
        sMeta->tableMeta = pTableMeta;
1186 1187 1188 1189 1190
        if(pTableMeta == NULL){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
wmmhello's avatar
wmmhello 已提交
1191
        nodeListSet(&info->superTables, currElement->measure, currElement->measureLen, sMeta);
1192 1193
      }
      info->currSTableMeta = sMeta->tableMeta;
1194
      superKV = sMeta->tags;
1195 1196

      if(unlikely(taosArrayGetSize(superKV) == 0)){
1197
        isSuperKVInit = false;
1198
      }
1199 1200 1201 1202 1203 1204 1205 1206 1207 1208
      taosArraySetSize(preLineKV, 0);
    }
  }else{
    taosArraySetSize(preLineKV, 0);
  }


  while (*sql < sqlEnd) {
    if (unlikely(IS_SPACE(*sql))) {
      break;
1209 1210
    }

1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226
    bool hasSlash = false;
    // parse key
    const char *key = *sql;
    int32_t     keyLen = 0;
    while (*sql < sqlEnd) {
      if (unlikely(IS_COMMA(*sql))) {
        smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      if (unlikely(IS_EQUAL(*sql))) {
        keyLen = *sql - key;
        (*sql)++;
        break;
      }
      if(!hasSlash){
        hasSlash = (*(*sql) == SLASH);
1227
      }
1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249
      (*sql)++;
    }
    if(unlikely(hasSlash)) {
      PROCESS_SLASH(key, keyLen)
    }

    if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
      smlBuildInvalidDataMsg(&info->msgBuf, "invalid key or key is too long than 64", key);
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
    }

    // parse value
    const char *value = *sql;
    int32_t     valueLen = 0;
    hasSlash = false;
    while (*sql < sqlEnd) {
      // parse value
      if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) {
        break;
      }else if (unlikely(IS_EQUAL(*sql))) {
        smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
        return TSDB_CODE_SML_INVALID_DATA;
1250
      }
1251

1252 1253
      if(!hasSlash){
        hasSlash = (*(*sql) == SLASH);
1254
      }
1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276

      (*sql)++;
    }
    valueLen = *sql - value;

    if (unlikely(valueLen == 0)) {
      smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", value);
      return TSDB_CODE_SML_INVALID_DATA;
    }

    if(unlikely(hasSlash)) {
      PROCESS_SLASH(value, valueLen)
    }

    if (unlikely(valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)) {
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }

    SSmlKv kv = {.key = key, .type = TSDB_DATA_TYPE_NCHAR, .keyLen = keyLen, .value = value, .length = valueLen};
    if(info->dataFormat){
      if(unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)){
        info->needModifySchema = true;
1277
      }
1278

1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323
      if(isSameMeasure){
        if(unlikely(cnt >= taosArrayGetSize(preLineKV))) {
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
        SSmlKv *preKV = taosArrayGet(preLineKV, cnt);
        if(unlikely(kv.length > preKV->length)){
          preKV->length = kv.length;
          SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen, NULL);
          ASSERT(tableMeta != NULL);

          SSmlKv *oldKV = taosArrayGet(tableMeta->tags, cnt);
          oldKV->length = kv.length;
          info->needModifySchema = true;
        }
        if(unlikely(!IS_SAME_KEY)){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
      }else{
        if(isSuperKVInit){
          if(unlikely(cnt >= taosArrayGetSize(superKV))) {
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }
          SSmlKv *preKV = taosArrayGet(superKV, cnt);
          if(unlikely(kv.length > preKV->length)) {
            preKV->length = kv.length;
          }else{
            kv.length = preKV->length;
          }
          info->needModifySchema = true;

          if(unlikely(!IS_SAME_KEY)){
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }
        }else{
          taosArrayPush(superKV, &kv);
        }
        taosArrayPush(preLineKV, &kv);
1324
      }
1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361
    }else{
      taosArrayPush(preLineKV, &kv);
    }

    cnt++;
    if(IS_SPACE(*sql)){
      break;
    }
    (*sql)++;
  }

  void* oneTable = nodeListGet(info->childTables, currElement->measure, currElement->measureTagsLen, NULL);
  if ((oneTable != NULL)) {
    return TSDB_CODE_SUCCESS;
  }

  SSmlTableInfo *tinfo = smlBuildTableInfo(1, currElement->measure, currElement->measureLen);
  if (!tinfo) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  for(int i = 0; i < taosArrayGetSize(preLineKV); i++){
    taosArrayPush(tinfo->tags, taosArrayGet(preLineKV, i));
  }
  smlSetCTableName(tinfo);
  if(info->dataFormat) {
    info->currSTableMeta->uid = tinfo->uid;
    tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
    if(tinfo->tableDataCtx == NULL){
      smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
      return TSDB_CODE_SML_INVALID_DATA;
    }
  }

  nodeListSet(&info->childTables, currElement->measure, currElement->measureTagsLen, tinfo);

  return TSDB_CODE_SUCCESS;
}
1362

1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374
static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd,
                          SSmlLineInfo* currElement, bool isSameMeasure, bool isSameCTable){
  int     cnt = 0;
  SArray *preLineKV = info->preLineColKV;
  bool    isSuperKVInit = true;
  SArray *superKV = NULL;
  if(info->dataFormat){
    if(unlikely(!isSameCTable)){
      SSmlTableInfo *oneTable = (SSmlTableInfo *)nodeListGet(info->childTables, currElement->measure, currElement->measureTagsLen, NULL);
      if (unlikely(oneTable == NULL)) {
        smlBuildInvalidDataMsg(&info->msgBuf, "child table should inside", currElement->measure);
        return TSDB_CODE_SML_INVALID_DATA;
1375
      }
1376
      info->currTableDataCtx = oneTable->tableDataCtx;
1377 1378
    }

1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397
    if(unlikely(!isSameMeasure)){
      SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen, NULL);

      if(unlikely(sMeta == NULL)){
        sMeta = smlBuildSTableMeta(info->dataFormat);
        STableMeta * pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen);
        sMeta->tableMeta = pTableMeta;
        if(pTableMeta == NULL){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
        nodeListSet(&info->superTables, currElement->measure, currElement->measureLen, sMeta);
      }
      info->currSTableMeta = sMeta->tableMeta;
      superKV = sMeta->cols;
      if(unlikely(taosArrayGetSize(superKV) == 0)){
        isSuperKVInit = false;
      }
1398 1399 1400 1401 1402
      taosArraySetSize(preLineKV, 0);
    }
  }

  while (*sql < sqlEnd) {
1403
    if (unlikely(IS_SPACE(*sql))) {
1404 1405 1406
      break;
    }

1407
    bool hasSlash = false;
1408 1409 1410 1411
    // parse key
    const char *key = *sql;
    int32_t     keyLen = 0;
    while (*sql < sqlEnd) {
1412
      if (unlikely(IS_COMMA(*sql))) {
1413 1414 1415
        smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
1416
      if (unlikely(IS_EQUAL(*sql))) {
1417 1418 1419 1420
        keyLen = *sql - key;
        (*sql)++;
        break;
      }
1421 1422 1423
      if(!hasSlash){
        hasSlash = (*(*sql) == SLASH);
      }
1424 1425
      (*sql)++;
    }
1426 1427 1428
    if(unlikely(hasSlash)) {
      PROCESS_SLASH(key, keyLen)
    }
1429

1430
    if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
1431 1432 1433 1434 1435 1436 1437
      smlBuildInvalidDataMsg(&info->msgBuf, "invalid key or key is too long than 64", key);
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
    }

    // parse value
    const char *value = *sql;
    int32_t     valueLen = 0;
1438
    hasSlash              = false;
1439 1440 1441
    bool        isInQuote = false;
    while (*sql < sqlEnd) {
      // parse value
1442
      if (IS_QUOTE(*sql)) {
1443 1444 1445 1446
        isInQuote = !isInQuote;
        (*sql)++;
        continue;
      }
wmmhello's avatar
wmmhello 已提交
1447
      if (!isInQuote){
1448
        if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) {
wmmhello's avatar
wmmhello 已提交
1449
          break;
1450
        } else if (unlikely(IS_EQUAL(*sql))) {
wmmhello's avatar
wmmhello 已提交
1451 1452 1453
          smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
          return TSDB_CODE_SML_INVALID_DATA;
        }
1454
      }
1455 1456 1457
      if(!hasSlash){
        hasSlash = (*(*sql) == SLASH);
      }
wmmhello's avatar
wmmhello 已提交
1458

1459 1460 1461 1462
      (*sql)++;
    }
    valueLen = *sql - value;

1463
    if (unlikely(isInQuote)) {
1464 1465 1466
      smlBuildInvalidDataMsg(&info->msgBuf, "only one quote", value);
      return TSDB_CODE_SML_INVALID_DATA;
    }
1467
    if (unlikely(valueLen == 0)) {
1468 1469 1470
      smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", value);
      return TSDB_CODE_SML_INVALID_DATA;
    }
1471 1472 1473
    if(unlikely(hasSlash)) {
      PROCESS_SLASH(value, valueLen)
    }
1474 1475

    SSmlKv kv = {.key = key, .keyLen = keyLen, .value = value, .length = valueLen};
1476 1477 1478
    int32_t ret = smlParseValue(&kv, &info->msgBuf);
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
1479 1480 1481
    }

    if(info->dataFormat){
1482 1483 1484
      //cnt begin 0, add ts so + 2
      if(unlikely(cnt + 2 > info->currSTableMeta->tableInfo.numOfColumns)){
        info->needModifySchema = true;
1485 1486
      }
      // bind data
1487 1488 1489 1490
      ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, cnt + 1);
      if (unlikely(ret != TSDB_CODE_SUCCESS)) {
        smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
        return ret;
1491 1492
      }

1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522
      if(isSameMeasure){
        if(cnt >= taosArrayGetSize(preLineKV)) {
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
        SSmlKv *preKV = taosArrayGet(preLineKV, cnt);
        if(kv.type != preKV->type){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }

        if(unlikely(IS_VAR_DATA_TYPE(kv.type) && kv.length > preKV->length)){
          preKV->length = kv.length;
          SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen, NULL);
          ASSERT(tableMeta != NULL);

          SSmlKv *oldKV = taosArrayGet(tableMeta->cols, cnt);
          oldKV->length = kv.length;
          info->needModifySchema = true;
        }
        if(unlikely(!IS_SAME_KEY)){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
      }else{
        if(isSuperKVInit){
          if(unlikely(cnt >= taosArrayGetSize(superKV))) {
1523 1524 1525 1526
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }
1527 1528
          SSmlKv *preKV = taosArrayGet(superKV, cnt);
          if(unlikely(kv.type != preKV->type)){
1529 1530 1531 1532 1533
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }

1534 1535 1536
          if(IS_VAR_DATA_TYPE(kv.type)){
            if(kv.length > preKV->length) {
              preKV->length = kv.length;
1537
            }else{
1538
              kv.length = preKV->length;
1539
            }
wmmhello's avatar
wmmhello 已提交
1540
            info->needModifySchema = true;
1541
          }
1542 1543 1544 1545
          if(unlikely(!IS_SAME_KEY)){
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
1546
          }
1547 1548
        }else{
          taosArrayPush(superKV, &kv);
1549
        }
1550
        taosArrayPush(preLineKV, &kv);
1551
      }
1552
    }else{
1553 1554 1555 1556
      if(currElement->colArray == NULL){
        currElement->colArray = taosArrayInit(16, sizeof(SSmlKv));
        taosArraySetSize(currElement->colArray, 1);
      }
1557 1558
      taosArrayPush(currElement->colArray, &kv);   //reserve for timestamp
    }
1559

1560 1561 1562 1563 1564
    cnt++;
    if(IS_SPACE(*sql)){
      break;
    }
    (*sql)++;
1565 1566
  }

1567 1568
  return TSDB_CODE_SUCCESS;
}
1569

1570 1571 1572 1573 1574
static int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements) {
  if (!sql) return TSDB_CODE_SML_INVALID_DATA;
  JUMP_SPACE(sql, sqlEnd)
  if (unlikely(*sql == COMMA)) return TSDB_CODE_SML_INVALID_DATA;
  elements->measure = sql;
wmmhello's avatar
wmmhello 已提交
1575

wmmhello's avatar
wmmhello 已提交
1576
  // parse measure
wmmhello's avatar
wmmhello 已提交
1577
  while (sql < sqlEnd) {
X
Xiaoyu Wang 已提交
1578
    if ((sql != elements->measure) && IS_SLASH_LETTER(sql)) {
wmmhello's avatar
wmmhello 已提交
1579 1580
      MOVE_FORWARD_ONE(sql, sqlEnd - sql);
      sqlEnd--;
wmmhello's avatar
wmmhello 已提交
1581 1582
      continue;
    }
X
Xiaoyu Wang 已提交
1583
    if (IS_COMMA(sql)) {
wmmhello's avatar
wmmhello 已提交
1584 1585 1586
      break;
    }

X
Xiaoyu Wang 已提交
1587
    if (IS_SPACE(sql)) {
wmmhello's avatar
wmmhello 已提交
1588 1589
      break;
    }
wmmhello's avatar
wmmhello 已提交
1590 1591
    sql++;
  }
wmmhello's avatar
wmmhello 已提交
1592
  elements->measureLen = sql - elements->measure;
1593
  if (unlikely(IS_INVALID_TABLE_LEN(elements->measureLen))) {
1594
    smlBuildInvalidDataMsg(&info->msgBuf, "measure is empty or too large than 192", NULL);
wmmhello's avatar
wmmhello 已提交
1595
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
wmmhello's avatar
wmmhello 已提交
1596
  }
wmmhello's avatar
wmmhello 已提交
1597

1598 1599 1600 1601 1602 1603 1604 1605 1606 1607
  // to get measureTagsLen before
  const char* tmp = sql;
  while (tmp < sqlEnd){
    if (IS_SPACE(tmp)) {
      break;
    }
    tmp++;
  }
  elements->measureTagsLen = tmp - elements->measure;

1608 1609 1610 1611 1612 1613 1614 1615
  bool isSameCTable = false;
  bool isSameMeasure = false;
  if(IS_SAME_CHILD_TABLE){
    isSameCTable = true;
    isSameMeasure = true;
  }else if(info->dataFormat) {
    isSameMeasure = IS_SAME_SUPER_TABLE;
  }
wmmhello's avatar
wmmhello 已提交
1616
  // parse tag
X
Xiaoyu Wang 已提交
1617
  if (*sql == SPACE) {
wmmhello's avatar
wmmhello 已提交
1618
    elements->tagsLen = 0;
X
Xiaoyu Wang 已提交
1619 1620
  } else {
    if (*sql == COMMA) sql++;
wmmhello's avatar
wmmhello 已提交
1621
    elements->tags = sql;
1622 1623

    // tinfo != NULL means child table has never occur before
1624 1625
    int ret = smlParseTagKv(info, &sql, sqlEnd, elements, isSameMeasure, isSameCTable);
    if(unlikely(ret != TSDB_CODE_SUCCESS)){
1626
      return ret;
wmmhello's avatar
wmmhello 已提交
1627
    }
1628
    if(unlikely(info->reRun)){
1629 1630 1631
      return TSDB_CODE_SUCCESS;
    }

1632 1633
    sql = elements->measure + elements->measureTagsLen;

wmmhello's avatar
wmmhello 已提交
1634
    elements->tagsLen = sql - elements->tags;
1635
  }
wmmhello's avatar
wmmhello 已提交
1636

wmmhello's avatar
wmmhello 已提交
1637
  // parse cols
wmmhello's avatar
wmmhello 已提交
1638
  JUMP_SPACE(sql, sqlEnd)
wmmhello's avatar
wmmhello 已提交
1639
  elements->cols = sql;
1640

1641 1642
  int ret = smlParseColKv(info, &sql, sqlEnd, elements, isSameMeasure, isSameCTable);
  if(unlikely(ret != TSDB_CODE_SUCCESS)){
1643
    return ret;
wmmhello's avatar
wmmhello 已提交
1644
  }
1645

1646
  if(unlikely(info->reRun)){
1647
    return TSDB_CODE_SUCCESS;
1648
  }
1649

wmmhello's avatar
wmmhello 已提交
1650
  elements->colsLen = sql - elements->cols;
1651
  if (unlikely(elements->colsLen == 0)) {
1652
    smlBuildInvalidDataMsg(&info->msgBuf, "cols is empty", NULL);
wmmhello's avatar
wmmhello 已提交
1653 1654
    return TSDB_CODE_SML_INVALID_DATA;
  }
wmmhello's avatar
wmmhello 已提交
1655

wmmhello's avatar
wmmhello 已提交
1656
  // parse timestamp
wmmhello's avatar
wmmhello 已提交
1657
  JUMP_SPACE(sql, sqlEnd)
wmmhello's avatar
wmmhello 已提交
1658
  elements->timestamp = sql;
wmmhello's avatar
wmmhello 已提交
1659 1660
  while (sql < sqlEnd) {
    if (isspace(*sql)) {
wmmhello's avatar
wmmhello 已提交
1661 1662 1663 1664
      break;
    }
    sql++;
  }
wmmhello's avatar
wmmhello 已提交
1665
  elements->timestampLen = sql - elements->timestamp;
wmmhello's avatar
wmmhello 已提交
1666

1667 1668 1669 1670
  int64_t ts = smlParseTS(info, elements->timestamp, elements->timestampLen);
  if (ts <= 0) {
    uError("SML:0x%" PRIx64 " smlParseTS error:%" PRId64, info->id, ts);
    return TSDB_CODE_INVALID_TIMESTAMP;
1671
  }
1672 1673
  // add ts to
  SSmlKv kv = { .key = TS, .keyLen = TS_LEN, .i = ts, .type = TSDB_DATA_TYPE_TIMESTAMP, .length = (int16_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
1674
  if(info->dataFormat){
1675
    smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0);
1676
    smlBuildRow(info->currTableDataCtx);
1677 1678
  }else{
    taosArraySet(elements->colArray, 0, &kv);
1679
  }
1680
  info->preLine = *elements;
1681

1682
  return ret;
wmmhello's avatar
wmmhello 已提交
1683 1684
}

1685
static void smlParseTelnetElement(char **sql, char *sqlEnd, char **data, int32_t *len) {
wmmhello's avatar
wmmhello 已提交
1686
  while (*sql < sqlEnd) {
1687
    if (unlikely((**sql != SPACE && !(*data)))) {
1688
      *data = *sql;
1689
    } else if (unlikely(**sql == SPACE && *data)) {
1690 1691 1692 1693 1694 1695 1696
      *len = *sql - *data;
      break;
    }
    (*sql)++;
  }
}

1697
static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SSmlLineInfo *elements, SSmlMsgBuf *msg) {
1698
  if(is_same_child_table_telnet(elements, &info->preLine) == 0){
1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734
    return TSDB_CODE_SUCCESS;
  }

  bool isSameMeasure = IS_SAME_SUPER_TABLE;

  int     cnt = 0;
  SArray *preLineKV = info->preLineTagKV;
  bool    isSuperKVInit = true;
  SArray *superKV = NULL;
  if(info->dataFormat){
    if(!isSameMeasure){
      SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);

      if(unlikely(sMeta == NULL)){
        sMeta = smlBuildSTableMeta(info->dataFormat);
        STableMeta * pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen);
        sMeta->tableMeta = pTableMeta;
        if(pTableMeta == NULL){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
        nodeListSet(&info->superTables, elements->measure, elements->measureLen, sMeta);
      }
      info->currSTableMeta = sMeta->tableMeta;
      superKV = sMeta->tags;

      if(unlikely(taosArrayGetSize(superKV) == 0)){
        isSuperKVInit = false;
      }
      taosArraySetSize(preLineKV, 0);
    }
  }else{
    taosArraySetSize(preLineKV, 0);
  }

wmmhello's avatar
wmmhello 已提交
1735
  const char *sql = data;
wmmhello's avatar
wmmhello 已提交
1736 1737
  while (sql < sqlEnd) {
    JUMP_SPACE(sql, sqlEnd)
1738
    if (unlikely(*sql == '\0')) break;
wmmhello's avatar
wmmhello 已提交
1739

wmmhello's avatar
wmmhello 已提交
1740
    const char *key = sql;
X
Xiaoyu Wang 已提交
1741
    int32_t     keyLen = 0;
wmmhello's avatar
wmmhello 已提交
1742 1743

    // parse key
wmmhello's avatar
wmmhello 已提交
1744
    while (sql < sqlEnd) {
1745
      if (unlikely(*sql == SPACE)) {
wmmhello's avatar
wmmhello 已提交
1746 1747 1748
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
1749
      if (unlikely(*sql == EQUAL)) {
wmmhello's avatar
wmmhello 已提交
1750 1751
        keyLen = sql - key;
        sql++;
1752 1753
        break;
      }
wmmhello's avatar
wmmhello 已提交
1754
      sql++;
1755
    }
wmmhello's avatar
wmmhello 已提交
1756

1757
    if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
1758
      smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
wmmhello's avatar
wmmhello 已提交
1759
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
1760
    }
1761 1762 1763 1764
//    if (smlCheckDuplicateKey(key, keyLen, dumplicateKey)) {
//      smlBuildInvalidDataMsg(msg, "dumplicate key", key);
//      return TSDB_CODE_TSC_DUP_NAMES;
//    }
1765 1766

    // parse value
wmmhello's avatar
wmmhello 已提交
1767
    const char *value = sql;
X
Xiaoyu Wang 已提交
1768
    int32_t     valueLen = 0;
wmmhello's avatar
wmmhello 已提交
1769
    while (sql < sqlEnd) {
wmmhello's avatar
wmmhello 已提交
1770
      // parse value
1771
      if (unlikely(*sql == SPACE)) {
1772 1773
        break;
      }
1774
      if (unlikely(*sql == EQUAL)) {
wmmhello's avatar
wmmhello 已提交
1775 1776 1777 1778
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      sql++;
1779
    }
wmmhello's avatar
wmmhello 已提交
1780
    valueLen = sql - value;
wmmhello's avatar
wmmhello 已提交
1781

1782
    if (unlikely(valueLen == 0)) {
1783
      smlBuildInvalidDataMsg(msg, "invalid value", value);
wmmhello's avatar
wmmhello 已提交
1784
      return TSDB_CODE_TSC_INVALID_VALUE;
1785
    }
wmmhello's avatar
wmmhello 已提交
1786

1787
    if (unlikely(valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)) {
wmmhello's avatar
wmmhello 已提交
1788 1789 1790
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }

1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832
    SSmlKv kv = {.key = key, .keyLen = keyLen, .value = value, .length = valueLen, .type = TSDB_DATA_TYPE_NCHAR};

    if(info->dataFormat){
      if(unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)){
        info->needModifySchema = true;
      }

      if(isSameMeasure){
        if(unlikely(cnt >= taosArrayGetSize(preLineKV))) {
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
        SSmlKv *preKV = taosArrayGet(preLineKV, cnt);
        if(unlikely(kv.length > preKV->length)){
          preKV->length = kv.length;
          SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
          ASSERT(tableMeta != NULL);

          SSmlKv *oldKV = taosArrayGet(tableMeta->tags, cnt);
          oldKV->length = kv.length;
          info->needModifySchema = true;
        }
        if(unlikely(!IS_SAME_KEY)){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
      }else{
        if(isSuperKVInit){
          if(unlikely(cnt >= taosArrayGetSize(superKV))) {
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }
          SSmlKv *preKV = taosArrayGet(superKV, cnt);
          if(unlikely(kv.length > preKV->length)) {
            preKV->length = kv.length;
          }else{
            kv.length = preKV->length;
          }
          info->needModifySchema = true;
1833

1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847
          if(unlikely(!IS_SAME_KEY)){
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }
        }else{
          taosArrayPush(superKV, &kv);
        }
        taosArrayPush(preLineKV, &kv);
      }
    }else{
      taosArrayPush(preLineKV, &kv);
    }
    cnt++;
1848
  }
1849
  SSmlTableInfo *tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements, POINTER_BYTES, is_same_child_table_telnet);
1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866
  if (unlikely(tinfo == NULL)) {
    tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen);
    if (!tinfo) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    for(int i = 0; i < taosArrayGetSize(preLineKV); i++){
      taosArrayPush(tinfo->tags, taosArrayGet(preLineKV, i));
    }
    smlSetCTableName(tinfo);
    if (info->dataFormat) {
      info->currSTableMeta->uid = tinfo->uid;
      tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
      if (tinfo->tableDataCtx == NULL) {
        smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
        return TSDB_CODE_SML_INVALID_DATA;
      }
    }
1867

1868 1869 1870 1871
    SSmlLineInfo *key = taosMemoryMalloc(sizeof(SSmlLineInfo));
    *key = *elements;
    tinfo->key = key;
    nodeListSet(&info->childTables, key, POINTER_BYTES, tinfo);
1872
  }
1873 1874
  info->currTableDataCtx = tinfo->tableDataCtx;

1875 1876
  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
1877

1878
// format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
1879
static int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements) {
X
Xiaoyu Wang 已提交
1880
  if (!sql) return TSDB_CODE_SML_INVALID_DATA;
1881 1882

  // parse metric
1883 1884
  smlParseTelnetElement(&sql, sqlEnd, &elements->measure, &elements->measureLen);
  if (unlikely((!(elements->measure) || IS_INVALID_TABLE_LEN(elements->measureLen)))) {
1885
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
wmmhello's avatar
wmmhello 已提交
1886
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
1887 1888 1889
  }

  // parse timestamp
1890 1891
  smlParseTelnetElement(&sql, sqlEnd, &elements->timestamp, &elements->timestampLen);
  if (unlikely(!elements->timestamp || elements->timestampLen == 0)) {
1892 1893 1894 1895
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
    return TSDB_CODE_SML_INVALID_DATA;
  }

1896 1897 1898 1899 1900 1901
  bool needConverTime = false;  // get TS before parse tag(get meta), so need conver time
  if(info->dataFormat && info->currSTableMeta == NULL){
    needConverTime = true;
  }
  int64_t ts = smlParseTS(info, elements->timestamp, elements->timestampLen);
  if (unlikely(ts < 0)) {
1902
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
1903
    return TSDB_CODE_INVALID_TIMESTAMP;
1904
  }
1905
  SSmlKv kvTs = { .key = TS, .keyLen = TS_LEN, .i = ts, .type = TSDB_DATA_TYPE_TIMESTAMP, .length = (int16_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
1906 1907

  // parse value
1908 1909
  smlParseTelnetElement(&sql, sqlEnd, &elements->cols, &elements->colsLen);
  if (unlikely(!elements->cols || elements->colsLen == 0)) {
1910 1911 1912
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql);
    return TSDB_CODE_TSC_INVALID_VALUE;
  }
1913

1914 1915 1916 1917 1918
  SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN, .value = elements->cols, .length = elements->colsLen};
  if (smlParseNumber(&kv, &info->msgBuf)) {
    kv.length = (int16_t)tDataTypes[kv.type].bytes;
  }else{
    return TSDB_CODE_TSC_INVALID_VALUE;
1919
  }
1920
  JUMP_SPACE(sql, sqlEnd)
1921

1922 1923
  elements->tags = sql;
  elements->tagsLen = sqlEnd - sql;
1924 1925 1926

  int ret = smlParseTelnetTags(info, sql, sqlEnd, elements, &info->msgBuf);
  if (unlikely(ret != TSDB_CODE_SUCCESS)) {
1927
    return ret;
wmmhello's avatar
wmmhello 已提交
1928
  }
wmmhello's avatar
wmmhello 已提交
1929

1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949
  if(unlikely(info->reRun)){
    return TSDB_CODE_SUCCESS;
  }

  if(info->dataFormat){
    if(needConverTime) {
      kvTs.i = convertTimePrecision(kvTs.i, TSDB_TIME_PRECISION_NANO, info->currSTableMeta->tableInfo.precision);
    }
    ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kvTs, 0);
    if(ret == TSDB_CODE_SUCCESS){
      ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 1);
    }
    if(ret == TSDB_CODE_SUCCESS){
      ret = smlBuildRow(info->currTableDataCtx);
    }
    if (unlikely(ret != TSDB_CODE_SUCCESS)) {
      smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
      return ret;
    }
  }else{
1950 1951 1952
    if(elements->colArray == NULL){
      elements->colArray = taosArrayInit(16, sizeof(SSmlKv));
    }
1953 1954 1955 1956 1957
    taosArrayPush(elements->colArray, &kvTs);
    taosArrayPush(elements->colArray, &kv);
  }
  info->preLine = *elements;

wmmhello's avatar
wmmhello 已提交
1958 1959 1960
  return TSDB_CODE_SUCCESS;
}

1961
static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, bool isTag, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
1962
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
1963
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
wmmhello's avatar
wmmhello 已提交
1964

wmmhello's avatar
wmmhello 已提交
1965
    int16_t *index = (int16_t *)taosHashGet(metaHash, kv->key, kv->keyLen);
X
Xiaoyu Wang 已提交
1966
    if (index) {
1967 1968 1969 1970 1971 1972 1973 1974
      SSmlKv *value = (SSmlKv *)taosArrayGet(metaArray, *index);
      if (isTag){
        if (kv->length > value->length) {
          value->length = kv->length;
        }
        continue;
      }
      if (kv->type != value->type) {
wmmhello's avatar
wmmhello 已提交
1975
        smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
wmmhello's avatar
wmmhello 已提交
1976
        return TSDB_CODE_SML_NOT_SAME_TYPE;
1977 1978 1979 1980
      }

      if (IS_VAR_DATA_TYPE(kv->type) && (kv->length > value->length)) {  // update string len, if bigger
        value->length = kv->length;
1981
      }
X
Xiaoyu Wang 已提交
1982
    } else {
wmmhello's avatar
wmmhello 已提交
1983 1984 1985
      size_t tmp = taosArrayGetSize(metaArray);
      ASSERT(tmp <= INT16_MAX);
      int16_t size = tmp;
1986
      taosArrayPush(metaArray, kv);
wmmhello's avatar
wmmhello 已提交
1987
      taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
1988
    }
wmmhello's avatar
wmmhello 已提交
1989
  }
wmmhello's avatar
wmmhello 已提交
1990

wmmhello's avatar
wmmhello 已提交
1991
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1992 1993
}

wmmhello's avatar
wmmhello 已提交
1994
static void smlDestroyTableInfo(SSmlTableInfo *tag) {
1995 1996 1997
  for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
    SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i);
    taosHashCleanup(kvHash);
wmmhello's avatar
wmmhello 已提交
1998
  }
1999

2000
  taosMemoryFree(tag->key);
2001 2002 2003 2004 2005
  taosArrayDestroy(tag->cols);
  taosArrayDestroy(tag->tags);
  taosMemoryFree(tag);
}

2006
static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
2007
  SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
2008
  if (!kvHash) {
2009
    uError("SML:smlDealCols failed to allocate memory");
2010
    return TSDB_CODE_OUT_OF_MEMORY;
2011
  }
X
Xiaoyu Wang 已提交
2012
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
2013
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
wmmhello's avatar
wmmhello 已提交
2014
    taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
2015 2016
  }

2017
  taosArrayPush(colsArray, &kvHash);
2018 2019 2020
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
2021 2022
void smlDestroyInfo(SSmlHandle *info) {
  if (!info) return;
2023 2024 2025
  qDestroyQuery(info->pQuery);

  // destroy info->childTables
wmmhello's avatar
wmmhello 已提交
2026 2027 2028 2029 2030 2031 2032 2033
  NodeList* tmp = info->childTables;
  while (tmp) {
    if(tmp->data.used) {
      smlDestroyTableInfo(tmp->data.value);
    }
    NodeList* t = tmp->next;
    taosMemoryFree(tmp);
    tmp = t;
2034 2035 2036
  }

  // destroy info->superTables
wmmhello's avatar
wmmhello 已提交
2037 2038 2039 2040 2041 2042 2043 2044
  tmp = info->superTables;
  while (tmp) {
    if(tmp->data.used) {
      smlDestroySTableMeta(tmp->data.value);
    }
    NodeList* t = tmp->next;
    taosMemoryFree(tmp);
    tmp = t;
2045 2046 2047 2048
  }

  // destroy info->pVgHash
  taosHashCleanup(info->pVgHash);
wmmhello's avatar
wmmhello 已提交
2049

2050 2051 2052
  taosArrayDestroy(info->preLineTagKV);
  taosArrayDestroy(info->preLineColKV);

wmmhello's avatar
wmmhello 已提交
2053 2054 2055 2056
  if(!info->dataFormat){
    for(int i = 0; i < info->lineNum; i++){
      taosArrayDestroy(info->lines[i].colArray);
    }
wmmhello's avatar
wmmhello 已提交
2057
    taosMemoryFree(info->lines);
wmmhello's avatar
wmmhello 已提交
2058
  }
wmmhello's avatar
wmmhello 已提交
2059

wmmhello's avatar
wmmhello 已提交
2060
  cJSON_Delete(info->root);
wmmhello's avatar
wmmhello 已提交
2061
  taosMemoryFreeClear(info);
wmmhello's avatar
wmmhello 已提交
2062
}
2063

wmmhello's avatar
wmmhello 已提交
2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075
static SSmlHandle *smlBuildSmlInfo(TAOS *taos) {
  int32_t     code = TSDB_CODE_SUCCESS;
  SSmlHandle *info = (SSmlHandle *)taosMemoryCalloc(1, sizeof(SSmlHandle));
  if (NULL == info) {
    return NULL;
  }
  info->taos = acquireTscObj(*(int64_t *)taos);
  code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code);
    goto cleanup;
  }
X
Xiaoyu Wang 已提交
2076
  info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
wmmhello's avatar
wmmhello 已提交
2077 2078 2079
  info->id = smlGenId();
  info->pQuery = smlInitHandle();
  info->dataFormat = true;
2080

2081 2082 2083
  info->preLineTagKV = taosArrayInit(8, sizeof(SSmlKv));
  info->preLineColKV = taosArrayInit(8, sizeof(SSmlKv));

wmmhello's avatar
wmmhello 已提交
2084 2085
  if (NULL == info->pVgHash) {
    uError("create SSmlHandle failed");
2086 2087 2088 2089
    goto cleanup;
  }

  return info;
wmmhello's avatar
wmmhello 已提交
2090 2091

cleanup:
2092 2093 2094 2095 2096
  smlDestroyInfo(info);
  return NULL;
}

/************* TSDB_SML_JSON_PROTOCOL function start **************/
2097
static int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) {
2098 2099
  cJSON *metric = cJSON_GetObjectItem(root, "metric");
  if (!cJSON_IsString(metric)) {
X
Xiaoyu Wang 已提交
2100
    return TSDB_CODE_TSC_INVALID_JSON;
2101 2102
  }

2103 2104
  elements->measureLen = strlen(metric->valuestring);
  if (IS_INVALID_TABLE_LEN(elements->measureLen)) {
X
Xiaoyu Wang 已提交
2105
    uError("OTD:0x%" PRIx64 " Metric lenght is 0 or large than 192", info->id);
2106 2107 2108
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
  }

2109
  elements->measure = metric->valuestring;
wmmhello's avatar
wmmhello 已提交
2110
  return TSDB_CODE_SUCCESS;
2111 2112
}

2113
static int64_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int32_t toPrecision) {
2114
  int32_t size = cJSON_GetArraySize(root);
2115 2116 2117
  if (unlikely(size != OTD_JSON_SUB_FIELDS_NUM)) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
    return -1;
2118 2119 2120
  }

  cJSON *value = cJSON_GetObjectItem(root, "value");
2121 2122 2123
  if (unlikely(!cJSON_IsNumber(value))) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
    return -1;
2124 2125 2126
  }

  cJSON *type = cJSON_GetObjectItem(root, "type");
2127 2128 2129
  if (unlikely(!cJSON_IsString(type))) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
    return -1;
2130 2131 2132
  }

  double timeDouble = value->valuedouble;
2133
  if (unlikely(smlDoubleToInt64OverFlow(timeDouble))) {
2134
    smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
2135
    return -1;
2136
  }
wmmhello's avatar
wmmhello 已提交
2137 2138

  if (timeDouble == 0) {
2139
    return taosGetTimestampNs()/smlFactorNS[toPrecision];
wmmhello's avatar
wmmhello 已提交
2140 2141 2142
  }

  if (timeDouble < 0) {
2143
    return timeDouble;
2144 2145
  }

2146
  int64_t tsInt64 = timeDouble;
2147
  size_t typeLen = strlen(type->valuestring);
wmmhello's avatar
wmmhello 已提交
2148
  if (typeLen == 1 && (type->valuestring[0] == 's' || type->valuestring[0] == 'S')) {
X
Xiaoyu Wang 已提交
2149
    // seconds
2150 2151 2152
    int8_t fromPrecision = TSDB_TIME_PRECISION_SECONDS;
    if(smlFactorS[toPrecision] < INT64_MAX / tsInt64){
      return tsInt64 * smlFactorS[toPrecision];
2153
    }
2154
    return -1;
wmmhello's avatar
wmmhello 已提交
2155
  } else if (typeLen == 2 && (type->valuestring[1] == 's' || type->valuestring[1] == 'S')) {
2156 2157
    switch (type->valuestring[0]) {
      case 'm':
wmmhello's avatar
wmmhello 已提交
2158
      case 'M':
X
Xiaoyu Wang 已提交
2159
        // milliseconds
2160
        return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_MILLI, toPrecision);
2161 2162
        break;
      case 'u':
wmmhello's avatar
wmmhello 已提交
2163
      case 'U':
X
Xiaoyu Wang 已提交
2164
        // microseconds
2165
        return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_MICRO, toPrecision);
2166 2167
        break;
      case 'n':
wmmhello's avatar
wmmhello 已提交
2168
      case 'N':
2169
        return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_NANO, toPrecision);
2170 2171
        break;
      default:
2172
        return -1;
2173 2174
    }
  } else {
2175
    return -1;
2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187
  }
}

static uint8_t smlGetTimestampLen(int64_t num) {
  uint8_t len = 0;
  while ((num /= 10) != 0) {
    len++;
  }
  len++;
  return len;
}

2188
static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *root) {
X
Xiaoyu Wang 已提交
2189
  // Timestamp must be the first KV to parse
2190
  int32_t toPrecision = info->currSTableMeta ? info->currSTableMeta->tableInfo.precision : TSDB_TIME_PRECISION_NANO;
2191 2192
  cJSON *timestamp = cJSON_GetObjectItem(root, "timestamp");
  if (cJSON_IsNumber(timestamp)) {
X
Xiaoyu Wang 已提交
2193
    // timestamp value 0 indicates current system time
2194
    double timeDouble = timestamp->valuedouble;
2195
    if (unlikely(smlDoubleToInt64OverFlow(timeDouble))) {
2196
      smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
2197
      return -1;
2198
    }
wmmhello's avatar
wmmhello 已提交
2199

2200 2201 2202 2203 2204 2205
    if (unlikely(timeDouble < 0)) {
      smlBuildInvalidDataMsg(&info->msgBuf,
                             "timestamp is negative", NULL);
      return timeDouble;
    }else if (unlikely(timeDouble == 0)) {
      return taosGetTimestampNs()/smlFactorNS[toPrecision];
2206
    }
2207

2208
    uint8_t tsLen = smlGetTimestampLen((int64_t)timeDouble);
2209 2210 2211 2212 2213
    int8_t fromPrecision = smlGetTsTypeByLen(tsLen);
    if (unlikely(fromPrecision == -1)) {
      smlBuildInvalidDataMsg(&info->msgBuf,
                             "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", NULL);
      return -1;
2214
    }
2215 2216 2217 2218 2219 2220 2221 2222 2223
    int64_t tsInt64 = timeDouble;
    if(fromPrecision == TSDB_TIME_PRECISION_SECONDS){
      if(smlFactorS[toPrecision] < INT64_MAX / tsInt64){
        return tsInt64 * smlFactorS[toPrecision];
      }
      return -1;
    }else{
      return convertTimePrecision(timeDouble, fromPrecision, toPrecision);
    }
2224
  } else if (cJSON_IsObject(timestamp)) {
2225
    return smlParseTSFromJSONObj(info, timestamp, toPrecision);
2226
  } else {
2227 2228 2229
    smlBuildInvalidDataMsg(&info->msgBuf,
                           "invalidate json", NULL);
    return -1;
wmmhello's avatar
wmmhello 已提交
2230
  }
wmmhello's avatar
wmmhello 已提交
2231 2232
}

X
Xiaoyu Wang 已提交
2233
static int32_t smlConvertJSONBool(SSmlKv *pVal, char *typeStr, cJSON *value) {
2234 2235 2236 2237 2238 2239 2240
  if (strcasecmp(typeStr, "bool") != 0) {
    uError("OTD:invalid type(%s) for JSON Bool", typeStr);
    return TSDB_CODE_TSC_INVALID_JSON_TYPE;
  }
  pVal->type = TSDB_DATA_TYPE_BOOL;
  pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
  pVal->i = value->valueint;
wmmhello's avatar
wmmhello 已提交
2241

2242 2243
  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
2244

X
Xiaoyu Wang 已提交
2245 2246 2247
static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
  // tinyint
  if (strcasecmp(typeStr, "i8") == 0 || strcasecmp(typeStr, "tinyint") == 0) {
2248 2249 2250 2251 2252 2253 2254 2255 2256
    if (!IS_VALID_TINYINT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(tinyint)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_TINYINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
2257 2258
  // smallint
  if (strcasecmp(typeStr, "i16") == 0 || strcasecmp(typeStr, "smallint") == 0) {
2259 2260 2261 2262 2263 2264 2265 2266 2267
    if (!IS_VALID_SMALLINT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(smallint)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_SMALLINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
2268 2269
  // int
  if (strcasecmp(typeStr, "i32") == 0 || strcasecmp(typeStr, "int") == 0) {
2270 2271 2272 2273 2274 2275 2276 2277 2278
    if (!IS_VALID_INT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(int)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_INT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
2279 2280
  // bigint
  if (strcasecmp(typeStr, "i64") == 0 || strcasecmp(typeStr, "bigint") == 0) {
2281 2282
    pVal->type = TSDB_DATA_TYPE_BIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
X
Xiaoyu Wang 已提交
2283
    if (value->valuedouble >= (double)INT64_MAX) {
2284
      pVal->i = INT64_MAX;
X
Xiaoyu Wang 已提交
2285
    } else if (value->valuedouble <= (double)INT64_MIN) {
2286
      pVal->i = INT64_MIN;
X
Xiaoyu Wang 已提交
2287
    } else {
2288
      pVal->i = value->valuedouble;
2289 2290 2291
    }
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
2292 2293
  // float
  if (strcasecmp(typeStr, "f32") == 0 || strcasecmp(typeStr, "float") == 0) {
2294 2295 2296
    if (!IS_VALID_FLOAT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(float)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
wmmhello's avatar
wmmhello 已提交
2297
    }
2298 2299 2300 2301 2302
    pVal->type = TSDB_DATA_TYPE_FLOAT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->f = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
2303 2304
  // double
  if (strcasecmp(typeStr, "f64") == 0 || strcasecmp(typeStr, "double") == 0) {
2305 2306 2307 2308
    pVal->type = TSDB_DATA_TYPE_DOUBLE;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->d = value->valuedouble;
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2309 2310
  }

X
Xiaoyu Wang 已提交
2311
  // if reach here means type is unsupported
2312 2313 2314
  uError("OTD:invalid type(%s) for JSON Number", typeStr);
  return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
2315

X
Xiaoyu Wang 已提交
2316
static int32_t smlConvertJSONString(SSmlKv *pVal, char *typeStr, cJSON *value) {
2317 2318 2319 2320 2321 2322 2323 2324 2325
  if (strcasecmp(typeStr, "binary") == 0) {
    pVal->type = TSDB_DATA_TYPE_BINARY;
  } else if (strcasecmp(typeStr, "nchar") == 0) {
    pVal->type = TSDB_DATA_TYPE_NCHAR;
  } else {
    uError("OTD:invalid type(%s) for JSON String", typeStr);
    return TSDB_CODE_TSC_INVALID_JSON_TYPE;
  }
  pVal->length = (int16_t)strlen(value->valuestring);
wmmhello's avatar
wmmhello 已提交
2326

2327
  if (pVal->type == TSDB_DATA_TYPE_BINARY && pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
wmmhello's avatar
wmmhello 已提交
2328 2329
    return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
  }
2330 2331
  if (pVal->type == TSDB_DATA_TYPE_NCHAR &&
      pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
wmmhello's avatar
wmmhello 已提交
2332 2333 2334
    return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
  }

wmmhello's avatar
wmmhello 已提交
2335 2336
  pVal->value = value->valuestring;
  return TSDB_CODE_SUCCESS;
2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362
}

static int32_t smlParseValueFromJSONObj(cJSON *root, SSmlKv *kv) {
  int32_t ret = TSDB_CODE_SUCCESS;
  int32_t size = cJSON_GetArraySize(root);

  if (size != OTD_JSON_SUB_FIELDS_NUM) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *value = cJSON_GetObjectItem(root, "value");
  if (value == NULL) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *type = cJSON_GetObjectItem(root, "type");
  if (!cJSON_IsString(type)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  switch (value->type) {
    case cJSON_True:
    case cJSON_False: {
      ret = smlConvertJSONBool(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
wmmhello's avatar
wmmhello 已提交
2363
      }
2364
      break;
wmmhello's avatar
wmmhello 已提交
2365
    }
2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381
    case cJSON_Number: {
      ret = smlConvertJSONNumber(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
      break;
    }
    case cJSON_String: {
      ret = smlConvertJSONString(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
      break;
    }
    default:
      return TSDB_CODE_TSC_INVALID_JSON_TYPE;
wmmhello's avatar
wmmhello 已提交
2382
  }
2383 2384

  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2385 2386
}

2387 2388 2389 2390 2391 2392 2393 2394
static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
  switch (root->type) {
    case cJSON_True:
    case cJSON_False: {
      kv->type = TSDB_DATA_TYPE_BOOL;
      kv->length = (int16_t)tDataTypes[kv->type].bytes;
      kv->i = root->valueint;
      break;
wmmhello's avatar
wmmhello 已提交
2395
    }
2396 2397 2398 2399 2400 2401 2402 2403 2404 2405
    case cJSON_Number: {
      kv->type = TSDB_DATA_TYPE_DOUBLE;
      kv->length = (int16_t)tDataTypes[kv->type].bytes;
      kv->d = root->valuedouble;
      break;
    }
    case cJSON_String: {
      /* set default JSON type to binary/nchar according to
       * user configured parameter tsDefaultJSONStrType
       */
wmmhello's avatar
wmmhello 已提交
2406

X
Xiaoyu Wang 已提交
2407
      char *tsDefaultJSONStrType = "nchar";  // todo
2408 2409
      smlConvertJSONString(kv, tsDefaultJSONStrType, root);
      break;
wmmhello's avatar
wmmhello 已提交
2410
    }
2411 2412 2413 2414 2415 2416 2417 2418 2419 2420
    case cJSON_Object: {
      int32_t ret = smlParseValueFromJSONObj(root, kv);
      if (ret != TSDB_CODE_SUCCESS) {
        uError("OTD:Failed to parse value from JSON Obj");
        return ret;
      }
      break;
    }
    default:
      return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2421
  }
2422 2423 2424 2425

  return TSDB_CODE_SUCCESS;
}

2426
static int32_t smlParseColsFromJSON(cJSON *root, SSmlKv *kv) {
2427 2428 2429 2430 2431
  cJSON *metricVal = cJSON_GetObjectItem(root, "value");
  if (metricVal == NULL) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

2432
  int32_t ret = smlParseValueFromJSON(metricVal, kv);
2433 2434 2435
  if (ret != TSDB_CODE_SUCCESS) {
    return ret;
  }
2436

2437
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2438 2439
}

2440
static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) {
2441
  int32_t ret = TSDB_CODE_SUCCESS;
2442

2443
  cJSON *tags = cJSON_GetObjectItem(root, "tags");
2444
  if (unlikely(tags == NULL || tags->type != cJSON_Object)) {
2445 2446
    return TSDB_CODE_TSC_INVALID_JSON;
  }
wmmhello's avatar
wmmhello 已提交
2447

2448 2449 2450 2451 2452
  // add measure to tags to identify one child table
  cJSON *cMeasure = cJSON_AddStringToObject(tags, JSON_METERS_NAME, elements->measure);
  if(unlikely(cMeasure == NULL)){
    return TSDB_CODE_TSC_INVALID_JSON;
  }
2453
  elements->tags = (char*)tags;
2454
  if(is_same_child_table_json(elements->tags, info->preLine.tags) == 0){
2455
    cJSON_DeleteItemFromObjectCaseSensitive(tags, JSON_METERS_NAME);
2456 2457
    return TSDB_CODE_SUCCESS;
  }
2458
  cJSON_DeleteItemFromObjectCaseSensitive(tags, JSON_METERS_NAME);
2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492

  bool isSameMeasure = IS_SAME_SUPER_TABLE;

  int     cnt = 0;
  SArray *preLineKV = info->preLineTagKV;
  bool    isSuperKVInit = true;
  SArray *superKV = NULL;
  if(info->dataFormat){
    if(unlikely(!isSameMeasure)){
      SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);

      if(unlikely(sMeta == NULL)){
        sMeta = smlBuildSTableMeta(info->dataFormat);
        STableMeta * pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen);
        sMeta->tableMeta = pTableMeta;
        if(pTableMeta == NULL){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
        nodeListSet(&info->superTables, elements->measure, elements->measureLen, sMeta);
      }
      info->currSTableMeta = sMeta->tableMeta;
      superKV = sMeta->tags;

      if(unlikely(taosArrayGetSize(superKV) == 0)){
        isSuperKVInit = false;
      }
      taosArraySetSize(preLineKV, 0);
    }
  }else{
    taosArraySetSize(preLineKV, 0);
  }

2493 2494 2495
  int32_t tagNum = cJSON_GetArraySize(tags);
  for (int32_t i = 0; i < tagNum; ++i) {
    cJSON *tag = cJSON_GetArrayItem(tags, i);
2496
    if (unlikely(tag == NULL)) {
2497
      return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2498
    }
2499
    if(unlikely(tag == cMeasure)) continue;
2500
    size_t keyLen = strlen(tag->string);
2501
    if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
2502 2503 2504 2505
      uError("OTD:Tag key length is 0 or too large than 64");
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
    }

2506
    // add kv to SSmlKv
2507
    SSmlKv kv ={.key = tag->string, .keyLen = keyLen};
X
Xiaoyu Wang 已提交
2508
    // value
2509
    ret = smlParseValueFromJSON(tag, &kv);
2510
    if (unlikely(ret != TSDB_CODE_SUCCESS)) {
2511 2512
      return ret;
    }
2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570

    if(info->dataFormat){
      if(unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)){
        info->needModifySchema = true;
      }

      if(isSameMeasure){
        if(unlikely(cnt >= taosArrayGetSize(preLineKV))) {
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
        SSmlKv *preKV = taosArrayGet(preLineKV, cnt);
        if(unlikely(kv.length > preKV->length)){
          preKV->length = kv.length;
          SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
          ASSERT(tableMeta != NULL);

          SSmlKv *oldKV = taosArrayGet(tableMeta->tags, cnt);
          oldKV->length = kv.length;
          info->needModifySchema = true;
        }
        if(unlikely(!IS_SAME_KEY)){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
      }else{
        if(isSuperKVInit){
          if(unlikely(cnt >= taosArrayGetSize(superKV))) {
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }
          SSmlKv *preKV = taosArrayGet(superKV, cnt);
          if(unlikely(kv.length > preKV->length)) {
            preKV->length = kv.length;
          }else{
            kv.length = preKV->length;
          }
          info->needModifySchema = true;

          if(unlikely(!IS_SAME_KEY)){
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }
        }else{
          taosArrayPush(superKV, &kv);
        }
        taosArrayPush(preLineKV, &kv);
      }
    }else{
      taosArrayPush(preLineKV, &kv);
    }
    cnt++;
  }

2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587
  SSmlTableInfo *tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements->tags, POINTER_BYTES, is_same_child_table_json);
  if (unlikely(tinfo == NULL)) {
    tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen);
    if (unlikely(!tinfo)) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    for (int i = 0; i < taosArrayGetSize(preLineKV); i++) {
      taosArrayPush(tinfo->tags, taosArrayGet(preLineKV, i));
    }
    smlSetCTableName(tinfo);
    if (info->dataFormat) {
      info->currSTableMeta->uid = tinfo->uid;
      tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
      if (tinfo->tableDataCtx == NULL) {
        smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
        return TSDB_CODE_SML_INVALID_DATA;
      }
2588
    }
wmmhello's avatar
wmmhello 已提交
2589

2590 2591 2592
    nodeListSet(&info->childTables, tags, POINTER_BYTES, tinfo);
  }
  info->currTableDataCtx = tinfo->tableDataCtx;
2593

2594
  return ret;
wmmhello's avatar
wmmhello 已提交
2595 2596
}

2597
static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) {
2598
  int32_t ret = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2599

2600
  int32_t size = cJSON_GetArraySize(root);
X
Xiaoyu Wang 已提交
2601
  // outmost json fields has to be exactly 4
2602
  if (unlikely(size != OTD_JSON_FIELDS_NUM)) {
X
Xiaoyu Wang 已提交
2603
    uError("OTD:0x%" PRIx64 " Invalid number of JSON fields in data point %d", info->id, size);
2604
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2605
  }
2606

X
Xiaoyu Wang 已提交
2607
  // Parse metric
2608 2609
  ret = smlParseMetricFromJSON(info, root, elements);
  if (unlikely(ret != TSDB_CODE_SUCCESS)) {
X
Xiaoyu Wang 已提交
2610
    uError("OTD:0x%" PRIx64 " Unable to parse metric from JSON payload", info->id);
2611
    return ret;
wmmhello's avatar
wmmhello 已提交
2612
  }
X
Xiaoyu Wang 已提交
2613
  uDebug("OTD:0x%" PRIx64 " Parse metric from JSON payload finished", info->id);
wmmhello's avatar
wmmhello 已提交
2614

X
Xiaoyu Wang 已提交
2615
  // Parse metric value
2616 2617 2618
  SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN};
  ret = smlParseColsFromJSON(root, &kv);
  if (unlikely(ret)) {
X
Xiaoyu Wang 已提交
2619
    uError("OTD:0x%" PRIx64 " Unable to parse metric value from JSON payload", info->id);
2620
    return ret;
2621
  }
X
Xiaoyu Wang 已提交
2622
  uDebug("OTD:0x%" PRIx64 " Parse metric value from JSON payload finished", info->id);
2623

2624 2625 2626
  // Parse tags
  ret = smlParseTagsFromJSON(info, root, elements);
  if (unlikely(ret)) {
X
Xiaoyu Wang 已提交
2627
    uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
2628
    return ret;
2629
  }
X
Xiaoyu Wang 已提交
2630
  uDebug("OTD:0x%" PRIx64 " Parse tags from JSON payload finished", info->id);
wmmhello's avatar
wmmhello 已提交
2631

2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658
  if(unlikely(info->reRun)){
    return TSDB_CODE_SUCCESS;
  }

  // Parse timestamp
  // notice!!! put ts back to tag to ensure get meta->precision
  int64_t ts = smlParseTSFromJSON(info, root);
  if (unlikely(ts < 0)) {
    uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id);
    return TSDB_CODE_INVALID_TIMESTAMP;
  }
  uDebug("OTD:0x%" PRIx64 " Parse timestamp from JSON payload finished", info->id);
  SSmlKv kvTs = { .key = TS, .keyLen = TS_LEN, .i = ts, .type = TSDB_DATA_TYPE_TIMESTAMP, .length = (int16_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};

  if(info->dataFormat){
    ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kvTs, 0);
    if(ret == TSDB_CODE_SUCCESS){
      ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 1);
    }
    if(ret == TSDB_CODE_SUCCESS){
      ret = smlBuildRow(info->currTableDataCtx);
    }
    if (unlikely(ret != TSDB_CODE_SUCCESS)) {
      smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
      return ret;
    }
  }else{
2659 2660 2661
    if(elements->colArray == NULL){
      elements->colArray = taosArrayInit(16, sizeof(SSmlKv));
    }
2662 2663 2664 2665 2666
    taosArrayPush(elements->colArray, &kvTs);
    taosArrayPush(elements->colArray, &kv);
  }
  info->preLine = *elements;

2667
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2668
}
2669
/************* TSDB_SML_JSON_PROTOCOL function end **************/
2670 2671 2672
static int32_t smlParseLineBottom(SSmlHandle *info) {
  if(info->dataFormat) return TSDB_CODE_SUCCESS;

wmmhello's avatar
wmmhello 已提交
2673 2674
  for(int32_t i = 0; i < info->lineNum; i ++){
    SSmlLineInfo* elements = info->lines + i;
2675
    SSmlTableInfo *tinfo = NULL;
2676
    if(info->protocol == TSDB_SML_LINE_PROTOCOL){
2677
      tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements->measure, elements->measureTagsLen, NULL);
2678 2679
    }else if(info->protocol == TSDB_SML_TELNET_PROTOCOL){
      tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements, POINTER_BYTES, is_same_child_table_telnet);
2680 2681 2682 2683
    }else{
      tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements->tags, POINTER_BYTES, is_same_child_table_json);
    }

wmmhello's avatar
wmmhello 已提交
2684
    if(tinfo == NULL){
2685 2686 2687
      uError("SML:0x%" PRIx64 "get oneTable failed, line num:%d", info->id, i);
      smlBuildInvalidDataMsg(&info->msgBuf, "get oneTable failed", elements->measure);
      return TSDB_CODE_SML_INVALID_DATA;
2688
    }
wmmhello's avatar
wmmhello 已提交
2689

2690 2691 2692 2693 2694
    if (taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS) {
      smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
      return TSDB_CODE_PAR_INVALID_TAGS_NUM;
    }

2695 2696 2697
    if (taosArrayGetSize(elements->colArray) + taosArrayGetSize(tinfo->tags) > TSDB_MAX_COLUMNS) {
      smlBuildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL);
      return TSDB_CODE_PAR_TOO_MANY_COLUMNS;
wmmhello's avatar
wmmhello 已提交
2698
    }
wmmhello's avatar
wmmhello 已提交
2699

2700
    int ret = smlPushCols(tinfo->cols, elements->colArray);
X
Xiaoyu Wang 已提交
2701
    if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2702 2703 2704
      return ret;
    }

2705
    SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
2706
    if (tableMeta) {  // update meta
wmmhello's avatar
wmmhello 已提交
2707
      ret = smlUpdateMeta(tableMeta->colHash, tableMeta->cols, elements->colArray, false, &info->msgBuf);
2708
      if (ret == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2709
        ret = smlUpdateMeta(tableMeta->tagHash, tableMeta->tags, tinfo->tags, true, &info->msgBuf);
2710 2711 2712 2713 2714
      }
      if (ret != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
        return ret;
      }
X
Xiaoyu Wang 已提交
2715
    } else {
2716 2717 2718 2719 2720
      ret = smlJudgeDupColName(elements->colArray, tinfo->tags, &info->msgBuf);
      if (ret != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
        return ret;
      }
wmmhello's avatar
wmmhello 已提交
2721

2722 2723 2724
      SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat);
      smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags);
      smlInsertMeta(meta->colHash, meta->cols, elements->colArray);
wmmhello's avatar
wmmhello 已提交
2725
      nodeListSet(&info->superTables, elements->measure, elements->measureLen, meta);
wmmhello's avatar
wmmhello 已提交
2726
    }
wmmhello's avatar
wmmhello 已提交
2727
  }
2728

wmmhello's avatar
wmmhello 已提交
2729 2730 2731
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
2732
static int32_t smlParseJSON(SSmlHandle *info, char *payload) {
2733 2734
  int32_t payloadNum = 0;
  int32_t ret = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2735

2736
  if (unlikely(payload == NULL)) {
X
Xiaoyu Wang 已提交
2737
    uError("SML:0x%" PRIx64 " empty JSON Payload", info->id);
2738
    return TSDB_CODE_TSC_INVALID_JSON;
2739
  }
2740

wmmhello's avatar
wmmhello 已提交
2741
  info->root = cJSON_Parse(payload);
2742
  if (unlikely(info->root == NULL)) {
X
Xiaoyu Wang 已提交
2743
    uError("SML:0x%" PRIx64 " parse json failed:%s", info->id, payload);
2744 2745
    return TSDB_CODE_TSC_INVALID_JSON;
  }
X
Xiaoyu Wang 已提交
2746
  // multiple data points must be sent in JSON array
2747
  if (cJSON_IsArray(info->root)) {
wmmhello's avatar
wmmhello 已提交
2748
    payloadNum = cJSON_GetArraySize(info->root);
2749 2750
  } else if (cJSON_IsObject(info->root)) {
    payloadNum = 1;
2751
  } else {
X
Xiaoyu Wang 已提交
2752
    uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
2753
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2754
  }
wmmhello's avatar
wmmhello 已提交
2755

2756 2757
  int32_t i = 0;
  while (i < payloadNum) {
wmmhello's avatar
wmmhello 已提交
2758
    cJSON *dataPoint = (payloadNum == 1 && cJSON_IsObject(info->root)) ? info->root : cJSON_GetArrayItem(info->root, i);
2759 2760 2761 2762 2763 2764 2765
    if(info->dataFormat) {
      SSmlLineInfo element = {0};
      ret = smlParseJSONString(info, dataPoint, &element);
    }else{
      ret = smlParseJSONString(info, dataPoint, info->lines + i);
    }
    if (unlikely(ret != TSDB_CODE_SUCCESS)) {
X
Xiaoyu Wang 已提交
2766
      uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798
      return ret;
    }

    if(unlikely(info->reRun)){
      i = 0;
      info->reRun = false;
      // clear info->childTables
      NodeList* pList = info->childTables;
      while (pList) {
        if(pList->data.used) {
          smlDestroyTableInfo(pList->data.value);
          pList->data.used = false;
        }
        pList = pList->next;
      }

      // clear info->superTables
      pList = info->superTables;
      while (pList) {
        if(pList->data.used) {
          smlDestroySTableMeta(pList->data.value);
          pList->data.used = false;
        }
        pList = pList->next;
      }

      if(unlikely(info->lines != NULL)){
        uError("SML:0x%" PRIx64 " info->lines != NULL", info->id);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      info->lineNum = payloadNum;
      info->lines = taosMemoryCalloc(info->lineNum, sizeof(SSmlLineInfo));
2799

2800
      memset(&info->preLine, 0, sizeof(SSmlLineInfo));
2801 2802 2803
      info->currSTableMeta = NULL;
      info->currTableDataCtx = NULL;

2804 2805 2806 2807
      SVnodeModifOpStmt* stmt= (SVnodeModifOpStmt*)(info->pQuery->pRoot);
      stmt->freeHashFunc(stmt->pTableBlockHashObj);
      stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
      continue;
2808
    }
2809
    i++;
2810 2811
  }

2812
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2813
}
2814

X
Xiaoyu Wang 已提交
2815
static int32_t smlInsertData(SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
2816 2817
  int32_t code = TSDB_CODE_SUCCESS;

wmmhello's avatar
wmmhello 已提交
2818 2819 2820
  NodeList* tmp = info->childTables;
  while (tmp) {
    SSmlTableInfo *tableData = (SSmlTableInfo *)tmp->data.value;
wmmhello's avatar
wmmhello 已提交
2821 2822

    SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
wmmhello's avatar
wmmhello 已提交
2823
    tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
wmmhello's avatar
wmmhello 已提交
2824
    memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName));
D
dapan1121 已提交
2825 2826 2827 2828 2829 2830

    SRequestConnInfo conn = {0};
    conn.pTrans = info->taos->pAppInfo->pTransporter;
    conn.requestId = info->pRequest->requestId;
    conn.requestObjRefId = info->pRequest->self;
    conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
X
Xiaoyu Wang 已提交
2831

wmmhello's avatar
wmmhello 已提交
2832
    SVgroupInfo vg;
D
dapan1121 已提交
2833
    code = catalogGetTableHashVgroup(info->pCatalog, &conn, &pName, &vg);
2834
    if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2835
      uError("SML:0x%" PRIx64 " catalogGetTableHashVgroup failed. table name: %s", info->id, tableData->childTableName);
wmmhello's avatar
wmmhello 已提交
2836 2837
      return code;
    }
X
Xiaoyu Wang 已提交
2838
    taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
wmmhello's avatar
wmmhello 已提交
2839

wmmhello's avatar
wmmhello 已提交
2840
    SSmlSTableMeta *pMeta =
2841
        (SSmlSTableMeta *)nodeListGet(info->superTables, tableData->sTableName, tableData->sTableNameLen, NULL);
wmmhello's avatar
wmmhello 已提交
2842
    ASSERT(NULL != pMeta);
wmmhello's avatar
wmmhello 已提交
2843

2844
    // use tablemeta of stable to save vgid and uid of child table
wmmhello's avatar
wmmhello 已提交
2845 2846
    pMeta->tableMeta->vgId = vg.vgId;
    pMeta->tableMeta->uid = tableData->uid;  // one table merge data block together according uid
wmmhello's avatar
wmmhello 已提交
2847

wmmhello's avatar
wmmhello 已提交
2848 2849
    code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, pMeta->cols, tableData->cols,
                       pMeta->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen,
2850
                       info->ttl, info->msgBuf.buf, info->msgBuf.len);
X
Xiaoyu Wang 已提交
2851 2852
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " smlBindData failed", info->id);
wmmhello's avatar
wmmhello 已提交
2853 2854
      return code;
    }
wmmhello's avatar
wmmhello 已提交
2855
    tmp = tmp->next;
wmmhello's avatar
wmmhello 已提交
2856
  }
wmmhello's avatar
wmmhello 已提交
2857

wmmhello's avatar
wmmhello 已提交
2858
  code = smlBuildOutput(info->pQuery, info->pVgHash);
2859
  if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2860
    uError("SML:0x%" PRIx64 " smlBuildOutput failed", info->id);
2861 2862
    return code;
  }
2863 2864
  info->cost.insertRpcTime = taosGetTimestampUs();

2865 2866 2867
  SAppClusterSummary *pActivity = &info->taos->pAppInfo->summary;
  atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);

wmmhello's avatar
wmmhello 已提交
2868 2869
  launchQueryImpl(info->pRequest, info->pQuery, true, NULL);
  return info->pRequest->code;
wmmhello's avatar
wmmhello 已提交
2870 2871
}

X
Xiaoyu Wang 已提交
2872 2873
static void smlPrintStatisticInfo(SSmlHandle *info) {
  uError("SML:0x%" PRIx64
2874
             " smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \
X
Xiaoyu Wang 已提交
2875
        parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64
2876
             "",
X
Xiaoyu Wang 已提交
2877
         info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables,
2878 2879
         info->cost.numOfCreateSTables, info->cost.numOfAlterTagSTables, info->cost.numOfAlterColSTables,
         info->cost.schemaTime - info->cost.parseTime,
X
Xiaoyu Wang 已提交
2880 2881
         info->cost.insertBindTime - info->cost.schemaTime, info->cost.insertRpcTime - info->cost.insertBindTime,
         info->cost.endTime - info->cost.insertRpcTime, info->cost.endTime - info->cost.parseTime);
2882 2883
}

dengyihao's avatar
dengyihao 已提交
2884
static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
wmmhello's avatar
wmmhello 已提交
2885
  int32_t code = TSDB_CODE_SUCCESS;
2886
  if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
dengyihao's avatar
dengyihao 已提交
2887
    if (lines) {
wmmhello's avatar
wmmhello 已提交
2888
      code = smlParseJSON(info, *lines);
dengyihao's avatar
dengyihao 已提交
2889
    } else if (rawLine) {
wmmhello's avatar
wmmhello 已提交
2890 2891
      code = smlParseJSON(info, rawLine);
    }
2892
    if (code != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
2893
      uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, lines ? *lines : rawLine);
2894 2895
      return code;
    }
wmmhello's avatar
wmmhello 已提交
2896
    return code;
wmmhello's avatar
wmmhello 已提交
2897
  }
wmmhello's avatar
wmmhello 已提交
2898

2899 2900
  int32_t i = 0;
  while (i < numLines) {
wmmhello's avatar
wmmhello 已提交
2901
    char *tmp = NULL;
dengyihao's avatar
dengyihao 已提交
2902 2903
    int   len = 0;
    if (lines) {
wmmhello's avatar
wmmhello 已提交
2904 2905
      tmp = lines[i];
      len = strlen(tmp);
dengyihao's avatar
dengyihao 已提交
2906
    } else if (rawLine) {
wmmhello's avatar
wmmhello 已提交
2907
      tmp = rawLine;
dengyihao's avatar
dengyihao 已提交
2908 2909
      while (rawLine < rawLineEnd) {
        if (*(rawLine++) == '\n') {
wmmhello's avatar
wmmhello 已提交
2910 2911 2912 2913
          break;
        }
        len++;
      }
dengyihao's avatar
dengyihao 已提交
2914
      if (info->protocol == TSDB_SML_LINE_PROTOCOL && tmp[0] == '#') {  // this line is comment
2915
        i++;
wmmhello's avatar
wmmhello 已提交
2916 2917
        continue;
      }
wmmhello's avatar
wmmhello 已提交
2918 2919
    }

2920 2921
    uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, (info->isRawLine ? "rawdata" : tmp));

X
Xiaoyu Wang 已提交
2922
    if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
wmmhello's avatar
wmmhello 已提交
2923 2924 2925 2926 2927 2928
      if(info->dataFormat){
        SSmlLineInfo element = {0};
        code = smlParseInfluxString(info, tmp, tmp + len, &element);
      }else{
        code = smlParseInfluxString(info, tmp, tmp + len, info->lines + i);
      }
X
Xiaoyu Wang 已提交
2929
    } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
2930 2931 2932 2933 2934 2935 2936
      if(info->dataFormat) {
        SSmlLineInfo element = {0};
        code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, &element);
      }else{
        code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, info->lines + i);
      }

X
Xiaoyu Wang 已提交
2937
    } else {
2938 2939
      ASSERT(0);
    }
wmmhello's avatar
wmmhello 已提交
2940
    if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2941
      uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, tmp);
2942
      return code;
wmmhello's avatar
wmmhello 已提交
2943
    }
2944 2945 2946 2947
    if(info->reRun){
      i = 0;
      info->reRun = false;
      // clear info->childTables
wmmhello's avatar
wmmhello 已提交
2948 2949 2950 2951 2952 2953 2954
      NodeList* pList = info->childTables;
      while (pList) {
        if(pList->data.used) {
          smlDestroyTableInfo(pList->data.value);
          pList->data.used = false;
        }
        pList = pList->next;
2955 2956 2957
      }

      // clear info->superTables
wmmhello's avatar
wmmhello 已提交
2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969
      pList = info->superTables;
      while (pList) {
        if(pList->data.used) {
          smlDestroySTableMeta(pList->data.value);
          pList->data.used = false;
        }
        pList = pList->next;
      }

      if(info->lines != NULL){
        uError("SML:0x%" PRIx64 " info->lines != NULL", info->id);
        return TSDB_CODE_SML_INVALID_DATA;
2970
      }
wmmhello's avatar
wmmhello 已提交
2971 2972
      info->lines = taosMemoryCalloc(info->lineNum, sizeof(SSmlLineInfo));

2973
      memset(&info->preLine, 0, sizeof(SSmlLineInfo));
2974 2975 2976
      info->currSTableMeta = NULL;
      info->currTableDataCtx = NULL;

2977 2978 2979
      SVnodeModifOpStmt* stmt= (SVnodeModifOpStmt*)(info->pQuery->pRoot);
      stmt->freeHashFunc(stmt->pTableBlockHashObj);
      stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
2980
      continue;
2981
    }
2982
    i++;
wmmhello's avatar
wmmhello 已提交
2983
  }
2984

2985 2986 2987
  return code;
}

dengyihao's avatar
dengyihao 已提交
2988
static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
2989
  int32_t code = TSDB_CODE_SUCCESS;
2990 2991
  int32_t retryNum = 0;

2992 2993
  info->cost.parseTime = taosGetTimestampUs();

wmmhello's avatar
wmmhello 已提交
2994
  code = smlParseLine(info, lines, rawLine, rawLineEnd, numLines);
2995
  if (code != 0) {
X
Xiaoyu Wang 已提交
2996
    uError("SML:0x%" PRIx64 " smlParseLine error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2997
    return code;
2998
  }
wmmhello's avatar
wmmhello 已提交
2999

3000 3001 3002 3003 3004 3005
  code = smlParseLineBottom(info);
  if (code != 0) {
    uError("SML:0x%" PRIx64 " smlParseLineBottom error : %s", info->id, tstrerror(code));
    return code;
  }

3006
  info->cost.lineNum = numLines;
wmmhello's avatar
wmmhello 已提交
3007 3008
  info->cost.numOfSTables = nodeListSize(info->superTables);
  info->cost.numOfCTables = nodeListSize(info->childTables);
3009 3010

  info->cost.schemaTime = taosGetTimestampUs();
3011

X
Xiaoyu Wang 已提交
3012
  do {
3013 3014
    code = smlModifyDBSchemas(info);
    if (code == 0) break;
wmmhello's avatar
wmmhello 已提交
3015
  } while (retryNum++ < nodeListSize(info->superTables) * MAX_RETRY_TIMES);
3016

wmmhello's avatar
wmmhello 已提交
3017
  if (code != 0) {
X
Xiaoyu Wang 已提交
3018
    uError("SML:0x%" PRIx64 " smlModifyDBSchemas error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
3019
    return code;
wmmhello's avatar
wmmhello 已提交
3020
  }
wmmhello's avatar
wmmhello 已提交
3021

3022
  info->cost.insertBindTime = taosGetTimestampUs();
wmmhello's avatar
wmmhello 已提交
3023 3024
  code = smlInsertData(info);
  if (code != 0) {
X
Xiaoyu Wang 已提交
3025
    uError("SML:0x%" PRIx64 " smlInsertData error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
3026
    return code;
wmmhello's avatar
wmmhello 已提交
3027 3028 3029 3030 3031
  }

  return code;
}

wmmhello's avatar
wmmhello 已提交
3032 3033 3034 3035 3036
TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, char *rawLineEnd,
                                       int numLines, int protocol, int precision, int32_t ttl, int64_t reqid) {
  if (NULL == taos) {
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    return NULL;
3037
  }
3038

wmmhello's avatar
wmmhello 已提交
3039 3040 3041 3042
  SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
  if (request == NULL) {
    uError("SML:taos_schemaless_insert error request is null");
    return NULL;
wmmhello's avatar
wmmhello 已提交
3043
  }
3044

wmmhello's avatar
wmmhello 已提交
3045 3046 3047 3048
  SSmlHandle *info = smlBuildSmlInfo(taos);
  if (info == NULL) {
    request->code = TSDB_CODE_OUT_OF_MEMORY;
    uError("SML:taos_schemaless_insert error SSmlHandle is null");
wmmhello's avatar
wmmhello 已提交
3049 3050
    goto end;
  }
wmmhello's avatar
wmmhello 已提交
3051 3052 3053 3054 3055 3056 3057 3058
  info->pRequest = request;
  info->isRawLine = rawLine != NULL;
  info->ttl       = ttl;
  info->precision = precision;
  info->protocol = protocol;
  info->msgBuf.buf = info->pRequest->msgBuf;
  info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
  info->lineNum = numLines;
wmmhello's avatar
wmmhello 已提交
3059

wmmhello's avatar
wmmhello 已提交
3060
  SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
wmmhello's avatar
wmmhello 已提交
3061 3062 3063
  if (request->pDb == NULL) {
    request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
wmmhello's avatar
wmmhello 已提交
3064 3065 3066
    goto end;
  }

X
Xiaoyu Wang 已提交
3067
  if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) {
3068
    request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
wmmhello's avatar
wmmhello 已提交
3069
    smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL);
3070
    goto end;
wmmhello's avatar
wmmhello 已提交
3071 3072
  }

X
Xiaoyu Wang 已提交
3073 3074
  if (protocol == TSDB_SML_LINE_PROTOCOL &&
      (precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)) {
3075
    request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
wmmhello's avatar
wmmhello 已提交
3076
    smlBuildInvalidDataMsg(&msg, "precision invalidate for line protocol", NULL);
3077 3078 3079
    goto end;
  }

3080
  if (protocol == TSDB_SML_JSON_PROTOCOL) {
wmmhello's avatar
wmmhello 已提交
3081
    numLines = 1;
3082
  } else if (numLines <= 0) {
wmmhello's avatar
wmmhello 已提交
3083 3084 3085 3086 3087
    request->code = TSDB_CODE_SML_INVALID_DATA;
    smlBuildInvalidDataMsg(&msg, "line num is invalid", NULL);
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
3088 3089 3090 3091 3092
  int32_t code = smlProcess(info, lines, rawLine, rawLineEnd, numLines);
  request->code = code;
  info->cost.endTime = taosGetTimestampUs();
  info->cost.code = code;
  smlPrintStatisticInfo(info);
3093

wmmhello's avatar
wmmhello 已提交
3094
end:
3095
  uDebug("resultend:%s", request->msgBuf);
wmmhello's avatar
wmmhello 已提交
3096
  smlDestroyInfo(info);
3097
  return (TAOS_RES *)request;
wmmhello's avatar
wmmhello 已提交
3098
}
wmmhello's avatar
wmmhello 已提交
3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115

/**
 * taos_schemaless_insert() parse and insert data points into database according to
 * different protocol.
 *
 * @param $lines input array may contain multiple lines, each line indicates a data point.
 *               If protocol=2 is used input array should contain single JSON
 *               string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
 *               multiple data points in JSON format, should include them in $JSON_string
 *               as a JSON array.
 * @param $numLines indicates how many data points in $lines.
 *                  If protocol = 2 is used this param will be ignored as $lines should
 *                  contain single JSON string.
 * @param $protocol indicates which protocol to use for parsing:
 *                  0 - influxDB line protocol
 *                  1 - OpenTSDB telnet line protocol
 *                  2 - OpenTSDB JSON format protocol
dengyihao's avatar
dengyihao 已提交
3116
 * @return TAOS_RES
wmmhello's avatar
wmmhello 已提交
3117 3118
 */

3119 3120
TAOS_RES *taos_schemaless_insert_ttl_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision,
                                                int32_t ttl, int64_t reqid) {
wmmhello's avatar
wmmhello 已提交
3121
  return taos_schemaless_insert_inner(taos, lines, NULL, NULL, numLines, protocol, precision, ttl, reqid);
dengyihao's avatar
dengyihao 已提交
3122 3123
}

3124 3125 3126
TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, TSDB_DEFAULT_TABLE_TTL, 0);
}
wmmhello's avatar
wmmhello 已提交
3127

3128 3129 3130
TAOS_RES *taos_schemaless_insert_ttl(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int32_t ttl) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, ttl, 0);
}
wmmhello's avatar
wmmhello 已提交
3131

3132 3133
TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int64_t reqid) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, TSDB_DEFAULT_TABLE_TTL, reqid);
wmmhello's avatar
wmmhello 已提交
3134 3135
}

3136
TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
3137
                                                    int precision, int32_t ttl, int64_t reqid) {
dengyihao's avatar
dengyihao 已提交
3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149
  int numLines = 0;
  *totalRows = 0;
  char *tmp = lines;
  for (int i = 0; i < len; i++) {
    if (lines[i] == '\n' || i == len - 1) {
      numLines++;
      if (tmp[0] != '#' || protocol != TSDB_SML_LINE_PROTOCOL) {  // ignore comment
        (*totalRows)++;
      }
      tmp = lines + i + 1;
    }
  }
wmmhello's avatar
wmmhello 已提交
3150
  return taos_schemaless_insert_inner(taos, NULL, lines, lines + len, *totalRows, protocol, precision, ttl, reqid);
dengyihao's avatar
dengyihao 已提交
3151 3152
}

3153 3154 3155 3156 3157 3158
TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int64_t reqid) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, TSDB_DEFAULT_TABLE_TTL, reqid);
}
TAOS_RES *taos_schemaless_insert_raw_ttl(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int32_t ttl) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, ttl, 0);
}
wmmhello's avatar
wmmhello 已提交
3159

3160 3161
TAOS_RES *taos_schemaless_insert_raw(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, TSDB_DEFAULT_TABLE_TTL, 0);
wmmhello's avatar
wmmhello 已提交
3162
}