clientSml.c 82.5 KB
Newer Older
wmmhello's avatar
wmmhello 已提交
1 2 3 4 5
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

X
Xiaoyu Wang 已提交
6 7 8 9 10
#include "cJSON.h"
#include "catalog.h"
#include "clientInt.h"
#include "osSemaphore.h"
#include "osThread.h"
wmmhello's avatar
wmmhello 已提交
11 12
#include "query.h"
#include "taos.h"
wmmhello's avatar
wmmhello 已提交
13
#include "taoserror.h"
X
Xiaoyu Wang 已提交
14
#include "tcommon.h"
wmmhello's avatar
wmmhello 已提交
15
#include "tdef.h"
X
Xiaoyu Wang 已提交
16
#include "tglobal.h"
wmmhello's avatar
wmmhello 已提交
17 18
#include "tlog.h"
#include "tmsg.h"
X
Xiaoyu Wang 已提交
19
#include "tname.h"
wmmhello's avatar
wmmhello 已提交
20 21
#include "ttime.h"
#include "ttypes.h"
wmmhello's avatar
wmmhello 已提交
22

wmmhello's avatar
wmmhello 已提交
23
//=================================================================================================
wmmhello's avatar
wmmhello 已提交
24 25 26 27 28 29 30

#define SPACE ' '
#define COMMA ','
#define EQUAL '='
#define QUOTE '"'
#define SLASH '\\'

X
Xiaoyu Wang 已提交
31 32 33 34 35 36 37
#define JUMP_SPACE(sql)  \
  while (*sql != '\0') { \
    if (*sql == SPACE)   \
      sql++;             \
    else                 \
      break;             \
  }
wmmhello's avatar
wmmhello 已提交
38
// comma ,
X
Xiaoyu Wang 已提交
39 40
#define IS_SLASH_COMMA(sql) (*(sql) == COMMA && *((sql)-1) == SLASH)
#define IS_COMMA(sql)       (*(sql) == COMMA && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
41
// space
X
Xiaoyu Wang 已提交
42 43
#define IS_SLASH_SPACE(sql) (*(sql) == SPACE && *((sql)-1) == SLASH)
#define IS_SPACE(sql)       (*(sql) == SPACE && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
44
// equal =
X
Xiaoyu Wang 已提交
45 46
#define IS_SLASH_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) == SLASH)
#define IS_EQUAL(sql)       (*(sql) == EQUAL && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
47
// quote "
X
Xiaoyu Wang 已提交
48 49
#define IS_SLASH_QUOTE(sql) (*(sql) == QUOTE && *((sql)-1) == SLASH)
#define IS_QUOTE(sql)       (*(sql) == QUOTE && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
50
// SLASH
X
Xiaoyu Wang 已提交
51
#define IS_SLASH_SLASH(sql) (*(sql) == SLASH && *((sql)-1) == SLASH)
wmmhello's avatar
wmmhello 已提交
52

X
Xiaoyu Wang 已提交
53 54
#define IS_SLASH_LETTER(sql) \
  (IS_SLASH_COMMA(sql) || IS_SLASH_SPACE(sql) || IS_SLASH_EQUAL(sql) || IS_SLASH_QUOTE(sql) || IS_SLASH_SLASH(sql))
wmmhello's avatar
wmmhello 已提交
55

X
Xiaoyu Wang 已提交
56
#define MOVE_FORWARD_ONE(sql, len) (memmove((void *)((sql)-1), (sql), len))
wmmhello's avatar
wmmhello 已提交
57

X
Xiaoyu Wang 已提交
58 59 60 61 62 63 64 65
#define PROCESS_SLASH(key, keyLen)           \
  for (int i = 1; i < keyLen; ++i) {         \
    if (IS_SLASH_LETTER(key + i)) {          \
      MOVE_FORWARD_ONE(key + i, keyLen - i); \
      i--;                                   \
      keyLen--;                              \
    }                                        \
  }
wmmhello's avatar
wmmhello 已提交
66

67 68 69
#define IS_INVALID_COL_LEN(len)   ((len) <= 0 || (len) >= TSDB_COL_NAME_LEN)
#define IS_INVALID_TABLE_LEN(len) ((len) <= 0 || (len) >= TSDB_TABLE_NAME_LEN)

70 71 72
#define OTD_JSON_SUB_FIELDS_NUM 2
#define OTD_JSON_FIELDS_NUM     4

X
Xiaoyu Wang 已提交
73 74 75 76
#define TS        "_ts"
#define TS_LEN    3
#define VALUE     "_value"
#define VALUE_LEN 6
77

X
Xiaoyu Wang 已提交
78 79
#define BINARY_ADD_LEN 2  // "binary"   2 means " "
#define NCHAR_ADD_LEN  3  // L"nchar"   3 means L" "
wmmhello's avatar
wmmhello 已提交
80 81

#define MAX_RETRY_TIMES 5
X
Xiaoyu Wang 已提交
82
#define LINE_BATCH      20000
wmmhello's avatar
wmmhello 已提交
83 84 85 86
//=================================================================================================
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;

typedef enum {
87
  SCHEMA_ACTION_NULL,
wmmhello's avatar
wmmhello 已提交
88 89 90 91 92
  SCHEMA_ACTION_CREATE_STABLE,
  SCHEMA_ACTION_ADD_COLUMN,
  SCHEMA_ACTION_ADD_TAG,
  SCHEMA_ACTION_CHANGE_COLUMN_SIZE,
  SCHEMA_ACTION_CHANGE_TAG_SIZE,
wmmhello's avatar
wmmhello 已提交
93 94 95
} ESchemaAction;

typedef struct {
X
Xiaoyu Wang 已提交
96 97 98 99
  const char *measure;
  const char *tags;
  const char *cols;
  const char *timestamp;
wmmhello's avatar
wmmhello 已提交
100 101 102 103 104 105 106 107 108

  int32_t measureLen;
  int32_t measureTagsLen;
  int32_t tagsLen;
  int32_t colsLen;
  int32_t timestampLen;
} SSmlLineInfo;

typedef struct {
X
Xiaoyu Wang 已提交
109 110 111 112
  const char *sTableName;  // super table name
  int32_t     sTableNameLen;
  char        childTableName[TSDB_TABLE_NAME_LEN];
  uint64_t    uid;
wmmhello's avatar
wmmhello 已提交
113

X
Xiaoyu Wang 已提交
114
  SArray *tags;
wmmhello's avatar
wmmhello 已提交
115

116 117
  // if info->formatData is true, elements are SArray<SSmlKv*>.
  // if info->formatData is false, elements are SHashObj<cols key string, SSmlKv*> for find by key quickly
X
Xiaoyu Wang 已提交
118
  SArray *cols;
wmmhello's avatar
wmmhello 已提交
119 120 121
} SSmlTableInfo;

typedef struct {
X
Xiaoyu Wang 已提交
122 123
  SArray   *tags;     // save the origin order to create table
  SHashObj *tagHash;  // elements are <key, index in tags>
124

X
Xiaoyu Wang 已提交
125 126
  SArray   *cols;
  SHashObj *colHash;
127

wmmhello's avatar
wmmhello 已提交
128 129 130 131
  STableMeta *tableMeta;
} SSmlSTableMeta;

typedef struct {
X
Xiaoyu Wang 已提交
132 133
  int32_t len;
  char   *buf;
wmmhello's avatar
wmmhello 已提交
134 135
} SSmlMsgBuf;

136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
typedef struct {
  int32_t code;
  int32_t lineNum;

  int32_t numOfSTables;
  int32_t numOfCTables;
  int32_t numOfCreateSTables;

  int64_t parseTime;
  int64_t schemaTime;
  int64_t insertBindTime;
  int64_t insertRpcTime;
  int64_t endTime;
} SSmlCostInfo;

X
Xiaoyu Wang 已提交
151 152 153
typedef struct {
  SRequestObj     *request;
  tsem_t           sem;
wmmhello's avatar
wmmhello 已提交
154 155 156
  TdThreadSpinlock lock;
} Params;

wmmhello's avatar
wmmhello 已提交
157
typedef struct {
X
Xiaoyu Wang 已提交
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
  int64_t id;
  Params *params;
  bool    isLast;

  SMLProtocolType protocol;
  int8_t          precision;
  bool dataFormat;  // true means that the name and order of keys in each line are the same(only for influx protocol)

  SHashObj *childTables;
  SHashObj *superTables;
  SHashObj *pVgHash;
  void     *exec;

  STscObj     *taos;
  SCatalog    *pCatalog;
  SRequestObj *pRequest;
  SQuery      *pQuery;

  SSmlCostInfo cost;
  int32_t      affectedRows;
  SSmlMsgBuf   msgBuf;
  SHashObj    *dumplicateKey;  // for dumplicate key
  SArray      *colsContainer;  // for cols parse, if dataFormat == false
wmmhello's avatar
wmmhello 已提交
181
} SSmlHandle;
wmmhello's avatar
wmmhello 已提交
182 183
//=================================================================================================

wmmhello's avatar
wmmhello 已提交
184
//=================================================================================================
185
static volatile int64_t linesSmlHandleId = 0;
X
Xiaoyu Wang 已提交
186 187
static int64_t          smlGenId() {
           int64_t id;
wmmhello's avatar
wmmhello 已提交
188

X
Xiaoyu Wang 已提交
189 190
           do {
             id = atomic_add_fetch_64(&linesSmlHandleId, 1);
wmmhello's avatar
wmmhello 已提交
191 192
  } while (id == 0);

X
Xiaoyu Wang 已提交
193
           return id;
wmmhello's avatar
wmmhello 已提交
194 195
}

196
static inline bool smlDoubleToInt64OverFlow(double num) {
X
Xiaoyu Wang 已提交
197
  if (num >= (double)INT64_MAX || num <= (double)INT64_MIN) return true;
198 199 200 201 202 203 204 205 206 207 208 209
  return false;
}

static inline bool smlCheckDuplicateKey(const char *key, int32_t keyLen, SHashObj *pHash) {
  void *val = taosHashGet(pHash, key, keyLen);
  if (val) {
    return true;
  }
  taosHashPut(pHash, key, keyLen, key, 1);
  return false;
}

X
Xiaoyu Wang 已提交
210
static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf *pBuf, const char *msg1, const char *msg2) {
211 212 213 214 215 216 217 218
  if(pBuf->buf){
    memset(pBuf->buf, 0, pBuf->len);
    if (msg1) strncat(pBuf->buf, msg1, pBuf->len);
    int32_t left = pBuf->len - strlen(pBuf->buf);
    if (left > 2 && msg2) {
      strncat(pBuf->buf, ":", left - 1);
      strncat(pBuf->buf, msg2, left - 2);
    }
wmmhello's avatar
wmmhello 已提交
219
  }
wmmhello's avatar
wmmhello 已提交
220 221 222
  return TSDB_CODE_SML_INVALID_DATA;
}

X
Xiaoyu Wang 已提交
223
static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSmlKv *kv, bool isTag,
224
                                       ESchemaAction *action, SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
225
  uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL;
226 227
  if (index) {
    if (colField[*index].type != kv->type) {
X
Xiaoyu Wang 已提交
228 229
      uError("SML:0x%" PRIx64 " point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id,
             kv->key, colField[*index].type, kv->type);
230 231 232
      return TSDB_CODE_TSC_INVALID_VALUE;
    }

X
Xiaoyu Wang 已提交
233 234 235 236
    if ((colField[*index].type == TSDB_DATA_TYPE_VARCHAR &&
         (colField[*index].bytes - VARSTR_HEADER_SIZE) < kv->length) ||
        (colField[*index].type == TSDB_DATA_TYPE_NCHAR &&
         ((colField[*index].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE < kv->length))) {
237
      if (isTag) {
wmmhello's avatar
wmmhello 已提交
238
        *action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
239
      } else {
wmmhello's avatar
wmmhello 已提交
240
        *action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
241 242 243 244
      }
    }
  } else {
    if (isTag) {
wmmhello's avatar
wmmhello 已提交
245
      *action = SCHEMA_ACTION_ADD_TAG;
246
    } else {
wmmhello's avatar
wmmhello 已提交
247
      *action = SCHEMA_ACTION_ADD_COLUMN;
248 249
    }
  }
wmmhello's avatar
wmmhello 已提交
250 251 252
  return 0;
}

wmmhello's avatar
wmmhello 已提交
253
static int32_t smlFindNearestPowerOf2(int32_t length, uint8_t type) {
wmmhello's avatar
wmmhello 已提交
254
  int32_t result = 1;
X
Xiaoyu Wang 已提交
255
  while (result <= length) {
wmmhello's avatar
wmmhello 已提交
256 257
    result *= 2;
  }
wmmhello's avatar
wmmhello 已提交
258 259 260 261 262
  if (type == TSDB_DATA_TYPE_BINARY && result > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE){
    result = TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE;
  } else if (type == TSDB_DATA_TYPE_NCHAR && result > (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){
    result = (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
  }
wmmhello's avatar
wmmhello 已提交
263

264 265 266 267
  if (type == TSDB_DATA_TYPE_NCHAR){
    result = result * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
  }else if (type == TSDB_DATA_TYPE_BINARY){
    result = result + VARSTR_HEADER_SIZE;
wmmhello's avatar
wmmhello 已提交
268
  }
269
  return result;
wmmhello's avatar
wmmhello 已提交
270 271
}

X
Xiaoyu Wang 已提交
272
static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
273
                                      ESchemaAction *action, bool isTag) {
274 275
  int32_t code = TSDB_CODE_SUCCESS;
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
276
    if(j == 0 && !isTag) continue;
X
Xiaoyu Wang 已提交
277
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, j);
278
    code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, info);
X
Xiaoyu Wang 已提交
279
    if (code != TSDB_CODE_SUCCESS) {
280 281 282 283 284 285
      return code;
    }
  }
  return TSDB_CODE_SUCCESS;
}

286
static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool isTag) {
287
  SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
288 289
  int32_t i = 0;
  for ( ;i < length; i++) {
290 291 292
    taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES);
  }

293 294 295 296 297 298
  if (isTag){
    i = 0;
  } else {
    i = 1;
  }
  for (; i < taosArrayGetSize(cols); i++) {
X
Xiaoyu Wang 已提交
299 300
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
    if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) {
301 302 303
      return -1;
    }
  }
304
  taosHashCleanup(hashTmp);
305 306 307
  return 0;
}

308 309 310 311 312 313 314 315
static int32_t getBytes(uint8_t type, int32_t length){
  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
    return smlFindNearestPowerOf2(length, type);
  } else {
    return tDataTypes[type].bytes;
  }
}

wmmhello's avatar
wmmhello 已提交
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols, SArray* results, int32_t numOfCols, bool isTag) {
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, j);
    ESchemaAction action = SCHEMA_ACTION_NULL;
    smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, &action, info);
    if(action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_ADD_TAG){
      SField field = {0};
      field.type = kv->type;
      field.bytes = getBytes(kv->type, kv->length);
      memcpy(field.name, kv->key, kv->keyLen);
      taosArrayPush(results, &field);
    }else if(action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE){
      uint16_t *index = (uint16_t *)taosHashGet(schemaHash, kv->key, kv->keyLen);
      uint16_t newIndex = *index;
      if(isTag) newIndex -= numOfCols;
      SField *field = (SField *)taosArrayGet(results, newIndex);
      field->bytes = getBytes(kv->type, kv->length);
    }
  }
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
338 339
//static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SSmlSTableMeta *sTableData,
//                              int32_t colVer, int32_t tagVer, int8_t source, uint64_t suid){
wmmhello's avatar
wmmhello 已提交
340
static int32_t  smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray* pColumns, SArray* pTags,
wmmhello's avatar
wmmhello 已提交
341 342
                               STableMeta *pTableMeta, ESchemaAction action){

343 344 345 346 347
  SRequestObj*   pRequest = NULL;
  SMCreateStbReq pReq = {0};
  int32_t        code = TSDB_CODE_SUCCESS;
  SCmdMsgInfo    pCmdMsg = {0};

wmmhello's avatar
wmmhello 已提交
348 349 350 351 352 353
  // put front for free
  pReq.numOfColumns = taosArrayGetSize(pColumns);
  pReq.pColumns = pColumns;
  pReq.numOfTags = taosArrayGetSize(pTags);
  pReq.pTags = pTags;

354 355 356 357 358
  code = buildRequest(info->taos->id, "", 0, NULL, false, &pRequest);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
359
  pRequest->syncQuery = true;
360 361 362 363 364
  if (!pRequest->pDb) {
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
365
  if (action == SCHEMA_ACTION_CREATE_STABLE){
wmmhello's avatar
wmmhello 已提交
366 367 368 369
    pReq.colVer = 1;
    pReq.tagVer = 1;
    pReq.suid = 0;
    pReq.source = TD_REQ_FROM_APP;
wmmhello's avatar
wmmhello 已提交
370
  } else if (action == SCHEMA_ACTION_ADD_TAG || action == SCHEMA_ACTION_CHANGE_TAG_SIZE){
wmmhello's avatar
wmmhello 已提交
371 372 373 374
    pReq.colVer = pTableMeta->sversion;
    pReq.tagVer = pTableMeta->tversion + 1;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
wmmhello's avatar
wmmhello 已提交
375
  } else if (action == SCHEMA_ACTION_ADD_COLUMN  || action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE){
wmmhello's avatar
wmmhello 已提交
376 377 378 379 380 381
    pReq.colVer = pTableMeta->sversion + 1;
    pReq.tagVer = pTableMeta->tversion;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
  }

wmmhello's avatar
wmmhello 已提交
382 383 384 385 386 387 388 389 390
  if (pReq.numOfTags == 0){
    pReq.numOfTags = 1;
    SField field = {0};
    field.type = TSDB_DATA_TYPE_NCHAR;
    field.bytes = 1;
    strcpy(field.name, tsSmlTagName);
    taosArrayPush(pReq.pTags, &field);
  }

391 392 393 394 395 396 397 398 399 400 401 402 403 404
  pReq.commentLen = -1;
  pReq.igExists = true;
  tNameExtractFullName(pName, pReq.name);

  pCmdMsg.epSet = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

D
dapan1121 已提交
405 406
  SQuery pQuery;
  memset(&pQuery, 0, sizeof(pQuery));
407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);

  if(pRequest->code == TSDB_CODE_SUCCESS){
    catalogRemoveTableMeta(info->pCatalog, pName);
  }
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

end:
  destroyRequest(pRequest);
  tFreeSMCreateStbReq(&pReq);
  return code;
}

X
Xiaoyu Wang 已提交
426
static int32_t smlModifyDBSchemas(SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
427 428 429 430
  int32_t         code = 0;
  SHashObj       *hashTmp = NULL;
  STableMeta     *pTableMeta = NULL;

X
Xiaoyu Wang 已提交
431
  SName   pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
432
  strcpy(pName.dbname, info->pRequest->pDb);
wmmhello's avatar
wmmhello 已提交
433

D
dapan1121 已提交
434 435 436 437 438
  SRequestConnInfo conn = {0};
  conn.pTrans = info->taos->pAppInfo->pTransporter;
  conn.requestId = info->pRequest->requestId;
  conn.requestObjRefId = info->pRequest->self;
  conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
X
Xiaoyu Wang 已提交
439 440

  SSmlSTableMeta **tableMetaSml = (SSmlSTableMeta **)taosHashIterate(info->superTables, NULL);
wmmhello's avatar
wmmhello 已提交
441
  while (tableMetaSml) {
X
Xiaoyu Wang 已提交
442 443
    SSmlSTableMeta *sTableData = *tableMetaSml;
    bool            needCheckMeta = false;  // for multi thread
wmmhello's avatar
wmmhello 已提交
444

wmmhello's avatar
wmmhello 已提交
445
    size_t superTableLen = 0;
X
Xiaoyu Wang 已提交
446
    void  *superTable = taosHashGetKey(tableMetaSml, &superTableLen);
447
    memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
wmmhello's avatar
wmmhello 已提交
448
    memcpy(pName.tname, superTable, superTableLen);
wmmhello's avatar
wmmhello 已提交
449

D
dapan1121 已提交
450
    code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
wmmhello's avatar
wmmhello 已提交
451

452
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
wmmhello's avatar
wmmhello 已提交
453 454 455 456 457 458
      SArray* pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
      SArray* pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
      smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
      smlBuildFieldsList(info, NULL, NULL, sTableData->cols, pColumns, 0, false);

      code = smlSendMetaMsg(info, &pName, pColumns, pTags, NULL, SCHEMA_ACTION_CREATE_STABLE);
459
      if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
460
        uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
461
        goto end;
wmmhello's avatar
wmmhello 已提交
462
      }
463
      info->cost.numOfCreateSTables++;
X
Xiaoyu Wang 已提交
464
    } else if (code == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
465
      hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags,
X
Xiaoyu Wang 已提交
466 467 468
                                       taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
      for (uint16_t i = pTableMeta->tableInfo.numOfColumns;
           i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
469 470
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
      }
wmmhello's avatar
wmmhello 已提交
471

472 473
      ESchemaAction action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, &action, true);
474
      if (code != TSDB_CODE_SUCCESS) {
475
        goto end;
476
      }
wmmhello's avatar
wmmhello 已提交
477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494
      if (action != SCHEMA_ACTION_NULL){
        SArray* pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray* pTags = taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));

        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
          if(i < pTableMeta->tableInfo.numOfColumns){
            taosArrayPush(pColumns, &field);
          }else{
            taosArrayPush(pTags, &field);
          }
        }
        smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->tags, pTags, pTableMeta->tableInfo.numOfColumns, true);

        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
495
        if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
496
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
497 498 499 500
          goto end;
        }
      }

wmmhello's avatar
wmmhello 已提交
501
      taosMemoryFreeClear(pTableMeta);
502 503 504 505
      code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
      if (code != TSDB_CODE_SUCCESS) {
        goto end;
      }
wmmhello's avatar
wmmhello 已提交
506 507 508 509
      code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
      if (code != TSDB_CODE_SUCCESS) {
        goto end;
      }
510 511

      taosHashClear(hashTmp);
wmmhello's avatar
wmmhello 已提交
512
      for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
513 514
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
      }
515 516
      action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &action, false);
517
      if (code != TSDB_CODE_SUCCESS) {
518
        goto end;
wmmhello's avatar
wmmhello 已提交
519
      }
wmmhello's avatar
wmmhello 已提交
520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538
      if (action != SCHEMA_ACTION_NULL){
        SArray* pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray* pTags = taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));

        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
          if(i < pTableMeta->tableInfo.numOfColumns){
            taosArrayPush(pColumns, &field);
          }else{
            taosArrayPush(pTags, &field);
          }
        }

        smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->cols, pColumns, pTableMeta->tableInfo.numOfColumns, false);

        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
539
        if (code != TSDB_CODE_SUCCESS) {
540
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
541 542 543
          goto end;
        }
      }
wmmhello's avatar
wmmhello 已提交
544

D
dapan1121 已提交
545
      code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
wmmhello's avatar
wmmhello 已提交
546
      if (code != TSDB_CODE_SUCCESS) {
547
        goto end;
wmmhello's avatar
wmmhello 已提交
548
      }
549
      needCheckMeta = true;
wmmhello's avatar
wmmhello 已提交
550 551
      taosHashCleanup(hashTmp);
      hashTmp = NULL;
wmmhello's avatar
wmmhello 已提交
552
    } else {
X
Xiaoyu Wang 已提交
553
      uError("SML:0x%" PRIx64 " load table meta error: %s", info->id, tstrerror(code));
554
      goto end;
wmmhello's avatar
wmmhello 已提交
555
    }
wmmhello's avatar
wmmhello 已提交
556
    taosMemoryFreeClear(pTableMeta);
557

D
dapan1121 已提交
558
    code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
559
    if (code != TSDB_CODE_SUCCESS) {
560
      uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
561
      goto end;
562
    }
563

X
Xiaoyu Wang 已提交
564 565
    if (needCheckMeta) {
      code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags,
566
                          sTableData->tags, true);
567
      if (code != TSDB_CODE_SUCCESS) {
568
        uError("SML:0x%" PRIx64 " check tag failed. super table name %s", info->id, pName.tname);
569 570
        goto end;
      }
571
      code = smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols, false);
572
      if (code != TSDB_CODE_SUCCESS) {
573
        uError("SML:0x%" PRIx64 " check cols failed. super table name %s", info->id, pName.tname);
574 575 576 577
        goto end;
      }
    }

578
    sTableData->tableMeta = pTableMeta;
wmmhello's avatar
wmmhello 已提交
579

X
Xiaoyu Wang 已提交
580
    tableMetaSml = (SSmlSTableMeta **)taosHashIterate(info->superTables, tableMetaSml);
wmmhello's avatar
wmmhello 已提交
581 582
  }
  return 0;
583 584

end:
wmmhello's avatar
wmmhello 已提交
585 586
  taosHashCleanup(hashTmp);
  taosMemoryFreeClear(pTableMeta);
D
dapan1121 已提交
587
  catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
588
  return code;
wmmhello's avatar
wmmhello 已提交
589 590
}

X
Xiaoyu Wang 已提交
591
static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
592
  const char *pVal = kvVal->value;
X
Xiaoyu Wang 已提交
593 594 595 596
  int32_t     len = kvVal->length;
  char       *endptr = NULL;
  double      result = taosStr2Double(pVal, &endptr);
  if (pVal == endptr) {
597
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
598 599 600
    return false;
  }

601
  int32_t left = len - (endptr - pVal);
X
Xiaoyu Wang 已提交
602
  if (left == 0 || (left == 3 && strncasecmp(endptr, "f64", left) == 0)) {
603 604
    kvVal->type = TSDB_DATA_TYPE_DOUBLE;
    kvVal->d = result;
X
Xiaoyu Wang 已提交
605 606
  } else if ((left == 3 && strncasecmp(endptr, "f32", left) == 0)) {
    if (!IS_VALID_FLOAT(result)) {
607 608
      smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
609
    }
610 611
    kvVal->type = TSDB_DATA_TYPE_FLOAT;
    kvVal->f = (float)result;
X
Xiaoyu Wang 已提交
612 613
  } else if ((left == 1 && *endptr == 'i') || (left == 3 && strncasecmp(endptr, "i64", left) == 0)) {
    if (smlDoubleToInt64OverFlow(result)) {
wmmhello's avatar
wmmhello 已提交
614 615
      errno = 0;
      int64_t tmp = taosStr2Int64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
616
      if (errno == ERANGE) {
wmmhello's avatar
wmmhello 已提交
617 618 619 620 621 622
        smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_BIGINT;
      kvVal->i = tmp;
      return true;
wmmhello's avatar
wmmhello 已提交
623
    }
624
    kvVal->type = TSDB_DATA_TYPE_BIGINT;
wmmhello's avatar
wmmhello 已提交
625
    kvVal->i = (int64_t)result;
wmmhello's avatar
wmmhello 已提交
626
  } else if ((left == 1 && *endptr == 'u') || (left == 3 && strncasecmp(endptr, "u64", left) == 0)) {
X
Xiaoyu Wang 已提交
627
    if (result >= (double)UINT64_MAX || result < 0) {
wmmhello's avatar
wmmhello 已提交
628 629
      errno = 0;
      uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
630
      if (errno == ERANGE || result < 0) {
wmmhello's avatar
wmmhello 已提交
631 632 633 634 635 636
        smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_UBIGINT;
      kvVal->u = tmp;
      return true;
637
    }
638
    kvVal->type = TSDB_DATA_TYPE_UBIGINT;
wmmhello's avatar
wmmhello 已提交
639
    kvVal->u = result;
X
Xiaoyu Wang 已提交
640 641
  } else if (left == 3 && strncasecmp(endptr, "i32", left) == 0) {
    if (!IS_VALID_INT(result)) {
642 643
      smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
644
    }
645 646
    kvVal->type = TSDB_DATA_TYPE_INT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
647 648
  } else if (left == 3 && strncasecmp(endptr, "u32", left) == 0) {
    if (!IS_VALID_UINT(result)) {
649 650
      smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
651
    }
652 653
    kvVal->type = TSDB_DATA_TYPE_UINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
654 655
  } else if (left == 3 && strncasecmp(endptr, "i16", left) == 0) {
    if (!IS_VALID_SMALLINT(result)) {
656 657
      smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
658
    }
659 660
    kvVal->type = TSDB_DATA_TYPE_SMALLINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
661 662
  } else if (left == 3 && strncasecmp(endptr, "u16", left) == 0) {
    if (!IS_VALID_USMALLINT(result)) {
663 664
      smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
665
    }
666 667
    kvVal->type = TSDB_DATA_TYPE_USMALLINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
668 669
  } else if (left == 2 && strncasecmp(endptr, "i8", left) == 0) {
    if (!IS_VALID_TINYINT(result)) {
670 671
      smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
672
    }
673 674
    kvVal->type = TSDB_DATA_TYPE_TINYINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
675 676
  } else if (left == 2 && strncasecmp(endptr, "u8", left) == 0) {
    if (!IS_VALID_UTINYINT(result)) {
677 678
      smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
679
    }
680 681
    kvVal->type = TSDB_DATA_TYPE_UTINYINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
682
  } else {
683
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
684 685
    return false;
  }
686
  return true;
wmmhello's avatar
wmmhello 已提交
687 688
}

wmmhello's avatar
wmmhello 已提交
689
static bool smlParseBool(SSmlKv *kvVal) {
wmmhello's avatar
wmmhello 已提交
690
  const char *pVal = kvVal->value;
X
Xiaoyu Wang 已提交
691
  int32_t     len = kvVal->length;
692
  if ((len == 1) && (pVal[0] == 't' || pVal[0] == 'T')) {
wmmhello's avatar
wmmhello 已提交
693
    kvVal->i = true;
wmmhello's avatar
wmmhello 已提交
694 695 696
    return true;
  }

697
  if ((len == 1) && (pVal[0] == 'f' || pVal[0] == 'F')) {
wmmhello's avatar
wmmhello 已提交
698
    kvVal->i = false;
wmmhello's avatar
wmmhello 已提交
699 700 701
    return true;
  }

X
Xiaoyu Wang 已提交
702
  if ((len == 4) && !strncasecmp(pVal, "true", len)) {
wmmhello's avatar
wmmhello 已提交
703
    kvVal->i = true;
wmmhello's avatar
wmmhello 已提交
704 705
    return true;
  }
X
Xiaoyu Wang 已提交
706
  if ((len == 5) && !strncasecmp(pVal, "false", len)) {
wmmhello's avatar
wmmhello 已提交
707
    kvVal->i = false;
wmmhello's avatar
wmmhello 已提交
708 709 710 711 712
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
713
static bool smlIsBinary(const char *pVal, uint16_t len) {
X
Xiaoyu Wang 已提交
714
  // binary: "abc"
wmmhello's avatar
wmmhello 已提交
715 716 717 718 719 720 721 722 723
  if (len < 2) {
    return false;
  }
  if (pVal[0] == '"' && pVal[len - 1] == '"') {
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
724
static bool smlIsNchar(const char *pVal, uint16_t len) {
X
Xiaoyu Wang 已提交
725
  // nchar: L"abc"
wmmhello's avatar
wmmhello 已提交
726 727 728
  if (len < 3) {
    return false;
  }
X
Xiaoyu Wang 已提交
729
  if ((pVal[0] == 'l' || pVal[0] == 'L') && pVal[1] == '"' && pVal[len - 1] == '"') {
wmmhello's avatar
wmmhello 已提交
730 731 732 733 734
    return true;
  }
  return false;
}

735
static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) {
X
Xiaoyu Wang 已提交
736
  char   *endPtr = NULL;
wafwerar's avatar
wafwerar 已提交
737
  int64_t tsInt64 = taosStr2Int64(value, &endPtr, 10);
X
Xiaoyu Wang 已提交
738
  if (value + len != endPtr) {
739
    return -1;
wmmhello's avatar
wmmhello 已提交
740
  }
741
  double ts = tsInt64;
742 743
  switch (type) {
    case TSDB_TIME_PRECISION_HOURS:
wmmhello's avatar
wmmhello 已提交
744 745
      ts *= NANOSECOND_PER_HOUR;
      tsInt64 *= NANOSECOND_PER_HOUR;
746 747
      break;
    case TSDB_TIME_PRECISION_MINUTES:
wmmhello's avatar
wmmhello 已提交
748 749
      ts *= NANOSECOND_PER_MINUTE;
      tsInt64 *= NANOSECOND_PER_MINUTE;
750 751
      break;
    case TSDB_TIME_PRECISION_SECONDS:
wmmhello's avatar
wmmhello 已提交
752 753
      ts *= NANOSECOND_PER_SEC;
      tsInt64 *= NANOSECOND_PER_SEC;
754 755
      break;
    case TSDB_TIME_PRECISION_MILLI:
wmmhello's avatar
wmmhello 已提交
756 757
      ts *= NANOSECOND_PER_MSEC;
      tsInt64 *= NANOSECOND_PER_MSEC;
758 759
      break;
    case TSDB_TIME_PRECISION_MICRO:
wmmhello's avatar
wmmhello 已提交
760 761
      ts *= NANOSECOND_PER_USEC;
      tsInt64 *= NANOSECOND_PER_USEC;
762 763 764 765 766
      break;
    case TSDB_TIME_PRECISION_NANO:
      break;
    default:
      ASSERT(0);
wmmhello's avatar
wmmhello 已提交
767
  }
X
Xiaoyu Wang 已提交
768
  if (ts >= (double)INT64_MAX || ts < 0) {
769
    return -1;
wmmhello's avatar
wmmhello 已提交
770 771
  }

772
  return tsInt64;
773 774 775 776 777 778
}

static int8_t smlGetTsTypeByLen(int32_t len) {
  if (len == TSDB_TIME_PRECISION_SEC_DIGITS) {
    return TSDB_TIME_PRECISION_SECONDS;
  } else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) {
779
    return TSDB_TIME_PRECISION_MILLI;
780 781
  } else {
    return -1;
wmmhello's avatar
wmmhello 已提交
782
  }
783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801
}

static int8_t smlGetTsTypeByPrecision(int8_t precision) {
  switch (precision) {
    case TSDB_SML_TIMESTAMP_HOURS:
      return TSDB_TIME_PRECISION_HOURS;
    case TSDB_SML_TIMESTAMP_MILLI_SECONDS:
      return TSDB_TIME_PRECISION_MILLI;
    case TSDB_SML_TIMESTAMP_NANO_SECONDS:
    case TSDB_SML_TIMESTAMP_NOT_CONFIGURED:
      return TSDB_TIME_PRECISION_NANO;
    case TSDB_SML_TIMESTAMP_MICRO_SECONDS:
      return TSDB_TIME_PRECISION_MICRO;
    case TSDB_SML_TIMESTAMP_SECONDS:
      return TSDB_TIME_PRECISION_SECONDS;
    case TSDB_SML_TIMESTAMP_MINUTES:
      return TSDB_TIME_PRECISION_MINUTES;
    default:
      return -1;
wmmhello's avatar
wmmhello 已提交
802
  }
803 804
}

X
Xiaoyu Wang 已提交
805 806
static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t len) {
  if (len == 0 || (len == 1 && data[0] == '0')) {
807
    return taosGetTimestampNs();
wmmhello's avatar
wmmhello 已提交
808 809
  }

810 811 812 813
  int8_t tsType = smlGetTsTypeByPrecision(info->precision);
  if (tsType == -1) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp precision", NULL);
    return -1;
wmmhello's avatar
wmmhello 已提交
814
  }
815 816

  int64_t ts = smlGetTimeValue(data, len, tsType);
X
Xiaoyu Wang 已提交
817
  if (ts == -1) {
818 819
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
    return -1;
wmmhello's avatar
wmmhello 已提交
820
  }
821 822 823
  return ts;
}

X
Xiaoyu Wang 已提交
824 825
static int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t len) {
  if (!data) {
826 827
    smlBuildInvalidDataMsg(&info->msgBuf, "timestamp can not be null", NULL);
    return -1;
wmmhello's avatar
wmmhello 已提交
828
  }
X
Xiaoyu Wang 已提交
829
  if (len == 1 && data[0] == '0') {
830 831
    return taosGetTimestampNs();
  }
832 833
  int8_t tsType = smlGetTsTypeByLen(len);
  if (tsType == -1) {
X
Xiaoyu Wang 已提交
834 835
    smlBuildInvalidDataMsg(&info->msgBuf,
                           "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", data);
836 837 838
    return -1;
  }
  int64_t ts = smlGetTimeValue(data, len, tsType);
X
Xiaoyu Wang 已提交
839
  if (ts == -1) {
840 841 842 843 844 845
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
    return -1;
  }
  return ts;
}

X
Xiaoyu Wang 已提交
846
static int32_t smlParseTS(SSmlHandle *info, const char *data, int32_t len, SArray *cols) {
847
  int64_t ts = 0;
X
Xiaoyu Wang 已提交
848
  if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
849
//    uError("SML:data:%s,len:%d", data, len);
850
    ts = smlParseInfluxTime(info, data, len);
X
Xiaoyu Wang 已提交
851
  } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
852
    ts = smlParseOpenTsdbTime(info, data, len);
X
Xiaoyu Wang 已提交
853
  } else {
854
    ASSERT(0);
855
  }
856

wmmhello's avatar
wmmhello 已提交
857
  if (ts == -1) return TSDB_CODE_INVALID_TIMESTAMP;
858 859 860

  // add ts to
  SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
861
  if (!kv) {
862 863 864 865 866 867 868 869
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  kv->key = TS;
  kv->keyLen = TS_LEN;
  kv->i = ts;
  kv->type = TSDB_DATA_TYPE_TIMESTAMP;
  kv->length = (int16_t)tDataTypes[kv->type].bytes;
X
Xiaoyu Wang 已提交
870
  if (cols) taosArrayPush(cols, &kv);
871 872 873
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
874
static int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
X
Xiaoyu Wang 已提交
875
  // binary
wmmhello's avatar
wmmhello 已提交
876
  if (smlIsBinary(pVal->value, pVal->length)) {
877
    pVal->type = TSDB_DATA_TYPE_BINARY;
wmmhello's avatar
wmmhello 已提交
878
    pVal->length -= BINARY_ADD_LEN;
wmmhello's avatar
wmmhello 已提交
879 880 881
    if (pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE){
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }
882
    pVal->value += (BINARY_ADD_LEN - 1);
wmmhello's avatar
wmmhello 已提交
883
    return TSDB_CODE_SUCCESS;
884
  }
X
Xiaoyu Wang 已提交
885
  // nchar
wmmhello's avatar
wmmhello 已提交
886
  if (smlIsNchar(pVal->value, pVal->length)) {
887
    pVal->type = TSDB_DATA_TYPE_NCHAR;
wmmhello's avatar
wmmhello 已提交
888
    pVal->length -= NCHAR_ADD_LEN;
wmmhello's avatar
wmmhello 已提交
889 890 891
    if(pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }
892
    pVal->value += (NCHAR_ADD_LEN - 1);
wmmhello's avatar
wmmhello 已提交
893
    return TSDB_CODE_SUCCESS;
894 895
  }

X
Xiaoyu Wang 已提交
896
  // bool
897 898 899
  if (smlParseBool(pVal)) {
    pVal->type = TSDB_DATA_TYPE_BOOL;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
wmmhello's avatar
wmmhello 已提交
900
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
901
  }
X
Xiaoyu Wang 已提交
902
  // number
903
  if (smlParseNumber(pVal, msg)) {
wmmhello's avatar
wmmhello 已提交
904
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
wmmhello's avatar
wmmhello 已提交
905
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
906 907
  }

wmmhello's avatar
wmmhello 已提交
908
  return TSDB_CODE_TSC_INVALID_VALUE;
wmmhello's avatar
wmmhello 已提交
909 910
}

X
Xiaoyu Wang 已提交
911 912
static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSmlMsgBuf *msg) {
  if (!sql) return TSDB_CODE_SML_INVALID_DATA;
wmmhello's avatar
wmmhello 已提交
913
  JUMP_SPACE(sql)
X
Xiaoyu Wang 已提交
914
  if (*sql == COMMA) return TSDB_CODE_SML_INVALID_DATA;
wmmhello's avatar
wmmhello 已提交
915
  elements->measure = sql;
wmmhello's avatar
wmmhello 已提交
916

wmmhello's avatar
wmmhello 已提交
917
  // parse measure
wmmhello's avatar
wmmhello 已提交
918
  while (*sql != '\0') {
X
Xiaoyu Wang 已提交
919 920
    if ((sql != elements->measure) && IS_SLASH_LETTER(sql)) {
      MOVE_FORWARD_ONE(sql, strlen(sql) + 1);
wmmhello's avatar
wmmhello 已提交
921 922
      continue;
    }
X
Xiaoyu Wang 已提交
923
    if (IS_COMMA(sql)) {
wmmhello's avatar
wmmhello 已提交
924 925 926
      break;
    }

X
Xiaoyu Wang 已提交
927
    if (IS_SPACE(sql)) {
wmmhello's avatar
wmmhello 已提交
928 929
      break;
    }
wmmhello's avatar
wmmhello 已提交
930 931
    sql++;
  }
wmmhello's avatar
wmmhello 已提交
932
  elements->measureLen = sql - elements->measure;
X
Xiaoyu Wang 已提交
933
  if (IS_INVALID_TABLE_LEN(elements->measureLen)) {
934
    smlBuildInvalidDataMsg(msg, "measure is empty or too large than 192", NULL);
wmmhello's avatar
wmmhello 已提交
935
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
wmmhello's avatar
wmmhello 已提交
936
  }
wmmhello's avatar
wmmhello 已提交
937

wmmhello's avatar
wmmhello 已提交
938
  // parse tag
X
Xiaoyu Wang 已提交
939
  if (*sql == SPACE) {
wmmhello's avatar
wmmhello 已提交
940
    elements->tagsLen = 0;
X
Xiaoyu Wang 已提交
941 942
  } else {
    if (*sql == COMMA) sql++;
wmmhello's avatar
wmmhello 已提交
943 944
    elements->tags = sql;
    while (*sql != '\0') {
X
Xiaoyu Wang 已提交
945
      if (IS_SPACE(sql)) {
wmmhello's avatar
wmmhello 已提交
946 947 948
        break;
      }
      sql++;
wmmhello's avatar
wmmhello 已提交
949
    }
wmmhello's avatar
wmmhello 已提交
950
    elements->tagsLen = sql - elements->tags;
951
  }
wmmhello's avatar
wmmhello 已提交
952
  elements->measureTagsLen = sql - elements->measure;
wmmhello's avatar
wmmhello 已提交
953

wmmhello's avatar
wmmhello 已提交
954 955 956
  // parse cols
  JUMP_SPACE(sql)
  elements->cols = sql;
wmmhello's avatar
wmmhello 已提交
957
  bool isInQuote = false;
wmmhello's avatar
wmmhello 已提交
958
  while (*sql != '\0') {
X
Xiaoyu Wang 已提交
959
    if (IS_QUOTE(sql)) {
wmmhello's avatar
wmmhello 已提交
960 961
      isInQuote = !isInQuote;
    }
X
Xiaoyu Wang 已提交
962
    if (!isInQuote && IS_SPACE(sql)) {
wmmhello's avatar
wmmhello 已提交
963 964 965 966
      break;
    }
    sql++;
  }
X
Xiaoyu Wang 已提交
967
  if (isInQuote) {
968 969 970
    smlBuildInvalidDataMsg(msg, "only one quote", elements->cols);
    return TSDB_CODE_SML_INVALID_DATA;
  }
wmmhello's avatar
wmmhello 已提交
971
  elements->colsLen = sql - elements->cols;
X
Xiaoyu Wang 已提交
972
  if (elements->colsLen == 0) {
wmmhello's avatar
wmmhello 已提交
973 974 975
    smlBuildInvalidDataMsg(msg, "cols is empty", NULL);
    return TSDB_CODE_SML_INVALID_DATA;
  }
wmmhello's avatar
wmmhello 已提交
976

wmmhello's avatar
wmmhello 已提交
977 978 979
  // parse timestamp
  JUMP_SPACE(sql)
  elements->timestamp = sql;
wmmhello's avatar
wmmhello 已提交
980
  while (*sql != '\0') {
X
Xiaoyu Wang 已提交
981
    if (*sql == SPACE) {
wmmhello's avatar
wmmhello 已提交
982 983 984 985
      break;
    }
    sql++;
  }
wmmhello's avatar
wmmhello 已提交
986
  elements->timestampLen = sql - elements->timestamp;
wmmhello's avatar
wmmhello 已提交
987 988 989 990

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
991
static void smlParseTelnetElement(const char **sql, const char **data, int32_t *len) {
992
  while (**sql != '\0') {
X
Xiaoyu Wang 已提交
993
    if (**sql != SPACE && !(*data)) {
994
      *data = *sql;
X
Xiaoyu Wang 已提交
995
    } else if (**sql == SPACE && *data) {
996 997 998 999 1000 1001 1002
      *len = *sql - *data;
      break;
    }
    (*sql)++;
  }
}

X
Xiaoyu Wang 已提交
1003 1004
static int32_t smlParseTelnetTags(const char *data, SArray *cols, char *childTableName, SHashObj *dumplicateKey,
                                  SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
1005
  const char *sql = data;
X
Xiaoyu Wang 已提交
1006 1007
  size_t      childTableNameLen = strlen(tsSmlChildTableName);
  while (*sql != '\0') {
wmmhello's avatar
wmmhello 已提交
1008
    JUMP_SPACE(sql)
X
Xiaoyu Wang 已提交
1009
    if (*sql == '\0') break;
wmmhello's avatar
wmmhello 已提交
1010

wmmhello's avatar
wmmhello 已提交
1011
    const char *key = sql;
X
Xiaoyu Wang 已提交
1012
    int32_t     keyLen = 0;
wmmhello's avatar
wmmhello 已提交
1013 1014

    // parse key
X
Xiaoyu Wang 已提交
1015 1016
    while (*sql != '\0') {
      if (*sql == SPACE) {
wmmhello's avatar
wmmhello 已提交
1017 1018 1019
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
X
Xiaoyu Wang 已提交
1020
      if (*sql == EQUAL) {
wmmhello's avatar
wmmhello 已提交
1021 1022
        keyLen = sql - key;
        sql++;
1023 1024
        break;
      }
wmmhello's avatar
wmmhello 已提交
1025
      sql++;
1026
    }
wmmhello's avatar
wmmhello 已提交
1027

X
Xiaoyu Wang 已提交
1028
    if (IS_INVALID_COL_LEN(keyLen)) {
1029
      smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
wmmhello's avatar
wmmhello 已提交
1030
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
1031
    }
X
Xiaoyu Wang 已提交
1032
    if (smlCheckDuplicateKey(key, keyLen, dumplicateKey)) {
1033
      smlBuildInvalidDataMsg(msg, "dumplicate key", key);
wmmhello's avatar
wmmhello 已提交
1034
      return TSDB_CODE_TSC_DUP_NAMES;
1035 1036 1037
    }

    // parse value
wmmhello's avatar
wmmhello 已提交
1038
    const char *value = sql;
X
Xiaoyu Wang 已提交
1039 1040
    int32_t     valueLen = 0;
    while (*sql != '\0') {
wmmhello's avatar
wmmhello 已提交
1041 1042
      // parse value
      if (*sql == SPACE) {
1043 1044
        break;
      }
wmmhello's avatar
wmmhello 已提交
1045 1046 1047 1048 1049
      if (*sql == EQUAL) {
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      sql++;
1050
    }
wmmhello's avatar
wmmhello 已提交
1051
    valueLen = sql - value;
wmmhello's avatar
wmmhello 已提交
1052

X
Xiaoyu Wang 已提交
1053
    if (valueLen == 0) {
1054
      smlBuildInvalidDataMsg(msg, "invalid value", value);
wmmhello's avatar
wmmhello 已提交
1055
      return TSDB_CODE_TSC_INVALID_VALUE;
1056
    }
wmmhello's avatar
wmmhello 已提交
1057

X
Xiaoyu Wang 已提交
1058 1059
    // handle child table name
    if (childTableNameLen != 0 && strncmp(key, tsSmlChildTableName, keyLen) == 0) {
1060 1061 1062 1063 1064
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, value, (valueLen < TSDB_TABLE_NAME_LEN ? valueLen : TSDB_TABLE_NAME_LEN));
      continue;
    }

wmmhello's avatar
wmmhello 已提交
1065 1066 1067 1068
    if(valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }

1069 1070
    // add kv to SSmlKv
    SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1071
    if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
1072 1073 1074
    kv->key = key;
    kv->keyLen = keyLen;
    kv->value = value;
wmmhello's avatar
wmmhello 已提交
1075
    kv->length = valueLen;
wmmhello's avatar
wmmhello 已提交
1076
    kv->type = TSDB_DATA_TYPE_NCHAR;
1077

X
Xiaoyu Wang 已提交
1078
    if (cols) taosArrayPush(cols, &kv);
1079 1080 1081 1082
  }

  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
1083

1084
// format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
X
Xiaoyu Wang 已提交
1085 1086
static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, SSmlTableInfo *tinfo, SArray *cols) {
  if (!sql) return TSDB_CODE_SML_INVALID_DATA;
1087 1088 1089

  // parse metric
  smlParseTelnetElement(&sql, &tinfo->sTableName, &tinfo->sTableNameLen);
1090
  if (!(tinfo->sTableName) || IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
1091
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
wmmhello's avatar
wmmhello 已提交
1092
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
1093 1094 1095 1096
  }

  // parse timestamp
  const char *timestamp = NULL;
X
Xiaoyu Wang 已提交
1097
  int32_t     tLen = 0;
1098 1099 1100 1101 1102 1103 1104 1105 1106
  smlParseTelnetElement(&sql, &timestamp, &tLen);
  if (!timestamp || tLen == 0) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
    return TSDB_CODE_SML_INVALID_DATA;
  }

  int32_t ret = smlParseTS(info, timestamp, tLen, cols);
  if (ret != TSDB_CODE_SUCCESS) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
wmmhello's avatar
wmmhello 已提交
1107
    return ret;
1108 1109 1110 1111
  }

  // parse value
  const char *value = NULL;
X
Xiaoyu Wang 已提交
1112
  int32_t     valueLen = 0;
1113 1114 1115
  smlParseTelnetElement(&sql, &value, &valueLen);
  if (!value || valueLen == 0) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql);
wmmhello's avatar
wmmhello 已提交
1116
    return TSDB_CODE_TSC_INVALID_VALUE;
1117 1118 1119
  }

  SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1120
  if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
1121 1122 1123 1124
  taosArrayPush(cols, &kv);
  kv->key = VALUE;
  kv->keyLen = VALUE_LEN;
  kv->value = value;
wmmhello's avatar
wmmhello 已提交
1125
  kv->length = valueLen;
wmmhello's avatar
wmmhello 已提交
1126 1127
  if ((ret = smlParseValue(kv, &info->msgBuf)) != TSDB_CODE_SUCCESS) {
    return ret;
1128 1129 1130
  }

  // parse tags
1131
  ret = smlParseTelnetTags(sql, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
1132 1133
  if (ret != TSDB_CODE_SUCCESS) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
wmmhello's avatar
wmmhello 已提交
1134
    return ret;
1135 1136 1137 1138 1139
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1140 1141 1142
static int32_t smlParseCols(const char *data, int32_t len, SArray *cols, char *childTableName, bool isTag,
                            SHashObj *dumplicateKey, SSmlMsgBuf *msg) {
  if (len == 0) {
wmmhello's avatar
wmmhello 已提交
1143
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1144 1145
  }

X
Xiaoyu Wang 已提交
1146
  size_t      childTableNameLen = strlen(tsSmlChildTableName);
wmmhello's avatar
wmmhello 已提交
1147
  const char *sql = data;
X
Xiaoyu Wang 已提交
1148
  while (sql < data + len) {
wmmhello's avatar
wmmhello 已提交
1149
    const char *key = sql;
X
Xiaoyu Wang 已提交
1150
    int32_t     keyLen = 0;
wmmhello's avatar
wmmhello 已提交
1151

X
Xiaoyu Wang 已提交
1152
    while (sql < data + len) {
wmmhello's avatar
wmmhello 已提交
1153
      // parse key
X
Xiaoyu Wang 已提交
1154
      if (IS_COMMA(sql)) {
wmmhello's avatar
wmmhello 已提交
1155 1156 1157
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
X
Xiaoyu Wang 已提交
1158
      if (IS_EQUAL(sql)) {
wmmhello's avatar
wmmhello 已提交
1159 1160
        keyLen = sql - key;
        sql++;
wmmhello's avatar
wmmhello 已提交
1161 1162
        break;
      }
wmmhello's avatar
wmmhello 已提交
1163
      sql++;
wmmhello's avatar
wmmhello 已提交
1164
    }
wmmhello's avatar
wmmhello 已提交
1165

X
Xiaoyu Wang 已提交
1166
    if (IS_INVALID_COL_LEN(keyLen)) {
wmmhello's avatar
wmmhello 已提交
1167
      smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
wmmhello's avatar
wmmhello 已提交
1168
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
wmmhello's avatar
wmmhello 已提交
1169
    }
X
Xiaoyu Wang 已提交
1170
    if (smlCheckDuplicateKey(key, keyLen, dumplicateKey)) {
1171
      smlBuildInvalidDataMsg(msg, "dumplicate key", key);
wmmhello's avatar
wmmhello 已提交
1172
      return TSDB_CODE_TSC_DUP_NAMES;
1173 1174
    }

wmmhello's avatar
wmmhello 已提交
1175
    // parse value
wmmhello's avatar
wmmhello 已提交
1176
    const char *value = sql;
X
Xiaoyu Wang 已提交
1177 1178 1179
    int32_t     valueLen = 0;
    bool        isInQuote = false;
    while (sql < data + len) {
wmmhello's avatar
wmmhello 已提交
1180
      // parse value
X
Xiaoyu Wang 已提交
1181
      if (!isTag && IS_QUOTE(sql)) {
wmmhello's avatar
wmmhello 已提交
1182
        isInQuote = !isInQuote;
wmmhello's avatar
wmmhello 已提交
1183 1184
        sql++;
        continue;
wmmhello's avatar
wmmhello 已提交
1185
      }
wmmhello's avatar
wmmhello 已提交
1186
      if (!isInQuote && IS_COMMA(sql)) {
wmmhello's avatar
wmmhello 已提交
1187 1188
        break;
      }
wmmhello's avatar
wmmhello 已提交
1189 1190 1191 1192 1193
      if (!isInQuote && IS_EQUAL(sql)) {
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      sql++;
wmmhello's avatar
wmmhello 已提交
1194
    }
wmmhello's avatar
wmmhello 已提交
1195 1196 1197
    valueLen = sql - value;
    sql++;

X
Xiaoyu Wang 已提交
1198
    if (isInQuote) {
wmmhello's avatar
wmmhello 已提交
1199 1200 1201
      smlBuildInvalidDataMsg(msg, "only one quote", value);
      return TSDB_CODE_SML_INVALID_DATA;
    }
X
Xiaoyu Wang 已提交
1202
    if (valueLen == 0) {
wmmhello's avatar
wmmhello 已提交
1203
      smlBuildInvalidDataMsg(msg, "invalid value", value);
wmmhello's avatar
wmmhello 已提交
1204 1205
      return TSDB_CODE_SML_INVALID_DATA;
    }
wmmhello's avatar
wmmhello 已提交
1206 1207
    PROCESS_SLASH(key, keyLen)
    PROCESS_SLASH(value, valueLen)
wmmhello's avatar
wmmhello 已提交
1208

X
Xiaoyu Wang 已提交
1209 1210
    // handle child table name
    if (childTableName && childTableNameLen != 0 && strncmp(key, tsSmlChildTableName, keyLen) == 0) {
1211 1212 1213 1214 1215
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, value, (valueLen < TSDB_TABLE_NAME_LEN ? valueLen : TSDB_TABLE_NAME_LEN));
      continue;
    }

wmmhello's avatar
wmmhello 已提交
1216
    // add kv to SSmlKv
wmmhello's avatar
wmmhello 已提交
1217
    SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1218 1219
    if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
    if (cols) taosArrayPush(cols, &kv);
1220

wmmhello's avatar
wmmhello 已提交
1221 1222 1223
    kv->key = key;
    kv->keyLen = keyLen;
    kv->value = value;
wmmhello's avatar
wmmhello 已提交
1224
    kv->length = valueLen;
X
Xiaoyu Wang 已提交
1225
    if (isTag) {
1226 1227 1228
      if(valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){
        return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
      }
wmmhello's avatar
wmmhello 已提交
1229
      kv->type = TSDB_DATA_TYPE_NCHAR;
X
Xiaoyu Wang 已提交
1230
    } else {
wmmhello's avatar
wmmhello 已提交
1231 1232 1233
      int32_t ret = smlParseValue(kv, msg);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
wmmhello's avatar
wmmhello 已提交
1234
      }
wmmhello's avatar
wmmhello 已提交
1235 1236
    }
  }
wmmhello's avatar
wmmhello 已提交
1237

wmmhello's avatar
wmmhello 已提交
1238 1239 1240
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
1241 1242
static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, SSmlMsgBuf *msg) {
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
wmmhello's avatar
wmmhello 已提交
1243
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
wmmhello's avatar
wmmhello 已提交
1244

wmmhello's avatar
wmmhello 已提交
1245
    int16_t *index = (int16_t *)taosHashGet(metaHash, kv->key, kv->keyLen);
X
Xiaoyu Wang 已提交
1246
    if (index) {
wmmhello's avatar
wmmhello 已提交
1247
      SSmlKv **value = (SSmlKv **)taosArrayGet(metaArray, *index);
X
Xiaoyu Wang 已提交
1248
      if (kv->type != (*value)->type) {
wmmhello's avatar
wmmhello 已提交
1249
        smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
wmmhello's avatar
wmmhello 已提交
1250
        return TSDB_CODE_SML_NOT_SAME_TYPE;
X
Xiaoyu Wang 已提交
1251 1252 1253
      } else {
        if (IS_VAR_DATA_TYPE(kv->type)) {  // update string len, if bigger
          if (kv->length > (*value)->length) {
wmmhello's avatar
wmmhello 已提交
1254
            *value = kv;
1255 1256 1257
          }
        }
      }
X
Xiaoyu Wang 已提交
1258
    } else {
wmmhello's avatar
wmmhello 已提交
1259 1260 1261 1262 1263
      size_t tmp = taosArrayGetSize(metaArray);
      ASSERT(tmp <= INT16_MAX);
      int16_t size = tmp;
      taosArrayPush(metaArray, &kv);
      taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
1264
    }
wmmhello's avatar
wmmhello 已提交
1265
  }
wmmhello's avatar
wmmhello 已提交
1266

wmmhello's avatar
wmmhello 已提交
1267
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1268 1269
}

X
Xiaoyu Wang 已提交
1270
static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols) {
wmmhello's avatar
wmmhello 已提交
1271 1272 1273 1274
  for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
    taosArrayPush(metaArray, &kv);
    taosHashPut(metaHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
wmmhello's avatar
wmmhello 已提交
1275 1276 1277
  }
}

X
Xiaoyu Wang 已提交
1278
static SSmlTableInfo *smlBuildTableInfo() {
1279
  SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
X
Xiaoyu Wang 已提交
1280
  if (!tag) {
1281
    return NULL;
wmmhello's avatar
wmmhello 已提交
1282
  }
1283 1284 1285 1286 1287

  tag->cols = taosArrayInit(16, POINTER_BYTES);
  if (tag->cols == NULL) {
    uError("SML:smlBuildTableInfo failed to allocate memory");
    goto cleanup;
wmmhello's avatar
wmmhello 已提交
1288
  }
1289 1290 1291 1292 1293

  tag->tags = taosArrayInit(16, POINTER_BYTES);
  if (tag->tags == NULL) {
    uError("SML:smlBuildTableInfo failed to allocate memory");
    goto cleanup;
wmmhello's avatar
wmmhello 已提交
1294
  }
1295 1296 1297 1298 1299
  return tag;

cleanup:
  taosMemoryFree(tag);
  return NULL;
wmmhello's avatar
wmmhello 已提交
1300 1301
}

X
Xiaoyu Wang 已提交
1302 1303 1304
static void smlDestroyTableInfo(SSmlHandle *info, SSmlTableInfo *tag) {
  if (info->dataFormat) {
    for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
1305 1306
      SArray *kvArray = (SArray *)taosArrayGetP(tag->cols, i);
      for (int j = 0; j < taosArrayGetSize(kvArray); ++j) {
wmmhello's avatar
wmmhello 已提交
1307
        SSmlKv *p = (SSmlKv *)taosArrayGetP(kvArray, j);
X
Xiaoyu Wang 已提交
1308 1309 1310
        if (info->protocol == TSDB_SML_JSON_PROTOCOL &&
            (p->type == TSDB_DATA_TYPE_NCHAR || p->type == TSDB_DATA_TYPE_BINARY)) {
          taosMemoryFree((void *)p->value);
wmmhello's avatar
wmmhello 已提交
1311
        }
1312 1313 1314 1315
        taosMemoryFree(p);
      }
      taosArrayDestroy(kvArray);
    }
X
Xiaoyu Wang 已提交
1316 1317
  } else {
    for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
1318
      SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i);
X
Xiaoyu Wang 已提交
1319
      void    **p1 = (void **)taosHashIterate(kvHash, NULL);
1320 1321
      while (p1) {
        taosMemoryFree(*p1);
X
Xiaoyu Wang 已提交
1322
        p1 = (void **)taosHashIterate(kvHash, p1);
1323 1324 1325 1326
      }
      taosHashCleanup(kvHash);
    }
  }
X
Xiaoyu Wang 已提交
1327
  for (size_t i = 0; i < taosArrayGetSize(tag->tags); i++) {
wmmhello's avatar
wmmhello 已提交
1328
    SSmlKv *p = (SSmlKv *)taosArrayGetP(tag->tags, i);
X
Xiaoyu Wang 已提交
1329 1330 1331 1332
    if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
      taosMemoryFree((void *)p->key);
      if (p->type == TSDB_DATA_TYPE_NCHAR || p->type == TSDB_DATA_TYPE_BINARY) {
        taosMemoryFree((void *)p->value);
wmmhello's avatar
wmmhello 已提交
1333 1334 1335 1336
      }
    }
    taosMemoryFree(p);
  }
X
Xiaoyu Wang 已提交
1337 1338
  if (info->protocol == TSDB_SML_JSON_PROTOCOL && tag->sTableName) {
    taosMemoryFree((void *)tag->sTableName);
wmmhello's avatar
wmmhello 已提交
1339
  }
1340 1341 1342 1343 1344
  taosArrayDestroy(tag->cols);
  taosArrayDestroy(tag->tags);
  taosMemoryFree(tag);
}

X
Xiaoyu Wang 已提交
1345
static int32_t smlKvTimeArrayCompare(const void *key1, const void *key2) {
1346 1347
  SArray *s1 = *(SArray **)key1;
  SArray *s2 = *(SArray **)key2;
1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360
  SSmlKv *kv1 = (SSmlKv *)taosArrayGetP(s1, 0);
  SSmlKv *kv2 = (SSmlKv *)taosArrayGetP(s2, 0);
  ASSERT(kv1->type == TSDB_DATA_TYPE_TIMESTAMP);
  ASSERT(kv2->type == TSDB_DATA_TYPE_TIMESTAMP);
  if (kv1->i < kv2->i) {
    return -1;
  } else if (kv1->i > kv2->i) {
    return 1;
  } else {
    return 0;
  }
}

X
Xiaoyu Wang 已提交
1361
static int32_t smlKvTimeHashCompare(const void *key1, const void *key2) {
1362 1363
  SHashObj *s1 = *(SHashObj **)key1;
  SHashObj *s2 = *(SHashObj **)key2;
1364 1365
  SSmlKv *kv1 = *(SSmlKv **)taosHashGet(s1, TS, TS_LEN);
  SSmlKv *kv2 = *(SSmlKv **)taosHashGet(s2, TS, TS_LEN);
1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376
  ASSERT(kv1->type == TSDB_DATA_TYPE_TIMESTAMP);
  ASSERT(kv2->type == TSDB_DATA_TYPE_TIMESTAMP);
  if (kv1->i < kv2->i) {
    return -1;
  } else if (kv1->i > kv2->i) {
    return 1;
  } else {
    return 0;
  }
}

1377 1378
static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *cols){
  if(dataFormat){
1379
    void *p = taosArraySearch(oneTable->cols, &cols, smlKvTimeArrayCompare, TD_GT);
1380 1381
    if(p == NULL){
      taosArrayPush(oneTable->cols, &cols);
1382 1383
    }else{
      taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &cols);
1384
    }
1385 1386 1387 1388
    return TSDB_CODE_SUCCESS;
  }

  SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
1389
  if (!kvHash) {
1390 1391 1392
    uError("SML:smlDealCols failed to allocate memory");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
1393
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
1394
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
wmmhello's avatar
wmmhello 已提交
1395
    taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
1396 1397
  }

1398
  void *p = taosArraySearch(oneTable->cols, &kvHash, smlKvTimeHashCompare, TD_GT);
1399 1400
  if(p == NULL){
    taosArrayPush(oneTable->cols, &kvHash);
1401 1402
  }else{
    taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &kvHash);
1403
  }
1404 1405 1406
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1407 1408 1409
static SSmlSTableMeta *smlBuildSTableMeta() {
  SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
  if (!meta) {
1410 1411 1412 1413 1414 1415 1416 1417
    return NULL;
  }
  meta->tagHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (meta->tagHash == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }

wmmhello's avatar
wmmhello 已提交
1418 1419
  meta->colHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (meta->colHash == NULL) {
1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }

  meta->tags = taosArrayInit(32, POINTER_BYTES);
  if (meta->tags == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }

  meta->cols = taosArrayInit(32, POINTER_BYTES);
  if (meta->cols == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }
  return meta;

cleanup:
  taosMemoryFree(meta);
  return NULL;
}

X
Xiaoyu Wang 已提交
1442
static void smlDestroySTableMeta(SSmlSTableMeta *meta) {
1443
  taosHashCleanup(meta->tagHash);
wmmhello's avatar
wmmhello 已提交
1444
  taosHashCleanup(meta->colHash);
1445 1446 1447
  taosArrayDestroy(meta->tags);
  taosArrayDestroy(meta->cols);
  taosMemoryFree(meta->tableMeta);
wmmhello's avatar
wmmhello 已提交
1448
  taosMemoryFree(meta);
1449 1450 1451 1452 1453
}

static void smlDestroyCols(SArray *cols) {
  if (!cols) return;
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
wmmhello's avatar
wmmhello 已提交
1454
    void *kv = taosArrayGetP(cols, i);
1455 1456 1457 1458
    taosMemoryFree(kv);
  }
}

X
Xiaoyu Wang 已提交
1459 1460
static void smlDestroyInfo(SSmlHandle *info) {
  if (!info) return;
1461 1462 1463 1464
  qDestroyQuery(info->pQuery);
  smlDestroyHandle(info->exec);

  // destroy info->childTables
X
Xiaoyu Wang 已提交
1465
  void **p1 = (void **)taosHashIterate(info->childTables, NULL);
1466
  while (p1) {
X
Xiaoyu Wang 已提交
1467 1468
    smlDestroyTableInfo(info, (SSmlTableInfo *)(*p1));
    p1 = (void **)taosHashIterate(info->childTables, p1);
1469 1470 1471 1472
  }
  taosHashCleanup(info->childTables);

  // destroy info->superTables
X
Xiaoyu Wang 已提交
1473
  p1 = (void **)taosHashIterate(info->superTables, NULL);
1474
  while (p1) {
X
Xiaoyu Wang 已提交
1475 1476
    smlDestroySTableMeta((SSmlSTableMeta *)(*p1));
    p1 = (void **)taosHashIterate(info->superTables, p1);
1477 1478 1479 1480 1481 1482
  }
  taosHashCleanup(info->superTables);

  // destroy info->pVgHash
  taosHashCleanup(info->pVgHash);
  taosHashCleanup(info->dumplicateKey);
X
Xiaoyu Wang 已提交
1483
  if (!info->dataFormat) {
wmmhello's avatar
wmmhello 已提交
1484 1485
    taosArrayDestroy(info->colsContainer);
  }
wmmhello's avatar
wmmhello 已提交
1486
  destroyRequest(info->pRequest);
1487 1488 1489
  taosMemoryFreeClear(info);
}

1490
static SSmlHandle* smlBuildSmlInfo(STscObj* pTscObj, SRequestObj* request, SMLProtocolType protocol, int8_t precision){
1491 1492 1493 1494 1495
  int32_t code = TSDB_CODE_SUCCESS;
  SSmlHandle* info = (SSmlHandle*)taosMemoryCalloc(1, sizeof(SSmlHandle));
  if (NULL == info) {
    return NULL;
  }
X
Xiaoyu Wang 已提交
1496
  info->id = smlGenId();
1497

1498
  info->pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
1499
  if (NULL == info->pQuery) {
X
Xiaoyu Wang 已提交
1500
    uError("SML:0x%" PRIx64 " create info->pQuery error", info->id);
1501 1502
    goto cleanup;
  }
X
Xiaoyu Wang 已提交
1503
  info->pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
1504
  info->pQuery->haveResultSet = false;
X
Xiaoyu Wang 已提交
1505 1506 1507 1508
  info->pQuery->msgType = TDMT_VND_SUBMIT;
  info->pQuery->pRoot = (SNode *)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
  if (NULL == info->pQuery->pRoot) {
    uError("SML:0x%" PRIx64 " create info->pQuery->pRoot error", info->id);
1509 1510
    goto cleanup;
  }
X
Xiaoyu Wang 已提交
1511
  ((SVnodeModifOpStmt *)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV;
1512

1513 1514 1515 1516 1517 1518 1519
  if (pTscObj){
    info->taos        = pTscObj;
    code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code);
      goto cleanup;
    }
wmmhello's avatar
wmmhello 已提交
1520
  }
1521

X
Xiaoyu Wang 已提交
1522 1523 1524
  info->precision = precision;
  info->protocol = protocol;
  if (protocol == TSDB_SML_LINE_PROTOCOL) {
1525
    info->dataFormat = tsSmlDataFormat;
X
Xiaoyu Wang 已提交
1526
  } else {
1527 1528
    info->dataFormat = true;
  }
1529 1530 1531 1532 1533 1534

  if(request){
    info->pRequest = request;
    info->msgBuf.buf = info->pRequest->msgBuf;
    info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
  }
1535

X
Xiaoyu Wang 已提交
1536
  info->exec = smlInitHandle(info->pQuery);
1537 1538
  info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
1539
  info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
1540 1541

  info->dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
1542
  if (!info->dataFormat) {
1543
    info->colsContainer = taosArrayInit(32, POINTER_BYTES);
X
Xiaoyu Wang 已提交
1544 1545
    if (NULL == info->colsContainer) {
      uError("SML:0x%" PRIx64 " create info failed", info->id);
1546 1547 1548
      goto cleanup;
    }
  }
X
Xiaoyu Wang 已提交
1549 1550 1551
  if (NULL == info->exec || NULL == info->childTables || NULL == info->superTables || NULL == info->pVgHash ||
      NULL == info->dumplicateKey) {
    uError("SML:0x%" PRIx64 " create info failed", info->id);
1552 1553 1554 1555 1556 1557 1558 1559 1560 1561
    goto cleanup;
  }

  return info;
cleanup:
  smlDestroyInfo(info);
  return NULL;
}

/************* TSDB_SML_JSON_PROTOCOL function start **************/
X
Xiaoyu Wang 已提交
1562
static int32_t smlJsonCreateSring(const char **output, char *input, int32_t inputLen) {
1563
  *output = (const char *)taosMemoryCalloc(1, inputLen);
X
Xiaoyu Wang 已提交
1564
  if (*output == NULL) {
1565 1566 1567
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
1568
  memcpy((void *)(*output), input, inputLen);
1569 1570 1571 1572 1573 1574
  return TSDB_CODE_SUCCESS;
}

static int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *root, SSmlTableInfo *tinfo) {
  cJSON *metric = cJSON_GetObjectItem(root, "metric");
  if (!cJSON_IsString(metric)) {
X
Xiaoyu Wang 已提交
1575
    return TSDB_CODE_TSC_INVALID_JSON;
1576 1577 1578
  }

  tinfo->sTableNameLen = strlen(metric->valuestring);
1579
  if (IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
X
Xiaoyu Wang 已提交
1580
    uError("OTD:0x%" PRIx64 " Metric lenght is 0 or large than 192", info->id);
1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
  }

  return smlJsonCreateSring(&tinfo->sTableName, metric->valuestring, tinfo->sTableNameLen);
}

static int32_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int64_t *tsVal) {
  int32_t size = cJSON_GetArraySize(root);
  if (size != OTD_JSON_SUB_FIELDS_NUM) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *value = cJSON_GetObjectItem(root, "value");
  if (!cJSON_IsNumber(value)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *type = cJSON_GetObjectItem(root, "type");
  if (!cJSON_IsString(type)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  double timeDouble = value->valuedouble;
X
Xiaoyu Wang 已提交
1604
  if (smlDoubleToInt64OverFlow(timeDouble)) {
1605
    smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1606
    return TSDB_CODE_INVALID_TIMESTAMP;
1607
  }
wmmhello's avatar
wmmhello 已提交
1608 1609 1610 1611 1612 1613 1614 1615

  if (timeDouble == 0) {
    *tsVal = taosGetTimestampNs();
    return TSDB_CODE_SUCCESS;
  }

  if (timeDouble < 0) {
    return TSDB_CODE_INVALID_TIMESTAMP;
1616 1617
  }

1618
  *tsVal = timeDouble;
1619
  size_t typeLen = strlen(type->valuestring);
wmmhello's avatar
wmmhello 已提交
1620
  if (typeLen == 1 && (type->valuestring[0] == 's' || type->valuestring[0] == 'S')) {
X
Xiaoyu Wang 已提交
1621
    // seconds
1622 1623
    *tsVal = *tsVal * NANOSECOND_PER_SEC;
    timeDouble = timeDouble * NANOSECOND_PER_SEC;
X
Xiaoyu Wang 已提交
1624
    if (smlDoubleToInt64OverFlow(timeDouble)) {
1625
      smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1626
      return TSDB_CODE_INVALID_TIMESTAMP;
1627
    }
wmmhello's avatar
wmmhello 已提交
1628
  } else if (typeLen == 2 && (type->valuestring[1] == 's' || type->valuestring[1] == 'S')) {
1629 1630
    switch (type->valuestring[0]) {
      case 'm':
wmmhello's avatar
wmmhello 已提交
1631
      case 'M':
X
Xiaoyu Wang 已提交
1632
        // milliseconds
1633 1634
        *tsVal = *tsVal * NANOSECOND_PER_MSEC;
        timeDouble = timeDouble * NANOSECOND_PER_MSEC;
X
Xiaoyu Wang 已提交
1635
        if (smlDoubleToInt64OverFlow(timeDouble)) {
1636
          smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1637
          return TSDB_CODE_INVALID_TIMESTAMP;
1638 1639 1640
        }
        break;
      case 'u':
wmmhello's avatar
wmmhello 已提交
1641
      case 'U':
X
Xiaoyu Wang 已提交
1642
        // microseconds
1643 1644
        *tsVal = *tsVal * NANOSECOND_PER_USEC;
        timeDouble = timeDouble * NANOSECOND_PER_USEC;
X
Xiaoyu Wang 已提交
1645
        if (smlDoubleToInt64OverFlow(timeDouble)) {
1646
          smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1647
          return TSDB_CODE_INVALID_TIMESTAMP;
1648 1649 1650
        }
        break;
      case 'n':
wmmhello's avatar
wmmhello 已提交
1651
      case 'N':
1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672
        break;
      default:
        return TSDB_CODE_TSC_INVALID_JSON;
    }
  } else {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  return TSDB_CODE_SUCCESS;
}

static uint8_t smlGetTimestampLen(int64_t num) {
  uint8_t len = 0;
  while ((num /= 10) != 0) {
    len++;
  }
  len++;
  return len;
}

static int32_t smlParseTSFromJSON(SSmlHandle *info, cJSON *root, SArray *cols) {
X
Xiaoyu Wang 已提交
1673
  // Timestamp must be the first KV to parse
1674 1675 1676 1677
  int64_t tsVal = 0;

  cJSON *timestamp = cJSON_GetObjectItem(root, "timestamp");
  if (cJSON_IsNumber(timestamp)) {
X
Xiaoyu Wang 已提交
1678
    // timestamp value 0 indicates current system time
1679
    double timeDouble = timestamp->valuedouble;
X
Xiaoyu Wang 已提交
1680
    if (smlDoubleToInt64OverFlow(timeDouble)) {
1681
      smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1682
      return TSDB_CODE_INVALID_TIMESTAMP;
1683
    }
wmmhello's avatar
wmmhello 已提交
1684

X
Xiaoyu Wang 已提交
1685
    if (timeDouble < 0) {
wmmhello's avatar
wmmhello 已提交
1686
      return TSDB_CODE_INVALID_TIMESTAMP;
1687
    }
1688

1689
    uint8_t tsLen = smlGetTimestampLen((int64_t)timeDouble);
1690
    tsVal = (int64_t)timeDouble;
1691
    if (tsLen == TSDB_TIME_PRECISION_SEC_DIGITS) {
1692 1693
      tsVal = tsVal * NANOSECOND_PER_SEC;
      timeDouble = timeDouble * NANOSECOND_PER_SEC;
X
Xiaoyu Wang 已提交
1694
      if (smlDoubleToInt64OverFlow(timeDouble)) {
1695
        smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1696
        return TSDB_CODE_INVALID_TIMESTAMP;
1697 1698
      }
    } else if (tsLen == TSDB_TIME_PRECISION_MILLI_DIGITS) {
1699 1700
      tsVal = tsVal * NANOSECOND_PER_MSEC;
      timeDouble = timeDouble * NANOSECOND_PER_MSEC;
X
Xiaoyu Wang 已提交
1701
      if (smlDoubleToInt64OverFlow(timeDouble)) {
1702
        smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1703
        return TSDB_CODE_INVALID_TIMESTAMP;
1704
      }
X
Xiaoyu Wang 已提交
1705
    } else if (timeDouble == 0) {
wmmhello's avatar
wmmhello 已提交
1706
      tsVal = taosGetTimestampNs();
X
Xiaoyu Wang 已提交
1707
    } else {
wmmhello's avatar
wmmhello 已提交
1708
      return TSDB_CODE_INVALID_TIMESTAMP;
1709 1710 1711 1712
    }
  } else if (cJSON_IsObject(timestamp)) {
    int32_t ret = smlParseTSFromJSONObj(info, timestamp, &tsVal);
    if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
1713
      uError("SML:0x%" PRIx64 " Failed to parse timestamp from JSON Obj", info->id);
1714 1715 1716 1717
      return ret;
    }
  } else {
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
1718 1719
  }

wmmhello's avatar
wmmhello 已提交
1720
  // add ts to
wmmhello's avatar
wmmhello 已提交
1721
  SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1722
  if (!kv) {
wmmhello's avatar
wmmhello 已提交
1723 1724 1725
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  kv->key = TS;
1726 1727
  kv->keyLen = TS_LEN;
  kv->i = tsVal;
wmmhello's avatar
wmmhello 已提交
1728
  kv->type = TSDB_DATA_TYPE_TIMESTAMP;
wmmhello's avatar
wmmhello 已提交
1729
  kv->length = (int16_t)tDataTypes[kv->type].bytes;
X
Xiaoyu Wang 已提交
1730
  if (cols) taosArrayPush(cols, &kv);
wmmhello's avatar
wmmhello 已提交
1731 1732 1733
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1734
static int32_t smlConvertJSONBool(SSmlKv *pVal, char *typeStr, cJSON *value) {
1735 1736 1737 1738 1739 1740 1741
  if (strcasecmp(typeStr, "bool") != 0) {
    uError("OTD:invalid type(%s) for JSON Bool", typeStr);
    return TSDB_CODE_TSC_INVALID_JSON_TYPE;
  }
  pVal->type = TSDB_DATA_TYPE_BOOL;
  pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
  pVal->i = value->valueint;
wmmhello's avatar
wmmhello 已提交
1742

1743 1744
  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
1745

X
Xiaoyu Wang 已提交
1746 1747 1748
static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
  // tinyint
  if (strcasecmp(typeStr, "i8") == 0 || strcasecmp(typeStr, "tinyint") == 0) {
1749 1750 1751 1752 1753 1754 1755 1756 1757
    if (!IS_VALID_TINYINT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(tinyint)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_TINYINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1758 1759
  // smallint
  if (strcasecmp(typeStr, "i16") == 0 || strcasecmp(typeStr, "smallint") == 0) {
1760 1761 1762 1763 1764 1765 1766 1767 1768
    if (!IS_VALID_SMALLINT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(smallint)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_SMALLINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1769 1770
  // int
  if (strcasecmp(typeStr, "i32") == 0 || strcasecmp(typeStr, "int") == 0) {
1771 1772 1773 1774 1775 1776 1777 1778 1779
    if (!IS_VALID_INT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(int)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_INT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1780 1781
  // bigint
  if (strcasecmp(typeStr, "i64") == 0 || strcasecmp(typeStr, "bigint") == 0) {
1782 1783
    pVal->type = TSDB_DATA_TYPE_BIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
X
Xiaoyu Wang 已提交
1784
    if (value->valuedouble >= (double)INT64_MAX) {
1785
      pVal->i = INT64_MAX;
X
Xiaoyu Wang 已提交
1786
    } else if (value->valuedouble <= (double)INT64_MIN) {
1787
      pVal->i = INT64_MIN;
X
Xiaoyu Wang 已提交
1788
    } else {
1789
      pVal->i = value->valuedouble;
1790 1791 1792
    }
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1793 1794
  // float
  if (strcasecmp(typeStr, "f32") == 0 || strcasecmp(typeStr, "float") == 0) {
1795 1796 1797
    if (!IS_VALID_FLOAT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(float)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
wmmhello's avatar
wmmhello 已提交
1798
    }
1799 1800 1801 1802 1803
    pVal->type = TSDB_DATA_TYPE_FLOAT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->f = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1804 1805
  // double
  if (strcasecmp(typeStr, "f64") == 0 || strcasecmp(typeStr, "double") == 0) {
1806 1807 1808 1809
    pVal->type = TSDB_DATA_TYPE_DOUBLE;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->d = value->valuedouble;
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1810 1811
  }

X
Xiaoyu Wang 已提交
1812
  // if reach here means type is unsupported
1813 1814 1815
  uError("OTD:invalid type(%s) for JSON Number", typeStr);
  return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
1816

X
Xiaoyu Wang 已提交
1817
static int32_t smlConvertJSONString(SSmlKv *pVal, char *typeStr, cJSON *value) {
1818 1819 1820 1821 1822 1823 1824 1825 1826
  if (strcasecmp(typeStr, "binary") == 0) {
    pVal->type = TSDB_DATA_TYPE_BINARY;
  } else if (strcasecmp(typeStr, "nchar") == 0) {
    pVal->type = TSDB_DATA_TYPE_NCHAR;
  } else {
    uError("OTD:invalid type(%s) for JSON String", typeStr);
    return TSDB_CODE_TSC_INVALID_JSON_TYPE;
  }
  pVal->length = (int16_t)strlen(value->valuestring);
wmmhello's avatar
wmmhello 已提交
1827 1828 1829 1830 1831 1832 1833 1834

  if (pVal->type == TSDB_DATA_TYPE_BINARY && pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE){
    return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
  }
  if (pVal->type == TSDB_DATA_TYPE_NCHAR  && pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){
    return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
  }

wmmhello's avatar
wmmhello 已提交
1835
  return smlJsonCreateSring(&pVal->value, value->valuestring, pVal->length);
1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861
}

static int32_t smlParseValueFromJSONObj(cJSON *root, SSmlKv *kv) {
  int32_t ret = TSDB_CODE_SUCCESS;
  int32_t size = cJSON_GetArraySize(root);

  if (size != OTD_JSON_SUB_FIELDS_NUM) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *value = cJSON_GetObjectItem(root, "value");
  if (value == NULL) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *type = cJSON_GetObjectItem(root, "type");
  if (!cJSON_IsString(type)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  switch (value->type) {
    case cJSON_True:
    case cJSON_False: {
      ret = smlConvertJSONBool(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
wmmhello's avatar
wmmhello 已提交
1862
      }
1863
      break;
wmmhello's avatar
wmmhello 已提交
1864
    }
1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880
    case cJSON_Number: {
      ret = smlConvertJSONNumber(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
      break;
    }
    case cJSON_String: {
      ret = smlConvertJSONString(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
      break;
    }
    default:
      return TSDB_CODE_TSC_INVALID_JSON_TYPE;
wmmhello's avatar
wmmhello 已提交
1881
  }
1882 1883

  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1884 1885
}

1886 1887 1888 1889 1890 1891 1892 1893
static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
  switch (root->type) {
    case cJSON_True:
    case cJSON_False: {
      kv->type = TSDB_DATA_TYPE_BOOL;
      kv->length = (int16_t)tDataTypes[kv->type].bytes;
      kv->i = root->valueint;
      break;
wmmhello's avatar
wmmhello 已提交
1894
    }
1895 1896 1897 1898 1899 1900 1901 1902 1903 1904
    case cJSON_Number: {
      kv->type = TSDB_DATA_TYPE_DOUBLE;
      kv->length = (int16_t)tDataTypes[kv->type].bytes;
      kv->d = root->valuedouble;
      break;
    }
    case cJSON_String: {
      /* set default JSON type to binary/nchar according to
       * user configured parameter tsDefaultJSONStrType
       */
wmmhello's avatar
wmmhello 已提交
1905

X
Xiaoyu Wang 已提交
1906
      char *tsDefaultJSONStrType = "nchar";  // todo
1907 1908
      smlConvertJSONString(kv, tsDefaultJSONStrType, root);
      break;
wmmhello's avatar
wmmhello 已提交
1909
    }
1910 1911 1912 1913 1914 1915 1916 1917 1918 1919
    case cJSON_Object: {
      int32_t ret = smlParseValueFromJSONObj(root, kv);
      if (ret != TSDB_CODE_SUCCESS) {
        uError("OTD:Failed to parse value from JSON Obj");
        return ret;
      }
      break;
    }
    default:
      return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
1920
  }
1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931

  return TSDB_CODE_SUCCESS;
}

static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
  cJSON *metricVal = cJSON_GetObjectItem(root, "value");
  if (metricVal == NULL) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1932
  if (!kv) {
1933 1934
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
1935
  if (cols) taosArrayPush(cols, &kv);
1936 1937 1938 1939 1940 1941 1942 1943

  kv->key = VALUE;
  kv->keyLen = VALUE_LEN;
  int32_t ret = smlParseValueFromJSON(metricVal, kv);
  if (ret != TSDB_CODE_SUCCESS) {
    return ret;
  }
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1944 1945
}

X
Xiaoyu Wang 已提交
1946 1947
static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, char *childTableName, SHashObj *dumplicateKey,
                                    SSmlMsgBuf *msg) {
1948 1949 1950 1951 1952 1953
  int32_t ret = TSDB_CODE_SUCCESS;

  cJSON *tags = cJSON_GetObjectItem(root, "tags");
  if (tags == NULL || tags->type != cJSON_Object) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }
wmmhello's avatar
wmmhello 已提交
1954

X
Xiaoyu Wang 已提交
1955
  size_t  childTableNameLen = strlen(tsSmlChildTableName);
1956 1957 1958 1959 1960
  int32_t tagNum = cJSON_GetArraySize(tags);
  for (int32_t i = 0; i < tagNum; ++i) {
    cJSON *tag = cJSON_GetArrayItem(tags, i);
    if (tag == NULL) {
      return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
1961
    }
1962 1963 1964 1965 1966
    size_t keyLen = strlen(tag->string);
    if (IS_INVALID_COL_LEN(keyLen)) {
      uError("OTD:Tag key length is 0 or too large than 64");
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
    }
X
Xiaoyu Wang 已提交
1967
    // check duplicate keys
1968
    if (smlCheckDuplicateKey(tag->string, keyLen, dumplicateKey)) {
wmmhello's avatar
wmmhello 已提交
1969
      return TSDB_CODE_TSC_DUP_NAMES;
wmmhello's avatar
wmmhello 已提交
1970 1971
    }

X
Xiaoyu Wang 已提交
1972 1973
    // handle child table name
    if (childTableNameLen != 0 && strcmp(tag->string, tsSmlChildTableName) == 0) {
1974 1975 1976 1977 1978 1979 1980 1981 1982
      if (!cJSON_IsString(tag)) {
        uError("OTD:ID must be JSON string");
        return TSDB_CODE_TSC_INVALID_JSON;
      }
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, tag->valuestring, TSDB_TABLE_NAME_LEN);
      continue;
    }

1983 1984
    // add kv to SSmlKv
    SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1985 1986
    if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
    if (pKVs) taosArrayPush(pKVs, &kv);
1987

X
Xiaoyu Wang 已提交
1988
    // key
1989
    kv->keyLen = keyLen;
1990 1991 1992 1993
    ret = smlJsonCreateSring(&kv->key, tag->string, kv->keyLen);
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }
X
Xiaoyu Wang 已提交
1994
    // value
1995 1996 1997 1998
    ret = smlParseValueFromJSON(tag, kv);
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }
wmmhello's avatar
wmmhello 已提交
1999 2000
  }

2001
  return ret;
wmmhello's avatar
wmmhello 已提交
2002 2003
}

2004 2005
static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo *tinfo, SArray *cols) {
  int32_t ret = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2006

2007
  if (!cJSON_IsObject(root)) {
X
Xiaoyu Wang 已提交
2008
    uError("OTD:0x%" PRIx64 " data point needs to be JSON object", info->id);
2009
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2010
  }
2011

2012
  int32_t size = cJSON_GetArraySize(root);
X
Xiaoyu Wang 已提交
2013
  // outmost json fields has to be exactly 4
2014
  if (size != OTD_JSON_FIELDS_NUM) {
X
Xiaoyu Wang 已提交
2015
    uError("OTD:0x%" PRIx64 " Invalid number of JSON fields in data point %d", info->id, size);
2016
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2017
  }
2018

X
Xiaoyu Wang 已提交
2019
  // Parse metric
2020 2021
  ret = smlParseMetricFromJSON(info, root, tinfo);
  if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2022
    uError("OTD:0x%" PRIx64 " Unable to parse metric from JSON payload", info->id);
2023
    return ret;
wmmhello's avatar
wmmhello 已提交
2024
  }
X
Xiaoyu Wang 已提交
2025
  uDebug("OTD:0x%" PRIx64 " Parse metric from JSON payload finished", info->id);
wmmhello's avatar
wmmhello 已提交
2026

X
Xiaoyu Wang 已提交
2027
  // Parse timestamp
2028 2029
  ret = smlParseTSFromJSON(info, root, cols);
  if (ret) {
X
Xiaoyu Wang 已提交
2030
    uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id);
2031
    return ret;
wmmhello's avatar
wmmhello 已提交
2032
  }
X
Xiaoyu Wang 已提交
2033
  uDebug("OTD:0x%" PRIx64 " Parse timestamp from JSON payload finished", info->id);
2034

X
Xiaoyu Wang 已提交
2035
  // Parse metric value
2036 2037
  ret = smlParseColsFromJSON(root, cols);
  if (ret) {
X
Xiaoyu Wang 已提交
2038
    uError("OTD:0x%" PRIx64 " Unable to parse metric value from JSON payload", info->id);
2039
    return ret;
2040
  }
X
Xiaoyu Wang 已提交
2041
  uDebug("OTD:0x%" PRIx64 " Parse metric value from JSON payload finished", info->id);
2042

X
Xiaoyu Wang 已提交
2043
  // Parse tags
2044
  ret = smlParseTagsFromJSON(root, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
2045
  if (ret) {
X
Xiaoyu Wang 已提交
2046
    uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
2047
    return ret;
2048
  }
X
Xiaoyu Wang 已提交
2049
  uDebug("OTD:0x%" PRIx64 " Parse tags from JSON payload finished", info->id);
wmmhello's avatar
wmmhello 已提交
2050

2051
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2052
}
2053
/************* TSDB_SML_JSON_PROTOCOL function end **************/
wmmhello's avatar
wmmhello 已提交
2054

X
Xiaoyu Wang 已提交
2055
static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql) {
wmmhello's avatar
wmmhello 已提交
2056
  SSmlLineInfo elements = {0};
2057
  uDebug("SML:0x%" PRIx64 " smlParseInfluxLine sql:%s, hello", info->id, sql);
2058

X
Xiaoyu Wang 已提交
2059 2060 2061
  int          ret = smlParseInfluxString(sql, &elements, &info->msgBuf);
  if (ret != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " smlParseInfluxLine failed", info->id);
wmmhello's avatar
wmmhello 已提交
2062 2063 2064
    return ret;
  }

2065
  SArray *cols = NULL;
X
Xiaoyu Wang 已提交
2066
  if (info->dataFormat) {  // if dataFormat, cols need new memory to save data
2067 2068
    cols = taosArrayInit(16, POINTER_BYTES);
    if (cols == NULL) {
X
Xiaoyu Wang 已提交
2069
      uError("SML:0x%" PRIx64 " smlParseInfluxLine failed to allocate memory", info->id);
2070 2071
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
2072
  } else {  // if dataFormat is false, cols do not need to save data, there is another new memory to save data
2073
    cols = info->colsContainer;
wmmhello's avatar
wmmhello 已提交
2074
  }
wmmhello's avatar
wmmhello 已提交
2075

wmmhello's avatar
wmmhello 已提交
2076
  ret = smlParseTS(info, elements.timestamp, elements.timestampLen, cols);
X
Xiaoyu Wang 已提交
2077 2078 2079
  if (ret != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " smlParseTS failed", info->id);
    if (info->dataFormat) taosArrayDestroy(cols);
wmmhello's avatar
wmmhello 已提交
2080 2081
    return ret;
  }
2082
  ret = smlParseCols(elements.cols, elements.colsLen, cols, NULL, false, info->dumplicateKey, &info->msgBuf);
X
Xiaoyu Wang 已提交
2083 2084
  if (ret != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " smlParseCols parse cloums fields failed", info->id);
2085
    smlDestroyCols(cols);
X
Xiaoyu Wang 已提交
2086
    if (info->dataFormat) taosArrayDestroy(cols);
wmmhello's avatar
wmmhello 已提交
2087 2088 2089
    return ret;
  }

X
Xiaoyu Wang 已提交
2090 2091 2092 2093 2094
  bool            hasTable = true;
  SSmlTableInfo  *tinfo = NULL;
  SSmlTableInfo **oneTable =
      (SSmlTableInfo **)taosHashGet(info->childTables, elements.measure, elements.measureTagsLen);
  if (!oneTable) {
2095
    tinfo = smlBuildTableInfo();
X
Xiaoyu Wang 已提交
2096
    if (!tinfo) {
wmmhello's avatar
wmmhello 已提交
2097 2098
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
2099
    taosHashPut(info->childTables, elements.measure, elements.measureTagsLen, &tinfo, POINTER_BYTES);
2100
    oneTable = &tinfo;
2101 2102 2103 2104
    hasTable = false;
  }

  ret = smlDealCols(*oneTable, info->dataFormat, cols);
X
Xiaoyu Wang 已提交
2105
  if (ret != TSDB_CODE_SUCCESS) {
2106 2107
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
2108

X
Xiaoyu Wang 已提交
2109 2110 2111 2112 2113
  if (!hasTable) {
    ret = smlParseCols(elements.tags, elements.tagsLen, (*oneTable)->tags, (*oneTable)->childTableName, true,
                       info->dumplicateKey, &info->msgBuf);
    if (ret != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " smlParseCols parse tag fields failed", info->id);
wmmhello's avatar
wmmhello 已提交
2114 2115 2116
      return ret;
    }

X
Xiaoyu Wang 已提交
2117
    if (taosArrayGetSize((*oneTable)->tags) > TSDB_MAX_TAGS) {
wmmhello's avatar
wmmhello 已提交
2118
      smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
wmmhello's avatar
wmmhello 已提交
2119
      return TSDB_CODE_PAR_INVALID_TAGS_NUM;
wmmhello's avatar
wmmhello 已提交
2120 2121
    }

2122 2123 2124 2125 2126
    if (taosArrayGetSize(cols) + taosArrayGetSize((*oneTable)->tags) > TSDB_MAX_COLUMNS) {
      smlBuildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL);
      return TSDB_CODE_PAR_TOO_MANY_COLUMNS;
    }

2127 2128
    (*oneTable)->sTableName = elements.measure;
    (*oneTable)->sTableNameLen = elements.measureLen;
X
Xiaoyu Wang 已提交
2129 2130 2131
    if (strlen((*oneTable)->childTableName) == 0) {
      RandTableName rName = {(*oneTable)->tags, (*oneTable)->sTableName, (uint8_t)(*oneTable)->sTableNameLen,
                             (*oneTable)->childTableName, 0};
2132 2133 2134

      buildChildTableName(&rName);
      (*oneTable)->uid = rName.uid;
X
Xiaoyu Wang 已提交
2135 2136
    } else {
      (*oneTable)->uid = *(uint64_t *)((*oneTable)->childTableName);
2137
    }
2138
  }
wmmhello's avatar
wmmhello 已提交
2139

X
Xiaoyu Wang 已提交
2140 2141
  SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, elements.measure, elements.measureLen);
  if (tableMeta) {  // update meta
wmmhello's avatar
wmmhello 已提交
2142
    ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, cols, &info->msgBuf);
wmmhello's avatar
wmmhello 已提交
2143
    if (!hasTable && ret == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2144 2145
      ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, (*oneTable)->tags, &info->msgBuf);
    }
wmmhello's avatar
wmmhello 已提交
2146
    if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2147
      uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
wmmhello's avatar
wmmhello 已提交
2148
      return ret;
wmmhello's avatar
wmmhello 已提交
2149
    }
X
Xiaoyu Wang 已提交
2150
  } else {
2151
    SSmlSTableMeta *meta = smlBuildSTableMeta();
wmmhello's avatar
wmmhello 已提交
2152 2153
    smlInsertMeta(meta->tagHash, meta->tags, (*oneTable)->tags);
    smlInsertMeta(meta->colHash, meta->cols, cols);
2154
    taosHashPut(info->superTables, elements.measure, elements.measureLen, &meta, POINTER_BYTES);
wmmhello's avatar
wmmhello 已提交
2155
  }
2156

X
Xiaoyu Wang 已提交
2157
  if (!info->dataFormat) {
2158 2159 2160
    taosArrayClear(info->colsContainer);
  }
  taosHashClear(info->dumplicateKey);
wmmhello's avatar
wmmhello 已提交
2161 2162 2163
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
2164 2165
static int32_t smlParseTelnetLine(SSmlHandle *info, void *data) {
  int            ret = TSDB_CODE_SUCCESS;
2166
  SSmlTableInfo *tinfo = smlBuildTableInfo();
X
Xiaoyu Wang 已提交
2167
  if (!tinfo) {
2168
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
2169 2170
  }

2171 2172
  SArray *cols = taosArrayInit(16, POINTER_BYTES);
  if (cols == NULL) {
X
Xiaoyu Wang 已提交
2173
    uError("SML:0x%" PRIx64 " smlParseTelnetLine failed to allocate memory", info->id);
2174
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
2175 2176
  }

X
Xiaoyu Wang 已提交
2177 2178 2179
  if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
    ret = smlParseTelnetString(info, (const char *)data, tinfo, cols);
  } else if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
2180
    ret = smlParseJSONString(info, (cJSON *)data, tinfo, cols);
X
Xiaoyu Wang 已提交
2181
  } else {
2182 2183
    ASSERT(0);
  }
X
Xiaoyu Wang 已提交
2184 2185
  if (ret != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " smlParseTelnetLine failed", info->id);
wmmhello's avatar
wmmhello 已提交
2186
    smlDestroyTableInfo(info, tinfo);
wmmhello's avatar
wmmhello 已提交
2187
    smlDestroyCols(cols);
2188 2189
    taosArrayDestroy(cols);
    return ret;
wmmhello's avatar
wmmhello 已提交
2190
  }
wmmhello's avatar
wmmhello 已提交
2191

X
Xiaoyu Wang 已提交
2192
  if (taosArrayGetSize(tinfo->tags) <= 0 || taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS) {
2193
    smlBuildInvalidDataMsg(&info->msgBuf, "invalidate tags length:[1,128]", NULL);
wmmhello's avatar
wmmhello 已提交
2194 2195 2196
    smlDestroyTableInfo(info, tinfo);
    smlDestroyCols(cols);
    taosArrayDestroy(cols);
wmmhello's avatar
wmmhello 已提交
2197
    return TSDB_CODE_PAR_INVALID_TAGS_NUM;
wmmhello's avatar
wmmhello 已提交
2198
  }
2199 2200
  taosHashClear(info->dumplicateKey);

X
Xiaoyu Wang 已提交
2201 2202
  if (strlen(tinfo->childTableName) == 0) {
    RandTableName rName = {tinfo->tags, tinfo->sTableName, (uint8_t)tinfo->sTableNameLen, tinfo->childTableName, 0};
2203 2204
    buildChildTableName(&rName);
    tinfo->uid = rName.uid;
X
Xiaoyu Wang 已提交
2205 2206
  } else {
    tinfo->uid = *(uint64_t *)(tinfo->childTableName);  // generate uid by name simple
2207 2208
  }

X
Xiaoyu Wang 已提交
2209 2210 2211 2212
  bool            hasTable = true;
  SSmlTableInfo **oneTable =
      (SSmlTableInfo **)taosHashGet(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName));
  if (!oneTable) {
2213
    taosHashPut(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName), &tinfo, POINTER_BYTES);
2214
    oneTable = &tinfo;
2215
    hasTable = false;
X
Xiaoyu Wang 已提交
2216
  } else {
wmmhello's avatar
wmmhello 已提交
2217
    smlDestroyTableInfo(info, tinfo);
wmmhello's avatar
wmmhello 已提交
2218
  }
wmmhello's avatar
wmmhello 已提交
2219

2220
  taosArrayPush((*oneTable)->cols, &cols);
X
Xiaoyu Wang 已提交
2221 2222 2223
  SSmlSTableMeta **tableMeta =
      (SSmlSTableMeta **)taosHashGet(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen);
  if (tableMeta) {  // update meta
wmmhello's avatar
wmmhello 已提交
2224
    ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, cols, &info->msgBuf);
wmmhello's avatar
wmmhello 已提交
2225
    if (!hasTable && ret == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2226 2227
      ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, (*oneTable)->tags, &info->msgBuf);
    }
wmmhello's avatar
wmmhello 已提交
2228
    if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2229
      uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
wmmhello's avatar
wmmhello 已提交
2230
      return ret;
2231
    }
X
Xiaoyu Wang 已提交
2232
  } else {
2233
    SSmlSTableMeta *meta = smlBuildSTableMeta();
wmmhello's avatar
wmmhello 已提交
2234 2235
    smlInsertMeta(meta->tagHash, meta->tags, (*oneTable)->tags);
    smlInsertMeta(meta->colHash, meta->cols, cols);
2236
    taosHashPut(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen, &meta, POINTER_BYTES);
wmmhello's avatar
wmmhello 已提交
2237 2238
  }

2239 2240
  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
2241

X
Xiaoyu Wang 已提交
2242
static int32_t smlParseJSON(SSmlHandle *info, char *payload) {
2243 2244
  int32_t payloadNum = 0;
  int32_t ret = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2245

2246
  if (payload == NULL) {
X
Xiaoyu Wang 已提交
2247
    uError("SML:0x%" PRIx64 " empty JSON Payload", info->id);
2248
    return TSDB_CODE_TSC_INVALID_JSON;
2249
  }
2250 2251 2252

  cJSON *root = cJSON_Parse(payload);
  if (root == NULL) {
X
Xiaoyu Wang 已提交
2253
    uError("SML:0x%" PRIx64 " parse json failed:%s", info->id, payload);
2254 2255
    return TSDB_CODE_TSC_INVALID_JSON;
  }
X
Xiaoyu Wang 已提交
2256
  // multiple data points must be sent in JSON array
2257 2258 2259 2260 2261
  if (cJSON_IsObject(root)) {
    payloadNum = 1;
  } else if (cJSON_IsArray(root)) {
    payloadNum = cJSON_GetArraySize(root);
  } else {
X
Xiaoyu Wang 已提交
2262
    uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
2263 2264
    ret = TSDB_CODE_TSC_INVALID_JSON;
    goto end;
wmmhello's avatar
wmmhello 已提交
2265
  }
wmmhello's avatar
wmmhello 已提交
2266

2267 2268 2269
  for (int32_t i = 0; i < payloadNum; ++i) {
    cJSON *dataPoint = (payloadNum == 1 && cJSON_IsObject(root)) ? root : cJSON_GetArrayItem(root, i);
    ret = smlParseTelnetLine(info, dataPoint);
X
Xiaoyu Wang 已提交
2270 2271
    if (ret != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
2272 2273 2274 2275 2276 2277 2278
      goto end;
    }
  }

end:
  cJSON_Delete(root);
  return ret;
wmmhello's avatar
wmmhello 已提交
2279
}
2280

X
Xiaoyu Wang 已提交
2281
static int32_t smlInsertData(SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
2282 2283
  int32_t code = TSDB_CODE_SUCCESS;

X
Xiaoyu Wang 已提交
2284
  SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
wmmhello's avatar
wmmhello 已提交
2285
  while (oneTable) {
X
Xiaoyu Wang 已提交
2286
    SSmlTableInfo *tableData = *oneTable;
wmmhello's avatar
wmmhello 已提交
2287 2288 2289 2290

    SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
    strcpy(pName.dbname, info->pRequest->pDb);
    memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName));
D
dapan1121 已提交
2291 2292 2293 2294 2295 2296

    SRequestConnInfo conn = {0};
    conn.pTrans = info->taos->pAppInfo->pTransporter;
    conn.requestId = info->pRequest->requestId;
    conn.requestObjRefId = info->pRequest->self;
    conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
X
Xiaoyu Wang 已提交
2297

wmmhello's avatar
wmmhello 已提交
2298
    SVgroupInfo vg;
D
dapan1121 已提交
2299
    code = catalogGetTableHashVgroup(info->pCatalog, &conn, &pName, &vg);
2300
    if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2301
      uError("SML:0x%" PRIx64 " catalogGetTableHashVgroup failed. table name: %s", info->id, tableData->childTableName);
wmmhello's avatar
wmmhello 已提交
2302 2303
      return code;
    }
X
Xiaoyu Wang 已提交
2304
    taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
wmmhello's avatar
wmmhello 已提交
2305

X
Xiaoyu Wang 已提交
2306 2307 2308
    SSmlSTableMeta **pMeta =
        (SSmlSTableMeta **)taosHashGet(info->superTables, tableData->sTableName, tableData->sTableNameLen);
    ASSERT(NULL != pMeta && NULL != *pMeta);
wmmhello's avatar
wmmhello 已提交
2309

2310
    // use tablemeta of stable to save vgid and uid of child table
wmmhello's avatar
wmmhello 已提交
2311
    (*pMeta)->tableMeta->vgId = vg.vgId;
X
Xiaoyu Wang 已提交
2312
    (*pMeta)->tableMeta->uid = tableData->uid;  // one table merge data block together according uid
wmmhello's avatar
wmmhello 已提交
2313

2314
    code = smlBindData(info->exec, tableData->tags, (*pMeta)->cols, tableData->cols, info->dataFormat,
wmmhello's avatar
wmmhello 已提交
2315
                       (*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen, info->msgBuf.buf, info->msgBuf.len);
X
Xiaoyu Wang 已提交
2316 2317
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " smlBindData failed", info->id);
wmmhello's avatar
wmmhello 已提交
2318 2319
      return code;
    }
X
Xiaoyu Wang 已提交
2320
    oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable);
wmmhello's avatar
wmmhello 已提交
2321
  }
wmmhello's avatar
wmmhello 已提交
2322

2323 2324
  code = smlBuildOutput(info->exec, info->pVgHash);
  if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2325
    uError("SML:0x%" PRIx64 " smlBuildOutput failed", info->id);
2326 2327
    return code;
  }
2328 2329
  info->cost.insertRpcTime = taosGetTimestampUs();

X
Xiaoyu Wang 已提交
2330 2331 2332
  // launchQueryImpl(info->pRequest, info->pQuery, false, NULL);
  //  info->affectedRows = taos_affected_rows(info->pRequest);
  //  return info->pRequest->code;
wmmhello's avatar
wmmhello 已提交
2333

D
dapan1121 已提交
2334
  launchAsyncQuery(info->pRequest, info->pQuery, NULL);
wmmhello's avatar
wmmhello 已提交
2335
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2336 2337
}

X
Xiaoyu Wang 已提交
2338 2339 2340 2341 2342 2343 2344 2345 2346
static void smlPrintStatisticInfo(SSmlHandle *info) {
  uError("SML:0x%" PRIx64
         " smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d \
        parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64
         "",
         info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables,
         info->cost.numOfCreateSTables, info->cost.schemaTime - info->cost.parseTime,
         info->cost.insertBindTime - info->cost.schemaTime, info->cost.insertRpcTime - info->cost.insertBindTime,
         info->cost.endTime - info->cost.insertRpcTime, info->cost.endTime - info->cost.parseTime);
2347 2348
}

X
Xiaoyu Wang 已提交
2349
static int32_t smlParseLine(SSmlHandle *info, char *lines[], int numLines) {
wmmhello's avatar
wmmhello 已提交
2350
  int32_t code = TSDB_CODE_SUCCESS;
2351 2352 2353 2354 2355 2356
  if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
    code = smlParseJSON(info, *lines);
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, *lines);
      return code;
    }
wmmhello's avatar
wmmhello 已提交
2357
    return code;
wmmhello's avatar
wmmhello 已提交
2358
  }
wmmhello's avatar
wmmhello 已提交
2359

wmmhello's avatar
wmmhello 已提交
2360
  for (int32_t i = 0; i < numLines; ++i) {
X
Xiaoyu Wang 已提交
2361
    if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
2362
      code = smlParseInfluxLine(info, lines[i]);
X
Xiaoyu Wang 已提交
2363
    } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
2364
      code = smlParseTelnetLine(info, lines[i]);
X
Xiaoyu Wang 已提交
2365
    } else {
2366 2367
      ASSERT(0);
    }
wmmhello's avatar
wmmhello 已提交
2368
    if (code != TSDB_CODE_SUCCESS) {
2369 2370
      uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, lines[i]);
      return code;
wmmhello's avatar
wmmhello 已提交
2371 2372
    }
  }
2373 2374 2375
  return code;
}

X
Xiaoyu Wang 已提交
2376
static int smlProcess(SSmlHandle *info, char *lines[], int numLines) {
2377
  int32_t code = TSDB_CODE_SUCCESS;
2378 2379
  int32_t retryNum = 0;

2380 2381 2382 2383
  info->cost.parseTime = taosGetTimestampUs();

  code = smlParseLine(info, lines, numLines);
  if (code != 0) {
X
Xiaoyu Wang 已提交
2384
    uError("SML:0x%" PRIx64 " smlParseLine error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2385
    return code;
2386
  }
wmmhello's avatar
wmmhello 已提交
2387

2388 2389 2390 2391 2392
  info->cost.lineNum = numLines;
  info->cost.numOfSTables = taosHashGetSize(info->superTables);
  info->cost.numOfCTables = taosHashGetSize(info->childTables);

  info->cost.schemaTime = taosGetTimestampUs();
2393

X
Xiaoyu Wang 已提交
2394
  do {
2395 2396
    code = smlModifyDBSchemas(info);
    if (code == 0) break;
wmmhello's avatar
wmmhello 已提交
2397
  } while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
2398

wmmhello's avatar
wmmhello 已提交
2399
  if (code != 0) {
X
Xiaoyu Wang 已提交
2400
    uError("SML:0x%" PRIx64 " smlModifyDBSchemas error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2401
    return code;
wmmhello's avatar
wmmhello 已提交
2402
  }
wmmhello's avatar
wmmhello 已提交
2403

2404
  info->cost.insertBindTime = taosGetTimestampUs();
wmmhello's avatar
wmmhello 已提交
2405 2406
  code = smlInsertData(info);
  if (code != 0) {
X
Xiaoyu Wang 已提交
2407
    uError("SML:0x%" PRIx64 " smlInsertData error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2408
    return code;
wmmhello's avatar
wmmhello 已提交
2409 2410 2411 2412 2413
  }

  return code;
}

X
Xiaoyu Wang 已提交
2414
static int32_t isSchemalessDb(STscObj *taos, SRequestObj *request) {
wmmhello's avatar
wmmhello 已提交
2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442
//  SCatalog *catalog = NULL;
//  int32_t   code = catalogGetHandle(((STscObj *)taos)->pAppInfo->clusterId, &catalog);
//  if (code != TSDB_CODE_SUCCESS) {
//    uError("SML get catalog error %d", code);
//    return code;
//  }
//
//  SName name;
//  tNameSetDbName(&name, taos->acctId, taos->db, strlen(taos->db));
//  char dbFname[TSDB_DB_FNAME_LEN] = {0};
//  tNameGetFullDbName(&name, dbFname);
//  SDbCfgInfo pInfo = {0};
//
//  SRequestConnInfo conn = {0};
//  conn.pTrans = taos->pAppInfo->pTransporter;
//  conn.requestId = request->requestId;
//  conn.requestObjRefId = request->self;
//  conn.mgmtEps = getEpSet_s(&taos->pAppInfo->mgmtEp);
//
//  code = catalogGetDBCfg(catalog, &conn, dbFname, &pInfo);
//  if (code != TSDB_CODE_SUCCESS) {
//    return code;
//  }
//  taosArrayDestroy(pInfo.pRetensions);
//
//  if (!pInfo.schemaless) {
//    return TSDB_CODE_SML_INVALID_DB_CONF;
//  }
wmmhello's avatar
wmmhello 已提交
2443 2444 2445
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
2446
static void smlInsertCallback(void *param, void *res, int32_t code) {
wmmhello's avatar
wmmhello 已提交
2447
  SRequestObj *pRequest = (SRequestObj *)res;
X
Xiaoyu Wang 已提交
2448
  SSmlHandle  *info = (SSmlHandle *)param;
wmmhello's avatar
wmmhello 已提交
2449
  int32_t rows = taos_affected_rows(pRequest);
wmmhello's avatar
wmmhello 已提交
2450

X
Xiaoyu Wang 已提交
2451
  uDebug("SML:0x%" PRIx64 " result. code:%d, msg:%s", info->id, pRequest->code, pRequest->msgBuf);
wmmhello's avatar
wmmhello 已提交
2452
  // lock
wmmhello's avatar
wmmhello 已提交
2453
  taosThreadSpinLock(&info->params->lock);
X
Xiaoyu Wang 已提交
2454
  if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2455
    info->params->request->code = code;
2456 2457 2458
    info->params->request->body.resInfo.numOfRows += rows;
  }else{
    info->params->request->body.resInfo.numOfRows += info->affectedRows;
wmmhello's avatar
wmmhello 已提交
2459
  }
wmmhello's avatar
wmmhello 已提交
2460
  taosThreadSpinUnlock(&info->params->lock);
wmmhello's avatar
wmmhello 已提交
2461 2462
  // unlock

wmmhello's avatar
wmmhello 已提交
2463
  uDebug("SML:0x%" PRIx64 " insert finished, code: %d, rows: %d, total: %d", info->id, code, rows, info->affectedRows);
wmmhello's avatar
wmmhello 已提交
2464
  Params *pParam = info->params;
X
Xiaoyu Wang 已提交
2465
  bool    isLast = info->isLast;
wmmhello's avatar
wmmhello 已提交
2466 2467 2468
  info->cost.endTime = taosGetTimestampUs();
  info->cost.code = code;
  smlPrintStatisticInfo(info);
wmmhello's avatar
wmmhello 已提交
2469 2470
  smlDestroyInfo(info);

X
Xiaoyu Wang 已提交
2471
  if (isLast) {
wmmhello's avatar
wmmhello 已提交
2472
    tsem_post(&pParam->sem);
wmmhello's avatar
wmmhello 已提交
2473 2474 2475
  }
}

wmmhello's avatar
wmmhello 已提交
2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496
/**
 * taos_schemaless_insert() parse and insert data points into database according to
 * different protocol.
 *
 * @param $lines input array may contain multiple lines, each line indicates a data point.
 *               If protocol=2 is used input array should contain single JSON
 *               string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
 *               multiple data points in JSON format, should include them in $JSON_string
 *               as a JSON array.
 * @param $numLines indicates how many data points in $lines.
 *                  If protocol = 2 is used this param will be ignored as $lines should
 *                  contain single JSON string.
 * @param $protocol indicates which protocol to use for parsing:
 *                  0 - influxDB line protocol
 *                  1 - OpenTSDB telnet line protocol
 *                  2 - OpenTSDB JSON format protocol
 * @return return zero for successful insertion. Otherwise return none-zero error code of
 *         failure reason.
 *
 */

wmmhello's avatar
wmmhello 已提交
2497
TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) {
D
dapan1121 已提交
2498
  if (NULL == taos) {
2499 2500 2501
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    return NULL;
  }
D
dapan1121 已提交
2502

2503
  SRequestObj* request = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
wmmhello's avatar
wmmhello 已提交
2504
  if(!request){
2505
    uError("SML:taos_schemaless_insert error request is null");
2506
    return NULL;
wmmhello's avatar
wmmhello 已提交
2507 2508
  }

wmmhello's avatar
wmmhello 已提交
2509
  int    batchs = 0;
2510 2511
  STscObj* pTscObj = request->pTscObj;

2512
  pTscObj->schemalessType = 1;
wmmhello's avatar
wmmhello 已提交
2513
  SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
wmmhello's avatar
wmmhello 已提交
2514

wmmhello's avatar
wmmhello 已提交
2515 2516
  Params params;
  params.request = request;
wmmhello's avatar
wmmhello 已提交
2517 2518 2519
  tsem_init(&params.sem, 0, 0);
  taosThreadSpinInit(&(params.lock), 0);

X
Xiaoyu Wang 已提交
2520
  if (request->pDb == NULL) {
wmmhello's avatar
wmmhello 已提交
2521
    request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
wmmhello's avatar
wmmhello 已提交
2522
    smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
wmmhello's avatar
wmmhello 已提交
2523 2524 2525
    goto end;
  }

2526
  if(isSchemalessDb(pTscObj, request) != TSDB_CODE_SUCCESS){
wmmhello's avatar
wmmhello 已提交
2527
    request->code = TSDB_CODE_SML_INVALID_DB_CONF;
wmmhello's avatar
wmmhello 已提交
2528
    smlBuildInvalidDataMsg(&msg, "Cannot write data to a non schemaless database", NULL);
wmmhello's avatar
wmmhello 已提交
2529 2530 2531
    goto end;
  }

2532
  if (!lines) {
2533
    request->code = TSDB_CODE_SML_INVALID_DATA;
wmmhello's avatar
wmmhello 已提交
2534
    smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
2535 2536 2537
    goto end;
  }

X
Xiaoyu Wang 已提交
2538
  if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) {
2539
    request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
wmmhello's avatar
wmmhello 已提交
2540
    smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL);
2541
    goto end;
wmmhello's avatar
wmmhello 已提交
2542 2543
  }

X
Xiaoyu Wang 已提交
2544 2545
  if (protocol == TSDB_SML_LINE_PROTOCOL &&
      (precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)) {
2546
    request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
wmmhello's avatar
wmmhello 已提交
2547
    smlBuildInvalidDataMsg(&msg, "precision invalidate for line protocol", NULL);
2548 2549 2550
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
2551 2552 2553 2554 2555 2556 2557 2558
  if(protocol == TSDB_SML_JSON_PROTOCOL){
    numLines = 1;
  }else if(numLines <= 0){
    request->code = TSDB_CODE_SML_INVALID_DATA;
    smlBuildInvalidDataMsg(&msg, "line num is invalid", NULL);
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
2559 2560
  batchs = ceil(((double)numLines) / LINE_BATCH);
  for (int i = 0; i < batchs; ++i) {
2561
    SRequestObj* req = (SRequestObj*)createRequest(pTscObj->id, TSDB_SQL_INSERT);
wmmhello's avatar
wmmhello 已提交
2562 2563 2564 2565 2566
    if(!req){
      request->code = TSDB_CODE_OUT_OF_MEMORY;
      uError("SML:taos_schemaless_insert error request is null");
      goto end;
    }
2567
    SSmlHandle* info = smlBuildSmlInfo(pTscObj, req, (SMLProtocolType)protocol, precision);
wmmhello's avatar
wmmhello 已提交
2568 2569 2570 2571 2572 2573
    if(!info){
      request->code = TSDB_CODE_OUT_OF_MEMORY;
      uError("SML:taos_schemaless_insert error SSmlHandle is null");
      goto end;
    }

wmmhello's avatar
wmmhello 已提交
2574 2575
    int32_t perBatch = LINE_BATCH;

X
Xiaoyu Wang 已提交
2576
    if (numLines > perBatch) {
wmmhello's avatar
wmmhello 已提交
2577 2578
      numLines -= perBatch;
      info->isLast = false;
X
Xiaoyu Wang 已提交
2579
    } else {
wmmhello's avatar
wmmhello 已提交
2580 2581 2582 2583 2584
      perBatch = numLines;
      numLines = 0;
      info->isLast = true;
    }

wmmhello's avatar
wmmhello 已提交
2585
    info->params = &params;
wmmhello's avatar
wmmhello 已提交
2586 2587
    info->affectedRows = perBatch;
    info->pRequest->body.queryFp = smlInsertCallback;
X
Xiaoyu Wang 已提交
2588
    info->pRequest->body.param = info;
wmmhello's avatar
wmmhello 已提交
2589
    int32_t code = smlProcess(info, lines, perBatch);
wmmhello's avatar
wmmhello 已提交
2590
    lines += perBatch;
X
Xiaoyu Wang 已提交
2591
    if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2592 2593 2594 2595
      info->pRequest->body.queryFp(info, req, code);
    }
  }
  tsem_wait(&params.sem);
2596 2597

end:
wmmhello's avatar
wmmhello 已提交
2598 2599
  taosThreadSpinDestroy(&params.lock);
  tsem_destroy(&params.sem);
2600
//  ((STscObj *)taos)->schemalessType = 0;
2601
  pTscObj->schemalessType = 1;
2602
  uDebug("resultend:%s", request->msgBuf);
wmmhello's avatar
wmmhello 已提交
2603
  return (TAOS_RES*)request;
wmmhello's avatar
wmmhello 已提交
2604
}