clientSml.c 85.2 KB
Newer Older
wmmhello's avatar
wmmhello 已提交
1 2 3 4 5
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

X
Xiaoyu Wang 已提交
6 7 8 9 10
#include "cJSON.h"
#include "catalog.h"
#include "clientInt.h"
#include "osSemaphore.h"
#include "osThread.h"
wmmhello's avatar
wmmhello 已提交
11 12
#include "query.h"
#include "taos.h"
wmmhello's avatar
wmmhello 已提交
13
#include "taoserror.h"
X
Xiaoyu Wang 已提交
14
#include "tcommon.h"
wmmhello's avatar
wmmhello 已提交
15
#include "tdef.h"
X
Xiaoyu Wang 已提交
16
#include "tglobal.h"
wmmhello's avatar
wmmhello 已提交
17 18
#include "tlog.h"
#include "tmsg.h"
X
Xiaoyu Wang 已提交
19
#include "tname.h"
wmmhello's avatar
wmmhello 已提交
20 21
#include "ttime.h"
#include "ttypes.h"
wmmhello's avatar
wmmhello 已提交
22

wmmhello's avatar
wmmhello 已提交
23
//=================================================================================================
wmmhello's avatar
wmmhello 已提交
24 25 26 27 28 29 30

#define SPACE ' '
#define COMMA ','
#define EQUAL '='
#define QUOTE '"'
#define SLASH '\\'

wmmhello's avatar
wmmhello 已提交
31 32
#define JUMP_SPACE(sql, sqlEnd)  \
  while (sql < sqlEnd) { \
X
Xiaoyu Wang 已提交
33 34 35 36 37
    if (*sql == SPACE)   \
      sql++;             \
    else                 \
      break;             \
  }
wmmhello's avatar
wmmhello 已提交
38
// comma ,
X
Xiaoyu Wang 已提交
39 40
#define IS_SLASH_COMMA(sql) (*(sql) == COMMA && *((sql)-1) == SLASH)
#define IS_COMMA(sql)       (*(sql) == COMMA && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
41
// space
X
Xiaoyu Wang 已提交
42 43
#define IS_SLASH_SPACE(sql) (*(sql) == SPACE && *((sql)-1) == SLASH)
#define IS_SPACE(sql)       (*(sql) == SPACE && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
44
// equal =
X
Xiaoyu Wang 已提交
45 46
#define IS_SLASH_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) == SLASH)
#define IS_EQUAL(sql)       (*(sql) == EQUAL && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
47
// quote "
X
Xiaoyu Wang 已提交
48 49
#define IS_SLASH_QUOTE(sql) (*(sql) == QUOTE && *((sql)-1) == SLASH)
#define IS_QUOTE(sql)       (*(sql) == QUOTE && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
50
// SLASH
X
Xiaoyu Wang 已提交
51
#define IS_SLASH_SLASH(sql) (*(sql) == SLASH && *((sql)-1) == SLASH)
wmmhello's avatar
wmmhello 已提交
52

X
Xiaoyu Wang 已提交
53 54
#define IS_SLASH_LETTER(sql) \
  (IS_SLASH_COMMA(sql) || IS_SLASH_SPACE(sql) || IS_SLASH_EQUAL(sql) || IS_SLASH_QUOTE(sql) || IS_SLASH_SLASH(sql))
wmmhello's avatar
wmmhello 已提交
55

X
Xiaoyu Wang 已提交
56
#define MOVE_FORWARD_ONE(sql, len) (memmove((void *)((sql)-1), (sql), len))
wmmhello's avatar
wmmhello 已提交
57

X
Xiaoyu Wang 已提交
58 59 60 61 62 63 64 65
#define PROCESS_SLASH(key, keyLen)           \
  for (int i = 1; i < keyLen; ++i) {         \
    if (IS_SLASH_LETTER(key + i)) {          \
      MOVE_FORWARD_ONE(key + i, keyLen - i); \
      i--;                                   \
      keyLen--;                              \
    }                                        \
  }
wmmhello's avatar
wmmhello 已提交
66

67 68 69
#define IS_INVALID_COL_LEN(len)   ((len) <= 0 || (len) >= TSDB_COL_NAME_LEN)
#define IS_INVALID_TABLE_LEN(len) ((len) <= 0 || (len) >= TSDB_TABLE_NAME_LEN)

70 71 72
#define OTD_JSON_SUB_FIELDS_NUM 2
#define OTD_JSON_FIELDS_NUM     4

X
Xiaoyu Wang 已提交
73 74 75 76
#define TS        "_ts"
#define TS_LEN    3
#define VALUE     "_value"
#define VALUE_LEN 6
77

X
Xiaoyu Wang 已提交
78 79
#define BINARY_ADD_LEN 2  // "binary"   2 means " "
#define NCHAR_ADD_LEN  3  // L"nchar"   3 means L" "
wmmhello's avatar
wmmhello 已提交
80 81

#define MAX_RETRY_TIMES 5
82
#define LINE_BATCH      2000
wmmhello's avatar
wmmhello 已提交
83 84 85 86
//=================================================================================================
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;

typedef enum {
87
  SCHEMA_ACTION_NULL,
wmmhello's avatar
wmmhello 已提交
88 89 90 91 92
  SCHEMA_ACTION_CREATE_STABLE,
  SCHEMA_ACTION_ADD_COLUMN,
  SCHEMA_ACTION_ADD_TAG,
  SCHEMA_ACTION_CHANGE_COLUMN_SIZE,
  SCHEMA_ACTION_CHANGE_TAG_SIZE,
wmmhello's avatar
wmmhello 已提交
93 94 95
} ESchemaAction;

typedef struct {
X
Xiaoyu Wang 已提交
96 97 98 99
  const char *measure;
  const char *tags;
  const char *cols;
  const char *timestamp;
wmmhello's avatar
wmmhello 已提交
100 101 102 103 104 105 106 107 108

  int32_t measureLen;
  int32_t measureTagsLen;
  int32_t tagsLen;
  int32_t colsLen;
  int32_t timestampLen;
} SSmlLineInfo;

typedef struct {
X
Xiaoyu Wang 已提交
109 110 111 112
  const char *sTableName;  // super table name
  int32_t     sTableNameLen;
  char        childTableName[TSDB_TABLE_NAME_LEN];
  uint64_t    uid;
wmmhello's avatar
wmmhello 已提交
113

X
Xiaoyu Wang 已提交
114
  SArray *tags;
wmmhello's avatar
wmmhello 已提交
115

116 117
  // if info->formatData is true, elements are SArray<SSmlKv*>.
  // if info->formatData is false, elements are SHashObj<cols key string, SSmlKv*> for find by key quickly
X
Xiaoyu Wang 已提交
118
  SArray *cols;
wmmhello's avatar
wmmhello 已提交
119 120 121
} SSmlTableInfo;

typedef struct {
X
Xiaoyu Wang 已提交
122 123
  SArray   *tags;     // save the origin order to create table
  SHashObj *tagHash;  // elements are <key, index in tags>
124

X
Xiaoyu Wang 已提交
125 126
  SArray   *cols;
  SHashObj *colHash;
127

wmmhello's avatar
wmmhello 已提交
128 129 130 131
  STableMeta *tableMeta;
} SSmlSTableMeta;

typedef struct {
X
Xiaoyu Wang 已提交
132 133
  int32_t len;
  char   *buf;
wmmhello's avatar
wmmhello 已提交
134 135
} SSmlMsgBuf;

136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
typedef struct {
  int32_t code;
  int32_t lineNum;

  int32_t numOfSTables;
  int32_t numOfCTables;
  int32_t numOfCreateSTables;

  int64_t parseTime;
  int64_t schemaTime;
  int64_t insertBindTime;
  int64_t insertRpcTime;
  int64_t endTime;
} SSmlCostInfo;

X
Xiaoyu Wang 已提交
151 152 153
typedef struct {
  SRequestObj     *request;
  tsem_t           sem;
154 155
  int32_t          cnt;
  int32_t          total;
wmmhello's avatar
wmmhello 已提交
156 157 158
  TdThreadSpinlock lock;
} Params;

wmmhello's avatar
wmmhello 已提交
159
typedef struct {
X
Xiaoyu Wang 已提交
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
  int64_t id;
  Params *params;

  SMLProtocolType protocol;
  int8_t          precision;
  bool dataFormat;  // true means that the name and order of keys in each line are the same(only for influx protocol)

  SHashObj *childTables;
  SHashObj *superTables;
  SHashObj *pVgHash;
  void     *exec;

  STscObj     *taos;
  SCatalog    *pCatalog;
  SRequestObj *pRequest;
  SQuery      *pQuery;

  SSmlCostInfo cost;
  int32_t      affectedRows;
  SSmlMsgBuf   msgBuf;
  SHashObj    *dumplicateKey;  // for dumplicate key
  SArray      *colsContainer;  // for cols parse, if dataFormat == false
wmmhello's avatar
wmmhello 已提交
182
} SSmlHandle;
wmmhello's avatar
wmmhello 已提交
183 184
//=================================================================================================

wmmhello's avatar
wmmhello 已提交
185
//=================================================================================================
186
static volatile int64_t linesSmlHandleId = 0;
X
Xiaoyu Wang 已提交
187 188
static int64_t          smlGenId() {
           int64_t id;
wmmhello's avatar
wmmhello 已提交
189

X
Xiaoyu Wang 已提交
190 191
           do {
             id = atomic_add_fetch_64(&linesSmlHandleId, 1);
wmmhello's avatar
wmmhello 已提交
192 193
  } while (id == 0);

X
Xiaoyu Wang 已提交
194
           return id;
wmmhello's avatar
wmmhello 已提交
195 196
}

197
static inline bool smlDoubleToInt64OverFlow(double num) {
X
Xiaoyu Wang 已提交
198
  if (num >= (double)INT64_MAX || num <= (double)INT64_MIN) return true;
199 200 201 202 203 204 205 206 207 208 209 210
  return false;
}

static inline bool smlCheckDuplicateKey(const char *key, int32_t keyLen, SHashObj *pHash) {
  void *val = taosHashGet(pHash, key, keyLen);
  if (val) {
    return true;
  }
  taosHashPut(pHash, key, keyLen, key, 1);
  return false;
}

X
Xiaoyu Wang 已提交
211
static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf *pBuf, const char *msg1, const char *msg2) {
212
  if (pBuf->buf) {
213 214 215 216 217 218 219
    memset(pBuf->buf, 0, pBuf->len);
    if (msg1) strncat(pBuf->buf, msg1, pBuf->len);
    int32_t left = pBuf->len - strlen(pBuf->buf);
    if (left > 2 && msg2) {
      strncat(pBuf->buf, ":", left - 1);
      strncat(pBuf->buf, msg2, left - 2);
    }
wmmhello's avatar
wmmhello 已提交
220
  }
wmmhello's avatar
wmmhello 已提交
221 222 223
  return TSDB_CODE_SML_INVALID_DATA;
}

X
Xiaoyu Wang 已提交
224
static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSmlKv *kv, bool isTag,
225
                                       ESchemaAction *action, SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
226
  uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL;
227 228
  if (index) {
    if (colField[*index].type != kv->type) {
X
Xiaoyu Wang 已提交
229 230
      uError("SML:0x%" PRIx64 " point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id,
             kv->key, colField[*index].type, kv->type);
231 232 233
      return TSDB_CODE_TSC_INVALID_VALUE;
    }

X
Xiaoyu Wang 已提交
234 235 236 237
    if ((colField[*index].type == TSDB_DATA_TYPE_VARCHAR &&
         (colField[*index].bytes - VARSTR_HEADER_SIZE) < kv->length) ||
        (colField[*index].type == TSDB_DATA_TYPE_NCHAR &&
         ((colField[*index].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE < kv->length))) {
238
      if (isTag) {
wmmhello's avatar
wmmhello 已提交
239
        *action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
240
      } else {
wmmhello's avatar
wmmhello 已提交
241
        *action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
242 243 244 245
      }
    }
  } else {
    if (isTag) {
wmmhello's avatar
wmmhello 已提交
246
      *action = SCHEMA_ACTION_ADD_TAG;
247
    } else {
wmmhello's avatar
wmmhello 已提交
248
      *action = SCHEMA_ACTION_ADD_COLUMN;
249 250
    }
  }
wmmhello's avatar
wmmhello 已提交
251 252 253
  return 0;
}

wmmhello's avatar
wmmhello 已提交
254
static int32_t smlFindNearestPowerOf2(int32_t length, uint8_t type) {
wmmhello's avatar
wmmhello 已提交
255
  int32_t result = 1;
X
Xiaoyu Wang 已提交
256
  while (result <= length) {
wmmhello's avatar
wmmhello 已提交
257 258
    result *= 2;
  }
259
  if (type == TSDB_DATA_TYPE_BINARY && result > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
wmmhello's avatar
wmmhello 已提交
260
    result = TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE;
261
  } else if (type == TSDB_DATA_TYPE_NCHAR && result > (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
wmmhello's avatar
wmmhello 已提交
262 263
    result = (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
  }
wmmhello's avatar
wmmhello 已提交
264

265
  if (type == TSDB_DATA_TYPE_NCHAR) {
266
    result = result * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
267
  } else if (type == TSDB_DATA_TYPE_BINARY) {
268
    result = result + VARSTR_HEADER_SIZE;
wmmhello's avatar
wmmhello 已提交
269
  }
270
  return result;
wmmhello's avatar
wmmhello 已提交
271 272
}

X
Xiaoyu Wang 已提交
273
static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
274
                                      ESchemaAction *action, bool isTag) {
275 276
  int32_t code = TSDB_CODE_SUCCESS;
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
277
    if (j == 0 && !isTag) continue;
X
Xiaoyu Wang 已提交
278
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, j);
279
    code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, info);
X
Xiaoyu Wang 已提交
280
    if (code != TSDB_CODE_SUCCESS) {
281 282 283 284 285 286
      return code;
    }
  }
  return TSDB_CODE_SUCCESS;
}

287
static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool isTag) {
288
  SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
289 290
  int32_t   i = 0;
  for (; i < length; i++) {
291 292 293
    taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES);
  }

294
  if (isTag) {
295 296 297 298 299
    i = 0;
  } else {
    i = 1;
  }
  for (; i < taosArrayGetSize(cols); i++) {
X
Xiaoyu Wang 已提交
300 301
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
    if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) {
302 303 304
      return -1;
    }
  }
305
  taosHashCleanup(hashTmp);
306 307 308
  return 0;
}

309
static int32_t getBytes(uint8_t type, int32_t length) {
310 311 312 313 314 315 316
  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
    return smlFindNearestPowerOf2(length, type);
  } else {
    return tDataTypes[type].bytes;
  }
}

317 318
static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
                                  SArray *results, int32_t numOfCols, bool isTag) {
wmmhello's avatar
wmmhello 已提交
319
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
320
    SSmlKv       *kv = (SSmlKv *)taosArrayGetP(cols, j);
wmmhello's avatar
wmmhello 已提交
321 322
    ESchemaAction action = SCHEMA_ACTION_NULL;
    smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, &action, info);
323
    if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_ADD_TAG) {
wmmhello's avatar
wmmhello 已提交
324 325 326 327 328
      SField field = {0};
      field.type = kv->type;
      field.bytes = getBytes(kv->type, kv->length);
      memcpy(field.name, kv->key, kv->keyLen);
      taosArrayPush(results, &field);
329
    } else if (action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
wmmhello's avatar
wmmhello 已提交
330
      uint16_t *index = (uint16_t *)taosHashGet(schemaHash, kv->key, kv->keyLen);
331 332
      uint16_t  newIndex = *index;
      if (isTag) newIndex -= numOfCols;
wmmhello's avatar
wmmhello 已提交
333 334 335 336 337 338 339
      SField *field = (SField *)taosArrayGet(results, newIndex);
      field->bytes = getBytes(kv->type, kv->length);
    }
  }
  return TSDB_CODE_SUCCESS;
}

340 341 342 343 344
// static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SSmlSTableMeta *sTableData,
//                               int32_t colVer, int32_t tagVer, int8_t source, uint64_t suid){
static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, SArray *pTags, STableMeta *pTableMeta,
                              ESchemaAction action) {
  SRequestObj   *pRequest = NULL;
345 346 347 348
  SMCreateStbReq pReq = {0};
  int32_t        code = TSDB_CODE_SUCCESS;
  SCmdMsgInfo    pCmdMsg = {0};

wmmhello's avatar
wmmhello 已提交
349 350 351 352 353 354
  // put front for free
  pReq.numOfColumns = taosArrayGetSize(pColumns);
  pReq.pColumns = pColumns;
  pReq.numOfTags = taosArrayGetSize(pTags);
  pReq.pTags = pTags;

355 356 357 358 359
  code = buildRequest(info->taos->id, "", 0, NULL, false, &pRequest);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
360
  pRequest->syncQuery = true;
361 362 363 364 365
  if (!pRequest->pDb) {
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }

366
  if (action == SCHEMA_ACTION_CREATE_STABLE) {
wmmhello's avatar
wmmhello 已提交
367 368 369 370
    pReq.colVer = 1;
    pReq.tagVer = 1;
    pReq.suid = 0;
    pReq.source = TD_REQ_FROM_APP;
371
  } else if (action == SCHEMA_ACTION_ADD_TAG || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
wmmhello's avatar
wmmhello 已提交
372 373 374 375
    pReq.colVer = pTableMeta->sversion;
    pReq.tagVer = pTableMeta->tversion + 1;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
376
  } else if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE) {
wmmhello's avatar
wmmhello 已提交
377 378 379 380 381 382
    pReq.colVer = pTableMeta->sversion + 1;
    pReq.tagVer = pTableMeta->tversion;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
  }

383
  if (pReq.numOfTags == 0) {
wmmhello's avatar
wmmhello 已提交
384 385 386 387 388 389 390 391
    pReq.numOfTags = 1;
    SField field = {0};
    field.type = TSDB_DATA_TYPE_NCHAR;
    field.bytes = 1;
    strcpy(field.name, tsSmlTagName);
    taosArrayPush(pReq.pTags, &field);
  }

392 393 394 395 396 397 398 399 400 401 402 403 404 405
  pReq.commentLen = -1;
  pReq.igExists = true;
  tNameExtractFullName(pName, pReq.name);

  pCmdMsg.epSet = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

D
dapan1121 已提交
406 407
  SQuery pQuery;
  memset(&pQuery, 0, sizeof(pQuery));
408 409 410 411 412 413 414
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);

415
  if (pRequest->code == TSDB_CODE_SUCCESS) {
416 417 418 419 420 421 422 423 424 425 426
    catalogRemoveTableMeta(info->pCatalog, pName);
  }
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

end:
  destroyRequest(pRequest);
  tFreeSMCreateStbReq(&pReq);
  return code;
}

X
Xiaoyu Wang 已提交
427
static int32_t smlModifyDBSchemas(SSmlHandle *info) {
428 429 430
  int32_t     code = 0;
  SHashObj   *hashTmp = NULL;
  STableMeta *pTableMeta = NULL;
wmmhello's avatar
wmmhello 已提交
431

432
  SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
433
  strcpy(pName.dbname, info->pRequest->pDb);
wmmhello's avatar
wmmhello 已提交
434

D
dapan1121 已提交
435 436 437 438 439
  SRequestConnInfo conn = {0};
  conn.pTrans = info->taos->pAppInfo->pTransporter;
  conn.requestId = info->pRequest->requestId;
  conn.requestObjRefId = info->pRequest->self;
  conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
X
Xiaoyu Wang 已提交
440 441

  SSmlSTableMeta **tableMetaSml = (SSmlSTableMeta **)taosHashIterate(info->superTables, NULL);
wmmhello's avatar
wmmhello 已提交
442
  while (tableMetaSml) {
X
Xiaoyu Wang 已提交
443 444
    SSmlSTableMeta *sTableData = *tableMetaSml;
    bool            needCheckMeta = false;  // for multi thread
wmmhello's avatar
wmmhello 已提交
445

wmmhello's avatar
wmmhello 已提交
446
    size_t superTableLen = 0;
X
Xiaoyu Wang 已提交
447
    void  *superTable = taosHashGetKey(tableMetaSml, &superTableLen);
448
    memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
wmmhello's avatar
wmmhello 已提交
449
    memcpy(pName.tname, superTable, superTableLen);
wmmhello's avatar
wmmhello 已提交
450

D
dapan1121 已提交
451
    code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
wmmhello's avatar
wmmhello 已提交
452

453
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
454 455
      SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
      SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
wmmhello's avatar
wmmhello 已提交
456 457 458 459
      smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
      smlBuildFieldsList(info, NULL, NULL, sTableData->cols, pColumns, 0, false);

      code = smlSendMetaMsg(info, &pName, pColumns, pTags, NULL, SCHEMA_ACTION_CREATE_STABLE);
460
      if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
461
        uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
462
        goto end;
wmmhello's avatar
wmmhello 已提交
463
      }
464
      info->cost.numOfCreateSTables++;
X
Xiaoyu Wang 已提交
465
    } else if (code == TSDB_CODE_SUCCESS) {
466 467
      hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true,
                             HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
468 469
      for (uint16_t i = pTableMeta->tableInfo.numOfColumns;
           i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
470 471
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
      }
wmmhello's avatar
wmmhello 已提交
472

473 474
      ESchemaAction action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, &action, true);
475
      if (code != TSDB_CODE_SUCCESS) {
476
        goto end;
477
      }
478 479 480 481 482
      if (action != SCHEMA_ACTION_NULL) {
        SArray *pColumns =
            taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray *pTags =
            taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
wmmhello's avatar
wmmhello 已提交
483 484 485 486 487 488

        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
489
          if (i < pTableMeta->tableInfo.numOfColumns) {
wmmhello's avatar
wmmhello 已提交
490
            taosArrayPush(pColumns, &field);
491
          } else {
wmmhello's avatar
wmmhello 已提交
492 493 494
            taosArrayPush(pTags, &field);
          }
        }
495 496
        smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->tags, pTags,
                           pTableMeta->tableInfo.numOfColumns, true);
wmmhello's avatar
wmmhello 已提交
497 498

        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
499
        if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
500
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
501 502 503 504
          goto end;
        }
      }

wmmhello's avatar
wmmhello 已提交
505
      taosMemoryFreeClear(pTableMeta);
506 507 508 509
      code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
      if (code != TSDB_CODE_SUCCESS) {
        goto end;
      }
wmmhello's avatar
wmmhello 已提交
510 511 512 513
      code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
      if (code != TSDB_CODE_SUCCESS) {
        goto end;
      }
514 515

      taosHashClear(hashTmp);
wmmhello's avatar
wmmhello 已提交
516
      for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
517 518
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
      }
519 520
      action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &action, false);
521
      if (code != TSDB_CODE_SUCCESS) {
522
        goto end;
wmmhello's avatar
wmmhello 已提交
523
      }
524 525 526 527 528
      if (action != SCHEMA_ACTION_NULL) {
        SArray *pColumns =
            taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray *pTags =
            taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
wmmhello's avatar
wmmhello 已提交
529 530 531 532 533 534

        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
535
          if (i < pTableMeta->tableInfo.numOfColumns) {
wmmhello's avatar
wmmhello 已提交
536
            taosArrayPush(pColumns, &field);
537
          } else {
wmmhello's avatar
wmmhello 已提交
538 539 540 541
            taosArrayPush(pTags, &field);
          }
        }

542 543
        smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->cols, pColumns,
                           pTableMeta->tableInfo.numOfColumns, false);
wmmhello's avatar
wmmhello 已提交
544 545

        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
546
        if (code != TSDB_CODE_SUCCESS) {
547
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
548 549 550
          goto end;
        }
      }
wmmhello's avatar
wmmhello 已提交
551

D
dapan1121 已提交
552
      code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
wmmhello's avatar
wmmhello 已提交
553
      if (code != TSDB_CODE_SUCCESS) {
554
        goto end;
wmmhello's avatar
wmmhello 已提交
555
      }
556
      needCheckMeta = true;
wmmhello's avatar
wmmhello 已提交
557 558
      taosHashCleanup(hashTmp);
      hashTmp = NULL;
wmmhello's avatar
wmmhello 已提交
559
    } else {
X
Xiaoyu Wang 已提交
560
      uError("SML:0x%" PRIx64 " load table meta error: %s", info->id, tstrerror(code));
561
      goto end;
wmmhello's avatar
wmmhello 已提交
562
    }
wmmhello's avatar
wmmhello 已提交
563
    taosMemoryFreeClear(pTableMeta);
564

D
dapan1121 已提交
565
    code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
566
    if (code != TSDB_CODE_SUCCESS) {
567
      uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
568
      goto end;
569
    }
570

X
Xiaoyu Wang 已提交
571 572
    if (needCheckMeta) {
      code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags,
573
                          sTableData->tags, true);
574
      if (code != TSDB_CODE_SUCCESS) {
575
        uError("SML:0x%" PRIx64 " check tag failed. super table name %s", info->id, pName.tname);
576 577
        goto end;
      }
578
      code = smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols, false);
579
      if (code != TSDB_CODE_SUCCESS) {
580
        uError("SML:0x%" PRIx64 " check cols failed. super table name %s", info->id, pName.tname);
581 582 583 584
        goto end;
      }
    }

585
    sTableData->tableMeta = pTableMeta;
wmmhello's avatar
wmmhello 已提交
586

X
Xiaoyu Wang 已提交
587
    tableMetaSml = (SSmlSTableMeta **)taosHashIterate(info->superTables, tableMetaSml);
wmmhello's avatar
wmmhello 已提交
588 589
  }
  return 0;
590 591

end:
wmmhello's avatar
wmmhello 已提交
592 593
  taosHashCleanup(hashTmp);
  taosMemoryFreeClear(pTableMeta);
D
dapan1121 已提交
594
  catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
595
  return code;
wmmhello's avatar
wmmhello 已提交
596 597
}

X
Xiaoyu Wang 已提交
598
static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
599
  const char *pVal = kvVal->value;
X
Xiaoyu Wang 已提交
600 601 602 603
  int32_t     len = kvVal->length;
  char       *endptr = NULL;
  double      result = taosStr2Double(pVal, &endptr);
  if (pVal == endptr) {
604
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
605 606 607
    return false;
  }

608
  int32_t left = len - (endptr - pVal);
X
Xiaoyu Wang 已提交
609
  if (left == 0 || (left == 3 && strncasecmp(endptr, "f64", left) == 0)) {
610 611
    kvVal->type = TSDB_DATA_TYPE_DOUBLE;
    kvVal->d = result;
X
Xiaoyu Wang 已提交
612 613
  } else if ((left == 3 && strncasecmp(endptr, "f32", left) == 0)) {
    if (!IS_VALID_FLOAT(result)) {
614 615
      smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
616
    }
617 618
    kvVal->type = TSDB_DATA_TYPE_FLOAT;
    kvVal->f = (float)result;
X
Xiaoyu Wang 已提交
619 620
  } else if ((left == 1 && *endptr == 'i') || (left == 3 && strncasecmp(endptr, "i64", left) == 0)) {
    if (smlDoubleToInt64OverFlow(result)) {
wmmhello's avatar
wmmhello 已提交
621 622
      errno = 0;
      int64_t tmp = taosStr2Int64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
623
      if (errno == ERANGE) {
wmmhello's avatar
wmmhello 已提交
624 625 626 627 628 629
        smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_BIGINT;
      kvVal->i = tmp;
      return true;
wmmhello's avatar
wmmhello 已提交
630
    }
631
    kvVal->type = TSDB_DATA_TYPE_BIGINT;
wmmhello's avatar
wmmhello 已提交
632
    kvVal->i = (int64_t)result;
wmmhello's avatar
wmmhello 已提交
633
  } else if ((left == 1 && *endptr == 'u') || (left == 3 && strncasecmp(endptr, "u64", left) == 0)) {
X
Xiaoyu Wang 已提交
634
    if (result >= (double)UINT64_MAX || result < 0) {
wmmhello's avatar
wmmhello 已提交
635 636
      errno = 0;
      uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
637
      if (errno == ERANGE || result < 0) {
wmmhello's avatar
wmmhello 已提交
638 639 640 641 642 643
        smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_UBIGINT;
      kvVal->u = tmp;
      return true;
644
    }
645
    kvVal->type = TSDB_DATA_TYPE_UBIGINT;
wmmhello's avatar
wmmhello 已提交
646
    kvVal->u = result;
X
Xiaoyu Wang 已提交
647 648
  } else if (left == 3 && strncasecmp(endptr, "i32", left) == 0) {
    if (!IS_VALID_INT(result)) {
649 650
      smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
651
    }
652 653
    kvVal->type = TSDB_DATA_TYPE_INT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
654 655
  } else if (left == 3 && strncasecmp(endptr, "u32", left) == 0) {
    if (!IS_VALID_UINT(result)) {
656 657
      smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
658
    }
659 660
    kvVal->type = TSDB_DATA_TYPE_UINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
661 662
  } else if (left == 3 && strncasecmp(endptr, "i16", left) == 0) {
    if (!IS_VALID_SMALLINT(result)) {
663 664
      smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
665
    }
666 667
    kvVal->type = TSDB_DATA_TYPE_SMALLINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
668 669
  } else if (left == 3 && strncasecmp(endptr, "u16", left) == 0) {
    if (!IS_VALID_USMALLINT(result)) {
670 671
      smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
672
    }
673 674
    kvVal->type = TSDB_DATA_TYPE_USMALLINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
675 676
  } else if (left == 2 && strncasecmp(endptr, "i8", left) == 0) {
    if (!IS_VALID_TINYINT(result)) {
677 678
      smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
679
    }
680 681
    kvVal->type = TSDB_DATA_TYPE_TINYINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
682 683
  } else if (left == 2 && strncasecmp(endptr, "u8", left) == 0) {
    if (!IS_VALID_UTINYINT(result)) {
684 685
      smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
686
    }
687 688
    kvVal->type = TSDB_DATA_TYPE_UTINYINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
689
  } else {
690
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
691 692
    return false;
  }
693
  return true;
wmmhello's avatar
wmmhello 已提交
694 695
}

wmmhello's avatar
wmmhello 已提交
696
static bool smlParseBool(SSmlKv *kvVal) {
wmmhello's avatar
wmmhello 已提交
697
  const char *pVal = kvVal->value;
X
Xiaoyu Wang 已提交
698
  int32_t     len = kvVal->length;
699
  if ((len == 1) && (pVal[0] == 't' || pVal[0] == 'T')) {
wmmhello's avatar
wmmhello 已提交
700
    kvVal->i = true;
wmmhello's avatar
wmmhello 已提交
701 702 703
    return true;
  }

704
  if ((len == 1) && (pVal[0] == 'f' || pVal[0] == 'F')) {
wmmhello's avatar
wmmhello 已提交
705
    kvVal->i = false;
wmmhello's avatar
wmmhello 已提交
706 707 708
    return true;
  }

X
Xiaoyu Wang 已提交
709
  if ((len == 4) && !strncasecmp(pVal, "true", len)) {
wmmhello's avatar
wmmhello 已提交
710
    kvVal->i = true;
wmmhello's avatar
wmmhello 已提交
711 712
    return true;
  }
X
Xiaoyu Wang 已提交
713
  if ((len == 5) && !strncasecmp(pVal, "false", len)) {
wmmhello's avatar
wmmhello 已提交
714
    kvVal->i = false;
wmmhello's avatar
wmmhello 已提交
715 716 717 718 719
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
720
static bool smlIsBinary(const char *pVal, uint16_t len) {
X
Xiaoyu Wang 已提交
721
  // binary: "abc"
wmmhello's avatar
wmmhello 已提交
722 723 724 725 726 727 728 729 730
  if (len < 2) {
    return false;
  }
  if (pVal[0] == '"' && pVal[len - 1] == '"') {
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
731
static bool smlIsNchar(const char *pVal, uint16_t len) {
X
Xiaoyu Wang 已提交
732
  // nchar: L"abc"
wmmhello's avatar
wmmhello 已提交
733 734 735
  if (len < 3) {
    return false;
  }
X
Xiaoyu Wang 已提交
736
  if ((pVal[0] == 'l' || pVal[0] == 'L') && pVal[1] == '"' && pVal[len - 1] == '"') {
wmmhello's avatar
wmmhello 已提交
737 738 739 740 741
    return true;
  }
  return false;
}

742
static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) {
X
Xiaoyu Wang 已提交
743
  char   *endPtr = NULL;
wafwerar's avatar
wafwerar 已提交
744
  int64_t tsInt64 = taosStr2Int64(value, &endPtr, 10);
X
Xiaoyu Wang 已提交
745
  if (value + len != endPtr) {
746
    return -1;
wmmhello's avatar
wmmhello 已提交
747
  }
748
  double ts = tsInt64;
749 750
  switch (type) {
    case TSDB_TIME_PRECISION_HOURS:
wmmhello's avatar
wmmhello 已提交
751 752
      ts *= NANOSECOND_PER_HOUR;
      tsInt64 *= NANOSECOND_PER_HOUR;
753 754
      break;
    case TSDB_TIME_PRECISION_MINUTES:
wmmhello's avatar
wmmhello 已提交
755 756
      ts *= NANOSECOND_PER_MINUTE;
      tsInt64 *= NANOSECOND_PER_MINUTE;
757 758
      break;
    case TSDB_TIME_PRECISION_SECONDS:
wmmhello's avatar
wmmhello 已提交
759 760
      ts *= NANOSECOND_PER_SEC;
      tsInt64 *= NANOSECOND_PER_SEC;
761 762
      break;
    case TSDB_TIME_PRECISION_MILLI:
wmmhello's avatar
wmmhello 已提交
763 764
      ts *= NANOSECOND_PER_MSEC;
      tsInt64 *= NANOSECOND_PER_MSEC;
765 766
      break;
    case TSDB_TIME_PRECISION_MICRO:
wmmhello's avatar
wmmhello 已提交
767 768
      ts *= NANOSECOND_PER_USEC;
      tsInt64 *= NANOSECOND_PER_USEC;
769 770 771 772 773
      break;
    case TSDB_TIME_PRECISION_NANO:
      break;
    default:
      ASSERT(0);
wmmhello's avatar
wmmhello 已提交
774
  }
X
Xiaoyu Wang 已提交
775
  if (ts >= (double)INT64_MAX || ts < 0) {
776
    return -1;
wmmhello's avatar
wmmhello 已提交
777 778
  }

779
  return tsInt64;
780 781 782 783 784 785
}

static int8_t smlGetTsTypeByLen(int32_t len) {
  if (len == TSDB_TIME_PRECISION_SEC_DIGITS) {
    return TSDB_TIME_PRECISION_SECONDS;
  } else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) {
786
    return TSDB_TIME_PRECISION_MILLI;
787 788
  } else {
    return -1;
wmmhello's avatar
wmmhello 已提交
789
  }
790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808
}

static int8_t smlGetTsTypeByPrecision(int8_t precision) {
  switch (precision) {
    case TSDB_SML_TIMESTAMP_HOURS:
      return TSDB_TIME_PRECISION_HOURS;
    case TSDB_SML_TIMESTAMP_MILLI_SECONDS:
      return TSDB_TIME_PRECISION_MILLI;
    case TSDB_SML_TIMESTAMP_NANO_SECONDS:
    case TSDB_SML_TIMESTAMP_NOT_CONFIGURED:
      return TSDB_TIME_PRECISION_NANO;
    case TSDB_SML_TIMESTAMP_MICRO_SECONDS:
      return TSDB_TIME_PRECISION_MICRO;
    case TSDB_SML_TIMESTAMP_SECONDS:
      return TSDB_TIME_PRECISION_SECONDS;
    case TSDB_SML_TIMESTAMP_MINUTES:
      return TSDB_TIME_PRECISION_MINUTES;
    default:
      return -1;
wmmhello's avatar
wmmhello 已提交
809
  }
810 811
}

X
Xiaoyu Wang 已提交
812 813
static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t len) {
  if (len == 0 || (len == 1 && data[0] == '0')) {
814
    return taosGetTimestampNs();
wmmhello's avatar
wmmhello 已提交
815 816
  }

817 818 819 820
  int8_t tsType = smlGetTsTypeByPrecision(info->precision);
  if (tsType == -1) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp precision", NULL);
    return -1;
wmmhello's avatar
wmmhello 已提交
821
  }
822 823

  int64_t ts = smlGetTimeValue(data, len, tsType);
X
Xiaoyu Wang 已提交
824
  if (ts == -1) {
825 826
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
    return -1;
wmmhello's avatar
wmmhello 已提交
827
  }
828 829 830
  return ts;
}

X
Xiaoyu Wang 已提交
831 832
static int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t len) {
  if (!data) {
833 834
    smlBuildInvalidDataMsg(&info->msgBuf, "timestamp can not be null", NULL);
    return -1;
wmmhello's avatar
wmmhello 已提交
835
  }
X
Xiaoyu Wang 已提交
836
  if (len == 1 && data[0] == '0') {
837 838
    return taosGetTimestampNs();
  }
839 840
  int8_t tsType = smlGetTsTypeByLen(len);
  if (tsType == -1) {
X
Xiaoyu Wang 已提交
841 842
    smlBuildInvalidDataMsg(&info->msgBuf,
                           "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", data);
843 844 845
    return -1;
  }
  int64_t ts = smlGetTimeValue(data, len, tsType);
X
Xiaoyu Wang 已提交
846
  if (ts == -1) {
847 848 849 850 851 852
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
    return -1;
  }
  return ts;
}

X
Xiaoyu Wang 已提交
853
static int32_t smlParseTS(SSmlHandle *info, const char *data, int32_t len, SArray *cols) {
854
  int64_t ts = 0;
X
Xiaoyu Wang 已提交
855
  if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
856
    //    uError("SML:data:%s,len:%d", data, len);
857
    ts = smlParseInfluxTime(info, data, len);
X
Xiaoyu Wang 已提交
858
  } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
859
    ts = smlParseOpenTsdbTime(info, data, len);
X
Xiaoyu Wang 已提交
860
  } else {
861
    ASSERT(0);
862
  }
863

wmmhello's avatar
wmmhello 已提交
864
  if (ts == -1) return TSDB_CODE_INVALID_TIMESTAMP;
865 866 867

  // add ts to
  SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
868
  if (!kv) {
869 870 871 872 873 874 875 876
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  kv->key = TS;
  kv->keyLen = TS_LEN;
  kv->i = ts;
  kv->type = TSDB_DATA_TYPE_TIMESTAMP;
  kv->length = (int16_t)tDataTypes[kv->type].bytes;
X
Xiaoyu Wang 已提交
877
  if (cols) taosArrayPush(cols, &kv);
878 879 880
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
881
static int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
X
Xiaoyu Wang 已提交
882
  // binary
wmmhello's avatar
wmmhello 已提交
883
  if (smlIsBinary(pVal->value, pVal->length)) {
884
    pVal->type = TSDB_DATA_TYPE_BINARY;
wmmhello's avatar
wmmhello 已提交
885
    pVal->length -= BINARY_ADD_LEN;
886
    if (pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
wmmhello's avatar
wmmhello 已提交
887 888
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }
889
    pVal->value += (BINARY_ADD_LEN - 1);
wmmhello's avatar
wmmhello 已提交
890
    return TSDB_CODE_SUCCESS;
891
  }
X
Xiaoyu Wang 已提交
892
  // nchar
wmmhello's avatar
wmmhello 已提交
893
  if (smlIsNchar(pVal->value, pVal->length)) {
894
    pVal->type = TSDB_DATA_TYPE_NCHAR;
wmmhello's avatar
wmmhello 已提交
895
    pVal->length -= NCHAR_ADD_LEN;
896
    if (pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
wmmhello's avatar
wmmhello 已提交
897 898
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }
899
    pVal->value += (NCHAR_ADD_LEN - 1);
wmmhello's avatar
wmmhello 已提交
900
    return TSDB_CODE_SUCCESS;
901 902
  }

X
Xiaoyu Wang 已提交
903
  // bool
904 905 906
  if (smlParseBool(pVal)) {
    pVal->type = TSDB_DATA_TYPE_BOOL;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
wmmhello's avatar
wmmhello 已提交
907
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
908
  }
X
Xiaoyu Wang 已提交
909
  // number
910
  if (smlParseNumber(pVal, msg)) {
wmmhello's avatar
wmmhello 已提交
911
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
wmmhello's avatar
wmmhello 已提交
912
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
913 914
  }

wmmhello's avatar
wmmhello 已提交
915
  return TSDB_CODE_TSC_INVALID_VALUE;
wmmhello's avatar
wmmhello 已提交
916 917
}

wmmhello's avatar
wmmhello 已提交
918
static int32_t smlParseInfluxString(const char *sql, const char *sqlEnd, SSmlLineInfo *elements, SSmlMsgBuf *msg) {
X
Xiaoyu Wang 已提交
919
  if (!sql) return TSDB_CODE_SML_INVALID_DATA;
wmmhello's avatar
wmmhello 已提交
920
  JUMP_SPACE(sql, sqlEnd)
X
Xiaoyu Wang 已提交
921
  if (*sql == COMMA) return TSDB_CODE_SML_INVALID_DATA;
wmmhello's avatar
wmmhello 已提交
922
  elements->measure = sql;
wmmhello's avatar
wmmhello 已提交
923

wmmhello's avatar
wmmhello 已提交
924
  // parse measure
wmmhello's avatar
wmmhello 已提交
925
  while (sql < sqlEnd) {
X
Xiaoyu Wang 已提交
926
    if ((sql != elements->measure) && IS_SLASH_LETTER(sql)) {
wmmhello's avatar
wmmhello 已提交
927 928
      MOVE_FORWARD_ONE(sql, sqlEnd - sql);
      sqlEnd--;
wmmhello's avatar
wmmhello 已提交
929 930
      continue;
    }
X
Xiaoyu Wang 已提交
931
    if (IS_COMMA(sql)) {
wmmhello's avatar
wmmhello 已提交
932 933 934
      break;
    }

X
Xiaoyu Wang 已提交
935
    if (IS_SPACE(sql)) {
wmmhello's avatar
wmmhello 已提交
936 937
      break;
    }
wmmhello's avatar
wmmhello 已提交
938 939
    sql++;
  }
wmmhello's avatar
wmmhello 已提交
940
  elements->measureLen = sql - elements->measure;
X
Xiaoyu Wang 已提交
941
  if (IS_INVALID_TABLE_LEN(elements->measureLen)) {
942
    smlBuildInvalidDataMsg(msg, "measure is empty or too large than 192", NULL);
wmmhello's avatar
wmmhello 已提交
943
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
wmmhello's avatar
wmmhello 已提交
944
  }
wmmhello's avatar
wmmhello 已提交
945

wmmhello's avatar
wmmhello 已提交
946
  // parse tag
X
Xiaoyu Wang 已提交
947
  if (*sql == SPACE) {
wmmhello's avatar
wmmhello 已提交
948
    elements->tagsLen = 0;
X
Xiaoyu Wang 已提交
949 950
  } else {
    if (*sql == COMMA) sql++;
wmmhello's avatar
wmmhello 已提交
951
    elements->tags = sql;
wmmhello's avatar
wmmhello 已提交
952
    while (sql < sqlEnd) {
X
Xiaoyu Wang 已提交
953
      if (IS_SPACE(sql)) {
wmmhello's avatar
wmmhello 已提交
954 955 956
        break;
      }
      sql++;
wmmhello's avatar
wmmhello 已提交
957
    }
wmmhello's avatar
wmmhello 已提交
958
    elements->tagsLen = sql - elements->tags;
959
  }
wmmhello's avatar
wmmhello 已提交
960
  elements->measureTagsLen = sql - elements->measure;
wmmhello's avatar
wmmhello 已提交
961

wmmhello's avatar
wmmhello 已提交
962
  // parse cols
wmmhello's avatar
wmmhello 已提交
963
  JUMP_SPACE(sql, sqlEnd)
wmmhello's avatar
wmmhello 已提交
964
  elements->cols = sql;
wmmhello's avatar
wmmhello 已提交
965
  bool isInQuote = false;
wmmhello's avatar
wmmhello 已提交
966
  while (sql < sqlEnd) {
X
Xiaoyu Wang 已提交
967
    if (IS_QUOTE(sql)) {
wmmhello's avatar
wmmhello 已提交
968 969
      isInQuote = !isInQuote;
    }
X
Xiaoyu Wang 已提交
970
    if (!isInQuote && IS_SPACE(sql)) {
wmmhello's avatar
wmmhello 已提交
971 972 973 974
      break;
    }
    sql++;
  }
X
Xiaoyu Wang 已提交
975
  if (isInQuote) {
976 977 978
    smlBuildInvalidDataMsg(msg, "only one quote", elements->cols);
    return TSDB_CODE_SML_INVALID_DATA;
  }
wmmhello's avatar
wmmhello 已提交
979
  elements->colsLen = sql - elements->cols;
X
Xiaoyu Wang 已提交
980
  if (elements->colsLen == 0) {
wmmhello's avatar
wmmhello 已提交
981 982 983
    smlBuildInvalidDataMsg(msg, "cols is empty", NULL);
    return TSDB_CODE_SML_INVALID_DATA;
  }
wmmhello's avatar
wmmhello 已提交
984

wmmhello's avatar
wmmhello 已提交
985
  // parse timestamp
wmmhello's avatar
wmmhello 已提交
986
  JUMP_SPACE(sql, sqlEnd)
wmmhello's avatar
wmmhello 已提交
987
  elements->timestamp = sql;
wmmhello's avatar
wmmhello 已提交
988 989
  while (sql < sqlEnd) {
    if (isspace(*sql)) {
wmmhello's avatar
wmmhello 已提交
990 991 992 993
      break;
    }
    sql++;
  }
wmmhello's avatar
wmmhello 已提交
994
  elements->timestampLen = sql - elements->timestamp;
wmmhello's avatar
wmmhello 已提交
995 996 997 998

  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
999 1000
static void smlParseTelnetElement(const char **sql, const char *sqlEnd, const char **data, int32_t *len) {
  while (*sql < sqlEnd) {
X
Xiaoyu Wang 已提交
1001
    if (**sql != SPACE && !(*data)) {
1002
      *data = *sql;
X
Xiaoyu Wang 已提交
1003
    } else if (**sql == SPACE && *data) {
1004 1005 1006 1007 1008 1009 1010
      *len = *sql - *data;
      break;
    }
    (*sql)++;
  }
}

wmmhello's avatar
wmmhello 已提交
1011
static int32_t smlParseTelnetTags(const char *data, const char *sqlEnd, SArray *cols, char *childTableName, SHashObj *dumplicateKey,
X
Xiaoyu Wang 已提交
1012
                                  SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
1013
  const char *sql = data;
X
Xiaoyu Wang 已提交
1014
  size_t      childTableNameLen = strlen(tsSmlChildTableName);
wmmhello's avatar
wmmhello 已提交
1015 1016
  while (sql < sqlEnd) {
    JUMP_SPACE(sql, sqlEnd)
X
Xiaoyu Wang 已提交
1017
    if (*sql == '\0') break;
wmmhello's avatar
wmmhello 已提交
1018

wmmhello's avatar
wmmhello 已提交
1019
    const char *key = sql;
X
Xiaoyu Wang 已提交
1020
    int32_t     keyLen = 0;
wmmhello's avatar
wmmhello 已提交
1021 1022

    // parse key
wmmhello's avatar
wmmhello 已提交
1023
    while (sql < sqlEnd) {
X
Xiaoyu Wang 已提交
1024
      if (*sql == SPACE) {
wmmhello's avatar
wmmhello 已提交
1025 1026 1027
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
X
Xiaoyu Wang 已提交
1028
      if (*sql == EQUAL) {
wmmhello's avatar
wmmhello 已提交
1029 1030
        keyLen = sql - key;
        sql++;
1031 1032
        break;
      }
wmmhello's avatar
wmmhello 已提交
1033
      sql++;
1034
    }
wmmhello's avatar
wmmhello 已提交
1035

X
Xiaoyu Wang 已提交
1036
    if (IS_INVALID_COL_LEN(keyLen)) {
1037
      smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
wmmhello's avatar
wmmhello 已提交
1038
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
1039
    }
X
Xiaoyu Wang 已提交
1040
    if (smlCheckDuplicateKey(key, keyLen, dumplicateKey)) {
1041
      smlBuildInvalidDataMsg(msg, "dumplicate key", key);
wmmhello's avatar
wmmhello 已提交
1042
      return TSDB_CODE_TSC_DUP_NAMES;
1043 1044 1045
    }

    // parse value
wmmhello's avatar
wmmhello 已提交
1046
    const char *value = sql;
X
Xiaoyu Wang 已提交
1047
    int32_t     valueLen = 0;
wmmhello's avatar
wmmhello 已提交
1048
    while (sql < sqlEnd) {
wmmhello's avatar
wmmhello 已提交
1049 1050
      // parse value
      if (*sql == SPACE) {
1051 1052
        break;
      }
wmmhello's avatar
wmmhello 已提交
1053 1054 1055 1056 1057
      if (*sql == EQUAL) {
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      sql++;
1058
    }
wmmhello's avatar
wmmhello 已提交
1059
    valueLen = sql - value;
wmmhello's avatar
wmmhello 已提交
1060

X
Xiaoyu Wang 已提交
1061
    if (valueLen == 0) {
1062
      smlBuildInvalidDataMsg(msg, "invalid value", value);
wmmhello's avatar
wmmhello 已提交
1063
      return TSDB_CODE_TSC_INVALID_VALUE;
1064
    }
wmmhello's avatar
wmmhello 已提交
1065

X
Xiaoyu Wang 已提交
1066 1067
    // handle child table name
    if (childTableNameLen != 0 && strncmp(key, tsSmlChildTableName, keyLen) == 0) {
1068 1069 1070 1071 1072
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, value, (valueLen < TSDB_TABLE_NAME_LEN ? valueLen : TSDB_TABLE_NAME_LEN));
      continue;
    }

1073
    if (valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
wmmhello's avatar
wmmhello 已提交
1074 1075 1076
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }

1077 1078
    // add kv to SSmlKv
    SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1079
    if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
1080 1081 1082
    kv->key = key;
    kv->keyLen = keyLen;
    kv->value = value;
wmmhello's avatar
wmmhello 已提交
1083
    kv->length = valueLen;
wmmhello's avatar
wmmhello 已提交
1084
    kv->type = TSDB_DATA_TYPE_NCHAR;
1085

X
Xiaoyu Wang 已提交
1086
    if (cols) taosArrayPush(cols, &kv);
1087 1088 1089 1090
  }

  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
1091

1092
// format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
wmmhello's avatar
wmmhello 已提交
1093
static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, const char *sqlEnd, SSmlTableInfo *tinfo, SArray *cols) {
X
Xiaoyu Wang 已提交
1094
  if (!sql) return TSDB_CODE_SML_INVALID_DATA;
1095 1096

  // parse metric
wmmhello's avatar
wmmhello 已提交
1097
  smlParseTelnetElement(&sql, sqlEnd, &tinfo->sTableName, &tinfo->sTableNameLen);
1098
  if (!(tinfo->sTableName) || IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
1099
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
wmmhello's avatar
wmmhello 已提交
1100
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
1101 1102 1103 1104
  }

  // parse timestamp
  const char *timestamp = NULL;
X
Xiaoyu Wang 已提交
1105
  int32_t     tLen = 0;
wmmhello's avatar
wmmhello 已提交
1106
  smlParseTelnetElement(&sql, sqlEnd, &timestamp, &tLen);
1107 1108 1109 1110 1111 1112 1113 1114
  if (!timestamp || tLen == 0) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
    return TSDB_CODE_SML_INVALID_DATA;
  }

  int32_t ret = smlParseTS(info, timestamp, tLen, cols);
  if (ret != TSDB_CODE_SUCCESS) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
wmmhello's avatar
wmmhello 已提交
1115
    return ret;
1116 1117 1118 1119
  }

  // parse value
  const char *value = NULL;
X
Xiaoyu Wang 已提交
1120
  int32_t     valueLen = 0;
wmmhello's avatar
wmmhello 已提交
1121
  smlParseTelnetElement(&sql, sqlEnd, &value, &valueLen);
1122 1123
  if (!value || valueLen == 0) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql);
wmmhello's avatar
wmmhello 已提交
1124
    return TSDB_CODE_TSC_INVALID_VALUE;
1125 1126 1127
  }

  SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1128
  if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
1129 1130 1131 1132
  taosArrayPush(cols, &kv);
  kv->key = VALUE;
  kv->keyLen = VALUE_LEN;
  kv->value = value;
wmmhello's avatar
wmmhello 已提交
1133
  kv->length = valueLen;
wmmhello's avatar
wmmhello 已提交
1134 1135
  if ((ret = smlParseValue(kv, &info->msgBuf)) != TSDB_CODE_SUCCESS) {
    return ret;
1136 1137 1138
  }

  // parse tags
wmmhello's avatar
wmmhello 已提交
1139
  ret = smlParseTelnetTags(sql, sqlEnd, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
1140 1141
  if (ret != TSDB_CODE_SUCCESS) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
wmmhello's avatar
wmmhello 已提交
1142
    return ret;
1143 1144 1145 1146 1147
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1148 1149 1150
static int32_t smlParseCols(const char *data, int32_t len, SArray *cols, char *childTableName, bool isTag,
                            SHashObj *dumplicateKey, SSmlMsgBuf *msg) {
  if (len == 0) {
wmmhello's avatar
wmmhello 已提交
1151
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1152 1153
  }

X
Xiaoyu Wang 已提交
1154
  size_t      childTableNameLen = strlen(tsSmlChildTableName);
wmmhello's avatar
wmmhello 已提交
1155
  const char *sql = data;
X
Xiaoyu Wang 已提交
1156
  while (sql < data + len) {
wmmhello's avatar
wmmhello 已提交
1157
    const char *key = sql;
X
Xiaoyu Wang 已提交
1158
    int32_t     keyLen = 0;
wmmhello's avatar
wmmhello 已提交
1159

X
Xiaoyu Wang 已提交
1160
    while (sql < data + len) {
wmmhello's avatar
wmmhello 已提交
1161
      // parse key
X
Xiaoyu Wang 已提交
1162
      if (IS_COMMA(sql)) {
wmmhello's avatar
wmmhello 已提交
1163 1164 1165
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
X
Xiaoyu Wang 已提交
1166
      if (IS_EQUAL(sql)) {
wmmhello's avatar
wmmhello 已提交
1167 1168
        keyLen = sql - key;
        sql++;
wmmhello's avatar
wmmhello 已提交
1169 1170
        break;
      }
wmmhello's avatar
wmmhello 已提交
1171
      sql++;
wmmhello's avatar
wmmhello 已提交
1172
    }
wmmhello's avatar
wmmhello 已提交
1173

X
Xiaoyu Wang 已提交
1174
    if (IS_INVALID_COL_LEN(keyLen)) {
wmmhello's avatar
wmmhello 已提交
1175
      smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
wmmhello's avatar
wmmhello 已提交
1176
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
wmmhello's avatar
wmmhello 已提交
1177
    }
X
Xiaoyu Wang 已提交
1178
    if (smlCheckDuplicateKey(key, keyLen, dumplicateKey)) {
1179
      smlBuildInvalidDataMsg(msg, "dumplicate key", key);
wmmhello's avatar
wmmhello 已提交
1180
      return TSDB_CODE_TSC_DUP_NAMES;
1181 1182
    }

wmmhello's avatar
wmmhello 已提交
1183
    // parse value
wmmhello's avatar
wmmhello 已提交
1184
    const char *value = sql;
X
Xiaoyu Wang 已提交
1185 1186 1187
    int32_t     valueLen = 0;
    bool        isInQuote = false;
    while (sql < data + len) {
wmmhello's avatar
wmmhello 已提交
1188
      // parse value
X
Xiaoyu Wang 已提交
1189
      if (!isTag && IS_QUOTE(sql)) {
wmmhello's avatar
wmmhello 已提交
1190
        isInQuote = !isInQuote;
wmmhello's avatar
wmmhello 已提交
1191 1192
        sql++;
        continue;
wmmhello's avatar
wmmhello 已提交
1193
      }
wmmhello's avatar
wmmhello 已提交
1194
      if (!isInQuote && IS_COMMA(sql)) {
wmmhello's avatar
wmmhello 已提交
1195 1196
        break;
      }
wmmhello's avatar
wmmhello 已提交
1197 1198 1199 1200 1201
      if (!isInQuote && IS_EQUAL(sql)) {
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      sql++;
wmmhello's avatar
wmmhello 已提交
1202
    }
wmmhello's avatar
wmmhello 已提交
1203 1204 1205
    valueLen = sql - value;
    sql++;

X
Xiaoyu Wang 已提交
1206
    if (isInQuote) {
wmmhello's avatar
wmmhello 已提交
1207 1208 1209
      smlBuildInvalidDataMsg(msg, "only one quote", value);
      return TSDB_CODE_SML_INVALID_DATA;
    }
X
Xiaoyu Wang 已提交
1210
    if (valueLen == 0) {
wmmhello's avatar
wmmhello 已提交
1211
      smlBuildInvalidDataMsg(msg, "invalid value", value);
wmmhello's avatar
wmmhello 已提交
1212 1213
      return TSDB_CODE_SML_INVALID_DATA;
    }
wmmhello's avatar
wmmhello 已提交
1214 1215
    PROCESS_SLASH(key, keyLen)
    PROCESS_SLASH(value, valueLen)
wmmhello's avatar
wmmhello 已提交
1216

X
Xiaoyu Wang 已提交
1217 1218
    // handle child table name
    if (childTableName && childTableNameLen != 0 && strncmp(key, tsSmlChildTableName, keyLen) == 0) {
1219 1220 1221 1222 1223
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, value, (valueLen < TSDB_TABLE_NAME_LEN ? valueLen : TSDB_TABLE_NAME_LEN));
      continue;
    }

wmmhello's avatar
wmmhello 已提交
1224
    // add kv to SSmlKv
wmmhello's avatar
wmmhello 已提交
1225
    SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1226 1227
    if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
    if (cols) taosArrayPush(cols, &kv);
1228

wmmhello's avatar
wmmhello 已提交
1229 1230 1231
    kv->key = key;
    kv->keyLen = keyLen;
    kv->value = value;
wmmhello's avatar
wmmhello 已提交
1232
    kv->length = valueLen;
X
Xiaoyu Wang 已提交
1233
    if (isTag) {
1234
      if (valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
1235 1236
        return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
      }
wmmhello's avatar
wmmhello 已提交
1237
      kv->type = TSDB_DATA_TYPE_NCHAR;
X
Xiaoyu Wang 已提交
1238
    } else {
wmmhello's avatar
wmmhello 已提交
1239 1240 1241
      int32_t ret = smlParseValue(kv, msg);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
wmmhello's avatar
wmmhello 已提交
1242
      }
wmmhello's avatar
wmmhello 已提交
1243 1244
    }
  }
wmmhello's avatar
wmmhello 已提交
1245

wmmhello's avatar
wmmhello 已提交
1246 1247 1248
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
1249 1250
static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, SSmlMsgBuf *msg) {
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
wmmhello's avatar
wmmhello 已提交
1251
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
wmmhello's avatar
wmmhello 已提交
1252

wmmhello's avatar
wmmhello 已提交
1253
    int16_t *index = (int16_t *)taosHashGet(metaHash, kv->key, kv->keyLen);
X
Xiaoyu Wang 已提交
1254
    if (index) {
wmmhello's avatar
wmmhello 已提交
1255
      SSmlKv **value = (SSmlKv **)taosArrayGet(metaArray, *index);
X
Xiaoyu Wang 已提交
1256
      if (kv->type != (*value)->type) {
wmmhello's avatar
wmmhello 已提交
1257
        smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
wmmhello's avatar
wmmhello 已提交
1258
        return TSDB_CODE_SML_NOT_SAME_TYPE;
X
Xiaoyu Wang 已提交
1259 1260 1261
      } else {
        if (IS_VAR_DATA_TYPE(kv->type)) {  // update string len, if bigger
          if (kv->length > (*value)->length) {
wmmhello's avatar
wmmhello 已提交
1262
            *value = kv;
1263 1264 1265
          }
        }
      }
X
Xiaoyu Wang 已提交
1266
    } else {
wmmhello's avatar
wmmhello 已提交
1267 1268 1269 1270 1271
      size_t tmp = taosArrayGetSize(metaArray);
      ASSERT(tmp <= INT16_MAX);
      int16_t size = tmp;
      taosArrayPush(metaArray, &kv);
      taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
1272
    }
wmmhello's avatar
wmmhello 已提交
1273
  }
wmmhello's avatar
wmmhello 已提交
1274

wmmhello's avatar
wmmhello 已提交
1275
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1276 1277
}

X
Xiaoyu Wang 已提交
1278
static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols) {
wmmhello's avatar
wmmhello 已提交
1279 1280 1281 1282
  for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
    taosArrayPush(metaArray, &kv);
    taosHashPut(metaHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
wmmhello's avatar
wmmhello 已提交
1283 1284 1285
  }
}

X
Xiaoyu Wang 已提交
1286
static SSmlTableInfo *smlBuildTableInfo() {
1287
  SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
X
Xiaoyu Wang 已提交
1288
  if (!tag) {
1289
    return NULL;
wmmhello's avatar
wmmhello 已提交
1290
  }
1291 1292 1293 1294 1295

  tag->cols = taosArrayInit(16, POINTER_BYTES);
  if (tag->cols == NULL) {
    uError("SML:smlBuildTableInfo failed to allocate memory");
    goto cleanup;
wmmhello's avatar
wmmhello 已提交
1296
  }
1297 1298 1299 1300 1301

  tag->tags = taosArrayInit(16, POINTER_BYTES);
  if (tag->tags == NULL) {
    uError("SML:smlBuildTableInfo failed to allocate memory");
    goto cleanup;
wmmhello's avatar
wmmhello 已提交
1302
  }
1303 1304 1305 1306 1307
  return tag;

cleanup:
  taosMemoryFree(tag);
  return NULL;
wmmhello's avatar
wmmhello 已提交
1308 1309
}

X
Xiaoyu Wang 已提交
1310 1311 1312
static void smlDestroyTableInfo(SSmlHandle *info, SSmlTableInfo *tag) {
  if (info->dataFormat) {
    for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
1313 1314
      SArray *kvArray = (SArray *)taosArrayGetP(tag->cols, i);
      for (int j = 0; j < taosArrayGetSize(kvArray); ++j) {
wmmhello's avatar
wmmhello 已提交
1315
        SSmlKv *p = (SSmlKv *)taosArrayGetP(kvArray, j);
X
Xiaoyu Wang 已提交
1316 1317 1318
        if (info->protocol == TSDB_SML_JSON_PROTOCOL &&
            (p->type == TSDB_DATA_TYPE_NCHAR || p->type == TSDB_DATA_TYPE_BINARY)) {
          taosMemoryFree((void *)p->value);
wmmhello's avatar
wmmhello 已提交
1319
        }
1320 1321 1322 1323
        taosMemoryFree(p);
      }
      taosArrayDestroy(kvArray);
    }
X
Xiaoyu Wang 已提交
1324 1325
  } else {
    for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
1326
      SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i);
X
Xiaoyu Wang 已提交
1327
      void    **p1 = (void **)taosHashIterate(kvHash, NULL);
1328 1329
      while (p1) {
        taosMemoryFree(*p1);
X
Xiaoyu Wang 已提交
1330
        p1 = (void **)taosHashIterate(kvHash, p1);
1331 1332 1333 1334
      }
      taosHashCleanup(kvHash);
    }
  }
X
Xiaoyu Wang 已提交
1335
  for (size_t i = 0; i < taosArrayGetSize(tag->tags); i++) {
wmmhello's avatar
wmmhello 已提交
1336
    SSmlKv *p = (SSmlKv *)taosArrayGetP(tag->tags, i);
X
Xiaoyu Wang 已提交
1337 1338 1339 1340
    if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
      taosMemoryFree((void *)p->key);
      if (p->type == TSDB_DATA_TYPE_NCHAR || p->type == TSDB_DATA_TYPE_BINARY) {
        taosMemoryFree((void *)p->value);
wmmhello's avatar
wmmhello 已提交
1341 1342 1343 1344
      }
    }
    taosMemoryFree(p);
  }
X
Xiaoyu Wang 已提交
1345 1346
  if (info->protocol == TSDB_SML_JSON_PROTOCOL && tag->sTableName) {
    taosMemoryFree((void *)tag->sTableName);
wmmhello's avatar
wmmhello 已提交
1347
  }
1348 1349 1350 1351 1352
  taosArrayDestroy(tag->cols);
  taosArrayDestroy(tag->tags);
  taosMemoryFree(tag);
}

X
Xiaoyu Wang 已提交
1353
static int32_t smlKvTimeArrayCompare(const void *key1, const void *key2) {
1354 1355
  SArray *s1 = *(SArray **)key1;
  SArray *s2 = *(SArray **)key2;
1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368
  SSmlKv *kv1 = (SSmlKv *)taosArrayGetP(s1, 0);
  SSmlKv *kv2 = (SSmlKv *)taosArrayGetP(s2, 0);
  ASSERT(kv1->type == TSDB_DATA_TYPE_TIMESTAMP);
  ASSERT(kv2->type == TSDB_DATA_TYPE_TIMESTAMP);
  if (kv1->i < kv2->i) {
    return -1;
  } else if (kv1->i > kv2->i) {
    return 1;
  } else {
    return 0;
  }
}

X
Xiaoyu Wang 已提交
1369
static int32_t smlKvTimeHashCompare(const void *key1, const void *key2) {
1370 1371
  SHashObj *s1 = *(SHashObj **)key1;
  SHashObj *s2 = *(SHashObj **)key2;
1372 1373
  SSmlKv   *kv1 = *(SSmlKv **)taosHashGet(s1, TS, TS_LEN);
  SSmlKv   *kv2 = *(SSmlKv **)taosHashGet(s2, TS, TS_LEN);
1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384
  ASSERT(kv1->type == TSDB_DATA_TYPE_TIMESTAMP);
  ASSERT(kv2->type == TSDB_DATA_TYPE_TIMESTAMP);
  if (kv1->i < kv2->i) {
    return -1;
  } else if (kv1->i > kv2->i) {
    return 1;
  } else {
    return 0;
  }
}

1385 1386
static int32_t smlDealCols(SSmlTableInfo *oneTable, bool dataFormat, SArray *cols) {
  if (dataFormat) {
1387
    void *p = taosArraySearch(oneTable->cols, &cols, smlKvTimeArrayCompare, TD_GT);
1388
    if (p == NULL) {
1389
      taosArrayPush(oneTable->cols, &cols);
1390
    } else {
1391
      taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &cols);
1392
    }
1393 1394 1395 1396
    return TSDB_CODE_SUCCESS;
  }

  SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
1397
  if (!kvHash) {
1398 1399 1400
    uError("SML:smlDealCols failed to allocate memory");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
1401
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
1402
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
wmmhello's avatar
wmmhello 已提交
1403
    taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
1404 1405
  }

1406
  void *p = taosArraySearch(oneTable->cols, &kvHash, smlKvTimeHashCompare, TD_GT);
1407
  if (p == NULL) {
1408
    taosArrayPush(oneTable->cols, &kvHash);
1409
  } else {
1410
    taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &kvHash);
1411
  }
1412 1413 1414
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1415 1416 1417
static SSmlSTableMeta *smlBuildSTableMeta() {
  SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
  if (!meta) {
1418 1419 1420 1421 1422 1423 1424 1425
    return NULL;
  }
  meta->tagHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (meta->tagHash == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }

wmmhello's avatar
wmmhello 已提交
1426 1427
  meta->colHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (meta->colHash == NULL) {
1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }

  meta->tags = taosArrayInit(32, POINTER_BYTES);
  if (meta->tags == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }

  meta->cols = taosArrayInit(32, POINTER_BYTES);
  if (meta->cols == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }
  return meta;

cleanup:
  taosMemoryFree(meta);
  return NULL;
}

X
Xiaoyu Wang 已提交
1450
static void smlDestroySTableMeta(SSmlSTableMeta *meta) {
1451
  taosHashCleanup(meta->tagHash);
wmmhello's avatar
wmmhello 已提交
1452
  taosHashCleanup(meta->colHash);
1453 1454 1455
  taosArrayDestroy(meta->tags);
  taosArrayDestroy(meta->cols);
  taosMemoryFree(meta->tableMeta);
wmmhello's avatar
wmmhello 已提交
1456
  taosMemoryFree(meta);
1457 1458 1459 1460 1461
}

static void smlDestroyCols(SArray *cols) {
  if (!cols) return;
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
wmmhello's avatar
wmmhello 已提交
1462
    void *kv = taosArrayGetP(cols, i);
1463 1464 1465 1466
    taosMemoryFree(kv);
  }
}

X
Xiaoyu Wang 已提交
1467 1468
static void smlDestroyInfo(SSmlHandle *info) {
  if (!info) return;
1469 1470 1471 1472
  qDestroyQuery(info->pQuery);
  smlDestroyHandle(info->exec);

  // destroy info->childTables
X
Xiaoyu Wang 已提交
1473
  void **p1 = (void **)taosHashIterate(info->childTables, NULL);
1474
  while (p1) {
X
Xiaoyu Wang 已提交
1475 1476
    smlDestroyTableInfo(info, (SSmlTableInfo *)(*p1));
    p1 = (void **)taosHashIterate(info->childTables, p1);
1477 1478 1479 1480
  }
  taosHashCleanup(info->childTables);

  // destroy info->superTables
X
Xiaoyu Wang 已提交
1481
  p1 = (void **)taosHashIterate(info->superTables, NULL);
1482
  while (p1) {
X
Xiaoyu Wang 已提交
1483 1484
    smlDestroySTableMeta((SSmlSTableMeta *)(*p1));
    p1 = (void **)taosHashIterate(info->superTables, p1);
1485 1486 1487 1488 1489 1490
  }
  taosHashCleanup(info->superTables);

  // destroy info->pVgHash
  taosHashCleanup(info->pVgHash);
  taosHashCleanup(info->dumplicateKey);
X
Xiaoyu Wang 已提交
1491
  if (!info->dataFormat) {
wmmhello's avatar
wmmhello 已提交
1492 1493
    taosArrayDestroy(info->colsContainer);
  }
wmmhello's avatar
wmmhello 已提交
1494
  destroyRequest(info->pRequest);
1495 1496 1497
  taosMemoryFreeClear(info);
}

1498 1499 1500
static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLProtocolType protocol, int8_t precision) {
  int32_t     code = TSDB_CODE_SUCCESS;
  SSmlHandle *info = (SSmlHandle *)taosMemoryCalloc(1, sizeof(SSmlHandle));
1501 1502 1503
  if (NULL == info) {
    return NULL;
  }
X
Xiaoyu Wang 已提交
1504
  info->id = smlGenId();
1505

1506
  info->pQuery = (SQuery *)nodesMakeNode(QUERY_NODE_QUERY);
1507
  if (NULL == info->pQuery) {
X
Xiaoyu Wang 已提交
1508
    uError("SML:0x%" PRIx64 " create info->pQuery error", info->id);
1509 1510
    goto cleanup;
  }
X
Xiaoyu Wang 已提交
1511
  info->pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
1512
  info->pQuery->haveResultSet = false;
X
Xiaoyu Wang 已提交
1513 1514 1515 1516
  info->pQuery->msgType = TDMT_VND_SUBMIT;
  info->pQuery->pRoot = (SNode *)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
  if (NULL == info->pQuery->pRoot) {
    uError("SML:0x%" PRIx64 " create info->pQuery->pRoot error", info->id);
1517 1518
    goto cleanup;
  }
X
Xiaoyu Wang 已提交
1519
  ((SVnodeModifOpStmt *)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV;
1520

1521 1522
  if (pTscObj) {
    info->taos = pTscObj;
1523 1524 1525 1526 1527
    code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code);
      goto cleanup;
    }
wmmhello's avatar
wmmhello 已提交
1528
  }
1529

X
Xiaoyu Wang 已提交
1530 1531 1532
  info->precision = precision;
  info->protocol = protocol;
  if (protocol == TSDB_SML_LINE_PROTOCOL) {
1533
    info->dataFormat = tsSmlDataFormat;
X
Xiaoyu Wang 已提交
1534
  } else {
1535 1536
    info->dataFormat = true;
  }
1537

1538
  if (request) {
1539 1540 1541
    info->pRequest = request;
    info->msgBuf.buf = info->pRequest->msgBuf;
    info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
1542
    info->pRequest->stmtType = info->pQuery->pRoot->type;
1543
  }
1544

X
Xiaoyu Wang 已提交
1545
  info->exec = smlInitHandle(info->pQuery);
1546 1547
  info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
1548
  info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
1549 1550

  info->dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
1551
  if (!info->dataFormat) {
1552
    info->colsContainer = taosArrayInit(32, POINTER_BYTES);
X
Xiaoyu Wang 已提交
1553 1554
    if (NULL == info->colsContainer) {
      uError("SML:0x%" PRIx64 " create info failed", info->id);
1555 1556 1557
      goto cleanup;
    }
  }
X
Xiaoyu Wang 已提交
1558 1559 1560
  if (NULL == info->exec || NULL == info->childTables || NULL == info->superTables || NULL == info->pVgHash ||
      NULL == info->dumplicateKey) {
    uError("SML:0x%" PRIx64 " create info failed", info->id);
1561 1562 1563 1564 1565 1566 1567 1568 1569 1570
    goto cleanup;
  }

  return info;
cleanup:
  smlDestroyInfo(info);
  return NULL;
}

/************* TSDB_SML_JSON_PROTOCOL function start **************/
X
Xiaoyu Wang 已提交
1571
static int32_t smlJsonCreateSring(const char **output, char *input, int32_t inputLen) {
1572
  *output = (const char *)taosMemoryCalloc(1, inputLen);
X
Xiaoyu Wang 已提交
1573
  if (*output == NULL) {
1574 1575 1576
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
1577
  memcpy((void *)(*output), input, inputLen);
1578 1579 1580 1581 1582 1583
  return TSDB_CODE_SUCCESS;
}

static int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *root, SSmlTableInfo *tinfo) {
  cJSON *metric = cJSON_GetObjectItem(root, "metric");
  if (!cJSON_IsString(metric)) {
X
Xiaoyu Wang 已提交
1584
    return TSDB_CODE_TSC_INVALID_JSON;
1585 1586 1587
  }

  tinfo->sTableNameLen = strlen(metric->valuestring);
1588
  if (IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
X
Xiaoyu Wang 已提交
1589
    uError("OTD:0x%" PRIx64 " Metric lenght is 0 or large than 192", info->id);
1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
  }

  return smlJsonCreateSring(&tinfo->sTableName, metric->valuestring, tinfo->sTableNameLen);
}

static int32_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int64_t *tsVal) {
  int32_t size = cJSON_GetArraySize(root);
  if (size != OTD_JSON_SUB_FIELDS_NUM) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *value = cJSON_GetObjectItem(root, "value");
  if (!cJSON_IsNumber(value)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *type = cJSON_GetObjectItem(root, "type");
  if (!cJSON_IsString(type)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  double timeDouble = value->valuedouble;
X
Xiaoyu Wang 已提交
1613
  if (smlDoubleToInt64OverFlow(timeDouble)) {
1614
    smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1615
    return TSDB_CODE_INVALID_TIMESTAMP;
1616
  }
wmmhello's avatar
wmmhello 已提交
1617 1618 1619 1620 1621 1622 1623 1624

  if (timeDouble == 0) {
    *tsVal = taosGetTimestampNs();
    return TSDB_CODE_SUCCESS;
  }

  if (timeDouble < 0) {
    return TSDB_CODE_INVALID_TIMESTAMP;
1625 1626
  }

1627
  *tsVal = timeDouble;
1628
  size_t typeLen = strlen(type->valuestring);
wmmhello's avatar
wmmhello 已提交
1629
  if (typeLen == 1 && (type->valuestring[0] == 's' || type->valuestring[0] == 'S')) {
X
Xiaoyu Wang 已提交
1630
    // seconds
1631 1632
    *tsVal = *tsVal * NANOSECOND_PER_SEC;
    timeDouble = timeDouble * NANOSECOND_PER_SEC;
X
Xiaoyu Wang 已提交
1633
    if (smlDoubleToInt64OverFlow(timeDouble)) {
1634
      smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1635
      return TSDB_CODE_INVALID_TIMESTAMP;
1636
    }
wmmhello's avatar
wmmhello 已提交
1637
  } else if (typeLen == 2 && (type->valuestring[1] == 's' || type->valuestring[1] == 'S')) {
1638 1639
    switch (type->valuestring[0]) {
      case 'm':
wmmhello's avatar
wmmhello 已提交
1640
      case 'M':
X
Xiaoyu Wang 已提交
1641
        // milliseconds
1642 1643
        *tsVal = *tsVal * NANOSECOND_PER_MSEC;
        timeDouble = timeDouble * NANOSECOND_PER_MSEC;
X
Xiaoyu Wang 已提交
1644
        if (smlDoubleToInt64OverFlow(timeDouble)) {
1645
          smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1646
          return TSDB_CODE_INVALID_TIMESTAMP;
1647 1648 1649
        }
        break;
      case 'u':
wmmhello's avatar
wmmhello 已提交
1650
      case 'U':
X
Xiaoyu Wang 已提交
1651
        // microseconds
1652 1653
        *tsVal = *tsVal * NANOSECOND_PER_USEC;
        timeDouble = timeDouble * NANOSECOND_PER_USEC;
X
Xiaoyu Wang 已提交
1654
        if (smlDoubleToInt64OverFlow(timeDouble)) {
1655
          smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1656
          return TSDB_CODE_INVALID_TIMESTAMP;
1657 1658 1659
        }
        break;
      case 'n':
wmmhello's avatar
wmmhello 已提交
1660
      case 'N':
1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681
        break;
      default:
        return TSDB_CODE_TSC_INVALID_JSON;
    }
  } else {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  return TSDB_CODE_SUCCESS;
}

static uint8_t smlGetTimestampLen(int64_t num) {
  uint8_t len = 0;
  while ((num /= 10) != 0) {
    len++;
  }
  len++;
  return len;
}

static int32_t smlParseTSFromJSON(SSmlHandle *info, cJSON *root, SArray *cols) {
X
Xiaoyu Wang 已提交
1682
  // Timestamp must be the first KV to parse
1683 1684 1685 1686
  int64_t tsVal = 0;

  cJSON *timestamp = cJSON_GetObjectItem(root, "timestamp");
  if (cJSON_IsNumber(timestamp)) {
X
Xiaoyu Wang 已提交
1687
    // timestamp value 0 indicates current system time
1688
    double timeDouble = timestamp->valuedouble;
X
Xiaoyu Wang 已提交
1689
    if (smlDoubleToInt64OverFlow(timeDouble)) {
1690
      smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1691
      return TSDB_CODE_INVALID_TIMESTAMP;
1692
    }
wmmhello's avatar
wmmhello 已提交
1693

X
Xiaoyu Wang 已提交
1694
    if (timeDouble < 0) {
wmmhello's avatar
wmmhello 已提交
1695
      return TSDB_CODE_INVALID_TIMESTAMP;
1696
    }
1697

1698
    uint8_t tsLen = smlGetTimestampLen((int64_t)timeDouble);
1699
    tsVal = (int64_t)timeDouble;
1700
    if (tsLen == TSDB_TIME_PRECISION_SEC_DIGITS) {
1701 1702
      tsVal = tsVal * NANOSECOND_PER_SEC;
      timeDouble = timeDouble * NANOSECOND_PER_SEC;
X
Xiaoyu Wang 已提交
1703
      if (smlDoubleToInt64OverFlow(timeDouble)) {
1704
        smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1705
        return TSDB_CODE_INVALID_TIMESTAMP;
1706 1707
      }
    } else if (tsLen == TSDB_TIME_PRECISION_MILLI_DIGITS) {
1708 1709
      tsVal = tsVal * NANOSECOND_PER_MSEC;
      timeDouble = timeDouble * NANOSECOND_PER_MSEC;
X
Xiaoyu Wang 已提交
1710
      if (smlDoubleToInt64OverFlow(timeDouble)) {
1711
        smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1712
        return TSDB_CODE_INVALID_TIMESTAMP;
1713
      }
X
Xiaoyu Wang 已提交
1714
    } else if (timeDouble == 0) {
wmmhello's avatar
wmmhello 已提交
1715
      tsVal = taosGetTimestampNs();
X
Xiaoyu Wang 已提交
1716
    } else {
wmmhello's avatar
wmmhello 已提交
1717
      return TSDB_CODE_INVALID_TIMESTAMP;
1718 1719 1720 1721
    }
  } else if (cJSON_IsObject(timestamp)) {
    int32_t ret = smlParseTSFromJSONObj(info, timestamp, &tsVal);
    if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
1722
      uError("SML:0x%" PRIx64 " Failed to parse timestamp from JSON Obj", info->id);
1723 1724 1725 1726
      return ret;
    }
  } else {
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
1727 1728
  }

wmmhello's avatar
wmmhello 已提交
1729
  // add ts to
wmmhello's avatar
wmmhello 已提交
1730
  SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1731
  if (!kv) {
wmmhello's avatar
wmmhello 已提交
1732 1733 1734
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  kv->key = TS;
1735 1736
  kv->keyLen = TS_LEN;
  kv->i = tsVal;
wmmhello's avatar
wmmhello 已提交
1737
  kv->type = TSDB_DATA_TYPE_TIMESTAMP;
wmmhello's avatar
wmmhello 已提交
1738
  kv->length = (int16_t)tDataTypes[kv->type].bytes;
X
Xiaoyu Wang 已提交
1739
  if (cols) taosArrayPush(cols, &kv);
wmmhello's avatar
wmmhello 已提交
1740 1741 1742
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1743
static int32_t smlConvertJSONBool(SSmlKv *pVal, char *typeStr, cJSON *value) {
1744 1745 1746 1747 1748 1749 1750
  if (strcasecmp(typeStr, "bool") != 0) {
    uError("OTD:invalid type(%s) for JSON Bool", typeStr);
    return TSDB_CODE_TSC_INVALID_JSON_TYPE;
  }
  pVal->type = TSDB_DATA_TYPE_BOOL;
  pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
  pVal->i = value->valueint;
wmmhello's avatar
wmmhello 已提交
1751

1752 1753
  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
1754

X
Xiaoyu Wang 已提交
1755 1756 1757
static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
  // tinyint
  if (strcasecmp(typeStr, "i8") == 0 || strcasecmp(typeStr, "tinyint") == 0) {
1758 1759 1760 1761 1762 1763 1764 1765 1766
    if (!IS_VALID_TINYINT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(tinyint)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_TINYINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1767 1768
  // smallint
  if (strcasecmp(typeStr, "i16") == 0 || strcasecmp(typeStr, "smallint") == 0) {
1769 1770 1771 1772 1773 1774 1775 1776 1777
    if (!IS_VALID_SMALLINT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(smallint)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_SMALLINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1778 1779
  // int
  if (strcasecmp(typeStr, "i32") == 0 || strcasecmp(typeStr, "int") == 0) {
1780 1781 1782 1783 1784 1785 1786 1787 1788
    if (!IS_VALID_INT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(int)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_INT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1789 1790
  // bigint
  if (strcasecmp(typeStr, "i64") == 0 || strcasecmp(typeStr, "bigint") == 0) {
1791 1792
    pVal->type = TSDB_DATA_TYPE_BIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
X
Xiaoyu Wang 已提交
1793
    if (value->valuedouble >= (double)INT64_MAX) {
1794
      pVal->i = INT64_MAX;
X
Xiaoyu Wang 已提交
1795
    } else if (value->valuedouble <= (double)INT64_MIN) {
1796
      pVal->i = INT64_MIN;
X
Xiaoyu Wang 已提交
1797
    } else {
1798
      pVal->i = value->valuedouble;
1799 1800 1801
    }
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1802 1803
  // float
  if (strcasecmp(typeStr, "f32") == 0 || strcasecmp(typeStr, "float") == 0) {
1804 1805 1806
    if (!IS_VALID_FLOAT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(float)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
wmmhello's avatar
wmmhello 已提交
1807
    }
1808 1809 1810 1811 1812
    pVal->type = TSDB_DATA_TYPE_FLOAT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->f = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1813 1814
  // double
  if (strcasecmp(typeStr, "f64") == 0 || strcasecmp(typeStr, "double") == 0) {
1815 1816 1817 1818
    pVal->type = TSDB_DATA_TYPE_DOUBLE;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->d = value->valuedouble;
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1819 1820
  }

X
Xiaoyu Wang 已提交
1821
  // if reach here means type is unsupported
1822 1823 1824
  uError("OTD:invalid type(%s) for JSON Number", typeStr);
  return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
1825

X
Xiaoyu Wang 已提交
1826
static int32_t smlConvertJSONString(SSmlKv *pVal, char *typeStr, cJSON *value) {
1827 1828 1829 1830 1831 1832 1833 1834 1835
  if (strcasecmp(typeStr, "binary") == 0) {
    pVal->type = TSDB_DATA_TYPE_BINARY;
  } else if (strcasecmp(typeStr, "nchar") == 0) {
    pVal->type = TSDB_DATA_TYPE_NCHAR;
  } else {
    uError("OTD:invalid type(%s) for JSON String", typeStr);
    return TSDB_CODE_TSC_INVALID_JSON_TYPE;
  }
  pVal->length = (int16_t)strlen(value->valuestring);
wmmhello's avatar
wmmhello 已提交
1836

1837
  if (pVal->type == TSDB_DATA_TYPE_BINARY && pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
wmmhello's avatar
wmmhello 已提交
1838 1839
    return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
  }
1840 1841
  if (pVal->type == TSDB_DATA_TYPE_NCHAR &&
      pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
wmmhello's avatar
wmmhello 已提交
1842 1843 1844
    return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
  }

wmmhello's avatar
wmmhello 已提交
1845
  return smlJsonCreateSring(&pVal->value, value->valuestring, pVal->length);
1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871
}

static int32_t smlParseValueFromJSONObj(cJSON *root, SSmlKv *kv) {
  int32_t ret = TSDB_CODE_SUCCESS;
  int32_t size = cJSON_GetArraySize(root);

  if (size != OTD_JSON_SUB_FIELDS_NUM) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *value = cJSON_GetObjectItem(root, "value");
  if (value == NULL) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *type = cJSON_GetObjectItem(root, "type");
  if (!cJSON_IsString(type)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  switch (value->type) {
    case cJSON_True:
    case cJSON_False: {
      ret = smlConvertJSONBool(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
wmmhello's avatar
wmmhello 已提交
1872
      }
1873
      break;
wmmhello's avatar
wmmhello 已提交
1874
    }
1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890
    case cJSON_Number: {
      ret = smlConvertJSONNumber(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
      break;
    }
    case cJSON_String: {
      ret = smlConvertJSONString(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
      break;
    }
    default:
      return TSDB_CODE_TSC_INVALID_JSON_TYPE;
wmmhello's avatar
wmmhello 已提交
1891
  }
1892 1893

  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1894 1895
}

1896 1897 1898 1899 1900 1901 1902 1903
static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
  switch (root->type) {
    case cJSON_True:
    case cJSON_False: {
      kv->type = TSDB_DATA_TYPE_BOOL;
      kv->length = (int16_t)tDataTypes[kv->type].bytes;
      kv->i = root->valueint;
      break;
wmmhello's avatar
wmmhello 已提交
1904
    }
1905 1906 1907 1908 1909 1910 1911 1912 1913 1914
    case cJSON_Number: {
      kv->type = TSDB_DATA_TYPE_DOUBLE;
      kv->length = (int16_t)tDataTypes[kv->type].bytes;
      kv->d = root->valuedouble;
      break;
    }
    case cJSON_String: {
      /* set default JSON type to binary/nchar according to
       * user configured parameter tsDefaultJSONStrType
       */
wmmhello's avatar
wmmhello 已提交
1915

X
Xiaoyu Wang 已提交
1916
      char *tsDefaultJSONStrType = "nchar";  // todo
1917 1918
      smlConvertJSONString(kv, tsDefaultJSONStrType, root);
      break;
wmmhello's avatar
wmmhello 已提交
1919
    }
1920 1921 1922 1923 1924 1925 1926 1927 1928 1929
    case cJSON_Object: {
      int32_t ret = smlParseValueFromJSONObj(root, kv);
      if (ret != TSDB_CODE_SUCCESS) {
        uError("OTD:Failed to parse value from JSON Obj");
        return ret;
      }
      break;
    }
    default:
      return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
1930
  }
1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941

  return TSDB_CODE_SUCCESS;
}

static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
  cJSON *metricVal = cJSON_GetObjectItem(root, "value");
  if (metricVal == NULL) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1942
  if (!kv) {
1943 1944
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
1945
  if (cols) taosArrayPush(cols, &kv);
1946 1947 1948 1949 1950 1951 1952 1953

  kv->key = VALUE;
  kv->keyLen = VALUE_LEN;
  int32_t ret = smlParseValueFromJSON(metricVal, kv);
  if (ret != TSDB_CODE_SUCCESS) {
    return ret;
  }
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1954 1955
}

X
Xiaoyu Wang 已提交
1956 1957
static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, char *childTableName, SHashObj *dumplicateKey,
                                    SSmlMsgBuf *msg) {
1958 1959 1960 1961 1962 1963
  int32_t ret = TSDB_CODE_SUCCESS;

  cJSON *tags = cJSON_GetObjectItem(root, "tags");
  if (tags == NULL || tags->type != cJSON_Object) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }
wmmhello's avatar
wmmhello 已提交
1964

X
Xiaoyu Wang 已提交
1965
  size_t  childTableNameLen = strlen(tsSmlChildTableName);
1966 1967 1968 1969 1970
  int32_t tagNum = cJSON_GetArraySize(tags);
  for (int32_t i = 0; i < tagNum; ++i) {
    cJSON *tag = cJSON_GetArrayItem(tags, i);
    if (tag == NULL) {
      return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
1971
    }
1972 1973 1974 1975 1976
    size_t keyLen = strlen(tag->string);
    if (IS_INVALID_COL_LEN(keyLen)) {
      uError("OTD:Tag key length is 0 or too large than 64");
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
    }
X
Xiaoyu Wang 已提交
1977
    // check duplicate keys
1978
    if (smlCheckDuplicateKey(tag->string, keyLen, dumplicateKey)) {
wmmhello's avatar
wmmhello 已提交
1979
      return TSDB_CODE_TSC_DUP_NAMES;
wmmhello's avatar
wmmhello 已提交
1980 1981
    }

X
Xiaoyu Wang 已提交
1982 1983
    // handle child table name
    if (childTableNameLen != 0 && strcmp(tag->string, tsSmlChildTableName) == 0) {
1984 1985 1986 1987 1988 1989 1990 1991 1992
      if (!cJSON_IsString(tag)) {
        uError("OTD:ID must be JSON string");
        return TSDB_CODE_TSC_INVALID_JSON;
      }
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, tag->valuestring, TSDB_TABLE_NAME_LEN);
      continue;
    }

1993 1994
    // add kv to SSmlKv
    SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1995 1996
    if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
    if (pKVs) taosArrayPush(pKVs, &kv);
1997

X
Xiaoyu Wang 已提交
1998
    // key
1999
    kv->keyLen = keyLen;
2000 2001 2002 2003
    ret = smlJsonCreateSring(&kv->key, tag->string, kv->keyLen);
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }
X
Xiaoyu Wang 已提交
2004
    // value
2005 2006 2007 2008
    ret = smlParseValueFromJSON(tag, kv);
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }
wmmhello's avatar
wmmhello 已提交
2009 2010
  }

2011
  return ret;
wmmhello's avatar
wmmhello 已提交
2012 2013
}

2014 2015
static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo *tinfo, SArray *cols) {
  int32_t ret = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2016

2017
  if (!cJSON_IsObject(root)) {
X
Xiaoyu Wang 已提交
2018
    uError("OTD:0x%" PRIx64 " data point needs to be JSON object", info->id);
2019
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2020
  }
2021

2022
  int32_t size = cJSON_GetArraySize(root);
X
Xiaoyu Wang 已提交
2023
  // outmost json fields has to be exactly 4
2024
  if (size != OTD_JSON_FIELDS_NUM) {
X
Xiaoyu Wang 已提交
2025
    uError("OTD:0x%" PRIx64 " Invalid number of JSON fields in data point %d", info->id, size);
2026
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2027
  }
2028

X
Xiaoyu Wang 已提交
2029
  // Parse metric
2030 2031
  ret = smlParseMetricFromJSON(info, root, tinfo);
  if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2032
    uError("OTD:0x%" PRIx64 " Unable to parse metric from JSON payload", info->id);
2033
    return ret;
wmmhello's avatar
wmmhello 已提交
2034
  }
X
Xiaoyu Wang 已提交
2035
  uDebug("OTD:0x%" PRIx64 " Parse metric from JSON payload finished", info->id);
wmmhello's avatar
wmmhello 已提交
2036

X
Xiaoyu Wang 已提交
2037
  // Parse timestamp
2038 2039
  ret = smlParseTSFromJSON(info, root, cols);
  if (ret) {
X
Xiaoyu Wang 已提交
2040
    uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id);
2041
    return ret;
wmmhello's avatar
wmmhello 已提交
2042
  }
X
Xiaoyu Wang 已提交
2043
  uDebug("OTD:0x%" PRIx64 " Parse timestamp from JSON payload finished", info->id);
2044

X
Xiaoyu Wang 已提交
2045
  // Parse metric value
2046 2047
  ret = smlParseColsFromJSON(root, cols);
  if (ret) {
X
Xiaoyu Wang 已提交
2048
    uError("OTD:0x%" PRIx64 " Unable to parse metric value from JSON payload", info->id);
2049
    return ret;
2050
  }
X
Xiaoyu Wang 已提交
2051
  uDebug("OTD:0x%" PRIx64 " Parse metric value from JSON payload finished", info->id);
2052

X
Xiaoyu Wang 已提交
2053
  // Parse tags
2054
  ret = smlParseTagsFromJSON(root, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
2055
  if (ret) {
X
Xiaoyu Wang 已提交
2056
    uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
2057
    return ret;
2058
  }
X
Xiaoyu Wang 已提交
2059
  uDebug("OTD:0x%" PRIx64 " Parse tags from JSON payload finished", info->id);
wmmhello's avatar
wmmhello 已提交
2060

2061
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2062
}
2063
/************* TSDB_SML_JSON_PROTOCOL function end **************/
wmmhello's avatar
wmmhello 已提交
2064

wmmhello's avatar
wmmhello 已提交
2065
static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql, const int len) {
wmmhello's avatar
wmmhello 已提交
2066
  SSmlLineInfo elements = {0};
2067
  uDebug("SML:0x%" PRIx64 " smlParseInfluxLine sql:%s, hello", info->id, sql);
2068

wmmhello's avatar
wmmhello 已提交
2069
  int ret = smlParseInfluxString(sql, sql + len, &elements, &info->msgBuf);
X
Xiaoyu Wang 已提交
2070 2071
  if (ret != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " smlParseInfluxLine failed", info->id);
wmmhello's avatar
wmmhello 已提交
2072 2073 2074
    return ret;
  }

2075
  SArray *cols = NULL;
X
Xiaoyu Wang 已提交
2076
  if (info->dataFormat) {  // if dataFormat, cols need new memory to save data
2077 2078
    cols = taosArrayInit(16, POINTER_BYTES);
    if (cols == NULL) {
X
Xiaoyu Wang 已提交
2079
      uError("SML:0x%" PRIx64 " smlParseInfluxLine failed to allocate memory", info->id);
2080 2081
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
2082
  } else {  // if dataFormat is false, cols do not need to save data, there is another new memory to save data
2083
    cols = info->colsContainer;
wmmhello's avatar
wmmhello 已提交
2084
  }
wmmhello's avatar
wmmhello 已提交
2085

wmmhello's avatar
wmmhello 已提交
2086
  ret = smlParseTS(info, elements.timestamp, elements.timestampLen, cols);
X
Xiaoyu Wang 已提交
2087 2088 2089
  if (ret != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " smlParseTS failed", info->id);
    if (info->dataFormat) taosArrayDestroy(cols);
wmmhello's avatar
wmmhello 已提交
2090 2091
    return ret;
  }
2092
  ret = smlParseCols(elements.cols, elements.colsLen, cols, NULL, false, info->dumplicateKey, &info->msgBuf);
X
Xiaoyu Wang 已提交
2093 2094
  if (ret != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " smlParseCols parse cloums fields failed", info->id);
2095
    smlDestroyCols(cols);
X
Xiaoyu Wang 已提交
2096
    if (info->dataFormat) taosArrayDestroy(cols);
wmmhello's avatar
wmmhello 已提交
2097 2098 2099
    return ret;
  }

X
Xiaoyu Wang 已提交
2100 2101 2102 2103 2104
  bool            hasTable = true;
  SSmlTableInfo  *tinfo = NULL;
  SSmlTableInfo **oneTable =
      (SSmlTableInfo **)taosHashGet(info->childTables, elements.measure, elements.measureTagsLen);
  if (!oneTable) {
2105
    tinfo = smlBuildTableInfo();
X
Xiaoyu Wang 已提交
2106
    if (!tinfo) {
wmmhello's avatar
wmmhello 已提交
2107 2108
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
2109
    taosHashPut(info->childTables, elements.measure, elements.measureTagsLen, &tinfo, POINTER_BYTES);
2110
    oneTable = &tinfo;
2111 2112 2113 2114
    hasTable = false;
  }

  ret = smlDealCols(*oneTable, info->dataFormat, cols);
X
Xiaoyu Wang 已提交
2115
  if (ret != TSDB_CODE_SUCCESS) {
2116 2117
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
2118

X
Xiaoyu Wang 已提交
2119 2120 2121 2122 2123
  if (!hasTable) {
    ret = smlParseCols(elements.tags, elements.tagsLen, (*oneTable)->tags, (*oneTable)->childTableName, true,
                       info->dumplicateKey, &info->msgBuf);
    if (ret != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " smlParseCols parse tag fields failed", info->id);
wmmhello's avatar
wmmhello 已提交
2124 2125 2126
      return ret;
    }

X
Xiaoyu Wang 已提交
2127
    if (taosArrayGetSize((*oneTable)->tags) > TSDB_MAX_TAGS) {
wmmhello's avatar
wmmhello 已提交
2128
      smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
wmmhello's avatar
wmmhello 已提交
2129
      return TSDB_CODE_PAR_INVALID_TAGS_NUM;
wmmhello's avatar
wmmhello 已提交
2130 2131
    }

2132 2133 2134 2135 2136
    if (taosArrayGetSize(cols) + taosArrayGetSize((*oneTable)->tags) > TSDB_MAX_COLUMNS) {
      smlBuildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL);
      return TSDB_CODE_PAR_TOO_MANY_COLUMNS;
    }

2137 2138
    (*oneTable)->sTableName = elements.measure;
    (*oneTable)->sTableNameLen = elements.measureLen;
X
Xiaoyu Wang 已提交
2139 2140 2141
    if (strlen((*oneTable)->childTableName) == 0) {
      RandTableName rName = {(*oneTable)->tags, (*oneTable)->sTableName, (uint8_t)(*oneTable)->sTableNameLen,
                             (*oneTable)->childTableName, 0};
2142 2143 2144

      buildChildTableName(&rName);
      (*oneTable)->uid = rName.uid;
X
Xiaoyu Wang 已提交
2145 2146
    } else {
      (*oneTable)->uid = *(uint64_t *)((*oneTable)->childTableName);
2147
    }
2148
  }
wmmhello's avatar
wmmhello 已提交
2149

X
Xiaoyu Wang 已提交
2150 2151
  SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, elements.measure, elements.measureLen);
  if (tableMeta) {  // update meta
wmmhello's avatar
wmmhello 已提交
2152
    ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, cols, &info->msgBuf);
wmmhello's avatar
wmmhello 已提交
2153
    if (!hasTable && ret == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2154 2155
      ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, (*oneTable)->tags, &info->msgBuf);
    }
wmmhello's avatar
wmmhello 已提交
2156
    if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2157
      uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
wmmhello's avatar
wmmhello 已提交
2158
      return ret;
wmmhello's avatar
wmmhello 已提交
2159
    }
X
Xiaoyu Wang 已提交
2160
  } else {
2161
    SSmlSTableMeta *meta = smlBuildSTableMeta();
wmmhello's avatar
wmmhello 已提交
2162 2163
    smlInsertMeta(meta->tagHash, meta->tags, (*oneTable)->tags);
    smlInsertMeta(meta->colHash, meta->cols, cols);
2164
    taosHashPut(info->superTables, elements.measure, elements.measureLen, &meta, POINTER_BYTES);
wmmhello's avatar
wmmhello 已提交
2165
  }
2166

X
Xiaoyu Wang 已提交
2167
  if (!info->dataFormat) {
2168 2169 2170
    taosArrayClear(info->colsContainer);
  }
  taosHashClear(info->dumplicateKey);
wmmhello's avatar
wmmhello 已提交
2171 2172 2173
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
2174
static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) {
X
Xiaoyu Wang 已提交
2175
  int            ret = TSDB_CODE_SUCCESS;
2176
  SSmlTableInfo *tinfo = smlBuildTableInfo();
X
Xiaoyu Wang 已提交
2177
  if (!tinfo) {
2178
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
2179 2180
  }

2181 2182
  SArray *cols = taosArrayInit(16, POINTER_BYTES);
  if (cols == NULL) {
X
Xiaoyu Wang 已提交
2183
    uError("SML:0x%" PRIx64 " smlParseTelnetLine failed to allocate memory", info->id);
2184
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
2185 2186
  }

X
Xiaoyu Wang 已提交
2187
  if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
wmmhello's avatar
wmmhello 已提交
2188
    ret = smlParseTelnetString(info, (const char *)data, data + len, tinfo, cols);
X
Xiaoyu Wang 已提交
2189
  } else if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
2190
    ret = smlParseJSONString(info, (cJSON *)data, tinfo, cols);
X
Xiaoyu Wang 已提交
2191
  } else {
2192 2193
    ASSERT(0);
  }
X
Xiaoyu Wang 已提交
2194 2195
  if (ret != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " smlParseTelnetLine failed", info->id);
wmmhello's avatar
wmmhello 已提交
2196
    smlDestroyTableInfo(info, tinfo);
wmmhello's avatar
wmmhello 已提交
2197
    smlDestroyCols(cols);
2198 2199
    taosArrayDestroy(cols);
    return ret;
wmmhello's avatar
wmmhello 已提交
2200
  }
wmmhello's avatar
wmmhello 已提交
2201

X
Xiaoyu Wang 已提交
2202
  if (taosArrayGetSize(tinfo->tags) <= 0 || taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS) {
2203
    smlBuildInvalidDataMsg(&info->msgBuf, "invalidate tags length:[1,128]", NULL);
wmmhello's avatar
wmmhello 已提交
2204 2205 2206
    smlDestroyTableInfo(info, tinfo);
    smlDestroyCols(cols);
    taosArrayDestroy(cols);
wmmhello's avatar
wmmhello 已提交
2207
    return TSDB_CODE_PAR_INVALID_TAGS_NUM;
wmmhello's avatar
wmmhello 已提交
2208
  }
2209 2210
  taosHashClear(info->dumplicateKey);

X
Xiaoyu Wang 已提交
2211 2212
  if (strlen(tinfo->childTableName) == 0) {
    RandTableName rName = {tinfo->tags, tinfo->sTableName, (uint8_t)tinfo->sTableNameLen, tinfo->childTableName, 0};
2213 2214
    buildChildTableName(&rName);
    tinfo->uid = rName.uid;
X
Xiaoyu Wang 已提交
2215 2216
  } else {
    tinfo->uid = *(uint64_t *)(tinfo->childTableName);  // generate uid by name simple
2217 2218
  }

X
Xiaoyu Wang 已提交
2219 2220 2221 2222
  bool            hasTable = true;
  SSmlTableInfo **oneTable =
      (SSmlTableInfo **)taosHashGet(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName));
  if (!oneTable) {
2223
    taosHashPut(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName), &tinfo, POINTER_BYTES);
2224
    oneTable = &tinfo;
2225
    hasTable = false;
X
Xiaoyu Wang 已提交
2226
  } else {
wmmhello's avatar
wmmhello 已提交
2227
    smlDestroyTableInfo(info, tinfo);
wmmhello's avatar
wmmhello 已提交
2228
  }
wmmhello's avatar
wmmhello 已提交
2229

2230
  taosArrayPush((*oneTable)->cols, &cols);
X
Xiaoyu Wang 已提交
2231 2232 2233
  SSmlSTableMeta **tableMeta =
      (SSmlSTableMeta **)taosHashGet(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen);
  if (tableMeta) {  // update meta
wmmhello's avatar
wmmhello 已提交
2234
    ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, cols, &info->msgBuf);
wmmhello's avatar
wmmhello 已提交
2235
    if (!hasTable && ret == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2236 2237
      ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, (*oneTable)->tags, &info->msgBuf);
    }
wmmhello's avatar
wmmhello 已提交
2238
    if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2239
      uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
wmmhello's avatar
wmmhello 已提交
2240
      return ret;
2241
    }
X
Xiaoyu Wang 已提交
2242
  } else {
2243
    SSmlSTableMeta *meta = smlBuildSTableMeta();
wmmhello's avatar
wmmhello 已提交
2244 2245
    smlInsertMeta(meta->tagHash, meta->tags, (*oneTable)->tags);
    smlInsertMeta(meta->colHash, meta->cols, cols);
2246
    taosHashPut(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen, &meta, POINTER_BYTES);
wmmhello's avatar
wmmhello 已提交
2247 2248
  }

2249 2250
  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
2251

X
Xiaoyu Wang 已提交
2252
static int32_t smlParseJSON(SSmlHandle *info, char *payload) {
2253 2254
  int32_t payloadNum = 0;
  int32_t ret = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2255

2256
  if (payload == NULL) {
X
Xiaoyu Wang 已提交
2257
    uError("SML:0x%" PRIx64 " empty JSON Payload", info->id);
2258
    return TSDB_CODE_TSC_INVALID_JSON;
2259
  }
2260 2261 2262

  cJSON *root = cJSON_Parse(payload);
  if (root == NULL) {
X
Xiaoyu Wang 已提交
2263
    uError("SML:0x%" PRIx64 " parse json failed:%s", info->id, payload);
2264 2265
    return TSDB_CODE_TSC_INVALID_JSON;
  }
X
Xiaoyu Wang 已提交
2266
  // multiple data points must be sent in JSON array
2267 2268 2269 2270 2271
  if (cJSON_IsObject(root)) {
    payloadNum = 1;
  } else if (cJSON_IsArray(root)) {
    payloadNum = cJSON_GetArraySize(root);
  } else {
X
Xiaoyu Wang 已提交
2272
    uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
2273 2274
    ret = TSDB_CODE_TSC_INVALID_JSON;
    goto end;
wmmhello's avatar
wmmhello 已提交
2275
  }
wmmhello's avatar
wmmhello 已提交
2276

2277 2278
  for (int32_t i = 0; i < payloadNum; ++i) {
    cJSON *dataPoint = (payloadNum == 1 && cJSON_IsObject(root)) ? root : cJSON_GetArrayItem(root, i);
wmmhello's avatar
wmmhello 已提交
2279
    ret = smlParseTelnetLine(info, dataPoint, -1);
X
Xiaoyu Wang 已提交
2280 2281
    if (ret != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
2282 2283 2284 2285 2286 2287 2288
      goto end;
    }
  }

end:
  cJSON_Delete(root);
  return ret;
wmmhello's avatar
wmmhello 已提交
2289
}
2290

X
Xiaoyu Wang 已提交
2291
static int32_t smlInsertData(SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
2292 2293
  int32_t code = TSDB_CODE_SUCCESS;

X
Xiaoyu Wang 已提交
2294
  SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
wmmhello's avatar
wmmhello 已提交
2295
  while (oneTable) {
X
Xiaoyu Wang 已提交
2296
    SSmlTableInfo *tableData = *oneTable;
wmmhello's avatar
wmmhello 已提交
2297 2298 2299 2300

    SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
    strcpy(pName.dbname, info->pRequest->pDb);
    memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName));
D
dapan1121 已提交
2301 2302 2303 2304 2305 2306

    SRequestConnInfo conn = {0};
    conn.pTrans = info->taos->pAppInfo->pTransporter;
    conn.requestId = info->pRequest->requestId;
    conn.requestObjRefId = info->pRequest->self;
    conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
X
Xiaoyu Wang 已提交
2307

wmmhello's avatar
wmmhello 已提交
2308
    SVgroupInfo vg;
D
dapan1121 已提交
2309
    code = catalogGetTableHashVgroup(info->pCatalog, &conn, &pName, &vg);
2310
    if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2311
      uError("SML:0x%" PRIx64 " catalogGetTableHashVgroup failed. table name: %s", info->id, tableData->childTableName);
wmmhello's avatar
wmmhello 已提交
2312 2313
      return code;
    }
X
Xiaoyu Wang 已提交
2314
    taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
wmmhello's avatar
wmmhello 已提交
2315

X
Xiaoyu Wang 已提交
2316 2317 2318
    SSmlSTableMeta **pMeta =
        (SSmlSTableMeta **)taosHashGet(info->superTables, tableData->sTableName, tableData->sTableNameLen);
    ASSERT(NULL != pMeta && NULL != *pMeta);
wmmhello's avatar
wmmhello 已提交
2319

2320
    // use tablemeta of stable to save vgid and uid of child table
wmmhello's avatar
wmmhello 已提交
2321
    (*pMeta)->tableMeta->vgId = vg.vgId;
X
Xiaoyu Wang 已提交
2322
    (*pMeta)->tableMeta->uid = tableData->uid;  // one table merge data block together according uid
wmmhello's avatar
wmmhello 已提交
2323

2324
    code = smlBindData(info->exec, tableData->tags, (*pMeta)->cols, tableData->cols, info->dataFormat,
2325 2326
                       (*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen,
                       info->msgBuf.buf, info->msgBuf.len);
X
Xiaoyu Wang 已提交
2327 2328
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " smlBindData failed", info->id);
wmmhello's avatar
wmmhello 已提交
2329 2330
      return code;
    }
X
Xiaoyu Wang 已提交
2331
    oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable);
wmmhello's avatar
wmmhello 已提交
2332
  }
wmmhello's avatar
wmmhello 已提交
2333

2334 2335
  code = smlBuildOutput(info->exec, info->pVgHash);
  if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2336
    uError("SML:0x%" PRIx64 " smlBuildOutput failed", info->id);
2337 2338
    return code;
  }
2339 2340
  info->cost.insertRpcTime = taosGetTimestampUs();

X
Xiaoyu Wang 已提交
2341 2342 2343
  // launchQueryImpl(info->pRequest, info->pQuery, false, NULL);
  //  info->affectedRows = taos_affected_rows(info->pRequest);
  //  return info->pRequest->code;
wmmhello's avatar
wmmhello 已提交
2344

2345 2346 2347
  SAppClusterSummary *pActivity = &info->taos->pAppInfo->summary;
  atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);

2348 2349 2350 2351 2352 2353
  SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
  if (pWrapper == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pWrapper->pRequest = info->pRequest;
  launchAsyncQuery(info->pRequest, info->pQuery, NULL, pWrapper);
wmmhello's avatar
wmmhello 已提交
2354
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2355 2356
}

X
Xiaoyu Wang 已提交
2357 2358 2359 2360 2361 2362 2363 2364 2365
static void smlPrintStatisticInfo(SSmlHandle *info) {
  uError("SML:0x%" PRIx64
         " smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d \
        parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64
         "",
         info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables,
         info->cost.numOfCreateSTables, info->cost.schemaTime - info->cost.parseTime,
         info->cost.insertBindTime - info->cost.schemaTime, info->cost.insertRpcTime - info->cost.insertBindTime,
         info->cost.endTime - info->cost.insertRpcTime, info->cost.endTime - info->cost.parseTime);
2366 2367
}

wmmhello's avatar
wmmhello 已提交
2368
static int32_t smlParseLine(SSmlHandle *info, char *lines[], char* rawLine, char* rawLineEnd, int numLines) {
wmmhello's avatar
wmmhello 已提交
2369
  int32_t code = TSDB_CODE_SUCCESS;
2370
  if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
wmmhello's avatar
wmmhello 已提交
2371 2372 2373 2374 2375
    if(lines){
      code = smlParseJSON(info, *lines);
    }else if(rawLine){
      code = smlParseJSON(info, rawLine);
    }
2376 2377 2378 2379
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, *lines);
      return code;
    }
wmmhello's avatar
wmmhello 已提交
2380
    return code;
wmmhello's avatar
wmmhello 已提交
2381
  }
wmmhello's avatar
wmmhello 已提交
2382

wmmhello's avatar
wmmhello 已提交
2383
  for (int32_t i = 0; i < numLines; ++i) {
wmmhello's avatar
wmmhello 已提交
2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398
    char *tmp = NULL;
    int  len  = 0;
    if(lines){
      tmp = lines[i];
      len = strlen(tmp);
    }else if(rawLine){
      tmp = rawLine;
      while(rawLine < rawLineEnd){
        if(*(rawLine++) == '\n'){
          break;
        }
        len++;
      }
    }

X
Xiaoyu Wang 已提交
2399
    if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
wmmhello's avatar
wmmhello 已提交
2400
      code = smlParseInfluxLine(info, tmp, len);
X
Xiaoyu Wang 已提交
2401
    } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
wmmhello's avatar
wmmhello 已提交
2402
      code = smlParseTelnetLine(info, tmp, len);
X
Xiaoyu Wang 已提交
2403
    } else {
2404 2405
      ASSERT(0);
    }
wmmhello's avatar
wmmhello 已提交
2406
    if (code != TSDB_CODE_SUCCESS) {
2407 2408
      uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, lines[i]);
      return code;
wmmhello's avatar
wmmhello 已提交
2409 2410
    }
  }
2411 2412 2413
  return code;
}

wmmhello's avatar
wmmhello 已提交
2414
static int smlProcess(SSmlHandle *info, char *lines[], char* rawLine, char* rawLineEnd, int numLines) {
2415
  int32_t code = TSDB_CODE_SUCCESS;
2416 2417
  int32_t retryNum = 0;

2418 2419
  info->cost.parseTime = taosGetTimestampUs();

wmmhello's avatar
wmmhello 已提交
2420
  code = smlParseLine(info, lines, rawLine, rawLineEnd, numLines);
2421
  if (code != 0) {
X
Xiaoyu Wang 已提交
2422
    uError("SML:0x%" PRIx64 " smlParseLine error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2423
    return code;
2424
  }
wmmhello's avatar
wmmhello 已提交
2425

2426 2427 2428 2429 2430
  info->cost.lineNum = numLines;
  info->cost.numOfSTables = taosHashGetSize(info->superTables);
  info->cost.numOfCTables = taosHashGetSize(info->childTables);

  info->cost.schemaTime = taosGetTimestampUs();
2431

X
Xiaoyu Wang 已提交
2432
  do {
2433 2434
    code = smlModifyDBSchemas(info);
    if (code == 0) break;
wmmhello's avatar
wmmhello 已提交
2435
  } while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
2436

wmmhello's avatar
wmmhello 已提交
2437
  if (code != 0) {
X
Xiaoyu Wang 已提交
2438
    uError("SML:0x%" PRIx64 " smlModifyDBSchemas error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2439
    return code;
wmmhello's avatar
wmmhello 已提交
2440
  }
wmmhello's avatar
wmmhello 已提交
2441

2442
  info->cost.insertBindTime = taosGetTimestampUs();
wmmhello's avatar
wmmhello 已提交
2443 2444
  code = smlInsertData(info);
  if (code != 0) {
X
Xiaoyu Wang 已提交
2445
    uError("SML:0x%" PRIx64 " smlInsertData error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2446
    return code;
wmmhello's avatar
wmmhello 已提交
2447 2448 2449 2450 2451
  }

  return code;
}

X
Xiaoyu Wang 已提交
2452
static int32_t isSchemalessDb(STscObj *taos, SRequestObj *request) {
2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480
  //  SCatalog *catalog = NULL;
  //  int32_t   code = catalogGetHandle(((STscObj *)taos)->pAppInfo->clusterId, &catalog);
  //  if (code != TSDB_CODE_SUCCESS) {
  //    uError("SML get catalog error %d", code);
  //    return code;
  //  }
  //
  //  SName name;
  //  tNameSetDbName(&name, taos->acctId, taos->db, strlen(taos->db));
  //  char dbFname[TSDB_DB_FNAME_LEN] = {0};
  //  tNameGetFullDbName(&name, dbFname);
  //  SDbCfgInfo pInfo = {0};
  //
  //  SRequestConnInfo conn = {0};
  //  conn.pTrans = taos->pAppInfo->pTransporter;
  //  conn.requestId = request->requestId;
  //  conn.requestObjRefId = request->self;
  //  conn.mgmtEps = getEpSet_s(&taos->pAppInfo->mgmtEp);
  //
  //  code = catalogGetDBCfg(catalog, &conn, dbFname, &pInfo);
  //  if (code != TSDB_CODE_SUCCESS) {
  //    return code;
  //  }
  //  taosArrayDestroy(pInfo.pRetensions);
  //
  //  if (!pInfo.schemaless) {
  //    return TSDB_CODE_SML_INVALID_DB_CONF;
  //  }
wmmhello's avatar
wmmhello 已提交
2481 2482 2483
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
2484
static void smlInsertCallback(void *param, void *res, int32_t code) {
wmmhello's avatar
wmmhello 已提交
2485
  SRequestObj *pRequest = (SRequestObj *)res;
X
Xiaoyu Wang 已提交
2486
  SSmlHandle  *info = (SSmlHandle *)param;
2487
  int32_t      rows = taos_affected_rows(pRequest);
wmmhello's avatar
wmmhello 已提交
2488

X
Xiaoyu Wang 已提交
2489
  uDebug("SML:0x%" PRIx64 " result. code:%d, msg:%s", info->id, pRequest->code, pRequest->msgBuf);
2490
  Params *pParam = info->params;
wmmhello's avatar
wmmhello 已提交
2491
  // lock
2492 2493
  taosThreadSpinLock(&pParam->lock);
  pParam->cnt++;
X
Xiaoyu Wang 已提交
2494
  if (code != TSDB_CODE_SUCCESS) {
2495 2496
    pParam->request->code = code;
    pParam->request->body.resInfo.numOfRows += rows;
2497
  } else {
2498 2499
    pParam->request->body.resInfo.numOfRows += info->affectedRows;
  }
2500 2501 2502
  // unlock
  taosThreadSpinUnlock(&pParam->lock);

2503 2504
  if (pParam->cnt == pParam->total) {
    tsem_post(&pParam->sem);
wmmhello's avatar
wmmhello 已提交
2505
  }
wmmhello's avatar
wmmhello 已提交
2506
  uDebug("SML:0x%" PRIx64 " insert finished, code: %d, rows: %d, total: %d", info->id, code, rows, info->affectedRows);
wmmhello's avatar
wmmhello 已提交
2507 2508 2509
  info->cost.endTime = taosGetTimestampUs();
  info->cost.code = code;
  smlPrintStatisticInfo(info);
wmmhello's avatar
wmmhello 已提交
2510 2511 2512
  smlDestroyInfo(info);
}

wmmhello's avatar
wmmhello 已提交
2513

wmmhello's avatar
wmmhello 已提交
2514
TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd, int numLines, int protocol, int precision) {
2515 2516
  int      batchs = 0;
  STscObj *pTscObj = request->pTscObj;
2517

2518
  pTscObj->schemalessType = 1;
wmmhello's avatar
wmmhello 已提交
2519
  SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
wmmhello's avatar
wmmhello 已提交
2520

2521
  Params params = {0};
wmmhello's avatar
wmmhello 已提交
2522
  params.request = request;
wmmhello's avatar
wmmhello 已提交
2523 2524 2525
  tsem_init(&params.sem, 0, 0);
  taosThreadSpinInit(&(params.lock), 0);

X
Xiaoyu Wang 已提交
2526
  if (request->pDb == NULL) {
wmmhello's avatar
wmmhello 已提交
2527
    request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
wmmhello's avatar
wmmhello 已提交
2528
    smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
wmmhello's avatar
wmmhello 已提交
2529 2530 2531
    goto end;
  }

2532
  if (isSchemalessDb(pTscObj, request) != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2533
    request->code = TSDB_CODE_SML_INVALID_DB_CONF;
wmmhello's avatar
wmmhello 已提交
2534
    smlBuildInvalidDataMsg(&msg, "Cannot write data to a non schemaless database", NULL);
wmmhello's avatar
wmmhello 已提交
2535 2536 2537
    goto end;
  }

X
Xiaoyu Wang 已提交
2538
  if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) {
2539
    request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
wmmhello's avatar
wmmhello 已提交
2540
    smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL);
2541
    goto end;
wmmhello's avatar
wmmhello 已提交
2542 2543
  }

X
Xiaoyu Wang 已提交
2544 2545
  if (protocol == TSDB_SML_LINE_PROTOCOL &&
      (precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)) {
2546
    request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
wmmhello's avatar
wmmhello 已提交
2547
    smlBuildInvalidDataMsg(&msg, "precision invalidate for line protocol", NULL);
2548 2549 2550
    goto end;
  }

2551
  if (protocol == TSDB_SML_JSON_PROTOCOL) {
wmmhello's avatar
wmmhello 已提交
2552
    numLines = 1;
2553
  } else if (numLines <= 0) {
wmmhello's avatar
wmmhello 已提交
2554 2555 2556 2557 2558
    request->code = TSDB_CODE_SML_INVALID_DATA;
    smlBuildInvalidDataMsg(&msg, "line num is invalid", NULL);
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
2559
  batchs = ceil(((double)numLines) / LINE_BATCH);
2560
  params.total = batchs;
wmmhello's avatar
wmmhello 已提交
2561
  for (int i = 0; i < batchs; ++i) {
2562 2563
    SRequestObj *req = (SRequestObj *)createRequest(pTscObj->id, TSDB_SQL_INSERT);
    if (!req) {
wmmhello's avatar
wmmhello 已提交
2564 2565 2566 2567
      request->code = TSDB_CODE_OUT_OF_MEMORY;
      uError("SML:taos_schemaless_insert error request is null");
      goto end;
    }
2568 2569
    SSmlHandle *info = smlBuildSmlInfo(pTscObj, req, (SMLProtocolType)protocol, precision);
    if (!info) {
wmmhello's avatar
wmmhello 已提交
2570 2571 2572 2573 2574
      request->code = TSDB_CODE_OUT_OF_MEMORY;
      uError("SML:taos_schemaless_insert error SSmlHandle is null");
      goto end;
    }

wmmhello's avatar
wmmhello 已提交
2575 2576
    int32_t perBatch = LINE_BATCH;

X
Xiaoyu Wang 已提交
2577
    if (numLines > perBatch) {
wmmhello's avatar
wmmhello 已提交
2578
      numLines -= perBatch;
X
Xiaoyu Wang 已提交
2579
    } else {
wmmhello's avatar
wmmhello 已提交
2580 2581 2582 2583
      perBatch = numLines;
      numLines = 0;
    }

wmmhello's avatar
wmmhello 已提交
2584
    info->params = &params;
wmmhello's avatar
wmmhello 已提交
2585 2586
    info->affectedRows = perBatch;
    info->pRequest->body.queryFp = smlInsertCallback;
X
Xiaoyu Wang 已提交
2587
    info->pRequest->body.param = info;
wmmhello's avatar
wmmhello 已提交
2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602
    int32_t code = smlProcess(info, lines, rawLine, rawLineEnd, perBatch);
    if(lines){
      lines += perBatch;
    }
    if(rawLine){
      int num = 0;
      while(rawLine < rawLineEnd){
        if(*(rawLine++) == '\n'){
          num++;
        }
        if(num == perBatch){
          break;
        }
      }
    }
X
Xiaoyu Wang 已提交
2603
    if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2604 2605 2606 2607
      info->pRequest->body.queryFp(info, req, code);
    }
  }
  tsem_wait(&params.sem);
2608

wmmhello's avatar
wmmhello 已提交
2609
  end:
wmmhello's avatar
wmmhello 已提交
2610 2611
  taosThreadSpinDestroy(&params.lock);
  tsem_destroy(&params.sem);
2612
  //  ((STscObj *)taos)->schemalessType = 0;
2613
  pTscObj->schemalessType = 1;
2614
  uDebug("resultend:%s", request->msgBuf);
2615
  return (TAOS_RES *)request;
wmmhello's avatar
wmmhello 已提交
2616
}
wmmhello's avatar
wmmhello 已提交
2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688

/**
 * taos_schemaless_insert() parse and insert data points into database according to
 * different protocol.
 *
 * @param $lines input array may contain multiple lines, each line indicates a data point.
 *               If protocol=2 is used input array should contain single JSON
 *               string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
 *               multiple data points in JSON format, should include them in $JSON_string
 *               as a JSON array.
 * @param $numLines indicates how many data points in $lines.
 *                  If protocol = 2 is used this param will be ignored as $lines should
 *                  contain single JSON string.
 * @param $protocol indicates which protocol to use for parsing:
 *                  0 - influxDB line protocol
 *                  1 - OpenTSDB telnet line protocol
 *                  2 - OpenTSDB JSON format protocol
 * @return return zero for successful insertion. Otherwise return none-zero error code of
 *         failure reason.
 *
 */

TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) {
  if (NULL == taos) {
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    return NULL;
  }

  SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT);
  if (!request) {
    uError("SML:taos_schemaless_insert error request is null");
    return NULL;
  }

  if (!lines || numLines <= 0) {
    SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
    request->code = TSDB_CODE_SML_INVALID_DATA;
    smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
    return (TAOS_RES *)request;
  }

  return taos_schemaless_insert_inner(request, lines, NULL, NULL, numLines, protocol, precision);
}

TAOS_RES *taos_schemaless_insert_raw(TAOS* taos, char* lines, int len, int32_t *totalRows, int protocol, int precision){
  if (NULL == taos) {
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    return NULL;
  }

  SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT);
  if (!request) {
    uError("SML:taos_schemaless_insert error request is null");
    return NULL;
  }

  if (!lines || len <= 0) {
    SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
    request->code = TSDB_CODE_SML_INVALID_DATA;
    smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
    return (TAOS_RES *)request;
  }

  int numLines = 0;
  for(int i = 0; i < len; i++){
    if(lines[i] == '\n' || i == len - 1){
      numLines++;
    }
  }
  *totalRows = numLines;
  return taos_schemaless_insert_inner(request, NULL, lines, lines + len, numLines, protocol, precision);
}