clientSml.c 82.6 KB
Newer Older
wmmhello's avatar
wmmhello 已提交
1 2 3 4 5
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

X
Xiaoyu Wang 已提交
6 7 8 9 10
#include "cJSON.h"
#include "catalog.h"
#include "clientInt.h"
#include "osSemaphore.h"
#include "osThread.h"
wmmhello's avatar
wmmhello 已提交
11 12
#include "query.h"
#include "taos.h"
wmmhello's avatar
wmmhello 已提交
13
#include "taoserror.h"
X
Xiaoyu Wang 已提交
14
#include "tcommon.h"
wmmhello's avatar
wmmhello 已提交
15
#include "tdef.h"
X
Xiaoyu Wang 已提交
16
#include "tglobal.h"
wmmhello's avatar
wmmhello 已提交
17 18
#include "tlog.h"
#include "tmsg.h"
X
Xiaoyu Wang 已提交
19
#include "tname.h"
wmmhello's avatar
wmmhello 已提交
20 21
#include "ttime.h"
#include "ttypes.h"
wmmhello's avatar
wmmhello 已提交
22

wmmhello's avatar
wmmhello 已提交
23
//=================================================================================================
wmmhello's avatar
wmmhello 已提交
24 25 26 27 28 29 30

#define SPACE ' '
#define COMMA ','
#define EQUAL '='
#define QUOTE '"'
#define SLASH '\\'

X
Xiaoyu Wang 已提交
31 32 33 34 35 36 37
#define JUMP_SPACE(sql)  \
  while (*sql != '\0') { \
    if (*sql == SPACE)   \
      sql++;             \
    else                 \
      break;             \
  }
wmmhello's avatar
wmmhello 已提交
38
// comma ,
X
Xiaoyu Wang 已提交
39 40
#define IS_SLASH_COMMA(sql) (*(sql) == COMMA && *((sql)-1) == SLASH)
#define IS_COMMA(sql)       (*(sql) == COMMA && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
41
// space
X
Xiaoyu Wang 已提交
42 43
#define IS_SLASH_SPACE(sql) (*(sql) == SPACE && *((sql)-1) == SLASH)
#define IS_SPACE(sql)       (*(sql) == SPACE && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
44
// equal =
X
Xiaoyu Wang 已提交
45 46
#define IS_SLASH_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) == SLASH)
#define IS_EQUAL(sql)       (*(sql) == EQUAL && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
47
// quote "
X
Xiaoyu Wang 已提交
48 49
#define IS_SLASH_QUOTE(sql) (*(sql) == QUOTE && *((sql)-1) == SLASH)
#define IS_QUOTE(sql)       (*(sql) == QUOTE && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
50
// SLASH
X
Xiaoyu Wang 已提交
51
#define IS_SLASH_SLASH(sql) (*(sql) == SLASH && *((sql)-1) == SLASH)
wmmhello's avatar
wmmhello 已提交
52

X
Xiaoyu Wang 已提交
53 54
#define IS_SLASH_LETTER(sql) \
  (IS_SLASH_COMMA(sql) || IS_SLASH_SPACE(sql) || IS_SLASH_EQUAL(sql) || IS_SLASH_QUOTE(sql) || IS_SLASH_SLASH(sql))
wmmhello's avatar
wmmhello 已提交
55

X
Xiaoyu Wang 已提交
56
#define MOVE_FORWARD_ONE(sql, len) (memmove((void *)((sql)-1), (sql), len))
wmmhello's avatar
wmmhello 已提交
57

X
Xiaoyu Wang 已提交
58 59 60 61 62 63 64 65
#define PROCESS_SLASH(key, keyLen)           \
  for (int i = 1; i < keyLen; ++i) {         \
    if (IS_SLASH_LETTER(key + i)) {          \
      MOVE_FORWARD_ONE(key + i, keyLen - i); \
      i--;                                   \
      keyLen--;                              \
    }                                        \
  }
wmmhello's avatar
wmmhello 已提交
66

67 68 69
#define IS_INVALID_COL_LEN(len)   ((len) <= 0 || (len) >= TSDB_COL_NAME_LEN)
#define IS_INVALID_TABLE_LEN(len) ((len) <= 0 || (len) >= TSDB_TABLE_NAME_LEN)

70 71 72
#define OTD_JSON_SUB_FIELDS_NUM 2
#define OTD_JSON_FIELDS_NUM     4

X
Xiaoyu Wang 已提交
73 74 75 76
#define TS        "_ts"
#define TS_LEN    3
#define VALUE     "_value"
#define VALUE_LEN 6
77

X
Xiaoyu Wang 已提交
78 79
#define BINARY_ADD_LEN 2  // "binary"   2 means " "
#define NCHAR_ADD_LEN  3  // L"nchar"   3 means L" "
wmmhello's avatar
wmmhello 已提交
80 81

#define MAX_RETRY_TIMES 5
82
#define LINE_BATCH      2000
wmmhello's avatar
wmmhello 已提交
83 84 85 86
//=================================================================================================
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;

typedef enum {
87
  SCHEMA_ACTION_NULL,
wmmhello's avatar
wmmhello 已提交
88 89 90 91 92
  SCHEMA_ACTION_CREATE_STABLE,
  SCHEMA_ACTION_ADD_COLUMN,
  SCHEMA_ACTION_ADD_TAG,
  SCHEMA_ACTION_CHANGE_COLUMN_SIZE,
  SCHEMA_ACTION_CHANGE_TAG_SIZE,
wmmhello's avatar
wmmhello 已提交
93 94 95
} ESchemaAction;

typedef struct {
X
Xiaoyu Wang 已提交
96 97 98 99
  const char *measure;
  const char *tags;
  const char *cols;
  const char *timestamp;
wmmhello's avatar
wmmhello 已提交
100 101 102 103 104 105 106 107 108

  int32_t measureLen;
  int32_t measureTagsLen;
  int32_t tagsLen;
  int32_t colsLen;
  int32_t timestampLen;
} SSmlLineInfo;

typedef struct {
X
Xiaoyu Wang 已提交
109 110 111 112
  const char *sTableName;  // super table name
  int32_t     sTableNameLen;
  char        childTableName[TSDB_TABLE_NAME_LEN];
  uint64_t    uid;
wmmhello's avatar
wmmhello 已提交
113

X
Xiaoyu Wang 已提交
114
  SArray *tags;
wmmhello's avatar
wmmhello 已提交
115

116 117
  // if info->formatData is true, elements are SArray<SSmlKv*>.
  // if info->formatData is false, elements are SHashObj<cols key string, SSmlKv*> for find by key quickly
X
Xiaoyu Wang 已提交
118
  SArray *cols;
wmmhello's avatar
wmmhello 已提交
119 120 121
} SSmlTableInfo;

typedef struct {
X
Xiaoyu Wang 已提交
122 123
  SArray   *tags;     // save the origin order to create table
  SHashObj *tagHash;  // elements are <key, index in tags>
124

X
Xiaoyu Wang 已提交
125 126
  SArray   *cols;
  SHashObj *colHash;
127

wmmhello's avatar
wmmhello 已提交
128 129 130 131
  STableMeta *tableMeta;
} SSmlSTableMeta;

typedef struct {
X
Xiaoyu Wang 已提交
132 133
  int32_t len;
  char   *buf;
wmmhello's avatar
wmmhello 已提交
134 135
} SSmlMsgBuf;

136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
typedef struct {
  int32_t code;
  int32_t lineNum;

  int32_t numOfSTables;
  int32_t numOfCTables;
  int32_t numOfCreateSTables;

  int64_t parseTime;
  int64_t schemaTime;
  int64_t insertBindTime;
  int64_t insertRpcTime;
  int64_t endTime;
} SSmlCostInfo;

X
Xiaoyu Wang 已提交
151 152 153
typedef struct {
  SRequestObj     *request;
  tsem_t           sem;
154 155
  int32_t          cnt;
  int32_t          total;
wmmhello's avatar
wmmhello 已提交
156 157 158
  TdThreadSpinlock lock;
} Params;

wmmhello's avatar
wmmhello 已提交
159
typedef struct {
X
Xiaoyu Wang 已提交
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
  int64_t id;
  Params *params;

  SMLProtocolType protocol;
  int8_t          precision;
  bool dataFormat;  // true means that the name and order of keys in each line are the same(only for influx protocol)

  SHashObj *childTables;
  SHashObj *superTables;
  SHashObj *pVgHash;
  void     *exec;

  STscObj     *taos;
  SCatalog    *pCatalog;
  SRequestObj *pRequest;
  SQuery      *pQuery;

  SSmlCostInfo cost;
  int32_t      affectedRows;
  SSmlMsgBuf   msgBuf;
  SHashObj    *dumplicateKey;  // for dumplicate key
  SArray      *colsContainer;  // for cols parse, if dataFormat == false
wmmhello's avatar
wmmhello 已提交
182
} SSmlHandle;
wmmhello's avatar
wmmhello 已提交
183 184
//=================================================================================================

wmmhello's avatar
wmmhello 已提交
185
//=================================================================================================
186
static volatile int64_t linesSmlHandleId = 0;
X
Xiaoyu Wang 已提交
187 188
static int64_t          smlGenId() {
           int64_t id;
wmmhello's avatar
wmmhello 已提交
189

X
Xiaoyu Wang 已提交
190 191
           do {
             id = atomic_add_fetch_64(&linesSmlHandleId, 1);
wmmhello's avatar
wmmhello 已提交
192 193
  } while (id == 0);

X
Xiaoyu Wang 已提交
194
           return id;
wmmhello's avatar
wmmhello 已提交
195 196
}

197
static inline bool smlDoubleToInt64OverFlow(double num) {
X
Xiaoyu Wang 已提交
198
  if (num >= (double)INT64_MAX || num <= (double)INT64_MIN) return true;
199 200 201 202 203 204 205 206 207 208 209 210
  return false;
}

static inline bool smlCheckDuplicateKey(const char *key, int32_t keyLen, SHashObj *pHash) {
  void *val = taosHashGet(pHash, key, keyLen);
  if (val) {
    return true;
  }
  taosHashPut(pHash, key, keyLen, key, 1);
  return false;
}

X
Xiaoyu Wang 已提交
211
static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf *pBuf, const char *msg1, const char *msg2) {
212 213 214 215 216 217 218 219
  if(pBuf->buf){
    memset(pBuf->buf, 0, pBuf->len);
    if (msg1) strncat(pBuf->buf, msg1, pBuf->len);
    int32_t left = pBuf->len - strlen(pBuf->buf);
    if (left > 2 && msg2) {
      strncat(pBuf->buf, ":", left - 1);
      strncat(pBuf->buf, msg2, left - 2);
    }
wmmhello's avatar
wmmhello 已提交
220
  }
wmmhello's avatar
wmmhello 已提交
221 222 223
  return TSDB_CODE_SML_INVALID_DATA;
}

X
Xiaoyu Wang 已提交
224
static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSmlKv *kv, bool isTag,
225
                                       ESchemaAction *action, SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
226
  uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL;
227 228
  if (index) {
    if (colField[*index].type != kv->type) {
X
Xiaoyu Wang 已提交
229 230
      uError("SML:0x%" PRIx64 " point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id,
             kv->key, colField[*index].type, kv->type);
231 232 233
      return TSDB_CODE_TSC_INVALID_VALUE;
    }

X
Xiaoyu Wang 已提交
234 235 236 237
    if ((colField[*index].type == TSDB_DATA_TYPE_VARCHAR &&
         (colField[*index].bytes - VARSTR_HEADER_SIZE) < kv->length) ||
        (colField[*index].type == TSDB_DATA_TYPE_NCHAR &&
         ((colField[*index].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE < kv->length))) {
238
      if (isTag) {
wmmhello's avatar
wmmhello 已提交
239
        *action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
240
      } else {
wmmhello's avatar
wmmhello 已提交
241
        *action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
242 243 244 245
      }
    }
  } else {
    if (isTag) {
wmmhello's avatar
wmmhello 已提交
246
      *action = SCHEMA_ACTION_ADD_TAG;
247
    } else {
wmmhello's avatar
wmmhello 已提交
248
      *action = SCHEMA_ACTION_ADD_COLUMN;
249 250
    }
  }
wmmhello's avatar
wmmhello 已提交
251 252 253
  return 0;
}

wmmhello's avatar
wmmhello 已提交
254
static int32_t smlFindNearestPowerOf2(int32_t length, uint8_t type) {
wmmhello's avatar
wmmhello 已提交
255
  int32_t result = 1;
X
Xiaoyu Wang 已提交
256
  while (result <= length) {
wmmhello's avatar
wmmhello 已提交
257 258
    result *= 2;
  }
wmmhello's avatar
wmmhello 已提交
259 260 261 262 263
  if (type == TSDB_DATA_TYPE_BINARY && result > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE){
    result = TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE;
  } else if (type == TSDB_DATA_TYPE_NCHAR && result > (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){
    result = (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
  }
wmmhello's avatar
wmmhello 已提交
264

265 266 267 268
  if (type == TSDB_DATA_TYPE_NCHAR){
    result = result * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
  }else if (type == TSDB_DATA_TYPE_BINARY){
    result = result + VARSTR_HEADER_SIZE;
wmmhello's avatar
wmmhello 已提交
269
  }
270
  return result;
wmmhello's avatar
wmmhello 已提交
271 272
}

X
Xiaoyu Wang 已提交
273
static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
274
                                      ESchemaAction *action, bool isTag) {
275 276
  int32_t code = TSDB_CODE_SUCCESS;
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
277
    if(j == 0 && !isTag) continue;
X
Xiaoyu Wang 已提交
278
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, j);
279
    code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, info);
X
Xiaoyu Wang 已提交
280
    if (code != TSDB_CODE_SUCCESS) {
281 282 283 284 285 286
      return code;
    }
  }
  return TSDB_CODE_SUCCESS;
}

287
static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool isTag) {
288
  SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
289 290
  int32_t i = 0;
  for ( ;i < length; i++) {
291 292 293
    taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES);
  }

294 295 296 297 298 299
  if (isTag){
    i = 0;
  } else {
    i = 1;
  }
  for (; i < taosArrayGetSize(cols); i++) {
X
Xiaoyu Wang 已提交
300 301
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
    if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) {
302 303 304
      return -1;
    }
  }
305
  taosHashCleanup(hashTmp);
306 307 308
  return 0;
}

309 310 311 312 313 314 315 316
static int32_t getBytes(uint8_t type, int32_t length){
  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
    return smlFindNearestPowerOf2(length, type);
  } else {
    return tDataTypes[type].bytes;
  }
}

wmmhello's avatar
wmmhello 已提交
317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338
static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols, SArray* results, int32_t numOfCols, bool isTag) {
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, j);
    ESchemaAction action = SCHEMA_ACTION_NULL;
    smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, &action, info);
    if(action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_ADD_TAG){
      SField field = {0};
      field.type = kv->type;
      field.bytes = getBytes(kv->type, kv->length);
      memcpy(field.name, kv->key, kv->keyLen);
      taosArrayPush(results, &field);
    }else if(action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE){
      uint16_t *index = (uint16_t *)taosHashGet(schemaHash, kv->key, kv->keyLen);
      uint16_t newIndex = *index;
      if(isTag) newIndex -= numOfCols;
      SField *field = (SField *)taosArrayGet(results, newIndex);
      field->bytes = getBytes(kv->type, kv->length);
    }
  }
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
339 340
//static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SSmlSTableMeta *sTableData,
//                              int32_t colVer, int32_t tagVer, int8_t source, uint64_t suid){
wmmhello's avatar
wmmhello 已提交
341
static int32_t  smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray* pColumns, SArray* pTags,
wmmhello's avatar
wmmhello 已提交
342 343
                               STableMeta *pTableMeta, ESchemaAction action){

344 345 346 347 348
  SRequestObj*   pRequest = NULL;
  SMCreateStbReq pReq = {0};
  int32_t        code = TSDB_CODE_SUCCESS;
  SCmdMsgInfo    pCmdMsg = {0};

wmmhello's avatar
wmmhello 已提交
349 350 351 352 353 354
  // put front for free
  pReq.numOfColumns = taosArrayGetSize(pColumns);
  pReq.pColumns = pColumns;
  pReq.numOfTags = taosArrayGetSize(pTags);
  pReq.pTags = pTags;

355 356 357 358 359
  code = buildRequest(info->taos->id, "", 0, NULL, false, &pRequest);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
360
  pRequest->syncQuery = true;
361 362 363 364 365
  if (!pRequest->pDb) {
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
366
  if (action == SCHEMA_ACTION_CREATE_STABLE){
wmmhello's avatar
wmmhello 已提交
367 368 369 370
    pReq.colVer = 1;
    pReq.tagVer = 1;
    pReq.suid = 0;
    pReq.source = TD_REQ_FROM_APP;
wmmhello's avatar
wmmhello 已提交
371
  } else if (action == SCHEMA_ACTION_ADD_TAG || action == SCHEMA_ACTION_CHANGE_TAG_SIZE){
wmmhello's avatar
wmmhello 已提交
372 373 374 375
    pReq.colVer = pTableMeta->sversion;
    pReq.tagVer = pTableMeta->tversion + 1;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
wmmhello's avatar
wmmhello 已提交
376
  } else if (action == SCHEMA_ACTION_ADD_COLUMN  || action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE){
wmmhello's avatar
wmmhello 已提交
377 378 379 380 381 382
    pReq.colVer = pTableMeta->sversion + 1;
    pReq.tagVer = pTableMeta->tversion;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
  }

wmmhello's avatar
wmmhello 已提交
383 384 385 386 387 388 389 390 391
  if (pReq.numOfTags == 0){
    pReq.numOfTags = 1;
    SField field = {0};
    field.type = TSDB_DATA_TYPE_NCHAR;
    field.bytes = 1;
    strcpy(field.name, tsSmlTagName);
    taosArrayPush(pReq.pTags, &field);
  }

392 393 394 395 396 397 398 399 400 401 402 403 404 405
  pReq.commentLen = -1;
  pReq.igExists = true;
  tNameExtractFullName(pName, pReq.name);

  pCmdMsg.epSet = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

D
dapan1121 已提交
406 407
  SQuery pQuery;
  memset(&pQuery, 0, sizeof(pQuery));
408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);

  if(pRequest->code == TSDB_CODE_SUCCESS){
    catalogRemoveTableMeta(info->pCatalog, pName);
  }
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

end:
  destroyRequest(pRequest);
  tFreeSMCreateStbReq(&pReq);
  return code;
}

X
Xiaoyu Wang 已提交
427
static int32_t smlModifyDBSchemas(SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
428 429 430 431
  int32_t         code = 0;
  SHashObj       *hashTmp = NULL;
  STableMeta     *pTableMeta = NULL;

X
Xiaoyu Wang 已提交
432
  SName   pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
433
  strcpy(pName.dbname, info->pRequest->pDb);
wmmhello's avatar
wmmhello 已提交
434

D
dapan1121 已提交
435 436 437 438 439
  SRequestConnInfo conn = {0};
  conn.pTrans = info->taos->pAppInfo->pTransporter;
  conn.requestId = info->pRequest->requestId;
  conn.requestObjRefId = info->pRequest->self;
  conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
X
Xiaoyu Wang 已提交
440 441

  SSmlSTableMeta **tableMetaSml = (SSmlSTableMeta **)taosHashIterate(info->superTables, NULL);
wmmhello's avatar
wmmhello 已提交
442
  while (tableMetaSml) {
X
Xiaoyu Wang 已提交
443 444
    SSmlSTableMeta *sTableData = *tableMetaSml;
    bool            needCheckMeta = false;  // for multi thread
wmmhello's avatar
wmmhello 已提交
445

wmmhello's avatar
wmmhello 已提交
446
    size_t superTableLen = 0;
X
Xiaoyu Wang 已提交
447
    void  *superTable = taosHashGetKey(tableMetaSml, &superTableLen);
448
    memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
wmmhello's avatar
wmmhello 已提交
449
    memcpy(pName.tname, superTable, superTableLen);
wmmhello's avatar
wmmhello 已提交
450

D
dapan1121 已提交
451
    code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
wmmhello's avatar
wmmhello 已提交
452

453
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
wmmhello's avatar
wmmhello 已提交
454 455 456 457 458 459
      SArray* pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
      SArray* pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
      smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
      smlBuildFieldsList(info, NULL, NULL, sTableData->cols, pColumns, 0, false);

      code = smlSendMetaMsg(info, &pName, pColumns, pTags, NULL, SCHEMA_ACTION_CREATE_STABLE);
460
      if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
461
        uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
462
        goto end;
wmmhello's avatar
wmmhello 已提交
463
      }
464
      info->cost.numOfCreateSTables++;
X
Xiaoyu Wang 已提交
465
    } else if (code == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
466
      hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags,
X
Xiaoyu Wang 已提交
467 468 469
                                       taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
      for (uint16_t i = pTableMeta->tableInfo.numOfColumns;
           i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
470 471
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
      }
wmmhello's avatar
wmmhello 已提交
472

473 474
      ESchemaAction action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, &action, true);
475
      if (code != TSDB_CODE_SUCCESS) {
476
        goto end;
477
      }
wmmhello's avatar
wmmhello 已提交
478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495
      if (action != SCHEMA_ACTION_NULL){
        SArray* pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray* pTags = taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));

        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
          if(i < pTableMeta->tableInfo.numOfColumns){
            taosArrayPush(pColumns, &field);
          }else{
            taosArrayPush(pTags, &field);
          }
        }
        smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->tags, pTags, pTableMeta->tableInfo.numOfColumns, true);

        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
496
        if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
497
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
498 499 500 501
          goto end;
        }
      }

wmmhello's avatar
wmmhello 已提交
502
      taosMemoryFreeClear(pTableMeta);
503 504 505 506
      code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
      if (code != TSDB_CODE_SUCCESS) {
        goto end;
      }
wmmhello's avatar
wmmhello 已提交
507 508 509 510
      code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
      if (code != TSDB_CODE_SUCCESS) {
        goto end;
      }
511 512

      taosHashClear(hashTmp);
wmmhello's avatar
wmmhello 已提交
513
      for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
514 515
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
      }
516 517
      action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &action, false);
518
      if (code != TSDB_CODE_SUCCESS) {
519
        goto end;
wmmhello's avatar
wmmhello 已提交
520
      }
wmmhello's avatar
wmmhello 已提交
521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539
      if (action != SCHEMA_ACTION_NULL){
        SArray* pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray* pTags = taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));

        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
          if(i < pTableMeta->tableInfo.numOfColumns){
            taosArrayPush(pColumns, &field);
          }else{
            taosArrayPush(pTags, &field);
          }
        }

        smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->cols, pColumns, pTableMeta->tableInfo.numOfColumns, false);

        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
540
        if (code != TSDB_CODE_SUCCESS) {
541
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
542 543 544
          goto end;
        }
      }
wmmhello's avatar
wmmhello 已提交
545

D
dapan1121 已提交
546
      code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
wmmhello's avatar
wmmhello 已提交
547
      if (code != TSDB_CODE_SUCCESS) {
548
        goto end;
wmmhello's avatar
wmmhello 已提交
549
      }
550
      needCheckMeta = true;
wmmhello's avatar
wmmhello 已提交
551 552
      taosHashCleanup(hashTmp);
      hashTmp = NULL;
wmmhello's avatar
wmmhello 已提交
553
    } else {
X
Xiaoyu Wang 已提交
554
      uError("SML:0x%" PRIx64 " load table meta error: %s", info->id, tstrerror(code));
555
      goto end;
wmmhello's avatar
wmmhello 已提交
556
    }
wmmhello's avatar
wmmhello 已提交
557
    taosMemoryFreeClear(pTableMeta);
558

D
dapan1121 已提交
559
    code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
560
    if (code != TSDB_CODE_SUCCESS) {
561
      uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
562
      goto end;
563
    }
564

X
Xiaoyu Wang 已提交
565 566
    if (needCheckMeta) {
      code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags,
567
                          sTableData->tags, true);
568
      if (code != TSDB_CODE_SUCCESS) {
569
        uError("SML:0x%" PRIx64 " check tag failed. super table name %s", info->id, pName.tname);
570 571
        goto end;
      }
572
      code = smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols, false);
573
      if (code != TSDB_CODE_SUCCESS) {
574
        uError("SML:0x%" PRIx64 " check cols failed. super table name %s", info->id, pName.tname);
575 576 577 578
        goto end;
      }
    }

579
    sTableData->tableMeta = pTableMeta;
wmmhello's avatar
wmmhello 已提交
580

X
Xiaoyu Wang 已提交
581
    tableMetaSml = (SSmlSTableMeta **)taosHashIterate(info->superTables, tableMetaSml);
wmmhello's avatar
wmmhello 已提交
582 583
  }
  return 0;
584 585

end:
wmmhello's avatar
wmmhello 已提交
586 587
  taosHashCleanup(hashTmp);
  taosMemoryFreeClear(pTableMeta);
D
dapan1121 已提交
588
  catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
589
  return code;
wmmhello's avatar
wmmhello 已提交
590 591
}

X
Xiaoyu Wang 已提交
592
static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
593
  const char *pVal = kvVal->value;
X
Xiaoyu Wang 已提交
594 595 596 597
  int32_t     len = kvVal->length;
  char       *endptr = NULL;
  double      result = taosStr2Double(pVal, &endptr);
  if (pVal == endptr) {
598
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
599 600 601
    return false;
  }

602
  int32_t left = len - (endptr - pVal);
X
Xiaoyu Wang 已提交
603
  if (left == 0 || (left == 3 && strncasecmp(endptr, "f64", left) == 0)) {
604 605
    kvVal->type = TSDB_DATA_TYPE_DOUBLE;
    kvVal->d = result;
X
Xiaoyu Wang 已提交
606 607
  } else if ((left == 3 && strncasecmp(endptr, "f32", left) == 0)) {
    if (!IS_VALID_FLOAT(result)) {
608 609
      smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
610
    }
611 612
    kvVal->type = TSDB_DATA_TYPE_FLOAT;
    kvVal->f = (float)result;
X
Xiaoyu Wang 已提交
613 614
  } else if ((left == 1 && *endptr == 'i') || (left == 3 && strncasecmp(endptr, "i64", left) == 0)) {
    if (smlDoubleToInt64OverFlow(result)) {
wmmhello's avatar
wmmhello 已提交
615 616
      errno = 0;
      int64_t tmp = taosStr2Int64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
617
      if (errno == ERANGE) {
wmmhello's avatar
wmmhello 已提交
618 619 620 621 622 623
        smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_BIGINT;
      kvVal->i = tmp;
      return true;
wmmhello's avatar
wmmhello 已提交
624
    }
625
    kvVal->type = TSDB_DATA_TYPE_BIGINT;
wmmhello's avatar
wmmhello 已提交
626
    kvVal->i = (int64_t)result;
wmmhello's avatar
wmmhello 已提交
627
  } else if ((left == 1 && *endptr == 'u') || (left == 3 && strncasecmp(endptr, "u64", left) == 0)) {
X
Xiaoyu Wang 已提交
628
    if (result >= (double)UINT64_MAX || result < 0) {
wmmhello's avatar
wmmhello 已提交
629 630
      errno = 0;
      uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
631
      if (errno == ERANGE || result < 0) {
wmmhello's avatar
wmmhello 已提交
632 633 634 635 636 637
        smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_UBIGINT;
      kvVal->u = tmp;
      return true;
638
    }
639
    kvVal->type = TSDB_DATA_TYPE_UBIGINT;
wmmhello's avatar
wmmhello 已提交
640
    kvVal->u = result;
X
Xiaoyu Wang 已提交
641 642
  } else if (left == 3 && strncasecmp(endptr, "i32", left) == 0) {
    if (!IS_VALID_INT(result)) {
643 644
      smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
645
    }
646 647
    kvVal->type = TSDB_DATA_TYPE_INT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
648 649
  } else if (left == 3 && strncasecmp(endptr, "u32", left) == 0) {
    if (!IS_VALID_UINT(result)) {
650 651
      smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
652
    }
653 654
    kvVal->type = TSDB_DATA_TYPE_UINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
655 656
  } else if (left == 3 && strncasecmp(endptr, "i16", left) == 0) {
    if (!IS_VALID_SMALLINT(result)) {
657 658
      smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
659
    }
660 661
    kvVal->type = TSDB_DATA_TYPE_SMALLINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
662 663
  } else if (left == 3 && strncasecmp(endptr, "u16", left) == 0) {
    if (!IS_VALID_USMALLINT(result)) {
664 665
      smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
666
    }
667 668
    kvVal->type = TSDB_DATA_TYPE_USMALLINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
669 670
  } else if (left == 2 && strncasecmp(endptr, "i8", left) == 0) {
    if (!IS_VALID_TINYINT(result)) {
671 672
      smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
673
    }
674 675
    kvVal->type = TSDB_DATA_TYPE_TINYINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
676 677
  } else if (left == 2 && strncasecmp(endptr, "u8", left) == 0) {
    if (!IS_VALID_UTINYINT(result)) {
678 679
      smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
680
    }
681 682
    kvVal->type = TSDB_DATA_TYPE_UTINYINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
683
  } else {
684
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
685 686
    return false;
  }
687
  return true;
wmmhello's avatar
wmmhello 已提交
688 689
}

wmmhello's avatar
wmmhello 已提交
690
static bool smlParseBool(SSmlKv *kvVal) {
wmmhello's avatar
wmmhello 已提交
691
  const char *pVal = kvVal->value;
X
Xiaoyu Wang 已提交
692
  int32_t     len = kvVal->length;
693
  if ((len == 1) && (pVal[0] == 't' || pVal[0] == 'T')) {
wmmhello's avatar
wmmhello 已提交
694
    kvVal->i = true;
wmmhello's avatar
wmmhello 已提交
695 696 697
    return true;
  }

698
  if ((len == 1) && (pVal[0] == 'f' || pVal[0] == 'F')) {
wmmhello's avatar
wmmhello 已提交
699
    kvVal->i = false;
wmmhello's avatar
wmmhello 已提交
700 701 702
    return true;
  }

X
Xiaoyu Wang 已提交
703
  if ((len == 4) && !strncasecmp(pVal, "true", len)) {
wmmhello's avatar
wmmhello 已提交
704
    kvVal->i = true;
wmmhello's avatar
wmmhello 已提交
705 706
    return true;
  }
X
Xiaoyu Wang 已提交
707
  if ((len == 5) && !strncasecmp(pVal, "false", len)) {
wmmhello's avatar
wmmhello 已提交
708
    kvVal->i = false;
wmmhello's avatar
wmmhello 已提交
709 710 711 712 713
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
714
static bool smlIsBinary(const char *pVal, uint16_t len) {
X
Xiaoyu Wang 已提交
715
  // binary: "abc"
wmmhello's avatar
wmmhello 已提交
716 717 718 719 720 721 722 723 724
  if (len < 2) {
    return false;
  }
  if (pVal[0] == '"' && pVal[len - 1] == '"') {
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
725
static bool smlIsNchar(const char *pVal, uint16_t len) {
X
Xiaoyu Wang 已提交
726
  // nchar: L"abc"
wmmhello's avatar
wmmhello 已提交
727 728 729
  if (len < 3) {
    return false;
  }
X
Xiaoyu Wang 已提交
730
  if ((pVal[0] == 'l' || pVal[0] == 'L') && pVal[1] == '"' && pVal[len - 1] == '"') {
wmmhello's avatar
wmmhello 已提交
731 732 733 734 735
    return true;
  }
  return false;
}

736
static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) {
X
Xiaoyu Wang 已提交
737
  char   *endPtr = NULL;
wafwerar's avatar
wafwerar 已提交
738
  int64_t tsInt64 = taosStr2Int64(value, &endPtr, 10);
X
Xiaoyu Wang 已提交
739
  if (value + len != endPtr) {
740
    return -1;
wmmhello's avatar
wmmhello 已提交
741
  }
742
  double ts = tsInt64;
743 744
  switch (type) {
    case TSDB_TIME_PRECISION_HOURS:
wmmhello's avatar
wmmhello 已提交
745 746
      ts *= NANOSECOND_PER_HOUR;
      tsInt64 *= NANOSECOND_PER_HOUR;
747 748
      break;
    case TSDB_TIME_PRECISION_MINUTES:
wmmhello's avatar
wmmhello 已提交
749 750
      ts *= NANOSECOND_PER_MINUTE;
      tsInt64 *= NANOSECOND_PER_MINUTE;
751 752
      break;
    case TSDB_TIME_PRECISION_SECONDS:
wmmhello's avatar
wmmhello 已提交
753 754
      ts *= NANOSECOND_PER_SEC;
      tsInt64 *= NANOSECOND_PER_SEC;
755 756
      break;
    case TSDB_TIME_PRECISION_MILLI:
wmmhello's avatar
wmmhello 已提交
757 758
      ts *= NANOSECOND_PER_MSEC;
      tsInt64 *= NANOSECOND_PER_MSEC;
759 760
      break;
    case TSDB_TIME_PRECISION_MICRO:
wmmhello's avatar
wmmhello 已提交
761 762
      ts *= NANOSECOND_PER_USEC;
      tsInt64 *= NANOSECOND_PER_USEC;
763 764 765 766 767
      break;
    case TSDB_TIME_PRECISION_NANO:
      break;
    default:
      ASSERT(0);
wmmhello's avatar
wmmhello 已提交
768
  }
X
Xiaoyu Wang 已提交
769
  if (ts >= (double)INT64_MAX || ts < 0) {
770
    return -1;
wmmhello's avatar
wmmhello 已提交
771 772
  }

773
  return tsInt64;
774 775 776 777 778 779
}

static int8_t smlGetTsTypeByLen(int32_t len) {
  if (len == TSDB_TIME_PRECISION_SEC_DIGITS) {
    return TSDB_TIME_PRECISION_SECONDS;
  } else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) {
780
    return TSDB_TIME_PRECISION_MILLI;
781 782
  } else {
    return -1;
wmmhello's avatar
wmmhello 已提交
783
  }
784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802
}

static int8_t smlGetTsTypeByPrecision(int8_t precision) {
  switch (precision) {
    case TSDB_SML_TIMESTAMP_HOURS:
      return TSDB_TIME_PRECISION_HOURS;
    case TSDB_SML_TIMESTAMP_MILLI_SECONDS:
      return TSDB_TIME_PRECISION_MILLI;
    case TSDB_SML_TIMESTAMP_NANO_SECONDS:
    case TSDB_SML_TIMESTAMP_NOT_CONFIGURED:
      return TSDB_TIME_PRECISION_NANO;
    case TSDB_SML_TIMESTAMP_MICRO_SECONDS:
      return TSDB_TIME_PRECISION_MICRO;
    case TSDB_SML_TIMESTAMP_SECONDS:
      return TSDB_TIME_PRECISION_SECONDS;
    case TSDB_SML_TIMESTAMP_MINUTES:
      return TSDB_TIME_PRECISION_MINUTES;
    default:
      return -1;
wmmhello's avatar
wmmhello 已提交
803
  }
804 805
}

X
Xiaoyu Wang 已提交
806 807
static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t len) {
  if (len == 0 || (len == 1 && data[0] == '0')) {
808
    return taosGetTimestampNs();
wmmhello's avatar
wmmhello 已提交
809 810
  }

811 812 813 814
  int8_t tsType = smlGetTsTypeByPrecision(info->precision);
  if (tsType == -1) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp precision", NULL);
    return -1;
wmmhello's avatar
wmmhello 已提交
815
  }
816 817

  int64_t ts = smlGetTimeValue(data, len, tsType);
X
Xiaoyu Wang 已提交
818
  if (ts == -1) {
819 820
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
    return -1;
wmmhello's avatar
wmmhello 已提交
821
  }
822 823 824
  return ts;
}

X
Xiaoyu Wang 已提交
825 826
static int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t len) {
  if (!data) {
827 828
    smlBuildInvalidDataMsg(&info->msgBuf, "timestamp can not be null", NULL);
    return -1;
wmmhello's avatar
wmmhello 已提交
829
  }
X
Xiaoyu Wang 已提交
830
  if (len == 1 && data[0] == '0') {
831 832
    return taosGetTimestampNs();
  }
833 834
  int8_t tsType = smlGetTsTypeByLen(len);
  if (tsType == -1) {
X
Xiaoyu Wang 已提交
835 836
    smlBuildInvalidDataMsg(&info->msgBuf,
                           "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", data);
837 838 839
    return -1;
  }
  int64_t ts = smlGetTimeValue(data, len, tsType);
X
Xiaoyu Wang 已提交
840
  if (ts == -1) {
841 842 843 844 845 846
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
    return -1;
  }
  return ts;
}

X
Xiaoyu Wang 已提交
847
static int32_t smlParseTS(SSmlHandle *info, const char *data, int32_t len, SArray *cols) {
848
  int64_t ts = 0;
X
Xiaoyu Wang 已提交
849
  if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
850
//    uError("SML:data:%s,len:%d", data, len);
851
    ts = smlParseInfluxTime(info, data, len);
X
Xiaoyu Wang 已提交
852
  } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
853
    ts = smlParseOpenTsdbTime(info, data, len);
X
Xiaoyu Wang 已提交
854
  } else {
855
    ASSERT(0);
856
  }
857

wmmhello's avatar
wmmhello 已提交
858
  if (ts == -1) return TSDB_CODE_INVALID_TIMESTAMP;
859 860 861

  // add ts to
  SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
862
  if (!kv) {
863 864 865 866 867 868 869 870
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  kv->key = TS;
  kv->keyLen = TS_LEN;
  kv->i = ts;
  kv->type = TSDB_DATA_TYPE_TIMESTAMP;
  kv->length = (int16_t)tDataTypes[kv->type].bytes;
X
Xiaoyu Wang 已提交
871
  if (cols) taosArrayPush(cols, &kv);
872 873 874
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
875
static int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
X
Xiaoyu Wang 已提交
876
  // binary
wmmhello's avatar
wmmhello 已提交
877
  if (smlIsBinary(pVal->value, pVal->length)) {
878
    pVal->type = TSDB_DATA_TYPE_BINARY;
wmmhello's avatar
wmmhello 已提交
879
    pVal->length -= BINARY_ADD_LEN;
wmmhello's avatar
wmmhello 已提交
880 881 882
    if (pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE){
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }
883
    pVal->value += (BINARY_ADD_LEN - 1);
wmmhello's avatar
wmmhello 已提交
884
    return TSDB_CODE_SUCCESS;
885
  }
X
Xiaoyu Wang 已提交
886
  // nchar
wmmhello's avatar
wmmhello 已提交
887
  if (smlIsNchar(pVal->value, pVal->length)) {
888
    pVal->type = TSDB_DATA_TYPE_NCHAR;
wmmhello's avatar
wmmhello 已提交
889
    pVal->length -= NCHAR_ADD_LEN;
wmmhello's avatar
wmmhello 已提交
890 891 892
    if(pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }
893
    pVal->value += (NCHAR_ADD_LEN - 1);
wmmhello's avatar
wmmhello 已提交
894
    return TSDB_CODE_SUCCESS;
895 896
  }

X
Xiaoyu Wang 已提交
897
  // bool
898 899 900
  if (smlParseBool(pVal)) {
    pVal->type = TSDB_DATA_TYPE_BOOL;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
wmmhello's avatar
wmmhello 已提交
901
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
902
  }
X
Xiaoyu Wang 已提交
903
  // number
904
  if (smlParseNumber(pVal, msg)) {
wmmhello's avatar
wmmhello 已提交
905
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
wmmhello's avatar
wmmhello 已提交
906
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
907 908
  }

wmmhello's avatar
wmmhello 已提交
909
  return TSDB_CODE_TSC_INVALID_VALUE;
wmmhello's avatar
wmmhello 已提交
910 911
}

X
Xiaoyu Wang 已提交
912 913
static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSmlMsgBuf *msg) {
  if (!sql) return TSDB_CODE_SML_INVALID_DATA;
wmmhello's avatar
wmmhello 已提交
914
  JUMP_SPACE(sql)
X
Xiaoyu Wang 已提交
915
  if (*sql == COMMA) return TSDB_CODE_SML_INVALID_DATA;
wmmhello's avatar
wmmhello 已提交
916
  elements->measure = sql;
wmmhello's avatar
wmmhello 已提交
917

wmmhello's avatar
wmmhello 已提交
918
  // parse measure
wmmhello's avatar
wmmhello 已提交
919
  while (*sql != '\0') {
X
Xiaoyu Wang 已提交
920 921
    if ((sql != elements->measure) && IS_SLASH_LETTER(sql)) {
      MOVE_FORWARD_ONE(sql, strlen(sql) + 1);
wmmhello's avatar
wmmhello 已提交
922 923
      continue;
    }
X
Xiaoyu Wang 已提交
924
    if (IS_COMMA(sql)) {
wmmhello's avatar
wmmhello 已提交
925 926 927
      break;
    }

X
Xiaoyu Wang 已提交
928
    if (IS_SPACE(sql)) {
wmmhello's avatar
wmmhello 已提交
929 930
      break;
    }
wmmhello's avatar
wmmhello 已提交
931 932
    sql++;
  }
wmmhello's avatar
wmmhello 已提交
933
  elements->measureLen = sql - elements->measure;
X
Xiaoyu Wang 已提交
934
  if (IS_INVALID_TABLE_LEN(elements->measureLen)) {
935
    smlBuildInvalidDataMsg(msg, "measure is empty or too large than 192", NULL);
wmmhello's avatar
wmmhello 已提交
936
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
wmmhello's avatar
wmmhello 已提交
937
  }
wmmhello's avatar
wmmhello 已提交
938

wmmhello's avatar
wmmhello 已提交
939
  // parse tag
X
Xiaoyu Wang 已提交
940
  if (*sql == SPACE) {
wmmhello's avatar
wmmhello 已提交
941
    elements->tagsLen = 0;
X
Xiaoyu Wang 已提交
942 943
  } else {
    if (*sql == COMMA) sql++;
wmmhello's avatar
wmmhello 已提交
944 945
    elements->tags = sql;
    while (*sql != '\0') {
X
Xiaoyu Wang 已提交
946
      if (IS_SPACE(sql)) {
wmmhello's avatar
wmmhello 已提交
947 948 949
        break;
      }
      sql++;
wmmhello's avatar
wmmhello 已提交
950
    }
wmmhello's avatar
wmmhello 已提交
951
    elements->tagsLen = sql - elements->tags;
952
  }
wmmhello's avatar
wmmhello 已提交
953
  elements->measureTagsLen = sql - elements->measure;
wmmhello's avatar
wmmhello 已提交
954

wmmhello's avatar
wmmhello 已提交
955 956 957
  // parse cols
  JUMP_SPACE(sql)
  elements->cols = sql;
wmmhello's avatar
wmmhello 已提交
958
  bool isInQuote = false;
wmmhello's avatar
wmmhello 已提交
959
  while (*sql != '\0') {
X
Xiaoyu Wang 已提交
960
    if (IS_QUOTE(sql)) {
wmmhello's avatar
wmmhello 已提交
961 962
      isInQuote = !isInQuote;
    }
X
Xiaoyu Wang 已提交
963
    if (!isInQuote && IS_SPACE(sql)) {
wmmhello's avatar
wmmhello 已提交
964 965 966 967
      break;
    }
    sql++;
  }
X
Xiaoyu Wang 已提交
968
  if (isInQuote) {
969 970 971
    smlBuildInvalidDataMsg(msg, "only one quote", elements->cols);
    return TSDB_CODE_SML_INVALID_DATA;
  }
wmmhello's avatar
wmmhello 已提交
972
  elements->colsLen = sql - elements->cols;
X
Xiaoyu Wang 已提交
973
  if (elements->colsLen == 0) {
wmmhello's avatar
wmmhello 已提交
974 975 976
    smlBuildInvalidDataMsg(msg, "cols is empty", NULL);
    return TSDB_CODE_SML_INVALID_DATA;
  }
wmmhello's avatar
wmmhello 已提交
977

wmmhello's avatar
wmmhello 已提交
978 979 980
  // parse timestamp
  JUMP_SPACE(sql)
  elements->timestamp = sql;
wmmhello's avatar
wmmhello 已提交
981
  while (*sql != '\0') {
X
Xiaoyu Wang 已提交
982
    if (*sql == SPACE) {
wmmhello's avatar
wmmhello 已提交
983 984 985 986
      break;
    }
    sql++;
  }
wmmhello's avatar
wmmhello 已提交
987
  elements->timestampLen = sql - elements->timestamp;
wmmhello's avatar
wmmhello 已提交
988 989 990 991

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
992
static void smlParseTelnetElement(const char **sql, const char **data, int32_t *len) {
993
  while (**sql != '\0') {
X
Xiaoyu Wang 已提交
994
    if (**sql != SPACE && !(*data)) {
995
      *data = *sql;
X
Xiaoyu Wang 已提交
996
    } else if (**sql == SPACE && *data) {
997 998 999 1000 1001 1002 1003
      *len = *sql - *data;
      break;
    }
    (*sql)++;
  }
}

X
Xiaoyu Wang 已提交
1004 1005
static int32_t smlParseTelnetTags(const char *data, SArray *cols, char *childTableName, SHashObj *dumplicateKey,
                                  SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
1006
  const char *sql = data;
X
Xiaoyu Wang 已提交
1007 1008
  size_t      childTableNameLen = strlen(tsSmlChildTableName);
  while (*sql != '\0') {
wmmhello's avatar
wmmhello 已提交
1009
    JUMP_SPACE(sql)
X
Xiaoyu Wang 已提交
1010
    if (*sql == '\0') break;
wmmhello's avatar
wmmhello 已提交
1011

wmmhello's avatar
wmmhello 已提交
1012
    const char *key = sql;
X
Xiaoyu Wang 已提交
1013
    int32_t     keyLen = 0;
wmmhello's avatar
wmmhello 已提交
1014 1015

    // parse key
X
Xiaoyu Wang 已提交
1016 1017
    while (*sql != '\0') {
      if (*sql == SPACE) {
wmmhello's avatar
wmmhello 已提交
1018 1019 1020
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
X
Xiaoyu Wang 已提交
1021
      if (*sql == EQUAL) {
wmmhello's avatar
wmmhello 已提交
1022 1023
        keyLen = sql - key;
        sql++;
1024 1025
        break;
      }
wmmhello's avatar
wmmhello 已提交
1026
      sql++;
1027
    }
wmmhello's avatar
wmmhello 已提交
1028

X
Xiaoyu Wang 已提交
1029
    if (IS_INVALID_COL_LEN(keyLen)) {
1030
      smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
wmmhello's avatar
wmmhello 已提交
1031
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
1032
    }
X
Xiaoyu Wang 已提交
1033
    if (smlCheckDuplicateKey(key, keyLen, dumplicateKey)) {
1034
      smlBuildInvalidDataMsg(msg, "dumplicate key", key);
wmmhello's avatar
wmmhello 已提交
1035
      return TSDB_CODE_TSC_DUP_NAMES;
1036 1037 1038
    }

    // parse value
wmmhello's avatar
wmmhello 已提交
1039
    const char *value = sql;
X
Xiaoyu Wang 已提交
1040 1041
    int32_t     valueLen = 0;
    while (*sql != '\0') {
wmmhello's avatar
wmmhello 已提交
1042 1043
      // parse value
      if (*sql == SPACE) {
1044 1045
        break;
      }
wmmhello's avatar
wmmhello 已提交
1046 1047 1048 1049 1050
      if (*sql == EQUAL) {
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      sql++;
1051
    }
wmmhello's avatar
wmmhello 已提交
1052
    valueLen = sql - value;
wmmhello's avatar
wmmhello 已提交
1053

X
Xiaoyu Wang 已提交
1054
    if (valueLen == 0) {
1055
      smlBuildInvalidDataMsg(msg, "invalid value", value);
wmmhello's avatar
wmmhello 已提交
1056
      return TSDB_CODE_TSC_INVALID_VALUE;
1057
    }
wmmhello's avatar
wmmhello 已提交
1058

X
Xiaoyu Wang 已提交
1059 1060
    // handle child table name
    if (childTableNameLen != 0 && strncmp(key, tsSmlChildTableName, keyLen) == 0) {
1061 1062 1063 1064 1065
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, value, (valueLen < TSDB_TABLE_NAME_LEN ? valueLen : TSDB_TABLE_NAME_LEN));
      continue;
    }

wmmhello's avatar
wmmhello 已提交
1066 1067 1068 1069
    if(valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }

1070 1071
    // add kv to SSmlKv
    SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1072
    if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
1073 1074 1075
    kv->key = key;
    kv->keyLen = keyLen;
    kv->value = value;
wmmhello's avatar
wmmhello 已提交
1076
    kv->length = valueLen;
wmmhello's avatar
wmmhello 已提交
1077
    kv->type = TSDB_DATA_TYPE_NCHAR;
1078

X
Xiaoyu Wang 已提交
1079
    if (cols) taosArrayPush(cols, &kv);
1080 1081 1082 1083
  }

  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
1084

1085
// format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
X
Xiaoyu Wang 已提交
1086 1087
static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, SSmlTableInfo *tinfo, SArray *cols) {
  if (!sql) return TSDB_CODE_SML_INVALID_DATA;
1088 1089 1090

  // parse metric
  smlParseTelnetElement(&sql, &tinfo->sTableName, &tinfo->sTableNameLen);
1091
  if (!(tinfo->sTableName) || IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
1092
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
wmmhello's avatar
wmmhello 已提交
1093
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
1094 1095 1096 1097
  }

  // parse timestamp
  const char *timestamp = NULL;
X
Xiaoyu Wang 已提交
1098
  int32_t     tLen = 0;
1099 1100 1101 1102 1103 1104 1105 1106 1107
  smlParseTelnetElement(&sql, &timestamp, &tLen);
  if (!timestamp || tLen == 0) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
    return TSDB_CODE_SML_INVALID_DATA;
  }

  int32_t ret = smlParseTS(info, timestamp, tLen, cols);
  if (ret != TSDB_CODE_SUCCESS) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
wmmhello's avatar
wmmhello 已提交
1108
    return ret;
1109 1110 1111 1112
  }

  // parse value
  const char *value = NULL;
X
Xiaoyu Wang 已提交
1113
  int32_t     valueLen = 0;
1114 1115 1116
  smlParseTelnetElement(&sql, &value, &valueLen);
  if (!value || valueLen == 0) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql);
wmmhello's avatar
wmmhello 已提交
1117
    return TSDB_CODE_TSC_INVALID_VALUE;
1118 1119 1120
  }

  SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1121
  if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
1122 1123 1124 1125
  taosArrayPush(cols, &kv);
  kv->key = VALUE;
  kv->keyLen = VALUE_LEN;
  kv->value = value;
wmmhello's avatar
wmmhello 已提交
1126
  kv->length = valueLen;
wmmhello's avatar
wmmhello 已提交
1127 1128
  if ((ret = smlParseValue(kv, &info->msgBuf)) != TSDB_CODE_SUCCESS) {
    return ret;
1129 1130 1131
  }

  // parse tags
1132
  ret = smlParseTelnetTags(sql, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
1133 1134
  if (ret != TSDB_CODE_SUCCESS) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
wmmhello's avatar
wmmhello 已提交
1135
    return ret;
1136 1137 1138 1139 1140
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1141 1142 1143
static int32_t smlParseCols(const char *data, int32_t len, SArray *cols, char *childTableName, bool isTag,
                            SHashObj *dumplicateKey, SSmlMsgBuf *msg) {
  if (len == 0) {
wmmhello's avatar
wmmhello 已提交
1144
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1145 1146
  }

X
Xiaoyu Wang 已提交
1147
  size_t      childTableNameLen = strlen(tsSmlChildTableName);
wmmhello's avatar
wmmhello 已提交
1148
  const char *sql = data;
X
Xiaoyu Wang 已提交
1149
  while (sql < data + len) {
wmmhello's avatar
wmmhello 已提交
1150
    const char *key = sql;
X
Xiaoyu Wang 已提交
1151
    int32_t     keyLen = 0;
wmmhello's avatar
wmmhello 已提交
1152

X
Xiaoyu Wang 已提交
1153
    while (sql < data + len) {
wmmhello's avatar
wmmhello 已提交
1154
      // parse key
X
Xiaoyu Wang 已提交
1155
      if (IS_COMMA(sql)) {
wmmhello's avatar
wmmhello 已提交
1156 1157 1158
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
X
Xiaoyu Wang 已提交
1159
      if (IS_EQUAL(sql)) {
wmmhello's avatar
wmmhello 已提交
1160 1161
        keyLen = sql - key;
        sql++;
wmmhello's avatar
wmmhello 已提交
1162 1163
        break;
      }
wmmhello's avatar
wmmhello 已提交
1164
      sql++;
wmmhello's avatar
wmmhello 已提交
1165
    }
wmmhello's avatar
wmmhello 已提交
1166

X
Xiaoyu Wang 已提交
1167
    if (IS_INVALID_COL_LEN(keyLen)) {
wmmhello's avatar
wmmhello 已提交
1168
      smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
wmmhello's avatar
wmmhello 已提交
1169
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
wmmhello's avatar
wmmhello 已提交
1170
    }
X
Xiaoyu Wang 已提交
1171
    if (smlCheckDuplicateKey(key, keyLen, dumplicateKey)) {
1172
      smlBuildInvalidDataMsg(msg, "dumplicate key", key);
wmmhello's avatar
wmmhello 已提交
1173
      return TSDB_CODE_TSC_DUP_NAMES;
1174 1175
    }

wmmhello's avatar
wmmhello 已提交
1176
    // parse value
wmmhello's avatar
wmmhello 已提交
1177
    const char *value = sql;
X
Xiaoyu Wang 已提交
1178 1179 1180
    int32_t     valueLen = 0;
    bool        isInQuote = false;
    while (sql < data + len) {
wmmhello's avatar
wmmhello 已提交
1181
      // parse value
X
Xiaoyu Wang 已提交
1182
      if (!isTag && IS_QUOTE(sql)) {
wmmhello's avatar
wmmhello 已提交
1183
        isInQuote = !isInQuote;
wmmhello's avatar
wmmhello 已提交
1184 1185
        sql++;
        continue;
wmmhello's avatar
wmmhello 已提交
1186
      }
wmmhello's avatar
wmmhello 已提交
1187
      if (!isInQuote && IS_COMMA(sql)) {
wmmhello's avatar
wmmhello 已提交
1188 1189
        break;
      }
wmmhello's avatar
wmmhello 已提交
1190 1191 1192 1193 1194
      if (!isInQuote && IS_EQUAL(sql)) {
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      sql++;
wmmhello's avatar
wmmhello 已提交
1195
    }
wmmhello's avatar
wmmhello 已提交
1196 1197 1198
    valueLen = sql - value;
    sql++;

X
Xiaoyu Wang 已提交
1199
    if (isInQuote) {
wmmhello's avatar
wmmhello 已提交
1200 1201 1202
      smlBuildInvalidDataMsg(msg, "only one quote", value);
      return TSDB_CODE_SML_INVALID_DATA;
    }
X
Xiaoyu Wang 已提交
1203
    if (valueLen == 0) {
wmmhello's avatar
wmmhello 已提交
1204
      smlBuildInvalidDataMsg(msg, "invalid value", value);
wmmhello's avatar
wmmhello 已提交
1205 1206
      return TSDB_CODE_SML_INVALID_DATA;
    }
wmmhello's avatar
wmmhello 已提交
1207 1208
    PROCESS_SLASH(key, keyLen)
    PROCESS_SLASH(value, valueLen)
wmmhello's avatar
wmmhello 已提交
1209

X
Xiaoyu Wang 已提交
1210 1211
    // handle child table name
    if (childTableName && childTableNameLen != 0 && strncmp(key, tsSmlChildTableName, keyLen) == 0) {
1212 1213 1214 1215 1216
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, value, (valueLen < TSDB_TABLE_NAME_LEN ? valueLen : TSDB_TABLE_NAME_LEN));
      continue;
    }

wmmhello's avatar
wmmhello 已提交
1217
    // add kv to SSmlKv
wmmhello's avatar
wmmhello 已提交
1218
    SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1219 1220
    if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
    if (cols) taosArrayPush(cols, &kv);
1221

wmmhello's avatar
wmmhello 已提交
1222 1223 1224
    kv->key = key;
    kv->keyLen = keyLen;
    kv->value = value;
wmmhello's avatar
wmmhello 已提交
1225
    kv->length = valueLen;
X
Xiaoyu Wang 已提交
1226
    if (isTag) {
1227 1228 1229
      if(valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){
        return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
      }
wmmhello's avatar
wmmhello 已提交
1230
      kv->type = TSDB_DATA_TYPE_NCHAR;
X
Xiaoyu Wang 已提交
1231
    } else {
wmmhello's avatar
wmmhello 已提交
1232 1233 1234
      int32_t ret = smlParseValue(kv, msg);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
wmmhello's avatar
wmmhello 已提交
1235
      }
wmmhello's avatar
wmmhello 已提交
1236 1237
    }
  }
wmmhello's avatar
wmmhello 已提交
1238

wmmhello's avatar
wmmhello 已提交
1239 1240 1241
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
1242 1243
static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, SSmlMsgBuf *msg) {
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
wmmhello's avatar
wmmhello 已提交
1244
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
wmmhello's avatar
wmmhello 已提交
1245

wmmhello's avatar
wmmhello 已提交
1246
    int16_t *index = (int16_t *)taosHashGet(metaHash, kv->key, kv->keyLen);
X
Xiaoyu Wang 已提交
1247
    if (index) {
wmmhello's avatar
wmmhello 已提交
1248
      SSmlKv **value = (SSmlKv **)taosArrayGet(metaArray, *index);
X
Xiaoyu Wang 已提交
1249
      if (kv->type != (*value)->type) {
wmmhello's avatar
wmmhello 已提交
1250
        smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
wmmhello's avatar
wmmhello 已提交
1251
        return TSDB_CODE_SML_NOT_SAME_TYPE;
X
Xiaoyu Wang 已提交
1252 1253 1254
      } else {
        if (IS_VAR_DATA_TYPE(kv->type)) {  // update string len, if bigger
          if (kv->length > (*value)->length) {
wmmhello's avatar
wmmhello 已提交
1255
            *value = kv;
1256 1257 1258
          }
        }
      }
X
Xiaoyu Wang 已提交
1259
    } else {
wmmhello's avatar
wmmhello 已提交
1260 1261 1262 1263 1264
      size_t tmp = taosArrayGetSize(metaArray);
      ASSERT(tmp <= INT16_MAX);
      int16_t size = tmp;
      taosArrayPush(metaArray, &kv);
      taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
1265
    }
wmmhello's avatar
wmmhello 已提交
1266
  }
wmmhello's avatar
wmmhello 已提交
1267

wmmhello's avatar
wmmhello 已提交
1268
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1269 1270
}

X
Xiaoyu Wang 已提交
1271
static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols) {
wmmhello's avatar
wmmhello 已提交
1272 1273 1274 1275
  for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
    taosArrayPush(metaArray, &kv);
    taosHashPut(metaHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
wmmhello's avatar
wmmhello 已提交
1276 1277 1278
  }
}

X
Xiaoyu Wang 已提交
1279
static SSmlTableInfo *smlBuildTableInfo() {
1280
  SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
X
Xiaoyu Wang 已提交
1281
  if (!tag) {
1282
    return NULL;
wmmhello's avatar
wmmhello 已提交
1283
  }
1284 1285 1286 1287 1288

  tag->cols = taosArrayInit(16, POINTER_BYTES);
  if (tag->cols == NULL) {
    uError("SML:smlBuildTableInfo failed to allocate memory");
    goto cleanup;
wmmhello's avatar
wmmhello 已提交
1289
  }
1290 1291 1292 1293 1294

  tag->tags = taosArrayInit(16, POINTER_BYTES);
  if (tag->tags == NULL) {
    uError("SML:smlBuildTableInfo failed to allocate memory");
    goto cleanup;
wmmhello's avatar
wmmhello 已提交
1295
  }
1296 1297 1298 1299 1300
  return tag;

cleanup:
  taosMemoryFree(tag);
  return NULL;
wmmhello's avatar
wmmhello 已提交
1301 1302
}

X
Xiaoyu Wang 已提交
1303 1304 1305
static void smlDestroyTableInfo(SSmlHandle *info, SSmlTableInfo *tag) {
  if (info->dataFormat) {
    for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
1306 1307
      SArray *kvArray = (SArray *)taosArrayGetP(tag->cols, i);
      for (int j = 0; j < taosArrayGetSize(kvArray); ++j) {
wmmhello's avatar
wmmhello 已提交
1308
        SSmlKv *p = (SSmlKv *)taosArrayGetP(kvArray, j);
X
Xiaoyu Wang 已提交
1309 1310 1311
        if (info->protocol == TSDB_SML_JSON_PROTOCOL &&
            (p->type == TSDB_DATA_TYPE_NCHAR || p->type == TSDB_DATA_TYPE_BINARY)) {
          taosMemoryFree((void *)p->value);
wmmhello's avatar
wmmhello 已提交
1312
        }
1313 1314 1315 1316
        taosMemoryFree(p);
      }
      taosArrayDestroy(kvArray);
    }
X
Xiaoyu Wang 已提交
1317 1318
  } else {
    for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
1319
      SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i);
X
Xiaoyu Wang 已提交
1320
      void    **p1 = (void **)taosHashIterate(kvHash, NULL);
1321 1322
      while (p1) {
        taosMemoryFree(*p1);
X
Xiaoyu Wang 已提交
1323
        p1 = (void **)taosHashIterate(kvHash, p1);
1324 1325 1326 1327
      }
      taosHashCleanup(kvHash);
    }
  }
X
Xiaoyu Wang 已提交
1328
  for (size_t i = 0; i < taosArrayGetSize(tag->tags); i++) {
wmmhello's avatar
wmmhello 已提交
1329
    SSmlKv *p = (SSmlKv *)taosArrayGetP(tag->tags, i);
X
Xiaoyu Wang 已提交
1330 1331 1332 1333
    if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
      taosMemoryFree((void *)p->key);
      if (p->type == TSDB_DATA_TYPE_NCHAR || p->type == TSDB_DATA_TYPE_BINARY) {
        taosMemoryFree((void *)p->value);
wmmhello's avatar
wmmhello 已提交
1334 1335 1336 1337
      }
    }
    taosMemoryFree(p);
  }
X
Xiaoyu Wang 已提交
1338 1339
  if (info->protocol == TSDB_SML_JSON_PROTOCOL && tag->sTableName) {
    taosMemoryFree((void *)tag->sTableName);
wmmhello's avatar
wmmhello 已提交
1340
  }
1341 1342 1343 1344 1345
  taosArrayDestroy(tag->cols);
  taosArrayDestroy(tag->tags);
  taosMemoryFree(tag);
}

X
Xiaoyu Wang 已提交
1346
static int32_t smlKvTimeArrayCompare(const void *key1, const void *key2) {
1347 1348
  SArray *s1 = *(SArray **)key1;
  SArray *s2 = *(SArray **)key2;
1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361
  SSmlKv *kv1 = (SSmlKv *)taosArrayGetP(s1, 0);
  SSmlKv *kv2 = (SSmlKv *)taosArrayGetP(s2, 0);
  ASSERT(kv1->type == TSDB_DATA_TYPE_TIMESTAMP);
  ASSERT(kv2->type == TSDB_DATA_TYPE_TIMESTAMP);
  if (kv1->i < kv2->i) {
    return -1;
  } else if (kv1->i > kv2->i) {
    return 1;
  } else {
    return 0;
  }
}

X
Xiaoyu Wang 已提交
1362
static int32_t smlKvTimeHashCompare(const void *key1, const void *key2) {
1363 1364
  SHashObj *s1 = *(SHashObj **)key1;
  SHashObj *s2 = *(SHashObj **)key2;
1365 1366
  SSmlKv *kv1 = *(SSmlKv **)taosHashGet(s1, TS, TS_LEN);
  SSmlKv *kv2 = *(SSmlKv **)taosHashGet(s2, TS, TS_LEN);
1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377
  ASSERT(kv1->type == TSDB_DATA_TYPE_TIMESTAMP);
  ASSERT(kv2->type == TSDB_DATA_TYPE_TIMESTAMP);
  if (kv1->i < kv2->i) {
    return -1;
  } else if (kv1->i > kv2->i) {
    return 1;
  } else {
    return 0;
  }
}

1378 1379
static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *cols){
  if(dataFormat){
1380
    void *p = taosArraySearch(oneTable->cols, &cols, smlKvTimeArrayCompare, TD_GT);
1381 1382
    if(p == NULL){
      taosArrayPush(oneTable->cols, &cols);
1383 1384
    }else{
      taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &cols);
1385
    }
1386 1387 1388 1389
    return TSDB_CODE_SUCCESS;
  }

  SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
1390
  if (!kvHash) {
1391 1392 1393
    uError("SML:smlDealCols failed to allocate memory");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
1394
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
1395
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
wmmhello's avatar
wmmhello 已提交
1396
    taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
1397 1398
  }

1399
  void *p = taosArraySearch(oneTable->cols, &kvHash, smlKvTimeHashCompare, TD_GT);
1400 1401
  if(p == NULL){
    taosArrayPush(oneTable->cols, &kvHash);
1402 1403
  }else{
    taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &kvHash);
1404
  }
1405 1406 1407
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1408 1409 1410
static SSmlSTableMeta *smlBuildSTableMeta() {
  SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
  if (!meta) {
1411 1412 1413 1414 1415 1416 1417 1418
    return NULL;
  }
  meta->tagHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (meta->tagHash == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }

wmmhello's avatar
wmmhello 已提交
1419 1420
  meta->colHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (meta->colHash == NULL) {
1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }

  meta->tags = taosArrayInit(32, POINTER_BYTES);
  if (meta->tags == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }

  meta->cols = taosArrayInit(32, POINTER_BYTES);
  if (meta->cols == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }
  return meta;

cleanup:
  taosMemoryFree(meta);
  return NULL;
}

X
Xiaoyu Wang 已提交
1443
static void smlDestroySTableMeta(SSmlSTableMeta *meta) {
1444
  taosHashCleanup(meta->tagHash);
wmmhello's avatar
wmmhello 已提交
1445
  taosHashCleanup(meta->colHash);
1446 1447 1448
  taosArrayDestroy(meta->tags);
  taosArrayDestroy(meta->cols);
  taosMemoryFree(meta->tableMeta);
wmmhello's avatar
wmmhello 已提交
1449
  taosMemoryFree(meta);
1450 1451 1452 1453 1454
}

static void smlDestroyCols(SArray *cols) {
  if (!cols) return;
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
wmmhello's avatar
wmmhello 已提交
1455
    void *kv = taosArrayGetP(cols, i);
1456 1457 1458 1459
    taosMemoryFree(kv);
  }
}

X
Xiaoyu Wang 已提交
1460 1461
static void smlDestroyInfo(SSmlHandle *info) {
  if (!info) return;
1462 1463 1464 1465
  qDestroyQuery(info->pQuery);
  smlDestroyHandle(info->exec);

  // destroy info->childTables
X
Xiaoyu Wang 已提交
1466
  void **p1 = (void **)taosHashIterate(info->childTables, NULL);
1467
  while (p1) {
X
Xiaoyu Wang 已提交
1468 1469
    smlDestroyTableInfo(info, (SSmlTableInfo *)(*p1));
    p1 = (void **)taosHashIterate(info->childTables, p1);
1470 1471 1472 1473
  }
  taosHashCleanup(info->childTables);

  // destroy info->superTables
X
Xiaoyu Wang 已提交
1474
  p1 = (void **)taosHashIterate(info->superTables, NULL);
1475
  while (p1) {
X
Xiaoyu Wang 已提交
1476 1477
    smlDestroySTableMeta((SSmlSTableMeta *)(*p1));
    p1 = (void **)taosHashIterate(info->superTables, p1);
1478 1479 1480 1481 1482 1483
  }
  taosHashCleanup(info->superTables);

  // destroy info->pVgHash
  taosHashCleanup(info->pVgHash);
  taosHashCleanup(info->dumplicateKey);
X
Xiaoyu Wang 已提交
1484
  if (!info->dataFormat) {
wmmhello's avatar
wmmhello 已提交
1485 1486
    taosArrayDestroy(info->colsContainer);
  }
wmmhello's avatar
wmmhello 已提交
1487
  destroyRequest(info->pRequest);
1488 1489 1490
  taosMemoryFreeClear(info);
}

1491
static SSmlHandle* smlBuildSmlInfo(STscObj* pTscObj, SRequestObj* request, SMLProtocolType protocol, int8_t precision){
1492 1493 1494 1495 1496
  int32_t code = TSDB_CODE_SUCCESS;
  SSmlHandle* info = (SSmlHandle*)taosMemoryCalloc(1, sizeof(SSmlHandle));
  if (NULL == info) {
    return NULL;
  }
X
Xiaoyu Wang 已提交
1497
  info->id = smlGenId();
1498

1499
  info->pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
1500
  if (NULL == info->pQuery) {
X
Xiaoyu Wang 已提交
1501
    uError("SML:0x%" PRIx64 " create info->pQuery error", info->id);
1502 1503
    goto cleanup;
  }
X
Xiaoyu Wang 已提交
1504
  info->pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
1505
  info->pQuery->haveResultSet = false;
X
Xiaoyu Wang 已提交
1506 1507 1508 1509
  info->pQuery->msgType = TDMT_VND_SUBMIT;
  info->pQuery->pRoot = (SNode *)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
  if (NULL == info->pQuery->pRoot) {
    uError("SML:0x%" PRIx64 " create info->pQuery->pRoot error", info->id);
1510 1511
    goto cleanup;
  }
X
Xiaoyu Wang 已提交
1512
  ((SVnodeModifOpStmt *)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV;
1513

1514 1515 1516 1517 1518 1519 1520
  if (pTscObj){
    info->taos        = pTscObj;
    code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code);
      goto cleanup;
    }
wmmhello's avatar
wmmhello 已提交
1521
  }
1522

X
Xiaoyu Wang 已提交
1523 1524 1525
  info->precision = precision;
  info->protocol = protocol;
  if (protocol == TSDB_SML_LINE_PROTOCOL) {
1526
    info->dataFormat = tsSmlDataFormat;
X
Xiaoyu Wang 已提交
1527
  } else {
1528 1529
    info->dataFormat = true;
  }
1530 1531 1532 1533 1534

  if(request){
    info->pRequest = request;
    info->msgBuf.buf = info->pRequest->msgBuf;
    info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
1535
    info->pRequest->stmtType = info->pQuery->pRoot->type;
1536
  }
1537

X
Xiaoyu Wang 已提交
1538
  info->exec = smlInitHandle(info->pQuery);
1539 1540
  info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
1541
  info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
1542 1543

  info->dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
1544
  if (!info->dataFormat) {
1545
    info->colsContainer = taosArrayInit(32, POINTER_BYTES);
X
Xiaoyu Wang 已提交
1546 1547
    if (NULL == info->colsContainer) {
      uError("SML:0x%" PRIx64 " create info failed", info->id);
1548 1549 1550
      goto cleanup;
    }
  }
X
Xiaoyu Wang 已提交
1551 1552 1553
  if (NULL == info->exec || NULL == info->childTables || NULL == info->superTables || NULL == info->pVgHash ||
      NULL == info->dumplicateKey) {
    uError("SML:0x%" PRIx64 " create info failed", info->id);
1554 1555 1556 1557 1558 1559 1560 1561 1562 1563
    goto cleanup;
  }

  return info;
cleanup:
  smlDestroyInfo(info);
  return NULL;
}

/************* TSDB_SML_JSON_PROTOCOL function start **************/
X
Xiaoyu Wang 已提交
1564
static int32_t smlJsonCreateSring(const char **output, char *input, int32_t inputLen) {
1565
  *output = (const char *)taosMemoryCalloc(1, inputLen);
X
Xiaoyu Wang 已提交
1566
  if (*output == NULL) {
1567 1568 1569
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
1570
  memcpy((void *)(*output), input, inputLen);
1571 1572 1573 1574 1575 1576
  return TSDB_CODE_SUCCESS;
}

static int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *root, SSmlTableInfo *tinfo) {
  cJSON *metric = cJSON_GetObjectItem(root, "metric");
  if (!cJSON_IsString(metric)) {
X
Xiaoyu Wang 已提交
1577
    return TSDB_CODE_TSC_INVALID_JSON;
1578 1579 1580
  }

  tinfo->sTableNameLen = strlen(metric->valuestring);
1581
  if (IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
X
Xiaoyu Wang 已提交
1582
    uError("OTD:0x%" PRIx64 " Metric lenght is 0 or large than 192", info->id);
1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
  }

  return smlJsonCreateSring(&tinfo->sTableName, metric->valuestring, tinfo->sTableNameLen);
}

static int32_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int64_t *tsVal) {
  int32_t size = cJSON_GetArraySize(root);
  if (size != OTD_JSON_SUB_FIELDS_NUM) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *value = cJSON_GetObjectItem(root, "value");
  if (!cJSON_IsNumber(value)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *type = cJSON_GetObjectItem(root, "type");
  if (!cJSON_IsString(type)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  double timeDouble = value->valuedouble;
X
Xiaoyu Wang 已提交
1606
  if (smlDoubleToInt64OverFlow(timeDouble)) {
1607
    smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1608
    return TSDB_CODE_INVALID_TIMESTAMP;
1609
  }
wmmhello's avatar
wmmhello 已提交
1610 1611 1612 1613 1614 1615 1616 1617

  if (timeDouble == 0) {
    *tsVal = taosGetTimestampNs();
    return TSDB_CODE_SUCCESS;
  }

  if (timeDouble < 0) {
    return TSDB_CODE_INVALID_TIMESTAMP;
1618 1619
  }

1620
  *tsVal = timeDouble;
1621
  size_t typeLen = strlen(type->valuestring);
wmmhello's avatar
wmmhello 已提交
1622
  if (typeLen == 1 && (type->valuestring[0] == 's' || type->valuestring[0] == 'S')) {
X
Xiaoyu Wang 已提交
1623
    // seconds
1624 1625
    *tsVal = *tsVal * NANOSECOND_PER_SEC;
    timeDouble = timeDouble * NANOSECOND_PER_SEC;
X
Xiaoyu Wang 已提交
1626
    if (smlDoubleToInt64OverFlow(timeDouble)) {
1627
      smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1628
      return TSDB_CODE_INVALID_TIMESTAMP;
1629
    }
wmmhello's avatar
wmmhello 已提交
1630
  } else if (typeLen == 2 && (type->valuestring[1] == 's' || type->valuestring[1] == 'S')) {
1631 1632
    switch (type->valuestring[0]) {
      case 'm':
wmmhello's avatar
wmmhello 已提交
1633
      case 'M':
X
Xiaoyu Wang 已提交
1634
        // milliseconds
1635 1636
        *tsVal = *tsVal * NANOSECOND_PER_MSEC;
        timeDouble = timeDouble * NANOSECOND_PER_MSEC;
X
Xiaoyu Wang 已提交
1637
        if (smlDoubleToInt64OverFlow(timeDouble)) {
1638
          smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1639
          return TSDB_CODE_INVALID_TIMESTAMP;
1640 1641 1642
        }
        break;
      case 'u':
wmmhello's avatar
wmmhello 已提交
1643
      case 'U':
X
Xiaoyu Wang 已提交
1644
        // microseconds
1645 1646
        *tsVal = *tsVal * NANOSECOND_PER_USEC;
        timeDouble = timeDouble * NANOSECOND_PER_USEC;
X
Xiaoyu Wang 已提交
1647
        if (smlDoubleToInt64OverFlow(timeDouble)) {
1648
          smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1649
          return TSDB_CODE_INVALID_TIMESTAMP;
1650 1651 1652
        }
        break;
      case 'n':
wmmhello's avatar
wmmhello 已提交
1653
      case 'N':
1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674
        break;
      default:
        return TSDB_CODE_TSC_INVALID_JSON;
    }
  } else {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  return TSDB_CODE_SUCCESS;
}

static uint8_t smlGetTimestampLen(int64_t num) {
  uint8_t len = 0;
  while ((num /= 10) != 0) {
    len++;
  }
  len++;
  return len;
}

static int32_t smlParseTSFromJSON(SSmlHandle *info, cJSON *root, SArray *cols) {
X
Xiaoyu Wang 已提交
1675
  // Timestamp must be the first KV to parse
1676 1677 1678 1679
  int64_t tsVal = 0;

  cJSON *timestamp = cJSON_GetObjectItem(root, "timestamp");
  if (cJSON_IsNumber(timestamp)) {
X
Xiaoyu Wang 已提交
1680
    // timestamp value 0 indicates current system time
1681
    double timeDouble = timestamp->valuedouble;
X
Xiaoyu Wang 已提交
1682
    if (smlDoubleToInt64OverFlow(timeDouble)) {
1683
      smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1684
      return TSDB_CODE_INVALID_TIMESTAMP;
1685
    }
wmmhello's avatar
wmmhello 已提交
1686

X
Xiaoyu Wang 已提交
1687
    if (timeDouble < 0) {
wmmhello's avatar
wmmhello 已提交
1688
      return TSDB_CODE_INVALID_TIMESTAMP;
1689
    }
1690

1691
    uint8_t tsLen = smlGetTimestampLen((int64_t)timeDouble);
1692
    tsVal = (int64_t)timeDouble;
1693
    if (tsLen == TSDB_TIME_PRECISION_SEC_DIGITS) {
1694 1695
      tsVal = tsVal * NANOSECOND_PER_SEC;
      timeDouble = timeDouble * NANOSECOND_PER_SEC;
X
Xiaoyu Wang 已提交
1696
      if (smlDoubleToInt64OverFlow(timeDouble)) {
1697
        smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1698
        return TSDB_CODE_INVALID_TIMESTAMP;
1699 1700
      }
    } else if (tsLen == TSDB_TIME_PRECISION_MILLI_DIGITS) {
1701 1702
      tsVal = tsVal * NANOSECOND_PER_MSEC;
      timeDouble = timeDouble * NANOSECOND_PER_MSEC;
X
Xiaoyu Wang 已提交
1703
      if (smlDoubleToInt64OverFlow(timeDouble)) {
1704
        smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1705
        return TSDB_CODE_INVALID_TIMESTAMP;
1706
      }
X
Xiaoyu Wang 已提交
1707
    } else if (timeDouble == 0) {
wmmhello's avatar
wmmhello 已提交
1708
      tsVal = taosGetTimestampNs();
X
Xiaoyu Wang 已提交
1709
    } else {
wmmhello's avatar
wmmhello 已提交
1710
      return TSDB_CODE_INVALID_TIMESTAMP;
1711 1712 1713 1714
    }
  } else if (cJSON_IsObject(timestamp)) {
    int32_t ret = smlParseTSFromJSONObj(info, timestamp, &tsVal);
    if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
1715
      uError("SML:0x%" PRIx64 " Failed to parse timestamp from JSON Obj", info->id);
1716 1717 1718 1719
      return ret;
    }
  } else {
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
1720 1721
  }

wmmhello's avatar
wmmhello 已提交
1722
  // add ts to
wmmhello's avatar
wmmhello 已提交
1723
  SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1724
  if (!kv) {
wmmhello's avatar
wmmhello 已提交
1725 1726 1727
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  kv->key = TS;
1728 1729
  kv->keyLen = TS_LEN;
  kv->i = tsVal;
wmmhello's avatar
wmmhello 已提交
1730
  kv->type = TSDB_DATA_TYPE_TIMESTAMP;
wmmhello's avatar
wmmhello 已提交
1731
  kv->length = (int16_t)tDataTypes[kv->type].bytes;
X
Xiaoyu Wang 已提交
1732
  if (cols) taosArrayPush(cols, &kv);
wmmhello's avatar
wmmhello 已提交
1733 1734 1735
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1736
static int32_t smlConvertJSONBool(SSmlKv *pVal, char *typeStr, cJSON *value) {
1737 1738 1739 1740 1741 1742 1743
  if (strcasecmp(typeStr, "bool") != 0) {
    uError("OTD:invalid type(%s) for JSON Bool", typeStr);
    return TSDB_CODE_TSC_INVALID_JSON_TYPE;
  }
  pVal->type = TSDB_DATA_TYPE_BOOL;
  pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
  pVal->i = value->valueint;
wmmhello's avatar
wmmhello 已提交
1744

1745 1746
  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
1747

X
Xiaoyu Wang 已提交
1748 1749 1750
static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
  // tinyint
  if (strcasecmp(typeStr, "i8") == 0 || strcasecmp(typeStr, "tinyint") == 0) {
1751 1752 1753 1754 1755 1756 1757 1758 1759
    if (!IS_VALID_TINYINT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(tinyint)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_TINYINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1760 1761
  // smallint
  if (strcasecmp(typeStr, "i16") == 0 || strcasecmp(typeStr, "smallint") == 0) {
1762 1763 1764 1765 1766 1767 1768 1769 1770
    if (!IS_VALID_SMALLINT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(smallint)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_SMALLINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1771 1772
  // int
  if (strcasecmp(typeStr, "i32") == 0 || strcasecmp(typeStr, "int") == 0) {
1773 1774 1775 1776 1777 1778 1779 1780 1781
    if (!IS_VALID_INT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(int)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_INT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1782 1783
  // bigint
  if (strcasecmp(typeStr, "i64") == 0 || strcasecmp(typeStr, "bigint") == 0) {
1784 1785
    pVal->type = TSDB_DATA_TYPE_BIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
X
Xiaoyu Wang 已提交
1786
    if (value->valuedouble >= (double)INT64_MAX) {
1787
      pVal->i = INT64_MAX;
X
Xiaoyu Wang 已提交
1788
    } else if (value->valuedouble <= (double)INT64_MIN) {
1789
      pVal->i = INT64_MIN;
X
Xiaoyu Wang 已提交
1790
    } else {
1791
      pVal->i = value->valuedouble;
1792 1793 1794
    }
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1795 1796
  // float
  if (strcasecmp(typeStr, "f32") == 0 || strcasecmp(typeStr, "float") == 0) {
1797 1798 1799
    if (!IS_VALID_FLOAT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(float)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
wmmhello's avatar
wmmhello 已提交
1800
    }
1801 1802 1803 1804 1805
    pVal->type = TSDB_DATA_TYPE_FLOAT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->f = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1806 1807
  // double
  if (strcasecmp(typeStr, "f64") == 0 || strcasecmp(typeStr, "double") == 0) {
1808 1809 1810 1811
    pVal->type = TSDB_DATA_TYPE_DOUBLE;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->d = value->valuedouble;
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1812 1813
  }

X
Xiaoyu Wang 已提交
1814
  // if reach here means type is unsupported
1815 1816 1817
  uError("OTD:invalid type(%s) for JSON Number", typeStr);
  return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
1818

X
Xiaoyu Wang 已提交
1819
static int32_t smlConvertJSONString(SSmlKv *pVal, char *typeStr, cJSON *value) {
1820 1821 1822 1823 1824 1825 1826 1827 1828
  if (strcasecmp(typeStr, "binary") == 0) {
    pVal->type = TSDB_DATA_TYPE_BINARY;
  } else if (strcasecmp(typeStr, "nchar") == 0) {
    pVal->type = TSDB_DATA_TYPE_NCHAR;
  } else {
    uError("OTD:invalid type(%s) for JSON String", typeStr);
    return TSDB_CODE_TSC_INVALID_JSON_TYPE;
  }
  pVal->length = (int16_t)strlen(value->valuestring);
wmmhello's avatar
wmmhello 已提交
1829 1830 1831 1832 1833 1834 1835 1836

  if (pVal->type == TSDB_DATA_TYPE_BINARY && pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE){
    return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
  }
  if (pVal->type == TSDB_DATA_TYPE_NCHAR  && pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){
    return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
  }

wmmhello's avatar
wmmhello 已提交
1837
  return smlJsonCreateSring(&pVal->value, value->valuestring, pVal->length);
1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863
}

static int32_t smlParseValueFromJSONObj(cJSON *root, SSmlKv *kv) {
  int32_t ret = TSDB_CODE_SUCCESS;
  int32_t size = cJSON_GetArraySize(root);

  if (size != OTD_JSON_SUB_FIELDS_NUM) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *value = cJSON_GetObjectItem(root, "value");
  if (value == NULL) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *type = cJSON_GetObjectItem(root, "type");
  if (!cJSON_IsString(type)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  switch (value->type) {
    case cJSON_True:
    case cJSON_False: {
      ret = smlConvertJSONBool(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
wmmhello's avatar
wmmhello 已提交
1864
      }
1865
      break;
wmmhello's avatar
wmmhello 已提交
1866
    }
1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882
    case cJSON_Number: {
      ret = smlConvertJSONNumber(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
      break;
    }
    case cJSON_String: {
      ret = smlConvertJSONString(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
      break;
    }
    default:
      return TSDB_CODE_TSC_INVALID_JSON_TYPE;
wmmhello's avatar
wmmhello 已提交
1883
  }
1884 1885

  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1886 1887
}

1888 1889 1890 1891 1892 1893 1894 1895
static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
  switch (root->type) {
    case cJSON_True:
    case cJSON_False: {
      kv->type = TSDB_DATA_TYPE_BOOL;
      kv->length = (int16_t)tDataTypes[kv->type].bytes;
      kv->i = root->valueint;
      break;
wmmhello's avatar
wmmhello 已提交
1896
    }
1897 1898 1899 1900 1901 1902 1903 1904 1905 1906
    case cJSON_Number: {
      kv->type = TSDB_DATA_TYPE_DOUBLE;
      kv->length = (int16_t)tDataTypes[kv->type].bytes;
      kv->d = root->valuedouble;
      break;
    }
    case cJSON_String: {
      /* set default JSON type to binary/nchar according to
       * user configured parameter tsDefaultJSONStrType
       */
wmmhello's avatar
wmmhello 已提交
1907

X
Xiaoyu Wang 已提交
1908
      char *tsDefaultJSONStrType = "nchar";  // todo
1909 1910
      smlConvertJSONString(kv, tsDefaultJSONStrType, root);
      break;
wmmhello's avatar
wmmhello 已提交
1911
    }
1912 1913 1914 1915 1916 1917 1918 1919 1920 1921
    case cJSON_Object: {
      int32_t ret = smlParseValueFromJSONObj(root, kv);
      if (ret != TSDB_CODE_SUCCESS) {
        uError("OTD:Failed to parse value from JSON Obj");
        return ret;
      }
      break;
    }
    default:
      return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
1922
  }
1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933

  return TSDB_CODE_SUCCESS;
}

static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
  cJSON *metricVal = cJSON_GetObjectItem(root, "value");
  if (metricVal == NULL) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1934
  if (!kv) {
1935 1936
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
1937
  if (cols) taosArrayPush(cols, &kv);
1938 1939 1940 1941 1942 1943 1944 1945

  kv->key = VALUE;
  kv->keyLen = VALUE_LEN;
  int32_t ret = smlParseValueFromJSON(metricVal, kv);
  if (ret != TSDB_CODE_SUCCESS) {
    return ret;
  }
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1946 1947
}

X
Xiaoyu Wang 已提交
1948 1949
static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, char *childTableName, SHashObj *dumplicateKey,
                                    SSmlMsgBuf *msg) {
1950 1951 1952 1953 1954 1955
  int32_t ret = TSDB_CODE_SUCCESS;

  cJSON *tags = cJSON_GetObjectItem(root, "tags");
  if (tags == NULL || tags->type != cJSON_Object) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }
wmmhello's avatar
wmmhello 已提交
1956

X
Xiaoyu Wang 已提交
1957
  size_t  childTableNameLen = strlen(tsSmlChildTableName);
1958 1959 1960 1961 1962
  int32_t tagNum = cJSON_GetArraySize(tags);
  for (int32_t i = 0; i < tagNum; ++i) {
    cJSON *tag = cJSON_GetArrayItem(tags, i);
    if (tag == NULL) {
      return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
1963
    }
1964 1965 1966 1967 1968
    size_t keyLen = strlen(tag->string);
    if (IS_INVALID_COL_LEN(keyLen)) {
      uError("OTD:Tag key length is 0 or too large than 64");
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
    }
X
Xiaoyu Wang 已提交
1969
    // check duplicate keys
1970
    if (smlCheckDuplicateKey(tag->string, keyLen, dumplicateKey)) {
wmmhello's avatar
wmmhello 已提交
1971
      return TSDB_CODE_TSC_DUP_NAMES;
wmmhello's avatar
wmmhello 已提交
1972 1973
    }

X
Xiaoyu Wang 已提交
1974 1975
    // handle child table name
    if (childTableNameLen != 0 && strcmp(tag->string, tsSmlChildTableName) == 0) {
1976 1977 1978 1979 1980 1981 1982 1983 1984
      if (!cJSON_IsString(tag)) {
        uError("OTD:ID must be JSON string");
        return TSDB_CODE_TSC_INVALID_JSON;
      }
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, tag->valuestring, TSDB_TABLE_NAME_LEN);
      continue;
    }

1985 1986
    // add kv to SSmlKv
    SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1987 1988
    if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
    if (pKVs) taosArrayPush(pKVs, &kv);
1989

X
Xiaoyu Wang 已提交
1990
    // key
1991
    kv->keyLen = keyLen;
1992 1993 1994 1995
    ret = smlJsonCreateSring(&kv->key, tag->string, kv->keyLen);
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }
X
Xiaoyu Wang 已提交
1996
    // value
1997 1998 1999 2000
    ret = smlParseValueFromJSON(tag, kv);
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }
wmmhello's avatar
wmmhello 已提交
2001 2002
  }

2003
  return ret;
wmmhello's avatar
wmmhello 已提交
2004 2005
}

2006 2007
static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo *tinfo, SArray *cols) {
  int32_t ret = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2008

2009
  if (!cJSON_IsObject(root)) {
X
Xiaoyu Wang 已提交
2010
    uError("OTD:0x%" PRIx64 " data point needs to be JSON object", info->id);
2011
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2012
  }
2013

2014
  int32_t size = cJSON_GetArraySize(root);
X
Xiaoyu Wang 已提交
2015
  // outmost json fields has to be exactly 4
2016
  if (size != OTD_JSON_FIELDS_NUM) {
X
Xiaoyu Wang 已提交
2017
    uError("OTD:0x%" PRIx64 " Invalid number of JSON fields in data point %d", info->id, size);
2018
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2019
  }
2020

X
Xiaoyu Wang 已提交
2021
  // Parse metric
2022 2023
  ret = smlParseMetricFromJSON(info, root, tinfo);
  if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2024
    uError("OTD:0x%" PRIx64 " Unable to parse metric from JSON payload", info->id);
2025
    return ret;
wmmhello's avatar
wmmhello 已提交
2026
  }
X
Xiaoyu Wang 已提交
2027
  uDebug("OTD:0x%" PRIx64 " Parse metric from JSON payload finished", info->id);
wmmhello's avatar
wmmhello 已提交
2028

X
Xiaoyu Wang 已提交
2029
  // Parse timestamp
2030 2031
  ret = smlParseTSFromJSON(info, root, cols);
  if (ret) {
X
Xiaoyu Wang 已提交
2032
    uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id);
2033
    return ret;
wmmhello's avatar
wmmhello 已提交
2034
  }
X
Xiaoyu Wang 已提交
2035
  uDebug("OTD:0x%" PRIx64 " Parse timestamp from JSON payload finished", info->id);
2036

X
Xiaoyu Wang 已提交
2037
  // Parse metric value
2038 2039
  ret = smlParseColsFromJSON(root, cols);
  if (ret) {
X
Xiaoyu Wang 已提交
2040
    uError("OTD:0x%" PRIx64 " Unable to parse metric value from JSON payload", info->id);
2041
    return ret;
2042
  }
X
Xiaoyu Wang 已提交
2043
  uDebug("OTD:0x%" PRIx64 " Parse metric value from JSON payload finished", info->id);
2044

X
Xiaoyu Wang 已提交
2045
  // Parse tags
2046
  ret = smlParseTagsFromJSON(root, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
2047
  if (ret) {
X
Xiaoyu Wang 已提交
2048
    uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
2049
    return ret;
2050
  }
X
Xiaoyu Wang 已提交
2051
  uDebug("OTD:0x%" PRIx64 " Parse tags from JSON payload finished", info->id);
wmmhello's avatar
wmmhello 已提交
2052

2053
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2054
}
2055
/************* TSDB_SML_JSON_PROTOCOL function end **************/
wmmhello's avatar
wmmhello 已提交
2056

X
Xiaoyu Wang 已提交
2057
static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql) {
wmmhello's avatar
wmmhello 已提交
2058
  SSmlLineInfo elements = {0};
2059
  uDebug("SML:0x%" PRIx64 " smlParseInfluxLine sql:%s, hello", info->id, sql);
2060

X
Xiaoyu Wang 已提交
2061 2062 2063
  int          ret = smlParseInfluxString(sql, &elements, &info->msgBuf);
  if (ret != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " smlParseInfluxLine failed", info->id);
wmmhello's avatar
wmmhello 已提交
2064 2065 2066
    return ret;
  }

2067
  SArray *cols = NULL;
X
Xiaoyu Wang 已提交
2068
  if (info->dataFormat) {  // if dataFormat, cols need new memory to save data
2069 2070
    cols = taosArrayInit(16, POINTER_BYTES);
    if (cols == NULL) {
X
Xiaoyu Wang 已提交
2071
      uError("SML:0x%" PRIx64 " smlParseInfluxLine failed to allocate memory", info->id);
2072 2073
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
2074
  } else {  // if dataFormat is false, cols do not need to save data, there is another new memory to save data
2075
    cols = info->colsContainer;
wmmhello's avatar
wmmhello 已提交
2076
  }
wmmhello's avatar
wmmhello 已提交
2077

wmmhello's avatar
wmmhello 已提交
2078
  ret = smlParseTS(info, elements.timestamp, elements.timestampLen, cols);
X
Xiaoyu Wang 已提交
2079 2080 2081
  if (ret != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " smlParseTS failed", info->id);
    if (info->dataFormat) taosArrayDestroy(cols);
wmmhello's avatar
wmmhello 已提交
2082 2083
    return ret;
  }
2084
  ret = smlParseCols(elements.cols, elements.colsLen, cols, NULL, false, info->dumplicateKey, &info->msgBuf);
X
Xiaoyu Wang 已提交
2085 2086
  if (ret != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " smlParseCols parse cloums fields failed", info->id);
2087
    smlDestroyCols(cols);
X
Xiaoyu Wang 已提交
2088
    if (info->dataFormat) taosArrayDestroy(cols);
wmmhello's avatar
wmmhello 已提交
2089 2090 2091
    return ret;
  }

X
Xiaoyu Wang 已提交
2092 2093 2094 2095 2096
  bool            hasTable = true;
  SSmlTableInfo  *tinfo = NULL;
  SSmlTableInfo **oneTable =
      (SSmlTableInfo **)taosHashGet(info->childTables, elements.measure, elements.measureTagsLen);
  if (!oneTable) {
2097
    tinfo = smlBuildTableInfo();
X
Xiaoyu Wang 已提交
2098
    if (!tinfo) {
wmmhello's avatar
wmmhello 已提交
2099 2100
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
2101
    taosHashPut(info->childTables, elements.measure, elements.measureTagsLen, &tinfo, POINTER_BYTES);
2102
    oneTable = &tinfo;
2103 2104 2105 2106
    hasTable = false;
  }

  ret = smlDealCols(*oneTable, info->dataFormat, cols);
X
Xiaoyu Wang 已提交
2107
  if (ret != TSDB_CODE_SUCCESS) {
2108 2109
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
2110

X
Xiaoyu Wang 已提交
2111 2112 2113 2114 2115
  if (!hasTable) {
    ret = smlParseCols(elements.tags, elements.tagsLen, (*oneTable)->tags, (*oneTable)->childTableName, true,
                       info->dumplicateKey, &info->msgBuf);
    if (ret != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " smlParseCols parse tag fields failed", info->id);
wmmhello's avatar
wmmhello 已提交
2116 2117 2118
      return ret;
    }

X
Xiaoyu Wang 已提交
2119
    if (taosArrayGetSize((*oneTable)->tags) > TSDB_MAX_TAGS) {
wmmhello's avatar
wmmhello 已提交
2120
      smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
wmmhello's avatar
wmmhello 已提交
2121
      return TSDB_CODE_PAR_INVALID_TAGS_NUM;
wmmhello's avatar
wmmhello 已提交
2122 2123
    }

2124 2125 2126 2127 2128
    if (taosArrayGetSize(cols) + taosArrayGetSize((*oneTable)->tags) > TSDB_MAX_COLUMNS) {
      smlBuildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL);
      return TSDB_CODE_PAR_TOO_MANY_COLUMNS;
    }

2129 2130
    (*oneTable)->sTableName = elements.measure;
    (*oneTable)->sTableNameLen = elements.measureLen;
X
Xiaoyu Wang 已提交
2131 2132 2133
    if (strlen((*oneTable)->childTableName) == 0) {
      RandTableName rName = {(*oneTable)->tags, (*oneTable)->sTableName, (uint8_t)(*oneTable)->sTableNameLen,
                             (*oneTable)->childTableName, 0};
2134 2135 2136

      buildChildTableName(&rName);
      (*oneTable)->uid = rName.uid;
X
Xiaoyu Wang 已提交
2137 2138
    } else {
      (*oneTable)->uid = *(uint64_t *)((*oneTable)->childTableName);
2139
    }
2140
  }
wmmhello's avatar
wmmhello 已提交
2141

X
Xiaoyu Wang 已提交
2142 2143
  SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, elements.measure, elements.measureLen);
  if (tableMeta) {  // update meta
wmmhello's avatar
wmmhello 已提交
2144
    ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, cols, &info->msgBuf);
wmmhello's avatar
wmmhello 已提交
2145
    if (!hasTable && ret == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2146 2147
      ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, (*oneTable)->tags, &info->msgBuf);
    }
wmmhello's avatar
wmmhello 已提交
2148
    if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2149
      uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
wmmhello's avatar
wmmhello 已提交
2150
      return ret;
wmmhello's avatar
wmmhello 已提交
2151
    }
X
Xiaoyu Wang 已提交
2152
  } else {
2153
    SSmlSTableMeta *meta = smlBuildSTableMeta();
wmmhello's avatar
wmmhello 已提交
2154 2155
    smlInsertMeta(meta->tagHash, meta->tags, (*oneTable)->tags);
    smlInsertMeta(meta->colHash, meta->cols, cols);
2156
    taosHashPut(info->superTables, elements.measure, elements.measureLen, &meta, POINTER_BYTES);
wmmhello's avatar
wmmhello 已提交
2157
  }
2158

X
Xiaoyu Wang 已提交
2159
  if (!info->dataFormat) {
2160 2161 2162
    taosArrayClear(info->colsContainer);
  }
  taosHashClear(info->dumplicateKey);
wmmhello's avatar
wmmhello 已提交
2163 2164 2165
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
2166 2167
static int32_t smlParseTelnetLine(SSmlHandle *info, void *data) {
  int            ret = TSDB_CODE_SUCCESS;
2168
  SSmlTableInfo *tinfo = smlBuildTableInfo();
X
Xiaoyu Wang 已提交
2169
  if (!tinfo) {
2170
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
2171 2172
  }

2173 2174
  SArray *cols = taosArrayInit(16, POINTER_BYTES);
  if (cols == NULL) {
X
Xiaoyu Wang 已提交
2175
    uError("SML:0x%" PRIx64 " smlParseTelnetLine failed to allocate memory", info->id);
2176
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
2177 2178
  }

X
Xiaoyu Wang 已提交
2179 2180 2181
  if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
    ret = smlParseTelnetString(info, (const char *)data, tinfo, cols);
  } else if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
2182
    ret = smlParseJSONString(info, (cJSON *)data, tinfo, cols);
X
Xiaoyu Wang 已提交
2183
  } else {
2184 2185
    ASSERT(0);
  }
X
Xiaoyu Wang 已提交
2186 2187
  if (ret != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " smlParseTelnetLine failed", info->id);
wmmhello's avatar
wmmhello 已提交
2188
    smlDestroyTableInfo(info, tinfo);
wmmhello's avatar
wmmhello 已提交
2189
    smlDestroyCols(cols);
2190 2191
    taosArrayDestroy(cols);
    return ret;
wmmhello's avatar
wmmhello 已提交
2192
  }
wmmhello's avatar
wmmhello 已提交
2193

X
Xiaoyu Wang 已提交
2194
  if (taosArrayGetSize(tinfo->tags) <= 0 || taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS) {
2195
    smlBuildInvalidDataMsg(&info->msgBuf, "invalidate tags length:[1,128]", NULL);
wmmhello's avatar
wmmhello 已提交
2196 2197 2198
    smlDestroyTableInfo(info, tinfo);
    smlDestroyCols(cols);
    taosArrayDestroy(cols);
wmmhello's avatar
wmmhello 已提交
2199
    return TSDB_CODE_PAR_INVALID_TAGS_NUM;
wmmhello's avatar
wmmhello 已提交
2200
  }
2201 2202
  taosHashClear(info->dumplicateKey);

X
Xiaoyu Wang 已提交
2203 2204
  if (strlen(tinfo->childTableName) == 0) {
    RandTableName rName = {tinfo->tags, tinfo->sTableName, (uint8_t)tinfo->sTableNameLen, tinfo->childTableName, 0};
2205 2206
    buildChildTableName(&rName);
    tinfo->uid = rName.uid;
X
Xiaoyu Wang 已提交
2207 2208
  } else {
    tinfo->uid = *(uint64_t *)(tinfo->childTableName);  // generate uid by name simple
2209 2210
  }

X
Xiaoyu Wang 已提交
2211 2212 2213 2214
  bool            hasTable = true;
  SSmlTableInfo **oneTable =
      (SSmlTableInfo **)taosHashGet(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName));
  if (!oneTable) {
2215
    taosHashPut(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName), &tinfo, POINTER_BYTES);
2216
    oneTable = &tinfo;
2217
    hasTable = false;
X
Xiaoyu Wang 已提交
2218
  } else {
wmmhello's avatar
wmmhello 已提交
2219
    smlDestroyTableInfo(info, tinfo);
wmmhello's avatar
wmmhello 已提交
2220
  }
wmmhello's avatar
wmmhello 已提交
2221

2222
  taosArrayPush((*oneTable)->cols, &cols);
X
Xiaoyu Wang 已提交
2223 2224 2225
  SSmlSTableMeta **tableMeta =
      (SSmlSTableMeta **)taosHashGet(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen);
  if (tableMeta) {  // update meta
wmmhello's avatar
wmmhello 已提交
2226
    ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, cols, &info->msgBuf);
wmmhello's avatar
wmmhello 已提交
2227
    if (!hasTable && ret == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2228 2229
      ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, (*oneTable)->tags, &info->msgBuf);
    }
wmmhello's avatar
wmmhello 已提交
2230
    if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2231
      uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
wmmhello's avatar
wmmhello 已提交
2232
      return ret;
2233
    }
X
Xiaoyu Wang 已提交
2234
  } else {
2235
    SSmlSTableMeta *meta = smlBuildSTableMeta();
wmmhello's avatar
wmmhello 已提交
2236 2237
    smlInsertMeta(meta->tagHash, meta->tags, (*oneTable)->tags);
    smlInsertMeta(meta->colHash, meta->cols, cols);
2238
    taosHashPut(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen, &meta, POINTER_BYTES);
wmmhello's avatar
wmmhello 已提交
2239 2240
  }

2241 2242
  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
2243

X
Xiaoyu Wang 已提交
2244
static int32_t smlParseJSON(SSmlHandle *info, char *payload) {
2245 2246
  int32_t payloadNum = 0;
  int32_t ret = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2247

2248
  if (payload == NULL) {
X
Xiaoyu Wang 已提交
2249
    uError("SML:0x%" PRIx64 " empty JSON Payload", info->id);
2250
    return TSDB_CODE_TSC_INVALID_JSON;
2251
  }
2252 2253 2254

  cJSON *root = cJSON_Parse(payload);
  if (root == NULL) {
X
Xiaoyu Wang 已提交
2255
    uError("SML:0x%" PRIx64 " parse json failed:%s", info->id, payload);
2256 2257
    return TSDB_CODE_TSC_INVALID_JSON;
  }
X
Xiaoyu Wang 已提交
2258
  // multiple data points must be sent in JSON array
2259 2260 2261 2262 2263
  if (cJSON_IsObject(root)) {
    payloadNum = 1;
  } else if (cJSON_IsArray(root)) {
    payloadNum = cJSON_GetArraySize(root);
  } else {
X
Xiaoyu Wang 已提交
2264
    uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
2265 2266
    ret = TSDB_CODE_TSC_INVALID_JSON;
    goto end;
wmmhello's avatar
wmmhello 已提交
2267
  }
wmmhello's avatar
wmmhello 已提交
2268

2269 2270 2271
  for (int32_t i = 0; i < payloadNum; ++i) {
    cJSON *dataPoint = (payloadNum == 1 && cJSON_IsObject(root)) ? root : cJSON_GetArrayItem(root, i);
    ret = smlParseTelnetLine(info, dataPoint);
X
Xiaoyu Wang 已提交
2272 2273
    if (ret != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
2274 2275 2276 2277 2278 2279 2280
      goto end;
    }
  }

end:
  cJSON_Delete(root);
  return ret;
wmmhello's avatar
wmmhello 已提交
2281
}
2282

X
Xiaoyu Wang 已提交
2283
static int32_t smlInsertData(SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
2284 2285
  int32_t code = TSDB_CODE_SUCCESS;

X
Xiaoyu Wang 已提交
2286
  SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
wmmhello's avatar
wmmhello 已提交
2287
  while (oneTable) {
X
Xiaoyu Wang 已提交
2288
    SSmlTableInfo *tableData = *oneTable;
wmmhello's avatar
wmmhello 已提交
2289 2290 2291 2292

    SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
    strcpy(pName.dbname, info->pRequest->pDb);
    memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName));
D
dapan1121 已提交
2293 2294 2295 2296 2297 2298

    SRequestConnInfo conn = {0};
    conn.pTrans = info->taos->pAppInfo->pTransporter;
    conn.requestId = info->pRequest->requestId;
    conn.requestObjRefId = info->pRequest->self;
    conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
X
Xiaoyu Wang 已提交
2299

wmmhello's avatar
wmmhello 已提交
2300
    SVgroupInfo vg;
D
dapan1121 已提交
2301
    code = catalogGetTableHashVgroup(info->pCatalog, &conn, &pName, &vg);
2302
    if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2303
      uError("SML:0x%" PRIx64 " catalogGetTableHashVgroup failed. table name: %s", info->id, tableData->childTableName);
wmmhello's avatar
wmmhello 已提交
2304 2305
      return code;
    }
X
Xiaoyu Wang 已提交
2306
    taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
wmmhello's avatar
wmmhello 已提交
2307

X
Xiaoyu Wang 已提交
2308 2309 2310
    SSmlSTableMeta **pMeta =
        (SSmlSTableMeta **)taosHashGet(info->superTables, tableData->sTableName, tableData->sTableNameLen);
    ASSERT(NULL != pMeta && NULL != *pMeta);
wmmhello's avatar
wmmhello 已提交
2311

2312
    // use tablemeta of stable to save vgid and uid of child table
wmmhello's avatar
wmmhello 已提交
2313
    (*pMeta)->tableMeta->vgId = vg.vgId;
X
Xiaoyu Wang 已提交
2314
    (*pMeta)->tableMeta->uid = tableData->uid;  // one table merge data block together according uid
wmmhello's avatar
wmmhello 已提交
2315

2316
    code = smlBindData(info->exec, tableData->tags, (*pMeta)->cols, tableData->cols, info->dataFormat,
wmmhello's avatar
wmmhello 已提交
2317
                       (*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen, info->msgBuf.buf, info->msgBuf.len);
X
Xiaoyu Wang 已提交
2318 2319
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " smlBindData failed", info->id);
wmmhello's avatar
wmmhello 已提交
2320 2321
      return code;
    }
X
Xiaoyu Wang 已提交
2322
    oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable);
wmmhello's avatar
wmmhello 已提交
2323
  }
wmmhello's avatar
wmmhello 已提交
2324

2325 2326
  code = smlBuildOutput(info->exec, info->pVgHash);
  if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2327
    uError("SML:0x%" PRIx64 " smlBuildOutput failed", info->id);
2328 2329
    return code;
  }
2330 2331
  info->cost.insertRpcTime = taosGetTimestampUs();

X
Xiaoyu Wang 已提交
2332 2333 2334
  // launchQueryImpl(info->pRequest, info->pQuery, false, NULL);
  //  info->affectedRows = taos_affected_rows(info->pRequest);
  //  return info->pRequest->code;
wmmhello's avatar
wmmhello 已提交
2335

2336 2337 2338
  SAppClusterSummary *pActivity = &info->taos->pAppInfo->summary;
  atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);

D
dapan1121 已提交
2339
  launchAsyncQuery(info->pRequest, info->pQuery, NULL);
wmmhello's avatar
wmmhello 已提交
2340
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2341 2342
}

X
Xiaoyu Wang 已提交
2343 2344 2345 2346 2347 2348 2349 2350 2351
static void smlPrintStatisticInfo(SSmlHandle *info) {
  uError("SML:0x%" PRIx64
         " smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d \
        parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64
         "",
         info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables,
         info->cost.numOfCreateSTables, info->cost.schemaTime - info->cost.parseTime,
         info->cost.insertBindTime - info->cost.schemaTime, info->cost.insertRpcTime - info->cost.insertBindTime,
         info->cost.endTime - info->cost.insertRpcTime, info->cost.endTime - info->cost.parseTime);
2352 2353
}

X
Xiaoyu Wang 已提交
2354
static int32_t smlParseLine(SSmlHandle *info, char *lines[], int numLines) {
wmmhello's avatar
wmmhello 已提交
2355
  int32_t code = TSDB_CODE_SUCCESS;
2356 2357 2358 2359 2360 2361
  if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
    code = smlParseJSON(info, *lines);
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, *lines);
      return code;
    }
wmmhello's avatar
wmmhello 已提交
2362
    return code;
wmmhello's avatar
wmmhello 已提交
2363
  }
wmmhello's avatar
wmmhello 已提交
2364

wmmhello's avatar
wmmhello 已提交
2365
  for (int32_t i = 0; i < numLines; ++i) {
X
Xiaoyu Wang 已提交
2366
    if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
2367
      code = smlParseInfluxLine(info, lines[i]);
X
Xiaoyu Wang 已提交
2368
    } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
2369
      code = smlParseTelnetLine(info, lines[i]);
X
Xiaoyu Wang 已提交
2370
    } else {
2371 2372
      ASSERT(0);
    }
wmmhello's avatar
wmmhello 已提交
2373
    if (code != TSDB_CODE_SUCCESS) {
2374 2375
      uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, lines[i]);
      return code;
wmmhello's avatar
wmmhello 已提交
2376 2377
    }
  }
2378 2379 2380
  return code;
}

X
Xiaoyu Wang 已提交
2381
static int smlProcess(SSmlHandle *info, char *lines[], int numLines) {
2382
  int32_t code = TSDB_CODE_SUCCESS;
2383 2384
  int32_t retryNum = 0;

2385 2386 2387 2388
  info->cost.parseTime = taosGetTimestampUs();

  code = smlParseLine(info, lines, numLines);
  if (code != 0) {
X
Xiaoyu Wang 已提交
2389
    uError("SML:0x%" PRIx64 " smlParseLine error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2390
    return code;
2391
  }
wmmhello's avatar
wmmhello 已提交
2392

2393 2394 2395 2396 2397
  info->cost.lineNum = numLines;
  info->cost.numOfSTables = taosHashGetSize(info->superTables);
  info->cost.numOfCTables = taosHashGetSize(info->childTables);

  info->cost.schemaTime = taosGetTimestampUs();
2398

X
Xiaoyu Wang 已提交
2399
  do {
2400 2401
    code = smlModifyDBSchemas(info);
    if (code == 0) break;
wmmhello's avatar
wmmhello 已提交
2402
  } while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
2403

wmmhello's avatar
wmmhello 已提交
2404
  if (code != 0) {
X
Xiaoyu Wang 已提交
2405
    uError("SML:0x%" PRIx64 " smlModifyDBSchemas error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2406
    return code;
wmmhello's avatar
wmmhello 已提交
2407
  }
wmmhello's avatar
wmmhello 已提交
2408

2409
  info->cost.insertBindTime = taosGetTimestampUs();
wmmhello's avatar
wmmhello 已提交
2410 2411
  code = smlInsertData(info);
  if (code != 0) {
X
Xiaoyu Wang 已提交
2412
    uError("SML:0x%" PRIx64 " smlInsertData error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2413
    return code;
wmmhello's avatar
wmmhello 已提交
2414 2415 2416 2417 2418
  }

  return code;
}

X
Xiaoyu Wang 已提交
2419
static int32_t isSchemalessDb(STscObj *taos, SRequestObj *request) {
wmmhello's avatar
wmmhello 已提交
2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447
//  SCatalog *catalog = NULL;
//  int32_t   code = catalogGetHandle(((STscObj *)taos)->pAppInfo->clusterId, &catalog);
//  if (code != TSDB_CODE_SUCCESS) {
//    uError("SML get catalog error %d", code);
//    return code;
//  }
//
//  SName name;
//  tNameSetDbName(&name, taos->acctId, taos->db, strlen(taos->db));
//  char dbFname[TSDB_DB_FNAME_LEN] = {0};
//  tNameGetFullDbName(&name, dbFname);
//  SDbCfgInfo pInfo = {0};
//
//  SRequestConnInfo conn = {0};
//  conn.pTrans = taos->pAppInfo->pTransporter;
//  conn.requestId = request->requestId;
//  conn.requestObjRefId = request->self;
//  conn.mgmtEps = getEpSet_s(&taos->pAppInfo->mgmtEp);
//
//  code = catalogGetDBCfg(catalog, &conn, dbFname, &pInfo);
//  if (code != TSDB_CODE_SUCCESS) {
//    return code;
//  }
//  taosArrayDestroy(pInfo.pRetensions);
//
//  if (!pInfo.schemaless) {
//    return TSDB_CODE_SML_INVALID_DB_CONF;
//  }
wmmhello's avatar
wmmhello 已提交
2448 2449 2450
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
2451
static void smlInsertCallback(void *param, void *res, int32_t code) {
wmmhello's avatar
wmmhello 已提交
2452
  SRequestObj *pRequest = (SRequestObj *)res;
X
Xiaoyu Wang 已提交
2453
  SSmlHandle  *info = (SSmlHandle *)param;
wmmhello's avatar
wmmhello 已提交
2454
  int32_t rows = taos_affected_rows(pRequest);
wmmhello's avatar
wmmhello 已提交
2455

X
Xiaoyu Wang 已提交
2456
  uDebug("SML:0x%" PRIx64 " result. code:%d, msg:%s", info->id, pRequest->code, pRequest->msgBuf);
2457
  Params *pParam = info->params;
wmmhello's avatar
wmmhello 已提交
2458
  // lock
2459 2460
  taosThreadSpinLock(&pParam->lock);
  pParam->cnt++;
X
Xiaoyu Wang 已提交
2461
  if (code != TSDB_CODE_SUCCESS) {
2462 2463
    pParam->request->code = code;
    pParam->request->body.resInfo.numOfRows += rows;
2464
  }else{
2465 2466 2467 2468
    pParam->request->body.resInfo.numOfRows += info->affectedRows;
  }
  if (pParam->cnt == pParam->total) {
    tsem_post(&pParam->sem);
wmmhello's avatar
wmmhello 已提交
2469
  }
2470
  taosThreadSpinUnlock(&pParam->lock);
wmmhello's avatar
wmmhello 已提交
2471
  // unlock
wmmhello's avatar
wmmhello 已提交
2472
  uDebug("SML:0x%" PRIx64 " insert finished, code: %d, rows: %d, total: %d", info->id, code, rows, info->affectedRows);
wmmhello's avatar
wmmhello 已提交
2473 2474 2475
  info->cost.endTime = taosGetTimestampUs();
  info->cost.code = code;
  smlPrintStatisticInfo(info);
wmmhello's avatar
wmmhello 已提交
2476 2477 2478
  smlDestroyInfo(info);
}

wmmhello's avatar
wmmhello 已提交
2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499
/**
 * taos_schemaless_insert() parse and insert data points into database according to
 * different protocol.
 *
 * @param $lines input array may contain multiple lines, each line indicates a data point.
 *               If protocol=2 is used input array should contain single JSON
 *               string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
 *               multiple data points in JSON format, should include them in $JSON_string
 *               as a JSON array.
 * @param $numLines indicates how many data points in $lines.
 *                  If protocol = 2 is used this param will be ignored as $lines should
 *                  contain single JSON string.
 * @param $protocol indicates which protocol to use for parsing:
 *                  0 - influxDB line protocol
 *                  1 - OpenTSDB telnet line protocol
 *                  2 - OpenTSDB JSON format protocol
 * @return return zero for successful insertion. Otherwise return none-zero error code of
 *         failure reason.
 *
 */

wmmhello's avatar
wmmhello 已提交
2500
TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) {
D
dapan1121 已提交
2501
  if (NULL == taos) {
2502 2503 2504
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    return NULL;
  }
D
dapan1121 已提交
2505

2506
  SRequestObj* request = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
wmmhello's avatar
wmmhello 已提交
2507
  if(!request){
2508
    uError("SML:taos_schemaless_insert error request is null");
2509
    return NULL;
wmmhello's avatar
wmmhello 已提交
2510 2511
  }

wmmhello's avatar
wmmhello 已提交
2512
  int    batchs = 0;
2513 2514
  STscObj* pTscObj = request->pTscObj;

2515
  pTscObj->schemalessType = 1;
wmmhello's avatar
wmmhello 已提交
2516
  SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
wmmhello's avatar
wmmhello 已提交
2517

2518
  Params params = {0};
wmmhello's avatar
wmmhello 已提交
2519
  params.request = request;
wmmhello's avatar
wmmhello 已提交
2520 2521 2522
  tsem_init(&params.sem, 0, 0);
  taosThreadSpinInit(&(params.lock), 0);

X
Xiaoyu Wang 已提交
2523
  if (request->pDb == NULL) {
wmmhello's avatar
wmmhello 已提交
2524
    request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
wmmhello's avatar
wmmhello 已提交
2525
    smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
wmmhello's avatar
wmmhello 已提交
2526 2527 2528
    goto end;
  }

2529
  if(isSchemalessDb(pTscObj, request) != TSDB_CODE_SUCCESS){
wmmhello's avatar
wmmhello 已提交
2530
    request->code = TSDB_CODE_SML_INVALID_DB_CONF;
wmmhello's avatar
wmmhello 已提交
2531
    smlBuildInvalidDataMsg(&msg, "Cannot write data to a non schemaless database", NULL);
wmmhello's avatar
wmmhello 已提交
2532 2533 2534
    goto end;
  }

2535
  if (!lines) {
2536
    request->code = TSDB_CODE_SML_INVALID_DATA;
wmmhello's avatar
wmmhello 已提交
2537
    smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
2538 2539 2540
    goto end;
  }

X
Xiaoyu Wang 已提交
2541
  if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) {
2542
    request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
wmmhello's avatar
wmmhello 已提交
2543
    smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL);
2544
    goto end;
wmmhello's avatar
wmmhello 已提交
2545 2546
  }

X
Xiaoyu Wang 已提交
2547 2548
  if (protocol == TSDB_SML_LINE_PROTOCOL &&
      (precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)) {
2549
    request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
wmmhello's avatar
wmmhello 已提交
2550
    smlBuildInvalidDataMsg(&msg, "precision invalidate for line protocol", NULL);
2551 2552 2553
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
2554 2555 2556 2557 2558 2559 2560 2561
  if(protocol == TSDB_SML_JSON_PROTOCOL){
    numLines = 1;
  }else if(numLines <= 0){
    request->code = TSDB_CODE_SML_INVALID_DATA;
    smlBuildInvalidDataMsg(&msg, "line num is invalid", NULL);
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
2562
  batchs = ceil(((double)numLines) / LINE_BATCH);
2563
  params.total = batchs;
wmmhello's avatar
wmmhello 已提交
2564
  for (int i = 0; i < batchs; ++i) {
2565
    SRequestObj* req = (SRequestObj*)createRequest(pTscObj->id, TSDB_SQL_INSERT);
wmmhello's avatar
wmmhello 已提交
2566 2567 2568 2569 2570
    if(!req){
      request->code = TSDB_CODE_OUT_OF_MEMORY;
      uError("SML:taos_schemaless_insert error request is null");
      goto end;
    }
2571
    SSmlHandle* info = smlBuildSmlInfo(pTscObj, req, (SMLProtocolType)protocol, precision);
wmmhello's avatar
wmmhello 已提交
2572 2573 2574 2575 2576 2577
    if(!info){
      request->code = TSDB_CODE_OUT_OF_MEMORY;
      uError("SML:taos_schemaless_insert error SSmlHandle is null");
      goto end;
    }

wmmhello's avatar
wmmhello 已提交
2578 2579
    int32_t perBatch = LINE_BATCH;

X
Xiaoyu Wang 已提交
2580
    if (numLines > perBatch) {
wmmhello's avatar
wmmhello 已提交
2581
      numLines -= perBatch;
X
Xiaoyu Wang 已提交
2582
    } else {
wmmhello's avatar
wmmhello 已提交
2583 2584 2585 2586
      perBatch = numLines;
      numLines = 0;
    }

wmmhello's avatar
wmmhello 已提交
2587
    info->params = &params;
wmmhello's avatar
wmmhello 已提交
2588 2589
    info->affectedRows = perBatch;
    info->pRequest->body.queryFp = smlInsertCallback;
X
Xiaoyu Wang 已提交
2590
    info->pRequest->body.param = info;
wmmhello's avatar
wmmhello 已提交
2591
    int32_t code = smlProcess(info, lines, perBatch);
wmmhello's avatar
wmmhello 已提交
2592
    lines += perBatch;
X
Xiaoyu Wang 已提交
2593
    if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2594 2595 2596 2597
      info->pRequest->body.queryFp(info, req, code);
    }
  }
  tsem_wait(&params.sem);
2598 2599

end:
wmmhello's avatar
wmmhello 已提交
2600 2601
  taosThreadSpinDestroy(&params.lock);
  tsem_destroy(&params.sem);
2602
//  ((STscObj *)taos)->schemalessType = 0;
2603
  pTscObj->schemalessType = 1;
2604
  uDebug("resultend:%s", request->msgBuf);
wmmhello's avatar
wmmhello 已提交
2605
  return (TAOS_RES*)request;
wmmhello's avatar
wmmhello 已提交
2606
}