clientSml.c 82.5 KB
Newer Older
wmmhello's avatar
wmmhello 已提交
1 2 3 4 5
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

X
Xiaoyu Wang 已提交
6 7 8 9 10
#include "cJSON.h"
#include "catalog.h"
#include "clientInt.h"
#include "osSemaphore.h"
#include "osThread.h"
wmmhello's avatar
wmmhello 已提交
11 12
#include "query.h"
#include "taos.h"
wmmhello's avatar
wmmhello 已提交
13
#include "taoserror.h"
X
Xiaoyu Wang 已提交
14
#include "tcommon.h"
wmmhello's avatar
wmmhello 已提交
15
#include "tdef.h"
X
Xiaoyu Wang 已提交
16
#include "tglobal.h"
wmmhello's avatar
wmmhello 已提交
17 18
#include "tlog.h"
#include "tmsg.h"
X
Xiaoyu Wang 已提交
19
#include "tname.h"
wmmhello's avatar
wmmhello 已提交
20 21
#include "ttime.h"
#include "ttypes.h"
wmmhello's avatar
wmmhello 已提交
22

wmmhello's avatar
wmmhello 已提交
23
//=================================================================================================
wmmhello's avatar
wmmhello 已提交
24 25 26 27 28 29 30

#define SPACE ' '
#define COMMA ','
#define EQUAL '='
#define QUOTE '"'
#define SLASH '\\'

X
Xiaoyu Wang 已提交
31 32 33 34 35 36 37
#define JUMP_SPACE(sql)  \
  while (*sql != '\0') { \
    if (*sql == SPACE)   \
      sql++;             \
    else                 \
      break;             \
  }
wmmhello's avatar
wmmhello 已提交
38
// comma ,
X
Xiaoyu Wang 已提交
39 40
#define IS_SLASH_COMMA(sql) (*(sql) == COMMA && *((sql)-1) == SLASH)
#define IS_COMMA(sql)       (*(sql) == COMMA && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
41
// space
X
Xiaoyu Wang 已提交
42 43
#define IS_SLASH_SPACE(sql) (*(sql) == SPACE && *((sql)-1) == SLASH)
#define IS_SPACE(sql)       (*(sql) == SPACE && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
44
// equal =
X
Xiaoyu Wang 已提交
45 46
#define IS_SLASH_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) == SLASH)
#define IS_EQUAL(sql)       (*(sql) == EQUAL && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
47
// quote "
X
Xiaoyu Wang 已提交
48 49
#define IS_SLASH_QUOTE(sql) (*(sql) == QUOTE && *((sql)-1) == SLASH)
#define IS_QUOTE(sql)       (*(sql) == QUOTE && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
50
// SLASH
X
Xiaoyu Wang 已提交
51
#define IS_SLASH_SLASH(sql) (*(sql) == SLASH && *((sql)-1) == SLASH)
wmmhello's avatar
wmmhello 已提交
52

X
Xiaoyu Wang 已提交
53 54
#define IS_SLASH_LETTER(sql) \
  (IS_SLASH_COMMA(sql) || IS_SLASH_SPACE(sql) || IS_SLASH_EQUAL(sql) || IS_SLASH_QUOTE(sql) || IS_SLASH_SLASH(sql))
wmmhello's avatar
wmmhello 已提交
55

X
Xiaoyu Wang 已提交
56
#define MOVE_FORWARD_ONE(sql, len) (memmove((void *)((sql)-1), (sql), len))
wmmhello's avatar
wmmhello 已提交
57

X
Xiaoyu Wang 已提交
58 59 60 61 62 63 64 65
#define PROCESS_SLASH(key, keyLen)           \
  for (int i = 1; i < keyLen; ++i) {         \
    if (IS_SLASH_LETTER(key + i)) {          \
      MOVE_FORWARD_ONE(key + i, keyLen - i); \
      i--;                                   \
      keyLen--;                              \
    }                                        \
  }
wmmhello's avatar
wmmhello 已提交
66

67 68 69
#define IS_INVALID_COL_LEN(len)   ((len) <= 0 || (len) >= TSDB_COL_NAME_LEN)
#define IS_INVALID_TABLE_LEN(len) ((len) <= 0 || (len) >= TSDB_TABLE_NAME_LEN)

70 71 72
#define OTD_JSON_SUB_FIELDS_NUM 2
#define OTD_JSON_FIELDS_NUM     4

X
Xiaoyu Wang 已提交
73 74 75 76
#define TS        "_ts"
#define TS_LEN    3
#define VALUE     "_value"
#define VALUE_LEN 6
77

X
Xiaoyu Wang 已提交
78 79
#define BINARY_ADD_LEN 2  // "binary"   2 means " "
#define NCHAR_ADD_LEN  3  // L"nchar"   3 means L" "
wmmhello's avatar
wmmhello 已提交
80 81

#define MAX_RETRY_TIMES 5
X
Xiaoyu Wang 已提交
82
#define LINE_BATCH      20000
wmmhello's avatar
wmmhello 已提交
83 84 85 86
//=================================================================================================
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;

typedef enum {
87
  SCHEMA_ACTION_NULL,
wmmhello's avatar
wmmhello 已提交
88 89 90 91 92
  SCHEMA_ACTION_CREATE_STABLE,
  SCHEMA_ACTION_ADD_COLUMN,
  SCHEMA_ACTION_ADD_TAG,
  SCHEMA_ACTION_CHANGE_COLUMN_SIZE,
  SCHEMA_ACTION_CHANGE_TAG_SIZE,
wmmhello's avatar
wmmhello 已提交
93 94 95
} ESchemaAction;

typedef struct {
X
Xiaoyu Wang 已提交
96 97 98 99
  const char *measure;
  const char *tags;
  const char *cols;
  const char *timestamp;
wmmhello's avatar
wmmhello 已提交
100 101 102 103 104 105 106 107 108

  int32_t measureLen;
  int32_t measureTagsLen;
  int32_t tagsLen;
  int32_t colsLen;
  int32_t timestampLen;
} SSmlLineInfo;

typedef struct {
X
Xiaoyu Wang 已提交
109 110 111 112
  const char *sTableName;  // super table name
  int32_t     sTableNameLen;
  char        childTableName[TSDB_TABLE_NAME_LEN];
  uint64_t    uid;
wmmhello's avatar
wmmhello 已提交
113

X
Xiaoyu Wang 已提交
114
  SArray *tags;
wmmhello's avatar
wmmhello 已提交
115

116 117
  // if info->formatData is true, elements are SArray<SSmlKv*>.
  // if info->formatData is false, elements are SHashObj<cols key string, SSmlKv*> for find by key quickly
X
Xiaoyu Wang 已提交
118
  SArray *cols;
wmmhello's avatar
wmmhello 已提交
119 120 121
} SSmlTableInfo;

typedef struct {
X
Xiaoyu Wang 已提交
122 123
  SArray   *tags;     // save the origin order to create table
  SHashObj *tagHash;  // elements are <key, index in tags>
124

X
Xiaoyu Wang 已提交
125 126
  SArray   *cols;
  SHashObj *colHash;
127

wmmhello's avatar
wmmhello 已提交
128 129 130 131
  STableMeta *tableMeta;
} SSmlSTableMeta;

typedef struct {
X
Xiaoyu Wang 已提交
132 133
  int32_t len;
  char   *buf;
wmmhello's avatar
wmmhello 已提交
134 135
} SSmlMsgBuf;

136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
typedef struct {
  int32_t code;
  int32_t lineNum;

  int32_t numOfSTables;
  int32_t numOfCTables;
  int32_t numOfCreateSTables;

  int64_t parseTime;
  int64_t schemaTime;
  int64_t insertBindTime;
  int64_t insertRpcTime;
  int64_t endTime;
} SSmlCostInfo;

X
Xiaoyu Wang 已提交
151 152 153
typedef struct {
  SRequestObj     *request;
  tsem_t           sem;
wmmhello's avatar
wmmhello 已提交
154 155 156
  TdThreadSpinlock lock;
} Params;

wmmhello's avatar
wmmhello 已提交
157
typedef struct {
X
Xiaoyu Wang 已提交
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
  int64_t id;
  Params *params;
  bool    isLast;

  SMLProtocolType protocol;
  int8_t          precision;
  bool dataFormat;  // true means that the name and order of keys in each line are the same(only for influx protocol)

  SHashObj *childTables;
  SHashObj *superTables;
  SHashObj *pVgHash;
  void     *exec;

  STscObj     *taos;
  SCatalog    *pCatalog;
  SRequestObj *pRequest;
  SQuery      *pQuery;

  SSmlCostInfo cost;
  int32_t      affectedRows;
  SSmlMsgBuf   msgBuf;
  SHashObj    *dumplicateKey;  // for dumplicate key
  SArray      *colsContainer;  // for cols parse, if dataFormat == false
wmmhello's avatar
wmmhello 已提交
181
} SSmlHandle;
wmmhello's avatar
wmmhello 已提交
182 183
//=================================================================================================

wmmhello's avatar
wmmhello 已提交
184
//=================================================================================================
185
static volatile int64_t linesSmlHandleId = 0;
X
Xiaoyu Wang 已提交
186 187
static int64_t          smlGenId() {
           int64_t id;
wmmhello's avatar
wmmhello 已提交
188

X
Xiaoyu Wang 已提交
189 190
           do {
             id = atomic_add_fetch_64(&linesSmlHandleId, 1);
wmmhello's avatar
wmmhello 已提交
191 192
  } while (id == 0);

X
Xiaoyu Wang 已提交
193
           return id;
wmmhello's avatar
wmmhello 已提交
194 195
}

196
static inline bool smlDoubleToInt64OverFlow(double num) {
X
Xiaoyu Wang 已提交
197
  if (num >= (double)INT64_MAX || num <= (double)INT64_MIN) return true;
198 199 200 201 202 203 204 205 206 207 208 209
  return false;
}

static inline bool smlCheckDuplicateKey(const char *key, int32_t keyLen, SHashObj *pHash) {
  void *val = taosHashGet(pHash, key, keyLen);
  if (val) {
    return true;
  }
  taosHashPut(pHash, key, keyLen, key, 1);
  return false;
}

X
Xiaoyu Wang 已提交
210
static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf *pBuf, const char *msg1, const char *msg2) {
211 212 213 214 215 216 217 218
  if(pBuf->buf){
    memset(pBuf->buf, 0, pBuf->len);
    if (msg1) strncat(pBuf->buf, msg1, pBuf->len);
    int32_t left = pBuf->len - strlen(pBuf->buf);
    if (left > 2 && msg2) {
      strncat(pBuf->buf, ":", left - 1);
      strncat(pBuf->buf, msg2, left - 2);
    }
wmmhello's avatar
wmmhello 已提交
219
  }
wmmhello's avatar
wmmhello 已提交
220 221 222
  return TSDB_CODE_SML_INVALID_DATA;
}

X
Xiaoyu Wang 已提交
223
static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSmlKv *kv, bool isTag,
224
                                       ESchemaAction *action, SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
225
  uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL;
226 227
  if (index) {
    if (colField[*index].type != kv->type) {
X
Xiaoyu Wang 已提交
228 229
      uError("SML:0x%" PRIx64 " point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id,
             kv->key, colField[*index].type, kv->type);
230 231 232
      return TSDB_CODE_TSC_INVALID_VALUE;
    }

X
Xiaoyu Wang 已提交
233 234 235 236
    if ((colField[*index].type == TSDB_DATA_TYPE_VARCHAR &&
         (colField[*index].bytes - VARSTR_HEADER_SIZE) < kv->length) ||
        (colField[*index].type == TSDB_DATA_TYPE_NCHAR &&
         ((colField[*index].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE < kv->length))) {
237
      if (isTag) {
wmmhello's avatar
wmmhello 已提交
238
        *action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
239
      } else {
wmmhello's avatar
wmmhello 已提交
240
        *action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
241 242 243 244
      }
    }
  } else {
    if (isTag) {
wmmhello's avatar
wmmhello 已提交
245
      *action = SCHEMA_ACTION_ADD_TAG;
246
    } else {
wmmhello's avatar
wmmhello 已提交
247
      *action = SCHEMA_ACTION_ADD_COLUMN;
248 249
    }
  }
wmmhello's avatar
wmmhello 已提交
250 251 252
  return 0;
}

wmmhello's avatar
wmmhello 已提交
253
static int32_t smlFindNearestPowerOf2(int32_t length, uint8_t type) {
wmmhello's avatar
wmmhello 已提交
254
  int32_t result = 1;
X
Xiaoyu Wang 已提交
255
  while (result <= length) {
wmmhello's avatar
wmmhello 已提交
256 257
    result *= 2;
  }
wmmhello's avatar
wmmhello 已提交
258 259 260 261 262
  if (type == TSDB_DATA_TYPE_BINARY && result > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE){
    result = TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE;
  } else if (type == TSDB_DATA_TYPE_NCHAR && result > (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){
    result = (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
  }
wmmhello's avatar
wmmhello 已提交
263

264 265 266 267
  if (type == TSDB_DATA_TYPE_NCHAR){
    result = result * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
  }else if (type == TSDB_DATA_TYPE_BINARY){
    result = result + VARSTR_HEADER_SIZE;
wmmhello's avatar
wmmhello 已提交
268
  }
269
  return result;
wmmhello's avatar
wmmhello 已提交
270 271
}

X
Xiaoyu Wang 已提交
272
static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
273
                                      ESchemaAction *action, bool isTag) {
274 275
  int32_t code = TSDB_CODE_SUCCESS;
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
276
    if(j == 0 && !isTag) continue;
X
Xiaoyu Wang 已提交
277
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, j);
278
    code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, info);
X
Xiaoyu Wang 已提交
279
    if (code != TSDB_CODE_SUCCESS) {
280 281 282 283 284 285
      return code;
    }
  }
  return TSDB_CODE_SUCCESS;
}

286
static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool isTag) {
287
  SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
288 289
  int32_t i = 0;
  for ( ;i < length; i++) {
290 291 292
    taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES);
  }

293 294 295 296 297 298
  if (isTag){
    i = 0;
  } else {
    i = 1;
  }
  for (; i < taosArrayGetSize(cols); i++) {
X
Xiaoyu Wang 已提交
299 300
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
    if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) {
301 302 303
      return -1;
    }
  }
304
  taosHashCleanup(hashTmp);
305 306 307
  return 0;
}

308 309 310 311 312 313 314 315
static int32_t getBytes(uint8_t type, int32_t length){
  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
    return smlFindNearestPowerOf2(length, type);
  } else {
    return tDataTypes[type].bytes;
  }
}

wmmhello's avatar
wmmhello 已提交
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols, SArray* results, int32_t numOfCols, bool isTag) {
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, j);
    ESchemaAction action = SCHEMA_ACTION_NULL;
    smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, &action, info);
    if(action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_ADD_TAG){
      SField field = {0};
      field.type = kv->type;
      field.bytes = getBytes(kv->type, kv->length);
      memcpy(field.name, kv->key, kv->keyLen);
      taosArrayPush(results, &field);
    }else if(action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE){
      uint16_t *index = (uint16_t *)taosHashGet(schemaHash, kv->key, kv->keyLen);
      uint16_t newIndex = *index;
      if(isTag) newIndex -= numOfCols;
      SField *field = (SField *)taosArrayGet(results, newIndex);
      field->bytes = getBytes(kv->type, kv->length);
    }
  }
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
338 339
//static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SSmlSTableMeta *sTableData,
//                              int32_t colVer, int32_t tagVer, int8_t source, uint64_t suid){
wmmhello's avatar
wmmhello 已提交
340
static int32_t  smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray* pColumns, SArray* pTags,
wmmhello's avatar
wmmhello 已提交
341 342
                               STableMeta *pTableMeta, ESchemaAction action){

343 344 345 346 347
  SRequestObj*   pRequest = NULL;
  SMCreateStbReq pReq = {0};
  int32_t        code = TSDB_CODE_SUCCESS;
  SCmdMsgInfo    pCmdMsg = {0};

wmmhello's avatar
wmmhello 已提交
348 349 350 351 352 353
  // put front for free
  pReq.numOfColumns = taosArrayGetSize(pColumns);
  pReq.pColumns = pColumns;
  pReq.numOfTags = taosArrayGetSize(pTags);
  pReq.pTags = pTags;

354 355 356 357 358
  code = buildRequest(info->taos->id, "", 0, NULL, false, &pRequest);
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
359
  pRequest->syncQuery = true;
360 361 362 363 364
  if (!pRequest->pDb) {
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
365
  if (action == SCHEMA_ACTION_CREATE_STABLE){
wmmhello's avatar
wmmhello 已提交
366 367 368 369
    pReq.colVer = 1;
    pReq.tagVer = 1;
    pReq.suid = 0;
    pReq.source = TD_REQ_FROM_APP;
wmmhello's avatar
wmmhello 已提交
370
  } else if (action == SCHEMA_ACTION_ADD_TAG || action == SCHEMA_ACTION_CHANGE_TAG_SIZE){
wmmhello's avatar
wmmhello 已提交
371 372 373 374
    pReq.colVer = pTableMeta->sversion;
    pReq.tagVer = pTableMeta->tversion + 1;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
wmmhello's avatar
wmmhello 已提交
375
  } else if (action == SCHEMA_ACTION_ADD_COLUMN  || action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE){
wmmhello's avatar
wmmhello 已提交
376 377 378 379 380 381
    pReq.colVer = pTableMeta->sversion + 1;
    pReq.tagVer = pTableMeta->tversion;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
  }

wmmhello's avatar
wmmhello 已提交
382 383 384 385 386 387 388 389 390
  if (pReq.numOfTags == 0){
    pReq.numOfTags = 1;
    SField field = {0};
    field.type = TSDB_DATA_TYPE_NCHAR;
    field.bytes = 1;
    strcpy(field.name, tsSmlTagName);
    taosArrayPush(pReq.pTags, &field);
  }

391 392 393 394 395 396 397 398 399 400 401 402 403 404
  pReq.commentLen = -1;
  pReq.igExists = true;
  tNameExtractFullName(pName, pReq.name);

  pCmdMsg.epSet = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

D
dapan1121 已提交
405 406
  SQuery pQuery;
  memset(&pQuery, 0, sizeof(pQuery));
407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);

  if(pRequest->code == TSDB_CODE_SUCCESS){
    catalogRemoveTableMeta(info->pCatalog, pName);
  }
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

end:
  destroyRequest(pRequest);
  tFreeSMCreateStbReq(&pReq);
  return code;
}

X
Xiaoyu Wang 已提交
426
static int32_t smlModifyDBSchemas(SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
427 428 429 430
  int32_t         code = 0;
  SHashObj       *hashTmp = NULL;
  STableMeta     *pTableMeta = NULL;

X
Xiaoyu Wang 已提交
431
  SName   pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
432
  strcpy(pName.dbname, info->pRequest->pDb);
wmmhello's avatar
wmmhello 已提交
433

D
dapan1121 已提交
434 435 436 437 438
  SRequestConnInfo conn = {0};
  conn.pTrans = info->taos->pAppInfo->pTransporter;
  conn.requestId = info->pRequest->requestId;
  conn.requestObjRefId = info->pRequest->self;
  conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
X
Xiaoyu Wang 已提交
439 440

  SSmlSTableMeta **tableMetaSml = (SSmlSTableMeta **)taosHashIterate(info->superTables, NULL);
wmmhello's avatar
wmmhello 已提交
441
  while (tableMetaSml) {
X
Xiaoyu Wang 已提交
442 443
    SSmlSTableMeta *sTableData = *tableMetaSml;
    bool            needCheckMeta = false;  // for multi thread
wmmhello's avatar
wmmhello 已提交
444

wmmhello's avatar
wmmhello 已提交
445
    size_t superTableLen = 0;
X
Xiaoyu Wang 已提交
446
    void  *superTable = taosHashGetKey(tableMetaSml, &superTableLen);
447
    memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
wmmhello's avatar
wmmhello 已提交
448
    memcpy(pName.tname, superTable, superTableLen);
wmmhello's avatar
wmmhello 已提交
449

D
dapan1121 已提交
450
    code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
wmmhello's avatar
wmmhello 已提交
451

452
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
wmmhello's avatar
wmmhello 已提交
453 454 455 456 457 458
      SArray* pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
      SArray* pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
      smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
      smlBuildFieldsList(info, NULL, NULL, sTableData->cols, pColumns, 0, false);

      code = smlSendMetaMsg(info, &pName, pColumns, pTags, NULL, SCHEMA_ACTION_CREATE_STABLE);
459
      if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
460
        uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
461
        goto end;
wmmhello's avatar
wmmhello 已提交
462
      }
463
      info->cost.numOfCreateSTables++;
X
Xiaoyu Wang 已提交
464
    } else if (code == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
465
      hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags,
X
Xiaoyu Wang 已提交
466 467 468
                                       taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
      for (uint16_t i = pTableMeta->tableInfo.numOfColumns;
           i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
469 470
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
      }
wmmhello's avatar
wmmhello 已提交
471

472 473
      ESchemaAction action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, &action, true);
474
      if (code != TSDB_CODE_SUCCESS) {
475
        goto end;
476
      }
wmmhello's avatar
wmmhello 已提交
477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494
      if (action != SCHEMA_ACTION_NULL){
        SArray* pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray* pTags = taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));

        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
          if(i < pTableMeta->tableInfo.numOfColumns){
            taosArrayPush(pColumns, &field);
          }else{
            taosArrayPush(pTags, &field);
          }
        }
        smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->tags, pTags, pTableMeta->tableInfo.numOfColumns, true);

        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
495
        if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
496
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
497 498 499 500
          goto end;
        }
      }

wmmhello's avatar
wmmhello 已提交
501
      taosMemoryFreeClear(pTableMeta);
502 503 504 505
      code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
      if (code != TSDB_CODE_SUCCESS) {
        goto end;
      }
wmmhello's avatar
wmmhello 已提交
506 507 508 509
      code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
      if (code != TSDB_CODE_SUCCESS) {
        goto end;
      }
510 511

      taosHashClear(hashTmp);
wmmhello's avatar
wmmhello 已提交
512
      for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
513 514
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
      }
515 516
      action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &action, false);
517
      if (code != TSDB_CODE_SUCCESS) {
518
        goto end;
wmmhello's avatar
wmmhello 已提交
519
      }
wmmhello's avatar
wmmhello 已提交
520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538
      if (action != SCHEMA_ACTION_NULL){
        SArray* pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray* pTags = taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));

        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
          if(i < pTableMeta->tableInfo.numOfColumns){
            taosArrayPush(pColumns, &field);
          }else{
            taosArrayPush(pTags, &field);
          }
        }

        smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->cols, pColumns, pTableMeta->tableInfo.numOfColumns, false);

        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
539 540 541 542 543
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, superTable);
          goto end;
        }
      }
wmmhello's avatar
wmmhello 已提交
544

D
dapan1121 已提交
545
      code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
wmmhello's avatar
wmmhello 已提交
546
      if (code != TSDB_CODE_SUCCESS) {
547
        goto end;
wmmhello's avatar
wmmhello 已提交
548
      }
549
      needCheckMeta = true;
wmmhello's avatar
wmmhello 已提交
550
    } else {
X
Xiaoyu Wang 已提交
551
      uError("SML:0x%" PRIx64 " load table meta error: %s", info->id, tstrerror(code));
552
      goto end;
wmmhello's avatar
wmmhello 已提交
553
    }
wmmhello's avatar
wmmhello 已提交
554
    taosMemoryFreeClear(pTableMeta);
555

D
dapan1121 已提交
556
    code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
557
    if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
558
      uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, (char *)superTable);
559
      goto end;
560
    }
561

X
Xiaoyu Wang 已提交
562 563
    if (needCheckMeta) {
      code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags,
564
                          sTableData->tags, true);
565
      if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
566
        uError("SML:0x%" PRIx64 " check tag failed. super table name %s", info->id, (char *)superTable);
567 568
        goto end;
      }
569
      code = smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols, false);
570
      if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
571
        uError("SML:0x%" PRIx64 " check cols failed. super table name %s", info->id, (char *)superTable);
572 573 574 575
        goto end;
      }
    }

576
    sTableData->tableMeta = pTableMeta;
wmmhello's avatar
wmmhello 已提交
577

X
Xiaoyu Wang 已提交
578
    tableMetaSml = (SSmlSTableMeta **)taosHashIterate(info->superTables, tableMetaSml);
wmmhello's avatar
wmmhello 已提交
579
    taosHashCleanup(hashTmp);
wmmhello's avatar
wmmhello 已提交
580 581
  }
  return 0;
582 583

end:
wmmhello's avatar
wmmhello 已提交
584 585
  taosHashCleanup(hashTmp);
  taosMemoryFreeClear(pTableMeta);
D
dapan1121 已提交
586
  catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
587
  return code;
wmmhello's avatar
wmmhello 已提交
588 589
}

X
Xiaoyu Wang 已提交
590
static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
591
  const char *pVal = kvVal->value;
X
Xiaoyu Wang 已提交
592 593 594 595
  int32_t     len = kvVal->length;
  char       *endptr = NULL;
  double      result = taosStr2Double(pVal, &endptr);
  if (pVal == endptr) {
596
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
597 598 599
    return false;
  }

600
  int32_t left = len - (endptr - pVal);
X
Xiaoyu Wang 已提交
601
  if (left == 0 || (left == 3 && strncasecmp(endptr, "f64", left) == 0)) {
602 603
    kvVal->type = TSDB_DATA_TYPE_DOUBLE;
    kvVal->d = result;
X
Xiaoyu Wang 已提交
604 605
  } else if ((left == 3 && strncasecmp(endptr, "f32", left) == 0)) {
    if (!IS_VALID_FLOAT(result)) {
606 607
      smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
608
    }
609 610
    kvVal->type = TSDB_DATA_TYPE_FLOAT;
    kvVal->f = (float)result;
X
Xiaoyu Wang 已提交
611 612
  } else if ((left == 1 && *endptr == 'i') || (left == 3 && strncasecmp(endptr, "i64", left) == 0)) {
    if (smlDoubleToInt64OverFlow(result)) {
wmmhello's avatar
wmmhello 已提交
613 614
      errno = 0;
      int64_t tmp = taosStr2Int64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
615
      if (errno == ERANGE) {
wmmhello's avatar
wmmhello 已提交
616 617 618 619 620 621
        smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_BIGINT;
      kvVal->i = tmp;
      return true;
wmmhello's avatar
wmmhello 已提交
622
    }
623
    kvVal->type = TSDB_DATA_TYPE_BIGINT;
wmmhello's avatar
wmmhello 已提交
624
    kvVal->i = (int64_t)result;
wmmhello's avatar
wmmhello 已提交
625
  } else if ((left == 1 && *endptr == 'u') || (left == 3 && strncasecmp(endptr, "u64", left) == 0)) {
X
Xiaoyu Wang 已提交
626
    if (result >= (double)UINT64_MAX || result < 0) {
wmmhello's avatar
wmmhello 已提交
627 628
      errno = 0;
      uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
629
      if (errno == ERANGE || result < 0) {
wmmhello's avatar
wmmhello 已提交
630 631 632 633 634 635
        smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_UBIGINT;
      kvVal->u = tmp;
      return true;
636
    }
637
    kvVal->type = TSDB_DATA_TYPE_UBIGINT;
wmmhello's avatar
wmmhello 已提交
638
    kvVal->u = result;
X
Xiaoyu Wang 已提交
639 640
  } else if (left == 3 && strncasecmp(endptr, "i32", left) == 0) {
    if (!IS_VALID_INT(result)) {
641 642
      smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
643
    }
644 645
    kvVal->type = TSDB_DATA_TYPE_INT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
646 647
  } else if (left == 3 && strncasecmp(endptr, "u32", left) == 0) {
    if (!IS_VALID_UINT(result)) {
648 649
      smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
650
    }
651 652
    kvVal->type = TSDB_DATA_TYPE_UINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
653 654
  } else if (left == 3 && strncasecmp(endptr, "i16", left) == 0) {
    if (!IS_VALID_SMALLINT(result)) {
655 656
      smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
657
    }
658 659
    kvVal->type = TSDB_DATA_TYPE_SMALLINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
660 661
  } else if (left == 3 && strncasecmp(endptr, "u16", left) == 0) {
    if (!IS_VALID_USMALLINT(result)) {
662 663
      smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
664
    }
665 666
    kvVal->type = TSDB_DATA_TYPE_USMALLINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
667 668
  } else if (left == 2 && strncasecmp(endptr, "i8", left) == 0) {
    if (!IS_VALID_TINYINT(result)) {
669 670
      smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
671
    }
672 673
    kvVal->type = TSDB_DATA_TYPE_TINYINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
674 675
  } else if (left == 2 && strncasecmp(endptr, "u8", left) == 0) {
    if (!IS_VALID_UTINYINT(result)) {
676 677
      smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
678
    }
679 680
    kvVal->type = TSDB_DATA_TYPE_UTINYINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
681
  } else {
682
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
683 684
    return false;
  }
685
  return true;
wmmhello's avatar
wmmhello 已提交
686 687
}

wmmhello's avatar
wmmhello 已提交
688
static bool smlParseBool(SSmlKv *kvVal) {
wmmhello's avatar
wmmhello 已提交
689
  const char *pVal = kvVal->value;
X
Xiaoyu Wang 已提交
690
  int32_t     len = kvVal->length;
691
  if ((len == 1) && (pVal[0] == 't' || pVal[0] == 'T')) {
wmmhello's avatar
wmmhello 已提交
692
    kvVal->i = true;
wmmhello's avatar
wmmhello 已提交
693 694 695
    return true;
  }

696
  if ((len == 1) && (pVal[0] == 'f' || pVal[0] == 'F')) {
wmmhello's avatar
wmmhello 已提交
697
    kvVal->i = false;
wmmhello's avatar
wmmhello 已提交
698 699 700
    return true;
  }

X
Xiaoyu Wang 已提交
701
  if ((len == 4) && !strncasecmp(pVal, "true", len)) {
wmmhello's avatar
wmmhello 已提交
702
    kvVal->i = true;
wmmhello's avatar
wmmhello 已提交
703 704
    return true;
  }
X
Xiaoyu Wang 已提交
705
  if ((len == 5) && !strncasecmp(pVal, "false", len)) {
wmmhello's avatar
wmmhello 已提交
706
    kvVal->i = false;
wmmhello's avatar
wmmhello 已提交
707 708 709 710 711
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
712
static bool smlIsBinary(const char *pVal, uint16_t len) {
X
Xiaoyu Wang 已提交
713
  // binary: "abc"
wmmhello's avatar
wmmhello 已提交
714 715 716 717 718 719 720 721 722
  if (len < 2) {
    return false;
  }
  if (pVal[0] == '"' && pVal[len - 1] == '"') {
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
723
static bool smlIsNchar(const char *pVal, uint16_t len) {
X
Xiaoyu Wang 已提交
724
  // nchar: L"abc"
wmmhello's avatar
wmmhello 已提交
725 726 727
  if (len < 3) {
    return false;
  }
X
Xiaoyu Wang 已提交
728
  if ((pVal[0] == 'l' || pVal[0] == 'L') && pVal[1] == '"' && pVal[len - 1] == '"') {
wmmhello's avatar
wmmhello 已提交
729 730 731 732 733
    return true;
  }
  return false;
}

734
static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) {
X
Xiaoyu Wang 已提交
735
  char   *endPtr = NULL;
wafwerar's avatar
wafwerar 已提交
736
  int64_t tsInt64 = taosStr2Int64(value, &endPtr, 10);
X
Xiaoyu Wang 已提交
737
  if (value + len != endPtr) {
738
    return -1;
wmmhello's avatar
wmmhello 已提交
739
  }
740
  double ts = tsInt64;
741 742
  switch (type) {
    case TSDB_TIME_PRECISION_HOURS:
wmmhello's avatar
wmmhello 已提交
743 744
      ts *= NANOSECOND_PER_HOUR;
      tsInt64 *= NANOSECOND_PER_HOUR;
745 746
      break;
    case TSDB_TIME_PRECISION_MINUTES:
wmmhello's avatar
wmmhello 已提交
747 748
      ts *= NANOSECOND_PER_MINUTE;
      tsInt64 *= NANOSECOND_PER_MINUTE;
749 750
      break;
    case TSDB_TIME_PRECISION_SECONDS:
wmmhello's avatar
wmmhello 已提交
751 752
      ts *= NANOSECOND_PER_SEC;
      tsInt64 *= NANOSECOND_PER_SEC;
753 754
      break;
    case TSDB_TIME_PRECISION_MILLI:
wmmhello's avatar
wmmhello 已提交
755 756
      ts *= NANOSECOND_PER_MSEC;
      tsInt64 *= NANOSECOND_PER_MSEC;
757 758
      break;
    case TSDB_TIME_PRECISION_MICRO:
wmmhello's avatar
wmmhello 已提交
759 760
      ts *= NANOSECOND_PER_USEC;
      tsInt64 *= NANOSECOND_PER_USEC;
761 762 763 764 765
      break;
    case TSDB_TIME_PRECISION_NANO:
      break;
    default:
      ASSERT(0);
wmmhello's avatar
wmmhello 已提交
766
  }
X
Xiaoyu Wang 已提交
767
  if (ts >= (double)INT64_MAX || ts < 0) {
768
    return -1;
wmmhello's avatar
wmmhello 已提交
769 770
  }

771
  return tsInt64;
772 773 774 775 776 777
}

static int8_t smlGetTsTypeByLen(int32_t len) {
  if (len == TSDB_TIME_PRECISION_SEC_DIGITS) {
    return TSDB_TIME_PRECISION_SECONDS;
  } else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) {
778
    return TSDB_TIME_PRECISION_MILLI;
779 780
  } else {
    return -1;
wmmhello's avatar
wmmhello 已提交
781
  }
782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800
}

static int8_t smlGetTsTypeByPrecision(int8_t precision) {
  switch (precision) {
    case TSDB_SML_TIMESTAMP_HOURS:
      return TSDB_TIME_PRECISION_HOURS;
    case TSDB_SML_TIMESTAMP_MILLI_SECONDS:
      return TSDB_TIME_PRECISION_MILLI;
    case TSDB_SML_TIMESTAMP_NANO_SECONDS:
    case TSDB_SML_TIMESTAMP_NOT_CONFIGURED:
      return TSDB_TIME_PRECISION_NANO;
    case TSDB_SML_TIMESTAMP_MICRO_SECONDS:
      return TSDB_TIME_PRECISION_MICRO;
    case TSDB_SML_TIMESTAMP_SECONDS:
      return TSDB_TIME_PRECISION_SECONDS;
    case TSDB_SML_TIMESTAMP_MINUTES:
      return TSDB_TIME_PRECISION_MINUTES;
    default:
      return -1;
wmmhello's avatar
wmmhello 已提交
801
  }
802 803
}

X
Xiaoyu Wang 已提交
804 805
static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t len) {
  if (len == 0 || (len == 1 && data[0] == '0')) {
806
    return taosGetTimestampNs();
wmmhello's avatar
wmmhello 已提交
807 808
  }

809 810 811 812
  int8_t tsType = smlGetTsTypeByPrecision(info->precision);
  if (tsType == -1) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp precision", NULL);
    return -1;
wmmhello's avatar
wmmhello 已提交
813
  }
814 815

  int64_t ts = smlGetTimeValue(data, len, tsType);
X
Xiaoyu Wang 已提交
816
  if (ts == -1) {
817 818
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
    return -1;
wmmhello's avatar
wmmhello 已提交
819
  }
820 821 822
  return ts;
}

X
Xiaoyu Wang 已提交
823 824
static int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t len) {
  if (!data) {
825 826
    smlBuildInvalidDataMsg(&info->msgBuf, "timestamp can not be null", NULL);
    return -1;
wmmhello's avatar
wmmhello 已提交
827
  }
X
Xiaoyu Wang 已提交
828
  if (len == 1 && data[0] == '0') {
829 830
    return taosGetTimestampNs();
  }
831 832
  int8_t tsType = smlGetTsTypeByLen(len);
  if (tsType == -1) {
X
Xiaoyu Wang 已提交
833 834
    smlBuildInvalidDataMsg(&info->msgBuf,
                           "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", data);
835 836 837
    return -1;
  }
  int64_t ts = smlGetTimeValue(data, len, tsType);
X
Xiaoyu Wang 已提交
838
  if (ts == -1) {
839 840 841 842 843 844
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
    return -1;
  }
  return ts;
}

X
Xiaoyu Wang 已提交
845
static int32_t smlParseTS(SSmlHandle *info, const char *data, int32_t len, SArray *cols) {
846
  int64_t ts = 0;
X
Xiaoyu Wang 已提交
847
  if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
848
//    uError("SML:data:%s,len:%d", data, len);
849
    ts = smlParseInfluxTime(info, data, len);
X
Xiaoyu Wang 已提交
850
  } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
851
    ts = smlParseOpenTsdbTime(info, data, len);
X
Xiaoyu Wang 已提交
852
  } else {
853
    ASSERT(0);
854
  }
855

wmmhello's avatar
wmmhello 已提交
856
  if (ts == -1) return TSDB_CODE_INVALID_TIMESTAMP;
857 858 859

  // add ts to
  SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
860
  if (!kv) {
861 862 863 864 865 866 867 868
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  kv->key = TS;
  kv->keyLen = TS_LEN;
  kv->i = ts;
  kv->type = TSDB_DATA_TYPE_TIMESTAMP;
  kv->length = (int16_t)tDataTypes[kv->type].bytes;
X
Xiaoyu Wang 已提交
869
  if (cols) taosArrayPush(cols, &kv);
870 871 872
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
873
static int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
X
Xiaoyu Wang 已提交
874
  // binary
wmmhello's avatar
wmmhello 已提交
875
  if (smlIsBinary(pVal->value, pVal->length)) {
876
    pVal->type = TSDB_DATA_TYPE_BINARY;
wmmhello's avatar
wmmhello 已提交
877
    pVal->length -= BINARY_ADD_LEN;
wmmhello's avatar
wmmhello 已提交
878 879 880
    if (pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE){
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }
881
    pVal->value += (BINARY_ADD_LEN - 1);
wmmhello's avatar
wmmhello 已提交
882
    return TSDB_CODE_SUCCESS;
883
  }
X
Xiaoyu Wang 已提交
884
  // nchar
wmmhello's avatar
wmmhello 已提交
885
  if (smlIsNchar(pVal->value, pVal->length)) {
886
    pVal->type = TSDB_DATA_TYPE_NCHAR;
wmmhello's avatar
wmmhello 已提交
887
    pVal->length -= NCHAR_ADD_LEN;
wmmhello's avatar
wmmhello 已提交
888 889 890
    if(pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }
891
    pVal->value += (NCHAR_ADD_LEN - 1);
wmmhello's avatar
wmmhello 已提交
892
    return TSDB_CODE_SUCCESS;
893 894
  }

X
Xiaoyu Wang 已提交
895
  // bool
896 897 898
  if (smlParseBool(pVal)) {
    pVal->type = TSDB_DATA_TYPE_BOOL;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
wmmhello's avatar
wmmhello 已提交
899
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
900
  }
X
Xiaoyu Wang 已提交
901
  // number
902
  if (smlParseNumber(pVal, msg)) {
wmmhello's avatar
wmmhello 已提交
903
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
wmmhello's avatar
wmmhello 已提交
904
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
905 906
  }

wmmhello's avatar
wmmhello 已提交
907
  return TSDB_CODE_TSC_INVALID_VALUE;
wmmhello's avatar
wmmhello 已提交
908 909
}

X
Xiaoyu Wang 已提交
910 911
static int32_t smlParseInfluxString(const char *sql, SSmlLineInfo *elements, SSmlMsgBuf *msg) {
  if (!sql) return TSDB_CODE_SML_INVALID_DATA;
wmmhello's avatar
wmmhello 已提交
912
  JUMP_SPACE(sql)
X
Xiaoyu Wang 已提交
913
  if (*sql == COMMA) return TSDB_CODE_SML_INVALID_DATA;
wmmhello's avatar
wmmhello 已提交
914
  elements->measure = sql;
wmmhello's avatar
wmmhello 已提交
915

wmmhello's avatar
wmmhello 已提交
916
  // parse measure
wmmhello's avatar
wmmhello 已提交
917
  while (*sql != '\0') {
X
Xiaoyu Wang 已提交
918 919
    if ((sql != elements->measure) && IS_SLASH_LETTER(sql)) {
      MOVE_FORWARD_ONE(sql, strlen(sql) + 1);
wmmhello's avatar
wmmhello 已提交
920 921
      continue;
    }
X
Xiaoyu Wang 已提交
922
    if (IS_COMMA(sql)) {
wmmhello's avatar
wmmhello 已提交
923 924 925
      break;
    }

X
Xiaoyu Wang 已提交
926
    if (IS_SPACE(sql)) {
wmmhello's avatar
wmmhello 已提交
927 928
      break;
    }
wmmhello's avatar
wmmhello 已提交
929 930
    sql++;
  }
wmmhello's avatar
wmmhello 已提交
931
  elements->measureLen = sql - elements->measure;
X
Xiaoyu Wang 已提交
932
  if (IS_INVALID_TABLE_LEN(elements->measureLen)) {
933
    smlBuildInvalidDataMsg(msg, "measure is empty or too large than 192", NULL);
wmmhello's avatar
wmmhello 已提交
934
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
wmmhello's avatar
wmmhello 已提交
935
  }
wmmhello's avatar
wmmhello 已提交
936

wmmhello's avatar
wmmhello 已提交
937
  // parse tag
X
Xiaoyu Wang 已提交
938
  if (*sql == SPACE) {
wmmhello's avatar
wmmhello 已提交
939
    elements->tagsLen = 0;
X
Xiaoyu Wang 已提交
940 941
  } else {
    if (*sql == COMMA) sql++;
wmmhello's avatar
wmmhello 已提交
942 943
    elements->tags = sql;
    while (*sql != '\0') {
X
Xiaoyu Wang 已提交
944
      if (IS_SPACE(sql)) {
wmmhello's avatar
wmmhello 已提交
945 946 947
        break;
      }
      sql++;
wmmhello's avatar
wmmhello 已提交
948
    }
wmmhello's avatar
wmmhello 已提交
949
    elements->tagsLen = sql - elements->tags;
950
  }
wmmhello's avatar
wmmhello 已提交
951
  elements->measureTagsLen = sql - elements->measure;
wmmhello's avatar
wmmhello 已提交
952

wmmhello's avatar
wmmhello 已提交
953 954 955
  // parse cols
  JUMP_SPACE(sql)
  elements->cols = sql;
wmmhello's avatar
wmmhello 已提交
956
  bool isInQuote = false;
wmmhello's avatar
wmmhello 已提交
957
  while (*sql != '\0') {
X
Xiaoyu Wang 已提交
958
    if (IS_QUOTE(sql)) {
wmmhello's avatar
wmmhello 已提交
959 960
      isInQuote = !isInQuote;
    }
X
Xiaoyu Wang 已提交
961
    if (!isInQuote && IS_SPACE(sql)) {
wmmhello's avatar
wmmhello 已提交
962 963 964 965
      break;
    }
    sql++;
  }
X
Xiaoyu Wang 已提交
966
  if (isInQuote) {
967 968 969
    smlBuildInvalidDataMsg(msg, "only one quote", elements->cols);
    return TSDB_CODE_SML_INVALID_DATA;
  }
wmmhello's avatar
wmmhello 已提交
970
  elements->colsLen = sql - elements->cols;
X
Xiaoyu Wang 已提交
971
  if (elements->colsLen == 0) {
wmmhello's avatar
wmmhello 已提交
972 973 974
    smlBuildInvalidDataMsg(msg, "cols is empty", NULL);
    return TSDB_CODE_SML_INVALID_DATA;
  }
wmmhello's avatar
wmmhello 已提交
975

wmmhello's avatar
wmmhello 已提交
976 977 978
  // parse timestamp
  JUMP_SPACE(sql)
  elements->timestamp = sql;
wmmhello's avatar
wmmhello 已提交
979
  while (*sql != '\0') {
X
Xiaoyu Wang 已提交
980
    if (*sql == SPACE) {
wmmhello's avatar
wmmhello 已提交
981 982 983 984
      break;
    }
    sql++;
  }
wmmhello's avatar
wmmhello 已提交
985
  elements->timestampLen = sql - elements->timestamp;
wmmhello's avatar
wmmhello 已提交
986 987 988 989

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
990
static void smlParseTelnetElement(const char **sql, const char **data, int32_t *len) {
991
  while (**sql != '\0') {
X
Xiaoyu Wang 已提交
992
    if (**sql != SPACE && !(*data)) {
993
      *data = *sql;
X
Xiaoyu Wang 已提交
994
    } else if (**sql == SPACE && *data) {
995 996 997 998 999 1000 1001
      *len = *sql - *data;
      break;
    }
    (*sql)++;
  }
}

X
Xiaoyu Wang 已提交
1002 1003
static int32_t smlParseTelnetTags(const char *data, SArray *cols, char *childTableName, SHashObj *dumplicateKey,
                                  SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
1004
  const char *sql = data;
X
Xiaoyu Wang 已提交
1005 1006
  size_t      childTableNameLen = strlen(tsSmlChildTableName);
  while (*sql != '\0') {
wmmhello's avatar
wmmhello 已提交
1007
    JUMP_SPACE(sql)
X
Xiaoyu Wang 已提交
1008
    if (*sql == '\0') break;
wmmhello's avatar
wmmhello 已提交
1009

wmmhello's avatar
wmmhello 已提交
1010
    const char *key = sql;
X
Xiaoyu Wang 已提交
1011
    int32_t     keyLen = 0;
wmmhello's avatar
wmmhello 已提交
1012 1013

    // parse key
X
Xiaoyu Wang 已提交
1014 1015
    while (*sql != '\0') {
      if (*sql == SPACE) {
wmmhello's avatar
wmmhello 已提交
1016 1017 1018
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
X
Xiaoyu Wang 已提交
1019
      if (*sql == EQUAL) {
wmmhello's avatar
wmmhello 已提交
1020 1021
        keyLen = sql - key;
        sql++;
1022 1023
        break;
      }
wmmhello's avatar
wmmhello 已提交
1024
      sql++;
1025
    }
wmmhello's avatar
wmmhello 已提交
1026

X
Xiaoyu Wang 已提交
1027
    if (IS_INVALID_COL_LEN(keyLen)) {
1028
      smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
wmmhello's avatar
wmmhello 已提交
1029
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
1030
    }
X
Xiaoyu Wang 已提交
1031
    if (smlCheckDuplicateKey(key, keyLen, dumplicateKey)) {
1032
      smlBuildInvalidDataMsg(msg, "dumplicate key", key);
wmmhello's avatar
wmmhello 已提交
1033
      return TSDB_CODE_TSC_DUP_NAMES;
1034 1035 1036
    }

    // parse value
wmmhello's avatar
wmmhello 已提交
1037
    const char *value = sql;
X
Xiaoyu Wang 已提交
1038 1039
    int32_t     valueLen = 0;
    while (*sql != '\0') {
wmmhello's avatar
wmmhello 已提交
1040 1041
      // parse value
      if (*sql == SPACE) {
1042 1043
        break;
      }
wmmhello's avatar
wmmhello 已提交
1044 1045 1046 1047 1048
      if (*sql == EQUAL) {
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      sql++;
1049
    }
wmmhello's avatar
wmmhello 已提交
1050
    valueLen = sql - value;
wmmhello's avatar
wmmhello 已提交
1051

X
Xiaoyu Wang 已提交
1052
    if (valueLen == 0) {
1053
      smlBuildInvalidDataMsg(msg, "invalid value", value);
wmmhello's avatar
wmmhello 已提交
1054
      return TSDB_CODE_TSC_INVALID_VALUE;
1055
    }
wmmhello's avatar
wmmhello 已提交
1056

X
Xiaoyu Wang 已提交
1057 1058
    // handle child table name
    if (childTableNameLen != 0 && strncmp(key, tsSmlChildTableName, keyLen) == 0) {
1059 1060 1061 1062 1063
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, value, (valueLen < TSDB_TABLE_NAME_LEN ? valueLen : TSDB_TABLE_NAME_LEN));
      continue;
    }

wmmhello's avatar
wmmhello 已提交
1064 1065 1066 1067
    if(valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }

1068 1069
    // add kv to SSmlKv
    SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1070
    if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
1071 1072 1073
    kv->key = key;
    kv->keyLen = keyLen;
    kv->value = value;
wmmhello's avatar
wmmhello 已提交
1074
    kv->length = valueLen;
wmmhello's avatar
wmmhello 已提交
1075
    kv->type = TSDB_DATA_TYPE_NCHAR;
1076

X
Xiaoyu Wang 已提交
1077
    if (cols) taosArrayPush(cols, &kv);
1078 1079 1080 1081
  }

  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
1082

1083
// format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
X
Xiaoyu Wang 已提交
1084 1085
static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, SSmlTableInfo *tinfo, SArray *cols) {
  if (!sql) return TSDB_CODE_SML_INVALID_DATA;
1086 1087 1088

  // parse metric
  smlParseTelnetElement(&sql, &tinfo->sTableName, &tinfo->sTableNameLen);
1089
  if (!(tinfo->sTableName) || IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
1090
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
wmmhello's avatar
wmmhello 已提交
1091
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
1092 1093 1094 1095
  }

  // parse timestamp
  const char *timestamp = NULL;
X
Xiaoyu Wang 已提交
1096
  int32_t     tLen = 0;
1097 1098 1099 1100 1101 1102 1103 1104 1105
  smlParseTelnetElement(&sql, &timestamp, &tLen);
  if (!timestamp || tLen == 0) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
    return TSDB_CODE_SML_INVALID_DATA;
  }

  int32_t ret = smlParseTS(info, timestamp, tLen, cols);
  if (ret != TSDB_CODE_SUCCESS) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
wmmhello's avatar
wmmhello 已提交
1106
    return ret;
1107 1108 1109 1110
  }

  // parse value
  const char *value = NULL;
X
Xiaoyu Wang 已提交
1111
  int32_t     valueLen = 0;
1112 1113 1114
  smlParseTelnetElement(&sql, &value, &valueLen);
  if (!value || valueLen == 0) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql);
wmmhello's avatar
wmmhello 已提交
1115
    return TSDB_CODE_TSC_INVALID_VALUE;
1116 1117 1118
  }

  SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1119
  if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
1120 1121 1122 1123
  taosArrayPush(cols, &kv);
  kv->key = VALUE;
  kv->keyLen = VALUE_LEN;
  kv->value = value;
wmmhello's avatar
wmmhello 已提交
1124
  kv->length = valueLen;
wmmhello's avatar
wmmhello 已提交
1125 1126
  if ((ret = smlParseValue(kv, &info->msgBuf)) != TSDB_CODE_SUCCESS) {
    return ret;
1127 1128 1129
  }

  // parse tags
1130
  ret = smlParseTelnetTags(sql, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
1131 1132
  if (ret != TSDB_CODE_SUCCESS) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
wmmhello's avatar
wmmhello 已提交
1133
    return ret;
1134 1135 1136 1137 1138
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1139 1140 1141
static int32_t smlParseCols(const char *data, int32_t len, SArray *cols, char *childTableName, bool isTag,
                            SHashObj *dumplicateKey, SSmlMsgBuf *msg) {
  if (len == 0) {
wmmhello's avatar
wmmhello 已提交
1142
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1143 1144
  }

X
Xiaoyu Wang 已提交
1145
  size_t      childTableNameLen = strlen(tsSmlChildTableName);
wmmhello's avatar
wmmhello 已提交
1146
  const char *sql = data;
X
Xiaoyu Wang 已提交
1147
  while (sql < data + len) {
wmmhello's avatar
wmmhello 已提交
1148
    const char *key = sql;
X
Xiaoyu Wang 已提交
1149
    int32_t     keyLen = 0;
wmmhello's avatar
wmmhello 已提交
1150

X
Xiaoyu Wang 已提交
1151
    while (sql < data + len) {
wmmhello's avatar
wmmhello 已提交
1152
      // parse key
X
Xiaoyu Wang 已提交
1153
      if (IS_COMMA(sql)) {
wmmhello's avatar
wmmhello 已提交
1154 1155 1156
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
X
Xiaoyu Wang 已提交
1157
      if (IS_EQUAL(sql)) {
wmmhello's avatar
wmmhello 已提交
1158 1159
        keyLen = sql - key;
        sql++;
wmmhello's avatar
wmmhello 已提交
1160 1161
        break;
      }
wmmhello's avatar
wmmhello 已提交
1162
      sql++;
wmmhello's avatar
wmmhello 已提交
1163
    }
wmmhello's avatar
wmmhello 已提交
1164

X
Xiaoyu Wang 已提交
1165
    if (IS_INVALID_COL_LEN(keyLen)) {
wmmhello's avatar
wmmhello 已提交
1166
      smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
wmmhello's avatar
wmmhello 已提交
1167
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
wmmhello's avatar
wmmhello 已提交
1168
    }
X
Xiaoyu Wang 已提交
1169
    if (smlCheckDuplicateKey(key, keyLen, dumplicateKey)) {
1170
      smlBuildInvalidDataMsg(msg, "dumplicate key", key);
wmmhello's avatar
wmmhello 已提交
1171
      return TSDB_CODE_TSC_DUP_NAMES;
1172 1173
    }

wmmhello's avatar
wmmhello 已提交
1174
    // parse value
wmmhello's avatar
wmmhello 已提交
1175
    const char *value = sql;
X
Xiaoyu Wang 已提交
1176 1177 1178
    int32_t     valueLen = 0;
    bool        isInQuote = false;
    while (sql < data + len) {
wmmhello's avatar
wmmhello 已提交
1179
      // parse value
X
Xiaoyu Wang 已提交
1180
      if (!isTag && IS_QUOTE(sql)) {
wmmhello's avatar
wmmhello 已提交
1181
        isInQuote = !isInQuote;
wmmhello's avatar
wmmhello 已提交
1182 1183
        sql++;
        continue;
wmmhello's avatar
wmmhello 已提交
1184
      }
wmmhello's avatar
wmmhello 已提交
1185
      if (!isInQuote && IS_COMMA(sql)) {
wmmhello's avatar
wmmhello 已提交
1186 1187
        break;
      }
wmmhello's avatar
wmmhello 已提交
1188 1189 1190 1191 1192
      if (!isInQuote && IS_EQUAL(sql)) {
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      sql++;
wmmhello's avatar
wmmhello 已提交
1193
    }
wmmhello's avatar
wmmhello 已提交
1194 1195 1196
    valueLen = sql - value;
    sql++;

X
Xiaoyu Wang 已提交
1197
    if (isInQuote) {
wmmhello's avatar
wmmhello 已提交
1198 1199 1200
      smlBuildInvalidDataMsg(msg, "only one quote", value);
      return TSDB_CODE_SML_INVALID_DATA;
    }
X
Xiaoyu Wang 已提交
1201
    if (valueLen == 0) {
wmmhello's avatar
wmmhello 已提交
1202
      smlBuildInvalidDataMsg(msg, "invalid value", value);
wmmhello's avatar
wmmhello 已提交
1203 1204
      return TSDB_CODE_SML_INVALID_DATA;
    }
wmmhello's avatar
wmmhello 已提交
1205 1206
    PROCESS_SLASH(key, keyLen)
    PROCESS_SLASH(value, valueLen)
wmmhello's avatar
wmmhello 已提交
1207

X
Xiaoyu Wang 已提交
1208 1209
    // handle child table name
    if (childTableName && childTableNameLen != 0 && strncmp(key, tsSmlChildTableName, keyLen) == 0) {
1210 1211 1212 1213 1214
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, value, (valueLen < TSDB_TABLE_NAME_LEN ? valueLen : TSDB_TABLE_NAME_LEN));
      continue;
    }

wmmhello's avatar
wmmhello 已提交
1215
    // add kv to SSmlKv
wmmhello's avatar
wmmhello 已提交
1216
    SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1217 1218
    if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
    if (cols) taosArrayPush(cols, &kv);
1219

wmmhello's avatar
wmmhello 已提交
1220 1221 1222
    kv->key = key;
    kv->keyLen = keyLen;
    kv->value = value;
wmmhello's avatar
wmmhello 已提交
1223
    kv->length = valueLen;
X
Xiaoyu Wang 已提交
1224
    if (isTag) {
1225 1226 1227
      if(valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){
        return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
      }
wmmhello's avatar
wmmhello 已提交
1228
      kv->type = TSDB_DATA_TYPE_NCHAR;
X
Xiaoyu Wang 已提交
1229
    } else {
wmmhello's avatar
wmmhello 已提交
1230 1231 1232
      int32_t ret = smlParseValue(kv, msg);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
wmmhello's avatar
wmmhello 已提交
1233
      }
wmmhello's avatar
wmmhello 已提交
1234 1235
    }
  }
wmmhello's avatar
wmmhello 已提交
1236

wmmhello's avatar
wmmhello 已提交
1237 1238 1239
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
1240 1241
static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, SSmlMsgBuf *msg) {
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
wmmhello's avatar
wmmhello 已提交
1242
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
wmmhello's avatar
wmmhello 已提交
1243

wmmhello's avatar
wmmhello 已提交
1244
    int16_t *index = (int16_t *)taosHashGet(metaHash, kv->key, kv->keyLen);
X
Xiaoyu Wang 已提交
1245
    if (index) {
wmmhello's avatar
wmmhello 已提交
1246
      SSmlKv **value = (SSmlKv **)taosArrayGet(metaArray, *index);
X
Xiaoyu Wang 已提交
1247
      if (kv->type != (*value)->type) {
wmmhello's avatar
wmmhello 已提交
1248
        smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
wmmhello's avatar
wmmhello 已提交
1249
        return TSDB_CODE_SML_NOT_SAME_TYPE;
X
Xiaoyu Wang 已提交
1250 1251 1252
      } else {
        if (IS_VAR_DATA_TYPE(kv->type)) {  // update string len, if bigger
          if (kv->length > (*value)->length) {
wmmhello's avatar
wmmhello 已提交
1253
            *value = kv;
1254 1255 1256
          }
        }
      }
X
Xiaoyu Wang 已提交
1257
    } else {
wmmhello's avatar
wmmhello 已提交
1258 1259 1260 1261 1262
      size_t tmp = taosArrayGetSize(metaArray);
      ASSERT(tmp <= INT16_MAX);
      int16_t size = tmp;
      taosArrayPush(metaArray, &kv);
      taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
1263
    }
wmmhello's avatar
wmmhello 已提交
1264
  }
wmmhello's avatar
wmmhello 已提交
1265

wmmhello's avatar
wmmhello 已提交
1266
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1267 1268
}

X
Xiaoyu Wang 已提交
1269
static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols) {
wmmhello's avatar
wmmhello 已提交
1270 1271 1272 1273
  for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
    taosArrayPush(metaArray, &kv);
    taosHashPut(metaHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
wmmhello's avatar
wmmhello 已提交
1274 1275 1276
  }
}

X
Xiaoyu Wang 已提交
1277
static SSmlTableInfo *smlBuildTableInfo() {
1278
  SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
X
Xiaoyu Wang 已提交
1279
  if (!tag) {
1280
    return NULL;
wmmhello's avatar
wmmhello 已提交
1281
  }
1282 1283 1284 1285 1286

  tag->cols = taosArrayInit(16, POINTER_BYTES);
  if (tag->cols == NULL) {
    uError("SML:smlBuildTableInfo failed to allocate memory");
    goto cleanup;
wmmhello's avatar
wmmhello 已提交
1287
  }
1288 1289 1290 1291 1292

  tag->tags = taosArrayInit(16, POINTER_BYTES);
  if (tag->tags == NULL) {
    uError("SML:smlBuildTableInfo failed to allocate memory");
    goto cleanup;
wmmhello's avatar
wmmhello 已提交
1293
  }
1294 1295 1296 1297 1298
  return tag;

cleanup:
  taosMemoryFree(tag);
  return NULL;
wmmhello's avatar
wmmhello 已提交
1299 1300
}

X
Xiaoyu Wang 已提交
1301 1302 1303
static void smlDestroyTableInfo(SSmlHandle *info, SSmlTableInfo *tag) {
  if (info->dataFormat) {
    for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
1304 1305
      SArray *kvArray = (SArray *)taosArrayGetP(tag->cols, i);
      for (int j = 0; j < taosArrayGetSize(kvArray); ++j) {
wmmhello's avatar
wmmhello 已提交
1306
        SSmlKv *p = (SSmlKv *)taosArrayGetP(kvArray, j);
X
Xiaoyu Wang 已提交
1307 1308 1309
        if (info->protocol == TSDB_SML_JSON_PROTOCOL &&
            (p->type == TSDB_DATA_TYPE_NCHAR || p->type == TSDB_DATA_TYPE_BINARY)) {
          taosMemoryFree((void *)p->value);
wmmhello's avatar
wmmhello 已提交
1310
        }
1311 1312 1313 1314
        taosMemoryFree(p);
      }
      taosArrayDestroy(kvArray);
    }
X
Xiaoyu Wang 已提交
1315 1316
  } else {
    for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
1317
      SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i);
X
Xiaoyu Wang 已提交
1318
      void    **p1 = (void **)taosHashIterate(kvHash, NULL);
1319 1320
      while (p1) {
        taosMemoryFree(*p1);
X
Xiaoyu Wang 已提交
1321
        p1 = (void **)taosHashIterate(kvHash, p1);
1322 1323 1324 1325
      }
      taosHashCleanup(kvHash);
    }
  }
X
Xiaoyu Wang 已提交
1326
  for (size_t i = 0; i < taosArrayGetSize(tag->tags); i++) {
wmmhello's avatar
wmmhello 已提交
1327
    SSmlKv *p = (SSmlKv *)taosArrayGetP(tag->tags, i);
X
Xiaoyu Wang 已提交
1328 1329 1330 1331
    if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
      taosMemoryFree((void *)p->key);
      if (p->type == TSDB_DATA_TYPE_NCHAR || p->type == TSDB_DATA_TYPE_BINARY) {
        taosMemoryFree((void *)p->value);
wmmhello's avatar
wmmhello 已提交
1332 1333 1334 1335
      }
    }
    taosMemoryFree(p);
  }
X
Xiaoyu Wang 已提交
1336 1337
  if (info->protocol == TSDB_SML_JSON_PROTOCOL && tag->sTableName) {
    taosMemoryFree((void *)tag->sTableName);
wmmhello's avatar
wmmhello 已提交
1338
  }
1339 1340 1341 1342 1343
  taosArrayDestroy(tag->cols);
  taosArrayDestroy(tag->tags);
  taosMemoryFree(tag);
}

X
Xiaoyu Wang 已提交
1344
static int32_t smlKvTimeArrayCompare(const void *key1, const void *key2) {
1345 1346
  SArray *s1 = *(SArray **)key1;
  SArray *s2 = *(SArray **)key2;
1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359
  SSmlKv *kv1 = (SSmlKv *)taosArrayGetP(s1, 0);
  SSmlKv *kv2 = (SSmlKv *)taosArrayGetP(s2, 0);
  ASSERT(kv1->type == TSDB_DATA_TYPE_TIMESTAMP);
  ASSERT(kv2->type == TSDB_DATA_TYPE_TIMESTAMP);
  if (kv1->i < kv2->i) {
    return -1;
  } else if (kv1->i > kv2->i) {
    return 1;
  } else {
    return 0;
  }
}

X
Xiaoyu Wang 已提交
1360
static int32_t smlKvTimeHashCompare(const void *key1, const void *key2) {
1361 1362
  SHashObj *s1 = *(SHashObj **)key1;
  SHashObj *s2 = *(SHashObj **)key2;
1363 1364
  SSmlKv *kv1 = *(SSmlKv **)taosHashGet(s1, TS, TS_LEN);
  SSmlKv *kv2 = *(SSmlKv **)taosHashGet(s2, TS, TS_LEN);
1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375
  ASSERT(kv1->type == TSDB_DATA_TYPE_TIMESTAMP);
  ASSERT(kv2->type == TSDB_DATA_TYPE_TIMESTAMP);
  if (kv1->i < kv2->i) {
    return -1;
  } else if (kv1->i > kv2->i) {
    return 1;
  } else {
    return 0;
  }
}

1376 1377
static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *cols){
  if(dataFormat){
1378
    void *p = taosArraySearch(oneTable->cols, &cols, smlKvTimeArrayCompare, TD_GT);
1379 1380
    if(p == NULL){
      taosArrayPush(oneTable->cols, &cols);
1381 1382
    }else{
      taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &cols);
1383
    }
1384 1385 1386 1387
    return TSDB_CODE_SUCCESS;
  }

  SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
1388
  if (!kvHash) {
1389 1390 1391
    uError("SML:smlDealCols failed to allocate memory");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
1392
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
1393
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
wmmhello's avatar
wmmhello 已提交
1394
    taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
1395 1396
  }

1397
  void *p = taosArraySearch(oneTable->cols, &kvHash, smlKvTimeHashCompare, TD_GT);
1398 1399
  if(p == NULL){
    taosArrayPush(oneTable->cols, &kvHash);
1400 1401
  }else{
    taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &kvHash);
1402
  }
1403 1404 1405
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1406 1407 1408
static SSmlSTableMeta *smlBuildSTableMeta() {
  SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
  if (!meta) {
1409 1410 1411 1412 1413 1414 1415 1416
    return NULL;
  }
  meta->tagHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (meta->tagHash == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }

wmmhello's avatar
wmmhello 已提交
1417 1418
  meta->colHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (meta->colHash == NULL) {
1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }

  meta->tags = taosArrayInit(32, POINTER_BYTES);
  if (meta->tags == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }

  meta->cols = taosArrayInit(32, POINTER_BYTES);
  if (meta->cols == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }
  return meta;

cleanup:
  taosMemoryFree(meta);
  return NULL;
}

X
Xiaoyu Wang 已提交
1441
static void smlDestroySTableMeta(SSmlSTableMeta *meta) {
1442
  taosHashCleanup(meta->tagHash);
wmmhello's avatar
wmmhello 已提交
1443
  taosHashCleanup(meta->colHash);
1444 1445 1446
  taosArrayDestroy(meta->tags);
  taosArrayDestroy(meta->cols);
  taosMemoryFree(meta->tableMeta);
wmmhello's avatar
wmmhello 已提交
1447
  taosMemoryFree(meta);
1448 1449 1450 1451 1452
}

static void smlDestroyCols(SArray *cols) {
  if (!cols) return;
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
wmmhello's avatar
wmmhello 已提交
1453
    void *kv = taosArrayGetP(cols, i);
1454 1455 1456 1457
    taosMemoryFree(kv);
  }
}

X
Xiaoyu Wang 已提交
1458 1459
static void smlDestroyInfo(SSmlHandle *info) {
  if (!info) return;
1460 1461 1462 1463
  qDestroyQuery(info->pQuery);
  smlDestroyHandle(info->exec);

  // destroy info->childTables
X
Xiaoyu Wang 已提交
1464
  void **p1 = (void **)taosHashIterate(info->childTables, NULL);
1465
  while (p1) {
X
Xiaoyu Wang 已提交
1466 1467
    smlDestroyTableInfo(info, (SSmlTableInfo *)(*p1));
    p1 = (void **)taosHashIterate(info->childTables, p1);
1468 1469 1470 1471
  }
  taosHashCleanup(info->childTables);

  // destroy info->superTables
X
Xiaoyu Wang 已提交
1472
  p1 = (void **)taosHashIterate(info->superTables, NULL);
1473
  while (p1) {
X
Xiaoyu Wang 已提交
1474 1475
    smlDestroySTableMeta((SSmlSTableMeta *)(*p1));
    p1 = (void **)taosHashIterate(info->superTables, p1);
1476 1477 1478 1479 1480 1481
  }
  taosHashCleanup(info->superTables);

  // destroy info->pVgHash
  taosHashCleanup(info->pVgHash);
  taosHashCleanup(info->dumplicateKey);
X
Xiaoyu Wang 已提交
1482
  if (!info->dataFormat) {
wmmhello's avatar
wmmhello 已提交
1483 1484
    taosArrayDestroy(info->colsContainer);
  }
wmmhello's avatar
wmmhello 已提交
1485
  destroyRequest(info->pRequest);
1486 1487 1488
  taosMemoryFreeClear(info);
}

1489
static SSmlHandle* smlBuildSmlInfo(STscObj* pTscObj, SRequestObj* request, SMLProtocolType protocol, int8_t precision){
1490 1491 1492 1493 1494
  int32_t code = TSDB_CODE_SUCCESS;
  SSmlHandle* info = (SSmlHandle*)taosMemoryCalloc(1, sizeof(SSmlHandle));
  if (NULL == info) {
    return NULL;
  }
X
Xiaoyu Wang 已提交
1495
  info->id = smlGenId();
1496

1497
  info->pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
1498
  if (NULL == info->pQuery) {
X
Xiaoyu Wang 已提交
1499
    uError("SML:0x%" PRIx64 " create info->pQuery error", info->id);
1500 1501
    goto cleanup;
  }
X
Xiaoyu Wang 已提交
1502
  info->pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
1503
  info->pQuery->haveResultSet = false;
X
Xiaoyu Wang 已提交
1504 1505 1506 1507
  info->pQuery->msgType = TDMT_VND_SUBMIT;
  info->pQuery->pRoot = (SNode *)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
  if (NULL == info->pQuery->pRoot) {
    uError("SML:0x%" PRIx64 " create info->pQuery->pRoot error", info->id);
1508 1509
    goto cleanup;
  }
X
Xiaoyu Wang 已提交
1510
  ((SVnodeModifOpStmt *)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV;
1511

1512 1513 1514 1515 1516 1517 1518
  if (pTscObj){
    info->taos        = pTscObj;
    code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code);
      goto cleanup;
    }
wmmhello's avatar
wmmhello 已提交
1519
  }
1520

X
Xiaoyu Wang 已提交
1521 1522 1523
  info->precision = precision;
  info->protocol = protocol;
  if (protocol == TSDB_SML_LINE_PROTOCOL) {
1524
    info->dataFormat = tsSmlDataFormat;
X
Xiaoyu Wang 已提交
1525
  } else {
1526 1527
    info->dataFormat = true;
  }
1528 1529 1530 1531 1532 1533

  if(request){
    info->pRequest = request;
    info->msgBuf.buf = info->pRequest->msgBuf;
    info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
  }
1534

X
Xiaoyu Wang 已提交
1535
  info->exec = smlInitHandle(info->pQuery);
1536 1537
  info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
1538
  info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
1539 1540

  info->dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
1541
  if (!info->dataFormat) {
1542
    info->colsContainer = taosArrayInit(32, POINTER_BYTES);
X
Xiaoyu Wang 已提交
1543 1544
    if (NULL == info->colsContainer) {
      uError("SML:0x%" PRIx64 " create info failed", info->id);
1545 1546 1547
      goto cleanup;
    }
  }
X
Xiaoyu Wang 已提交
1548 1549 1550
  if (NULL == info->exec || NULL == info->childTables || NULL == info->superTables || NULL == info->pVgHash ||
      NULL == info->dumplicateKey) {
    uError("SML:0x%" PRIx64 " create info failed", info->id);
1551 1552 1553 1554 1555 1556 1557 1558 1559 1560
    goto cleanup;
  }

  return info;
cleanup:
  smlDestroyInfo(info);
  return NULL;
}

/************* TSDB_SML_JSON_PROTOCOL function start **************/
X
Xiaoyu Wang 已提交
1561
static int32_t smlJsonCreateSring(const char **output, char *input, int32_t inputLen) {
1562
  *output = (const char *)taosMemoryMalloc(inputLen);
X
Xiaoyu Wang 已提交
1563
  if (*output == NULL) {
1564 1565 1566
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
1567
  memcpy((void *)(*output), input, inputLen);
1568 1569 1570 1571 1572 1573
  return TSDB_CODE_SUCCESS;
}

static int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *root, SSmlTableInfo *tinfo) {
  cJSON *metric = cJSON_GetObjectItem(root, "metric");
  if (!cJSON_IsString(metric)) {
X
Xiaoyu Wang 已提交
1574
    return TSDB_CODE_TSC_INVALID_JSON;
1575 1576 1577
  }

  tinfo->sTableNameLen = strlen(metric->valuestring);
1578
  if (IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
X
Xiaoyu Wang 已提交
1579
    uError("OTD:0x%" PRIx64 " Metric lenght is 0 or large than 192", info->id);
1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
  }

  return smlJsonCreateSring(&tinfo->sTableName, metric->valuestring, tinfo->sTableNameLen);
}

static int32_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int64_t *tsVal) {
  int32_t size = cJSON_GetArraySize(root);
  if (size != OTD_JSON_SUB_FIELDS_NUM) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *value = cJSON_GetObjectItem(root, "value");
  if (!cJSON_IsNumber(value)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *type = cJSON_GetObjectItem(root, "type");
  if (!cJSON_IsString(type)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  double timeDouble = value->valuedouble;
X
Xiaoyu Wang 已提交
1603
  if (smlDoubleToInt64OverFlow(timeDouble)) {
1604
    smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1605
    return TSDB_CODE_INVALID_TIMESTAMP;
1606
  }
wmmhello's avatar
wmmhello 已提交
1607 1608 1609 1610 1611 1612 1613 1614

  if (timeDouble == 0) {
    *tsVal = taosGetTimestampNs();
    return TSDB_CODE_SUCCESS;
  }

  if (timeDouble < 0) {
    return TSDB_CODE_INVALID_TIMESTAMP;
1615 1616
  }

1617
  *tsVal = timeDouble;
1618
  size_t typeLen = strlen(type->valuestring);
wmmhello's avatar
wmmhello 已提交
1619
  if (typeLen == 1 && (type->valuestring[0] == 's' || type->valuestring[0] == 'S')) {
X
Xiaoyu Wang 已提交
1620
    // seconds
1621 1622
    *tsVal = *tsVal * NANOSECOND_PER_SEC;
    timeDouble = timeDouble * NANOSECOND_PER_SEC;
X
Xiaoyu Wang 已提交
1623
    if (smlDoubleToInt64OverFlow(timeDouble)) {
1624
      smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1625
      return TSDB_CODE_INVALID_TIMESTAMP;
1626
    }
wmmhello's avatar
wmmhello 已提交
1627
  } else if (typeLen == 2 && (type->valuestring[1] == 's' || type->valuestring[1] == 'S')) {
1628 1629
    switch (type->valuestring[0]) {
      case 'm':
wmmhello's avatar
wmmhello 已提交
1630
      case 'M':
X
Xiaoyu Wang 已提交
1631
        // milliseconds
1632 1633
        *tsVal = *tsVal * NANOSECOND_PER_MSEC;
        timeDouble = timeDouble * NANOSECOND_PER_MSEC;
X
Xiaoyu Wang 已提交
1634
        if (smlDoubleToInt64OverFlow(timeDouble)) {
1635
          smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1636
          return TSDB_CODE_INVALID_TIMESTAMP;
1637 1638 1639
        }
        break;
      case 'u':
wmmhello's avatar
wmmhello 已提交
1640
      case 'U':
X
Xiaoyu Wang 已提交
1641
        // microseconds
1642 1643
        *tsVal = *tsVal * NANOSECOND_PER_USEC;
        timeDouble = timeDouble * NANOSECOND_PER_USEC;
X
Xiaoyu Wang 已提交
1644
        if (smlDoubleToInt64OverFlow(timeDouble)) {
1645
          smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1646
          return TSDB_CODE_INVALID_TIMESTAMP;
1647 1648 1649
        }
        break;
      case 'n':
wmmhello's avatar
wmmhello 已提交
1650
      case 'N':
1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671
        break;
      default:
        return TSDB_CODE_TSC_INVALID_JSON;
    }
  } else {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  return TSDB_CODE_SUCCESS;
}

static uint8_t smlGetTimestampLen(int64_t num) {
  uint8_t len = 0;
  while ((num /= 10) != 0) {
    len++;
  }
  len++;
  return len;
}

static int32_t smlParseTSFromJSON(SSmlHandle *info, cJSON *root, SArray *cols) {
X
Xiaoyu Wang 已提交
1672
  // Timestamp must be the first KV to parse
1673 1674 1675 1676
  int64_t tsVal = 0;

  cJSON *timestamp = cJSON_GetObjectItem(root, "timestamp");
  if (cJSON_IsNumber(timestamp)) {
X
Xiaoyu Wang 已提交
1677
    // timestamp value 0 indicates current system time
1678
    double timeDouble = timestamp->valuedouble;
X
Xiaoyu Wang 已提交
1679
    if (smlDoubleToInt64OverFlow(timeDouble)) {
1680
      smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1681
      return TSDB_CODE_INVALID_TIMESTAMP;
1682
    }
wmmhello's avatar
wmmhello 已提交
1683

X
Xiaoyu Wang 已提交
1684
    if (timeDouble < 0) {
wmmhello's avatar
wmmhello 已提交
1685
      return TSDB_CODE_INVALID_TIMESTAMP;
1686
    }
1687

1688
    uint8_t tsLen = smlGetTimestampLen((int64_t)timeDouble);
1689
    tsVal = (int64_t)timeDouble;
1690
    if (tsLen == TSDB_TIME_PRECISION_SEC_DIGITS) {
1691 1692
      tsVal = tsVal * NANOSECOND_PER_SEC;
      timeDouble = timeDouble * NANOSECOND_PER_SEC;
X
Xiaoyu Wang 已提交
1693
      if (smlDoubleToInt64OverFlow(timeDouble)) {
1694
        smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1695
        return TSDB_CODE_INVALID_TIMESTAMP;
1696 1697
      }
    } else if (tsLen == TSDB_TIME_PRECISION_MILLI_DIGITS) {
1698 1699
      tsVal = tsVal * NANOSECOND_PER_MSEC;
      timeDouble = timeDouble * NANOSECOND_PER_MSEC;
X
Xiaoyu Wang 已提交
1700
      if (smlDoubleToInt64OverFlow(timeDouble)) {
1701
        smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1702
        return TSDB_CODE_INVALID_TIMESTAMP;
1703
      }
X
Xiaoyu Wang 已提交
1704
    } else if (timeDouble == 0) {
wmmhello's avatar
wmmhello 已提交
1705
      tsVal = taosGetTimestampNs();
X
Xiaoyu Wang 已提交
1706
    } else {
wmmhello's avatar
wmmhello 已提交
1707
      return TSDB_CODE_INVALID_TIMESTAMP;
1708 1709 1710 1711
    }
  } else if (cJSON_IsObject(timestamp)) {
    int32_t ret = smlParseTSFromJSONObj(info, timestamp, &tsVal);
    if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
1712
      uError("SML:0x%" PRIx64 " Failed to parse timestamp from JSON Obj", info->id);
1713 1714 1715 1716
      return ret;
    }
  } else {
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
1717 1718
  }

wmmhello's avatar
wmmhello 已提交
1719
  // add ts to
wmmhello's avatar
wmmhello 已提交
1720
  SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1721
  if (!kv) {
wmmhello's avatar
wmmhello 已提交
1722 1723 1724
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  kv->key = TS;
1725 1726
  kv->keyLen = TS_LEN;
  kv->i = tsVal;
wmmhello's avatar
wmmhello 已提交
1727
  kv->type = TSDB_DATA_TYPE_TIMESTAMP;
wmmhello's avatar
wmmhello 已提交
1728
  kv->length = (int16_t)tDataTypes[kv->type].bytes;
X
Xiaoyu Wang 已提交
1729
  if (cols) taosArrayPush(cols, &kv);
wmmhello's avatar
wmmhello 已提交
1730 1731 1732
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1733
static int32_t smlConvertJSONBool(SSmlKv *pVal, char *typeStr, cJSON *value) {
1734 1735 1736 1737 1738 1739 1740
  if (strcasecmp(typeStr, "bool") != 0) {
    uError("OTD:invalid type(%s) for JSON Bool", typeStr);
    return TSDB_CODE_TSC_INVALID_JSON_TYPE;
  }
  pVal->type = TSDB_DATA_TYPE_BOOL;
  pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
  pVal->i = value->valueint;
wmmhello's avatar
wmmhello 已提交
1741

1742 1743
  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
1744

X
Xiaoyu Wang 已提交
1745 1746 1747
static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
  // tinyint
  if (strcasecmp(typeStr, "i8") == 0 || strcasecmp(typeStr, "tinyint") == 0) {
1748 1749 1750 1751 1752 1753 1754 1755 1756
    if (!IS_VALID_TINYINT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(tinyint)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_TINYINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1757 1758
  // smallint
  if (strcasecmp(typeStr, "i16") == 0 || strcasecmp(typeStr, "smallint") == 0) {
1759 1760 1761 1762 1763 1764 1765 1766 1767
    if (!IS_VALID_SMALLINT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(smallint)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_SMALLINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1768 1769
  // int
  if (strcasecmp(typeStr, "i32") == 0 || strcasecmp(typeStr, "int") == 0) {
1770 1771 1772 1773 1774 1775 1776 1777 1778
    if (!IS_VALID_INT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(int)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_INT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1779 1780
  // bigint
  if (strcasecmp(typeStr, "i64") == 0 || strcasecmp(typeStr, "bigint") == 0) {
1781 1782
    pVal->type = TSDB_DATA_TYPE_BIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
X
Xiaoyu Wang 已提交
1783
    if (value->valuedouble >= (double)INT64_MAX) {
1784
      pVal->i = INT64_MAX;
X
Xiaoyu Wang 已提交
1785
    } else if (value->valuedouble <= (double)INT64_MIN) {
1786
      pVal->i = INT64_MIN;
X
Xiaoyu Wang 已提交
1787
    } else {
1788
      pVal->i = value->valuedouble;
1789 1790 1791
    }
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1792 1793
  // float
  if (strcasecmp(typeStr, "f32") == 0 || strcasecmp(typeStr, "float") == 0) {
1794 1795 1796
    if (!IS_VALID_FLOAT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(float)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
wmmhello's avatar
wmmhello 已提交
1797
    }
1798 1799 1800 1801 1802
    pVal->type = TSDB_DATA_TYPE_FLOAT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->f = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1803 1804
  // double
  if (strcasecmp(typeStr, "f64") == 0 || strcasecmp(typeStr, "double") == 0) {
1805 1806 1807 1808
    pVal->type = TSDB_DATA_TYPE_DOUBLE;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->d = value->valuedouble;
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1809 1810
  }

X
Xiaoyu Wang 已提交
1811
  // if reach here means type is unsupported
1812 1813 1814
  uError("OTD:invalid type(%s) for JSON Number", typeStr);
  return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
1815

X
Xiaoyu Wang 已提交
1816
static int32_t smlConvertJSONString(SSmlKv *pVal, char *typeStr, cJSON *value) {
1817 1818 1819 1820 1821 1822 1823 1824 1825
  if (strcasecmp(typeStr, "binary") == 0) {
    pVal->type = TSDB_DATA_TYPE_BINARY;
  } else if (strcasecmp(typeStr, "nchar") == 0) {
    pVal->type = TSDB_DATA_TYPE_NCHAR;
  } else {
    uError("OTD:invalid type(%s) for JSON String", typeStr);
    return TSDB_CODE_TSC_INVALID_JSON_TYPE;
  }
  pVal->length = (int16_t)strlen(value->valuestring);
wmmhello's avatar
wmmhello 已提交
1826 1827 1828 1829 1830 1831 1832 1833

  if (pVal->type == TSDB_DATA_TYPE_BINARY && pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE){
    return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
  }
  if (pVal->type == TSDB_DATA_TYPE_NCHAR  && pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){
    return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
  }

wmmhello's avatar
wmmhello 已提交
1834
  return smlJsonCreateSring(&pVal->value, value->valuestring, pVal->length);
1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860
}

static int32_t smlParseValueFromJSONObj(cJSON *root, SSmlKv *kv) {
  int32_t ret = TSDB_CODE_SUCCESS;
  int32_t size = cJSON_GetArraySize(root);

  if (size != OTD_JSON_SUB_FIELDS_NUM) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *value = cJSON_GetObjectItem(root, "value");
  if (value == NULL) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *type = cJSON_GetObjectItem(root, "type");
  if (!cJSON_IsString(type)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  switch (value->type) {
    case cJSON_True:
    case cJSON_False: {
      ret = smlConvertJSONBool(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
wmmhello's avatar
wmmhello 已提交
1861
      }
1862
      break;
wmmhello's avatar
wmmhello 已提交
1863
    }
1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879
    case cJSON_Number: {
      ret = smlConvertJSONNumber(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
      break;
    }
    case cJSON_String: {
      ret = smlConvertJSONString(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
      break;
    }
    default:
      return TSDB_CODE_TSC_INVALID_JSON_TYPE;
wmmhello's avatar
wmmhello 已提交
1880
  }
1881 1882

  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1883 1884
}

1885 1886 1887 1888 1889 1890 1891 1892
static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
  switch (root->type) {
    case cJSON_True:
    case cJSON_False: {
      kv->type = TSDB_DATA_TYPE_BOOL;
      kv->length = (int16_t)tDataTypes[kv->type].bytes;
      kv->i = root->valueint;
      break;
wmmhello's avatar
wmmhello 已提交
1893
    }
1894 1895 1896 1897 1898 1899 1900 1901 1902 1903
    case cJSON_Number: {
      kv->type = TSDB_DATA_TYPE_DOUBLE;
      kv->length = (int16_t)tDataTypes[kv->type].bytes;
      kv->d = root->valuedouble;
      break;
    }
    case cJSON_String: {
      /* set default JSON type to binary/nchar according to
       * user configured parameter tsDefaultJSONStrType
       */
wmmhello's avatar
wmmhello 已提交
1904

X
Xiaoyu Wang 已提交
1905
      char *tsDefaultJSONStrType = "nchar";  // todo
1906 1907
      smlConvertJSONString(kv, tsDefaultJSONStrType, root);
      break;
wmmhello's avatar
wmmhello 已提交
1908
    }
1909 1910 1911 1912 1913 1914 1915 1916 1917 1918
    case cJSON_Object: {
      int32_t ret = smlParseValueFromJSONObj(root, kv);
      if (ret != TSDB_CODE_SUCCESS) {
        uError("OTD:Failed to parse value from JSON Obj");
        return ret;
      }
      break;
    }
    default:
      return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
1919
  }
1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930

  return TSDB_CODE_SUCCESS;
}

static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
  cJSON *metricVal = cJSON_GetObjectItem(root, "value");
  if (metricVal == NULL) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1931
  if (!kv) {
1932 1933
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
1934
  if (cols) taosArrayPush(cols, &kv);
1935 1936 1937 1938 1939 1940 1941 1942

  kv->key = VALUE;
  kv->keyLen = VALUE_LEN;
  int32_t ret = smlParseValueFromJSON(metricVal, kv);
  if (ret != TSDB_CODE_SUCCESS) {
    return ret;
  }
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1943 1944
}

X
Xiaoyu Wang 已提交
1945 1946
static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, char *childTableName, SHashObj *dumplicateKey,
                                    SSmlMsgBuf *msg) {
1947 1948 1949 1950 1951 1952
  int32_t ret = TSDB_CODE_SUCCESS;

  cJSON *tags = cJSON_GetObjectItem(root, "tags");
  if (tags == NULL || tags->type != cJSON_Object) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }
wmmhello's avatar
wmmhello 已提交
1953

X
Xiaoyu Wang 已提交
1954
  size_t  childTableNameLen = strlen(tsSmlChildTableName);
1955 1956 1957 1958 1959
  int32_t tagNum = cJSON_GetArraySize(tags);
  for (int32_t i = 0; i < tagNum; ++i) {
    cJSON *tag = cJSON_GetArrayItem(tags, i);
    if (tag == NULL) {
      return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
1960
    }
1961 1962 1963 1964 1965
    size_t keyLen = strlen(tag->string);
    if (IS_INVALID_COL_LEN(keyLen)) {
      uError("OTD:Tag key length is 0 or too large than 64");
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
    }
X
Xiaoyu Wang 已提交
1966
    // check duplicate keys
1967
    if (smlCheckDuplicateKey(tag->string, keyLen, dumplicateKey)) {
wmmhello's avatar
wmmhello 已提交
1968
      return TSDB_CODE_TSC_DUP_NAMES;
wmmhello's avatar
wmmhello 已提交
1969 1970
    }

X
Xiaoyu Wang 已提交
1971 1972
    // handle child table name
    if (childTableNameLen != 0 && strcmp(tag->string, tsSmlChildTableName) == 0) {
1973 1974 1975 1976 1977 1978 1979 1980 1981
      if (!cJSON_IsString(tag)) {
        uError("OTD:ID must be JSON string");
        return TSDB_CODE_TSC_INVALID_JSON;
      }
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, tag->valuestring, TSDB_TABLE_NAME_LEN);
      continue;
    }

1982 1983
    // add kv to SSmlKv
    SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1984 1985
    if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
    if (pKVs) taosArrayPush(pKVs, &kv);
1986

X
Xiaoyu Wang 已提交
1987
    // key
1988
    kv->keyLen = keyLen;
1989 1990 1991 1992
    ret = smlJsonCreateSring(&kv->key, tag->string, kv->keyLen);
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }
X
Xiaoyu Wang 已提交
1993
    // value
1994 1995 1996 1997
    ret = smlParseValueFromJSON(tag, kv);
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }
wmmhello's avatar
wmmhello 已提交
1998 1999
  }

2000
  return ret;
wmmhello's avatar
wmmhello 已提交
2001 2002
}

2003 2004
static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo *tinfo, SArray *cols) {
  int32_t ret = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2005

2006
  if (!cJSON_IsObject(root)) {
X
Xiaoyu Wang 已提交
2007
    uError("OTD:0x%" PRIx64 " data point needs to be JSON object", info->id);
2008
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2009
  }
2010

2011
  int32_t size = cJSON_GetArraySize(root);
X
Xiaoyu Wang 已提交
2012
  // outmost json fields has to be exactly 4
2013
  if (size != OTD_JSON_FIELDS_NUM) {
X
Xiaoyu Wang 已提交
2014
    uError("OTD:0x%" PRIx64 " Invalid number of JSON fields in data point %d", info->id, size);
2015
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2016
  }
2017

X
Xiaoyu Wang 已提交
2018
  // Parse metric
2019 2020
  ret = smlParseMetricFromJSON(info, root, tinfo);
  if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2021
    uError("OTD:0x%" PRIx64 " Unable to parse metric from JSON payload", info->id);
2022
    return ret;
wmmhello's avatar
wmmhello 已提交
2023
  }
X
Xiaoyu Wang 已提交
2024
  uDebug("OTD:0x%" PRIx64 " Parse metric from JSON payload finished", info->id);
wmmhello's avatar
wmmhello 已提交
2025

X
Xiaoyu Wang 已提交
2026
  // Parse timestamp
2027 2028
  ret = smlParseTSFromJSON(info, root, cols);
  if (ret) {
X
Xiaoyu Wang 已提交
2029
    uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id);
2030
    return ret;
wmmhello's avatar
wmmhello 已提交
2031
  }
X
Xiaoyu Wang 已提交
2032
  uDebug("OTD:0x%" PRIx64 " Parse timestamp from JSON payload finished", info->id);
2033

X
Xiaoyu Wang 已提交
2034
  // Parse metric value
2035 2036
  ret = smlParseColsFromJSON(root, cols);
  if (ret) {
X
Xiaoyu Wang 已提交
2037
    uError("OTD:0x%" PRIx64 " Unable to parse metric value from JSON payload", info->id);
2038
    return ret;
2039
  }
X
Xiaoyu Wang 已提交
2040
  uDebug("OTD:0x%" PRIx64 " Parse metric value from JSON payload finished", info->id);
2041

X
Xiaoyu Wang 已提交
2042
  // Parse tags
2043
  ret = smlParseTagsFromJSON(root, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
2044
  if (ret) {
X
Xiaoyu Wang 已提交
2045
    uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
2046
    return ret;
2047
  }
X
Xiaoyu Wang 已提交
2048
  uDebug("OTD:0x%" PRIx64 " Parse tags from JSON payload finished", info->id);
wmmhello's avatar
wmmhello 已提交
2049

2050
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2051
}
2052
/************* TSDB_SML_JSON_PROTOCOL function end **************/
wmmhello's avatar
wmmhello 已提交
2053

X
Xiaoyu Wang 已提交
2054
static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql) {
wmmhello's avatar
wmmhello 已提交
2055
  SSmlLineInfo elements = {0};
2056
  uDebug("SML:0x%" PRIx64 " smlParseInfluxLine sql:%s, hello", info->id, sql);
2057

X
Xiaoyu Wang 已提交
2058 2059 2060
  int          ret = smlParseInfluxString(sql, &elements, &info->msgBuf);
  if (ret != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " smlParseInfluxLine failed", info->id);
wmmhello's avatar
wmmhello 已提交
2061 2062 2063
    return ret;
  }

2064
  SArray *cols = NULL;
X
Xiaoyu Wang 已提交
2065
  if (info->dataFormat) {  // if dataFormat, cols need new memory to save data
2066 2067
    cols = taosArrayInit(16, POINTER_BYTES);
    if (cols == NULL) {
X
Xiaoyu Wang 已提交
2068
      uError("SML:0x%" PRIx64 " smlParseInfluxLine failed to allocate memory", info->id);
2069 2070
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
2071
  } else {  // if dataFormat is false, cols do not need to save data, there is another new memory to save data
2072
    cols = info->colsContainer;
wmmhello's avatar
wmmhello 已提交
2073
  }
wmmhello's avatar
wmmhello 已提交
2074

wmmhello's avatar
wmmhello 已提交
2075
  ret = smlParseTS(info, elements.timestamp, elements.timestampLen, cols);
X
Xiaoyu Wang 已提交
2076 2077 2078
  if (ret != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " smlParseTS failed", info->id);
    if (info->dataFormat) taosArrayDestroy(cols);
wmmhello's avatar
wmmhello 已提交
2079 2080
    return ret;
  }
2081
  ret = smlParseCols(elements.cols, elements.colsLen, cols, NULL, false, info->dumplicateKey, &info->msgBuf);
X
Xiaoyu Wang 已提交
2082 2083
  if (ret != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " smlParseCols parse cloums fields failed", info->id);
2084
    smlDestroyCols(cols);
X
Xiaoyu Wang 已提交
2085
    if (info->dataFormat) taosArrayDestroy(cols);
wmmhello's avatar
wmmhello 已提交
2086 2087 2088
    return ret;
  }

X
Xiaoyu Wang 已提交
2089 2090 2091 2092 2093
  bool            hasTable = true;
  SSmlTableInfo  *tinfo = NULL;
  SSmlTableInfo **oneTable =
      (SSmlTableInfo **)taosHashGet(info->childTables, elements.measure, elements.measureTagsLen);
  if (!oneTable) {
2094
    tinfo = smlBuildTableInfo();
X
Xiaoyu Wang 已提交
2095
    if (!tinfo) {
wmmhello's avatar
wmmhello 已提交
2096 2097
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
2098
    taosHashPut(info->childTables, elements.measure, elements.measureTagsLen, &tinfo, POINTER_BYTES);
2099
    oneTable = &tinfo;
2100 2101 2102 2103
    hasTable = false;
  }

  ret = smlDealCols(*oneTable, info->dataFormat, cols);
X
Xiaoyu Wang 已提交
2104
  if (ret != TSDB_CODE_SUCCESS) {
2105 2106
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
2107

X
Xiaoyu Wang 已提交
2108 2109 2110 2111 2112
  if (!hasTable) {
    ret = smlParseCols(elements.tags, elements.tagsLen, (*oneTable)->tags, (*oneTable)->childTableName, true,
                       info->dumplicateKey, &info->msgBuf);
    if (ret != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " smlParseCols parse tag fields failed", info->id);
wmmhello's avatar
wmmhello 已提交
2113 2114 2115
      return ret;
    }

X
Xiaoyu Wang 已提交
2116
    if (taosArrayGetSize((*oneTable)->tags) > TSDB_MAX_TAGS) {
wmmhello's avatar
wmmhello 已提交
2117
      smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
wmmhello's avatar
wmmhello 已提交
2118
      return TSDB_CODE_PAR_INVALID_TAGS_NUM;
wmmhello's avatar
wmmhello 已提交
2119 2120
    }

2121 2122 2123 2124 2125
    if (taosArrayGetSize(cols) + taosArrayGetSize((*oneTable)->tags) > TSDB_MAX_COLUMNS) {
      smlBuildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL);
      return TSDB_CODE_PAR_TOO_MANY_COLUMNS;
    }

2126 2127
    (*oneTable)->sTableName = elements.measure;
    (*oneTable)->sTableNameLen = elements.measureLen;
X
Xiaoyu Wang 已提交
2128 2129 2130
    if (strlen((*oneTable)->childTableName) == 0) {
      RandTableName rName = {(*oneTable)->tags, (*oneTable)->sTableName, (uint8_t)(*oneTable)->sTableNameLen,
                             (*oneTable)->childTableName, 0};
2131 2132 2133

      buildChildTableName(&rName);
      (*oneTable)->uid = rName.uid;
X
Xiaoyu Wang 已提交
2134 2135
    } else {
      (*oneTable)->uid = *(uint64_t *)((*oneTable)->childTableName);
2136
    }
2137
  }
wmmhello's avatar
wmmhello 已提交
2138

X
Xiaoyu Wang 已提交
2139 2140
  SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, elements.measure, elements.measureLen);
  if (tableMeta) {  // update meta
wmmhello's avatar
wmmhello 已提交
2141
    ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, cols, &info->msgBuf);
wmmhello's avatar
wmmhello 已提交
2142
    if (!hasTable && ret == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2143 2144
      ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, (*oneTable)->tags, &info->msgBuf);
    }
wmmhello's avatar
wmmhello 已提交
2145
    if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2146
      uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
wmmhello's avatar
wmmhello 已提交
2147
      return ret;
wmmhello's avatar
wmmhello 已提交
2148
    }
X
Xiaoyu Wang 已提交
2149
  } else {
2150
    SSmlSTableMeta *meta = smlBuildSTableMeta();
wmmhello's avatar
wmmhello 已提交
2151 2152
    smlInsertMeta(meta->tagHash, meta->tags, (*oneTable)->tags);
    smlInsertMeta(meta->colHash, meta->cols, cols);
2153
    taosHashPut(info->superTables, elements.measure, elements.measureLen, &meta, POINTER_BYTES);
wmmhello's avatar
wmmhello 已提交
2154
  }
2155

X
Xiaoyu Wang 已提交
2156
  if (!info->dataFormat) {
2157 2158 2159
    taosArrayClear(info->colsContainer);
  }
  taosHashClear(info->dumplicateKey);
wmmhello's avatar
wmmhello 已提交
2160 2161 2162
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
2163 2164
static int32_t smlParseTelnetLine(SSmlHandle *info, void *data) {
  int            ret = TSDB_CODE_SUCCESS;
2165
  SSmlTableInfo *tinfo = smlBuildTableInfo();
X
Xiaoyu Wang 已提交
2166
  if (!tinfo) {
2167
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
2168 2169
  }

2170 2171
  SArray *cols = taosArrayInit(16, POINTER_BYTES);
  if (cols == NULL) {
X
Xiaoyu Wang 已提交
2172
    uError("SML:0x%" PRIx64 " smlParseTelnetLine failed to allocate memory", info->id);
2173
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
2174 2175
  }

X
Xiaoyu Wang 已提交
2176 2177 2178
  if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
    ret = smlParseTelnetString(info, (const char *)data, tinfo, cols);
  } else if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
2179
    ret = smlParseJSONString(info, (cJSON *)data, tinfo, cols);
X
Xiaoyu Wang 已提交
2180
  } else {
2181 2182
    ASSERT(0);
  }
X
Xiaoyu Wang 已提交
2183 2184
  if (ret != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " smlParseTelnetLine failed", info->id);
wmmhello's avatar
wmmhello 已提交
2185
    smlDestroyTableInfo(info, tinfo);
wmmhello's avatar
wmmhello 已提交
2186
    smlDestroyCols(cols);
2187 2188
    taosArrayDestroy(cols);
    return ret;
wmmhello's avatar
wmmhello 已提交
2189
  }
wmmhello's avatar
wmmhello 已提交
2190

X
Xiaoyu Wang 已提交
2191
  if (taosArrayGetSize(tinfo->tags) <= 0 || taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS) {
2192
    smlBuildInvalidDataMsg(&info->msgBuf, "invalidate tags length:[1,128]", NULL);
wmmhello's avatar
wmmhello 已提交
2193 2194 2195
    smlDestroyTableInfo(info, tinfo);
    smlDestroyCols(cols);
    taosArrayDestroy(cols);
wmmhello's avatar
wmmhello 已提交
2196
    return TSDB_CODE_PAR_INVALID_TAGS_NUM;
wmmhello's avatar
wmmhello 已提交
2197
  }
2198 2199
  taosHashClear(info->dumplicateKey);

X
Xiaoyu Wang 已提交
2200 2201
  if (strlen(tinfo->childTableName) == 0) {
    RandTableName rName = {tinfo->tags, tinfo->sTableName, (uint8_t)tinfo->sTableNameLen, tinfo->childTableName, 0};
2202 2203
    buildChildTableName(&rName);
    tinfo->uid = rName.uid;
X
Xiaoyu Wang 已提交
2204 2205
  } else {
    tinfo->uid = *(uint64_t *)(tinfo->childTableName);  // generate uid by name simple
2206 2207
  }

X
Xiaoyu Wang 已提交
2208 2209 2210 2211
  bool            hasTable = true;
  SSmlTableInfo **oneTable =
      (SSmlTableInfo **)taosHashGet(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName));
  if (!oneTable) {
2212
    taosHashPut(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName), &tinfo, POINTER_BYTES);
2213
    oneTable = &tinfo;
2214
    hasTable = false;
X
Xiaoyu Wang 已提交
2215
  } else {
wmmhello's avatar
wmmhello 已提交
2216
    smlDestroyTableInfo(info, tinfo);
wmmhello's avatar
wmmhello 已提交
2217
  }
wmmhello's avatar
wmmhello 已提交
2218

2219
  taosArrayPush((*oneTable)->cols, &cols);
X
Xiaoyu Wang 已提交
2220 2221 2222
  SSmlSTableMeta **tableMeta =
      (SSmlSTableMeta **)taosHashGet(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen);
  if (tableMeta) {  // update meta
wmmhello's avatar
wmmhello 已提交
2223
    ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, cols, &info->msgBuf);
wmmhello's avatar
wmmhello 已提交
2224
    if (!hasTable && ret == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2225 2226
      ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, (*oneTable)->tags, &info->msgBuf);
    }
wmmhello's avatar
wmmhello 已提交
2227
    if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2228
      uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
wmmhello's avatar
wmmhello 已提交
2229
      return ret;
2230
    }
X
Xiaoyu Wang 已提交
2231
  } else {
2232
    SSmlSTableMeta *meta = smlBuildSTableMeta();
wmmhello's avatar
wmmhello 已提交
2233 2234
    smlInsertMeta(meta->tagHash, meta->tags, (*oneTable)->tags);
    smlInsertMeta(meta->colHash, meta->cols, cols);
2235
    taosHashPut(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen, &meta, POINTER_BYTES);
wmmhello's avatar
wmmhello 已提交
2236 2237
  }

2238 2239
  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
2240

X
Xiaoyu Wang 已提交
2241
static int32_t smlParseJSON(SSmlHandle *info, char *payload) {
2242 2243
  int32_t payloadNum = 0;
  int32_t ret = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2244

2245
  if (payload == NULL) {
X
Xiaoyu Wang 已提交
2246
    uError("SML:0x%" PRIx64 " empty JSON Payload", info->id);
2247
    return TSDB_CODE_TSC_INVALID_JSON;
2248
  }
2249 2250 2251

  cJSON *root = cJSON_Parse(payload);
  if (root == NULL) {
X
Xiaoyu Wang 已提交
2252
    uError("SML:0x%" PRIx64 " parse json failed:%s", info->id, payload);
2253 2254
    return TSDB_CODE_TSC_INVALID_JSON;
  }
X
Xiaoyu Wang 已提交
2255
  // multiple data points must be sent in JSON array
2256 2257 2258 2259 2260
  if (cJSON_IsObject(root)) {
    payloadNum = 1;
  } else if (cJSON_IsArray(root)) {
    payloadNum = cJSON_GetArraySize(root);
  } else {
X
Xiaoyu Wang 已提交
2261
    uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
2262 2263
    ret = TSDB_CODE_TSC_INVALID_JSON;
    goto end;
wmmhello's avatar
wmmhello 已提交
2264
  }
wmmhello's avatar
wmmhello 已提交
2265

2266 2267 2268
  for (int32_t i = 0; i < payloadNum; ++i) {
    cJSON *dataPoint = (payloadNum == 1 && cJSON_IsObject(root)) ? root : cJSON_GetArrayItem(root, i);
    ret = smlParseTelnetLine(info, dataPoint);
X
Xiaoyu Wang 已提交
2269 2270
    if (ret != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
2271 2272 2273 2274 2275 2276 2277
      goto end;
    }
  }

end:
  cJSON_Delete(root);
  return ret;
wmmhello's avatar
wmmhello 已提交
2278
}
2279

X
Xiaoyu Wang 已提交
2280
static int32_t smlInsertData(SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
2281 2282
  int32_t code = TSDB_CODE_SUCCESS;

X
Xiaoyu Wang 已提交
2283
  SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
wmmhello's avatar
wmmhello 已提交
2284
  while (oneTable) {
X
Xiaoyu Wang 已提交
2285
    SSmlTableInfo *tableData = *oneTable;
wmmhello's avatar
wmmhello 已提交
2286 2287 2288 2289

    SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
    strcpy(pName.dbname, info->pRequest->pDb);
    memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName));
D
dapan1121 已提交
2290 2291 2292 2293 2294 2295

    SRequestConnInfo conn = {0};
    conn.pTrans = info->taos->pAppInfo->pTransporter;
    conn.requestId = info->pRequest->requestId;
    conn.requestObjRefId = info->pRequest->self;
    conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
X
Xiaoyu Wang 已提交
2296

wmmhello's avatar
wmmhello 已提交
2297
    SVgroupInfo vg;
D
dapan1121 已提交
2298
    code = catalogGetTableHashVgroup(info->pCatalog, &conn, &pName, &vg);
2299
    if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2300
      uError("SML:0x%" PRIx64 " catalogGetTableHashVgroup failed. table name: %s", info->id, tableData->childTableName);
wmmhello's avatar
wmmhello 已提交
2301 2302
      return code;
    }
X
Xiaoyu Wang 已提交
2303
    taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
wmmhello's avatar
wmmhello 已提交
2304

X
Xiaoyu Wang 已提交
2305 2306 2307
    SSmlSTableMeta **pMeta =
        (SSmlSTableMeta **)taosHashGet(info->superTables, tableData->sTableName, tableData->sTableNameLen);
    ASSERT(NULL != pMeta && NULL != *pMeta);
wmmhello's avatar
wmmhello 已提交
2308

2309
    // use tablemeta of stable to save vgid and uid of child table
wmmhello's avatar
wmmhello 已提交
2310
    (*pMeta)->tableMeta->vgId = vg.vgId;
X
Xiaoyu Wang 已提交
2311
    (*pMeta)->tableMeta->uid = tableData->uid;  // one table merge data block together according uid
wmmhello's avatar
wmmhello 已提交
2312

2313
    code = smlBindData(info->exec, tableData->tags, (*pMeta)->cols, tableData->cols, info->dataFormat,
wmmhello's avatar
wmmhello 已提交
2314
                       (*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen, info->msgBuf.buf, info->msgBuf.len);
X
Xiaoyu Wang 已提交
2315 2316
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " smlBindData failed", info->id);
wmmhello's avatar
wmmhello 已提交
2317 2318
      return code;
    }
X
Xiaoyu Wang 已提交
2319
    oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable);
wmmhello's avatar
wmmhello 已提交
2320
  }
wmmhello's avatar
wmmhello 已提交
2321

2322 2323
  code = smlBuildOutput(info->exec, info->pVgHash);
  if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2324
    uError("SML:0x%" PRIx64 " smlBuildOutput failed", info->id);
2325 2326
    return code;
  }
2327 2328
  info->cost.insertRpcTime = taosGetTimestampUs();

X
Xiaoyu Wang 已提交
2329 2330 2331
  // launchQueryImpl(info->pRequest, info->pQuery, false, NULL);
  //  info->affectedRows = taos_affected_rows(info->pRequest);
  //  return info->pRequest->code;
wmmhello's avatar
wmmhello 已提交
2332

D
dapan1121 已提交
2333
  launchAsyncQuery(info->pRequest, info->pQuery, NULL);
wmmhello's avatar
wmmhello 已提交
2334
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2335 2336
}

X
Xiaoyu Wang 已提交
2337 2338 2339 2340 2341 2342 2343 2344 2345
static void smlPrintStatisticInfo(SSmlHandle *info) {
  uError("SML:0x%" PRIx64
         " smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d \
        parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64
         "",
         info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables,
         info->cost.numOfCreateSTables, info->cost.schemaTime - info->cost.parseTime,
         info->cost.insertBindTime - info->cost.schemaTime, info->cost.insertRpcTime - info->cost.insertBindTime,
         info->cost.endTime - info->cost.insertRpcTime, info->cost.endTime - info->cost.parseTime);
2346 2347
}

X
Xiaoyu Wang 已提交
2348
static int32_t smlParseLine(SSmlHandle *info, char *lines[], int numLines) {
wmmhello's avatar
wmmhello 已提交
2349
  int32_t code = TSDB_CODE_SUCCESS;
2350 2351 2352 2353 2354 2355
  if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
    code = smlParseJSON(info, *lines);
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, *lines);
      return code;
    }
wmmhello's avatar
wmmhello 已提交
2356
    return code;
wmmhello's avatar
wmmhello 已提交
2357
  }
wmmhello's avatar
wmmhello 已提交
2358

wmmhello's avatar
wmmhello 已提交
2359
  for (int32_t i = 0; i < numLines; ++i) {
X
Xiaoyu Wang 已提交
2360
    if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
2361
      code = smlParseInfluxLine(info, lines[i]);
X
Xiaoyu Wang 已提交
2362
    } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
2363
      code = smlParseTelnetLine(info, lines[i]);
X
Xiaoyu Wang 已提交
2364
    } else {
2365 2366
      ASSERT(0);
    }
wmmhello's avatar
wmmhello 已提交
2367
    if (code != TSDB_CODE_SUCCESS) {
2368 2369
      uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, lines[i]);
      return code;
wmmhello's avatar
wmmhello 已提交
2370 2371
    }
  }
2372 2373 2374
  return code;
}

X
Xiaoyu Wang 已提交
2375
static int smlProcess(SSmlHandle *info, char *lines[], int numLines) {
2376
  int32_t code = TSDB_CODE_SUCCESS;
2377 2378
  int32_t retryNum = 0;

2379 2380 2381 2382
  info->cost.parseTime = taosGetTimestampUs();

  code = smlParseLine(info, lines, numLines);
  if (code != 0) {
X
Xiaoyu Wang 已提交
2383
    uError("SML:0x%" PRIx64 " smlParseLine error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2384
    return code;
2385
  }
wmmhello's avatar
wmmhello 已提交
2386

2387 2388 2389 2390 2391
  info->cost.lineNum = numLines;
  info->cost.numOfSTables = taosHashGetSize(info->superTables);
  info->cost.numOfCTables = taosHashGetSize(info->childTables);

  info->cost.schemaTime = taosGetTimestampUs();
2392

X
Xiaoyu Wang 已提交
2393
  do {
2394 2395
    code = smlModifyDBSchemas(info);
    if (code == 0) break;
wmmhello's avatar
wmmhello 已提交
2396
  } while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
2397

wmmhello's avatar
wmmhello 已提交
2398
  if (code != 0) {
X
Xiaoyu Wang 已提交
2399
    uError("SML:0x%" PRIx64 " smlModifyDBSchemas error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2400
    return code;
wmmhello's avatar
wmmhello 已提交
2401
  }
wmmhello's avatar
wmmhello 已提交
2402

2403
  info->cost.insertBindTime = taosGetTimestampUs();
wmmhello's avatar
wmmhello 已提交
2404 2405
  code = smlInsertData(info);
  if (code != 0) {
X
Xiaoyu Wang 已提交
2406
    uError("SML:0x%" PRIx64 " smlInsertData error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2407
    return code;
wmmhello's avatar
wmmhello 已提交
2408 2409 2410 2411 2412
  }

  return code;
}

X
Xiaoyu Wang 已提交
2413
static int32_t isSchemalessDb(STscObj *taos, SRequestObj *request) {
wmmhello's avatar
wmmhello 已提交
2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441
//  SCatalog *catalog = NULL;
//  int32_t   code = catalogGetHandle(((STscObj *)taos)->pAppInfo->clusterId, &catalog);
//  if (code != TSDB_CODE_SUCCESS) {
//    uError("SML get catalog error %d", code);
//    return code;
//  }
//
//  SName name;
//  tNameSetDbName(&name, taos->acctId, taos->db, strlen(taos->db));
//  char dbFname[TSDB_DB_FNAME_LEN] = {0};
//  tNameGetFullDbName(&name, dbFname);
//  SDbCfgInfo pInfo = {0};
//
//  SRequestConnInfo conn = {0};
//  conn.pTrans = taos->pAppInfo->pTransporter;
//  conn.requestId = request->requestId;
//  conn.requestObjRefId = request->self;
//  conn.mgmtEps = getEpSet_s(&taos->pAppInfo->mgmtEp);
//
//  code = catalogGetDBCfg(catalog, &conn, dbFname, &pInfo);
//  if (code != TSDB_CODE_SUCCESS) {
//    return code;
//  }
//  taosArrayDestroy(pInfo.pRetensions);
//
//  if (!pInfo.schemaless) {
//    return TSDB_CODE_SML_INVALID_DB_CONF;
//  }
wmmhello's avatar
wmmhello 已提交
2442 2443 2444
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
2445
static void smlInsertCallback(void *param, void *res, int32_t code) {
wmmhello's avatar
wmmhello 已提交
2446
  SRequestObj *pRequest = (SRequestObj *)res;
X
Xiaoyu Wang 已提交
2447
  SSmlHandle  *info = (SSmlHandle *)param;
wmmhello's avatar
wmmhello 已提交
2448
  int32_t rows = taos_affected_rows(pRequest);
wmmhello's avatar
wmmhello 已提交
2449

X
Xiaoyu Wang 已提交
2450
  uDebug("SML:0x%" PRIx64 " result. code:%d, msg:%s", info->id, pRequest->code, pRequest->msgBuf);
wmmhello's avatar
wmmhello 已提交
2451
  // lock
wmmhello's avatar
wmmhello 已提交
2452
  taosThreadSpinLock(&info->params->lock);
X
Xiaoyu Wang 已提交
2453
  if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2454
    info->params->request->code = code;
2455 2456 2457
    info->params->request->body.resInfo.numOfRows += rows;
  }else{
    info->params->request->body.resInfo.numOfRows += info->affectedRows;
wmmhello's avatar
wmmhello 已提交
2458
  }
wmmhello's avatar
wmmhello 已提交
2459
  taosThreadSpinUnlock(&info->params->lock);
wmmhello's avatar
wmmhello 已提交
2460 2461
  // unlock

wmmhello's avatar
wmmhello 已提交
2462
  uDebug("SML:0x%" PRIx64 " insert finished, code: %d, rows: %d, total: %d", info->id, code, rows, info->affectedRows);
wmmhello's avatar
wmmhello 已提交
2463
  Params *pParam = info->params;
X
Xiaoyu Wang 已提交
2464
  bool    isLast = info->isLast;
wmmhello's avatar
wmmhello 已提交
2465 2466 2467
  info->cost.endTime = taosGetTimestampUs();
  info->cost.code = code;
  smlPrintStatisticInfo(info);
wmmhello's avatar
wmmhello 已提交
2468 2469
  smlDestroyInfo(info);

X
Xiaoyu Wang 已提交
2470
  if (isLast) {
wmmhello's avatar
wmmhello 已提交
2471
    tsem_post(&pParam->sem);
wmmhello's avatar
wmmhello 已提交
2472 2473 2474
  }
}

wmmhello's avatar
wmmhello 已提交
2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495
/**
 * taos_schemaless_insert() parse and insert data points into database according to
 * different protocol.
 *
 * @param $lines input array may contain multiple lines, each line indicates a data point.
 *               If protocol=2 is used input array should contain single JSON
 *               string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
 *               multiple data points in JSON format, should include them in $JSON_string
 *               as a JSON array.
 * @param $numLines indicates how many data points in $lines.
 *                  If protocol = 2 is used this param will be ignored as $lines should
 *                  contain single JSON string.
 * @param $protocol indicates which protocol to use for parsing:
 *                  0 - influxDB line protocol
 *                  1 - OpenTSDB telnet line protocol
 *                  2 - OpenTSDB JSON format protocol
 * @return return zero for successful insertion. Otherwise return none-zero error code of
 *         failure reason.
 *
 */

wmmhello's avatar
wmmhello 已提交
2496
TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) {
D
dapan1121 已提交
2497
  if (NULL == taos) {
2498 2499 2500
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    return NULL;
  }
D
dapan1121 已提交
2501

2502
  SRequestObj* request = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
wmmhello's avatar
wmmhello 已提交
2503
  if(!request){
2504
    uError("SML:taos_schemaless_insert error request is null");
2505
    return NULL;
wmmhello's avatar
wmmhello 已提交
2506 2507
  }

wmmhello's avatar
wmmhello 已提交
2508
  int    batchs = 0;
2509 2510
  STscObj* pTscObj = request->pTscObj;

2511
  pTscObj->schemalessType = 1;
wmmhello's avatar
wmmhello 已提交
2512
  SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
wmmhello's avatar
wmmhello 已提交
2513

wmmhello's avatar
wmmhello 已提交
2514 2515
  Params params;
  params.request = request;
wmmhello's avatar
wmmhello 已提交
2516 2517 2518
  tsem_init(&params.sem, 0, 0);
  taosThreadSpinInit(&(params.lock), 0);

X
Xiaoyu Wang 已提交
2519
  if (request->pDb == NULL) {
wmmhello's avatar
wmmhello 已提交
2520
    request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
wmmhello's avatar
wmmhello 已提交
2521
    smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
wmmhello's avatar
wmmhello 已提交
2522 2523 2524
    goto end;
  }

2525
  if(isSchemalessDb(pTscObj, request) != TSDB_CODE_SUCCESS){
wmmhello's avatar
wmmhello 已提交
2526
    request->code = TSDB_CODE_SML_INVALID_DB_CONF;
wmmhello's avatar
wmmhello 已提交
2527
    smlBuildInvalidDataMsg(&msg, "Cannot write data to a non schemaless database", NULL);
wmmhello's avatar
wmmhello 已提交
2528 2529 2530
    goto end;
  }

2531
  if (!lines) {
2532
    request->code = TSDB_CODE_SML_INVALID_DATA;
wmmhello's avatar
wmmhello 已提交
2533
    smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
2534 2535 2536
    goto end;
  }

X
Xiaoyu Wang 已提交
2537
  if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) {
2538
    request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
wmmhello's avatar
wmmhello 已提交
2539
    smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL);
2540
    goto end;
wmmhello's avatar
wmmhello 已提交
2541 2542
  }

X
Xiaoyu Wang 已提交
2543 2544
  if (protocol == TSDB_SML_LINE_PROTOCOL &&
      (precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)) {
2545
    request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
wmmhello's avatar
wmmhello 已提交
2546
    smlBuildInvalidDataMsg(&msg, "precision invalidate for line protocol", NULL);
2547 2548 2549
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
2550 2551 2552 2553 2554 2555 2556 2557
  if(protocol == TSDB_SML_JSON_PROTOCOL){
    numLines = 1;
  }else if(numLines <= 0){
    request->code = TSDB_CODE_SML_INVALID_DATA;
    smlBuildInvalidDataMsg(&msg, "line num is invalid", NULL);
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
2558 2559
  batchs = ceil(((double)numLines) / LINE_BATCH);
  for (int i = 0; i < batchs; ++i) {
2560
    SRequestObj* req = (SRequestObj*)createRequest(pTscObj->id, TSDB_SQL_INSERT);
wmmhello's avatar
wmmhello 已提交
2561 2562 2563 2564 2565
    if(!req){
      request->code = TSDB_CODE_OUT_OF_MEMORY;
      uError("SML:taos_schemaless_insert error request is null");
      goto end;
    }
2566
    SSmlHandle* info = smlBuildSmlInfo(pTscObj, req, (SMLProtocolType)protocol, precision);
wmmhello's avatar
wmmhello 已提交
2567 2568 2569 2570 2571 2572
    if(!info){
      request->code = TSDB_CODE_OUT_OF_MEMORY;
      uError("SML:taos_schemaless_insert error SSmlHandle is null");
      goto end;
    }

wmmhello's avatar
wmmhello 已提交
2573 2574
    int32_t perBatch = LINE_BATCH;

X
Xiaoyu Wang 已提交
2575
    if (numLines > perBatch) {
wmmhello's avatar
wmmhello 已提交
2576 2577
      numLines -= perBatch;
      info->isLast = false;
X
Xiaoyu Wang 已提交
2578
    } else {
wmmhello's avatar
wmmhello 已提交
2579 2580 2581 2582 2583
      perBatch = numLines;
      numLines = 0;
      info->isLast = true;
    }

wmmhello's avatar
wmmhello 已提交
2584
    info->params = &params;
wmmhello's avatar
wmmhello 已提交
2585 2586
    info->affectedRows = perBatch;
    info->pRequest->body.queryFp = smlInsertCallback;
X
Xiaoyu Wang 已提交
2587
    info->pRequest->body.param = info;
wmmhello's avatar
wmmhello 已提交
2588
    int32_t code = smlProcess(info, lines, perBatch);
wmmhello's avatar
wmmhello 已提交
2589
    lines += perBatch;
X
Xiaoyu Wang 已提交
2590
    if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2591 2592 2593 2594
      info->pRequest->body.queryFp(info, req, code);
    }
  }
  tsem_wait(&params.sem);
2595 2596

end:
wmmhello's avatar
wmmhello 已提交
2597 2598
  taosThreadSpinDestroy(&params.lock);
  tsem_destroy(&params.sem);
2599
//  ((STscObj *)taos)->schemalessType = 0;
2600
  pTscObj->schemalessType = 1;
2601
  uDebug("resultend:%s", request->msgBuf);
wmmhello's avatar
wmmhello 已提交
2602
  return (TAOS_RES*)request;
wmmhello's avatar
wmmhello 已提交
2603
}