clientSml.c 98.2 KB
Newer Older
wmmhello's avatar
wmmhello 已提交
1 2 3 4 5
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

X
Xiaoyu Wang 已提交
6 7 8 9 10
#include "cJSON.h"
#include "catalog.h"
#include "clientInt.h"
#include "osSemaphore.h"
#include "osThread.h"
wmmhello's avatar
wmmhello 已提交
11 12
#include "query.h"
#include "taos.h"
wmmhello's avatar
wmmhello 已提交
13
#include "taoserror.h"
X
Xiaoyu Wang 已提交
14
#include "tcommon.h"
wmmhello's avatar
wmmhello 已提交
15
#include "tdef.h"
X
Xiaoyu Wang 已提交
16
#include "tglobal.h"
wmmhello's avatar
wmmhello 已提交
17 18
#include "tlog.h"
#include "tmsg.h"
X
Xiaoyu Wang 已提交
19
#include "tname.h"
wmmhello's avatar
wmmhello 已提交
20 21
#include "ttime.h"
#include "ttypes.h"
wmmhello's avatar
wmmhello 已提交
22

23 24 25 26 27 28 29 30 31 32 33 34 35
#if (defined(__GNUC__) && (__GNUC__ >= 3)) || (defined(__INTEL_COMPILER) && (__INTEL_COMPILER >= 800)) || defined(__clang__)
#  define expect(expr,value)    (__builtin_expect ((expr),(value)) )
#else
#  define expect(expr,value)    (expr)
#endif

#ifndef likely
#define likely(expr)     expect((expr) != 0, 1)
#endif
#ifndef unlikely
#define unlikely(expr)   expect((expr) != 0, 0)
#endif

wmmhello's avatar
wmmhello 已提交
36
//=================================================================================================
wmmhello's avatar
wmmhello 已提交
37 38 39 40 41 42 43

#define SPACE ' '
#define COMMA ','
#define EQUAL '='
#define QUOTE '"'
#define SLASH '\\'

dengyihao's avatar
dengyihao 已提交
44 45 46 47 48 49
#define JUMP_SPACE(sql, sqlEnd) \
  while (sql < sqlEnd) {        \
    if (*sql == SPACE)          \
      sql++;                    \
    else                        \
      break;                    \
X
Xiaoyu Wang 已提交
50
  }
wmmhello's avatar
wmmhello 已提交
51
// comma ,
X
Xiaoyu Wang 已提交
52 53
#define IS_SLASH_COMMA(sql) (*(sql) == COMMA && *((sql)-1) == SLASH)
#define IS_COMMA(sql)       (*(sql) == COMMA && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
54
// space
X
Xiaoyu Wang 已提交
55 56
#define IS_SLASH_SPACE(sql) (*(sql) == SPACE && *((sql)-1) == SLASH)
#define IS_SPACE(sql)       (*(sql) == SPACE && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
57
// equal =
X
Xiaoyu Wang 已提交
58 59
#define IS_SLASH_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) == SLASH)
#define IS_EQUAL(sql)       (*(sql) == EQUAL && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
60
// quote "
X
Xiaoyu Wang 已提交
61 62
#define IS_SLASH_QUOTE(sql) (*(sql) == QUOTE && *((sql)-1) == SLASH)
#define IS_QUOTE(sql)       (*(sql) == QUOTE && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
63
// SLASH
X
Xiaoyu Wang 已提交
64
#define IS_SLASH_SLASH(sql) (*(sql) == SLASH && *((sql)-1) == SLASH)
wmmhello's avatar
wmmhello 已提交
65

X
Xiaoyu Wang 已提交
66 67
#define IS_SLASH_LETTER(sql) \
  (IS_SLASH_COMMA(sql) || IS_SLASH_SPACE(sql) || IS_SLASH_EQUAL(sql) || IS_SLASH_QUOTE(sql) || IS_SLASH_SLASH(sql))
wmmhello's avatar
wmmhello 已提交
68

X
Xiaoyu Wang 已提交
69
#define MOVE_FORWARD_ONE(sql, len) (memmove((void *)((sql)-1), (sql), len))
wmmhello's avatar
wmmhello 已提交
70

X
Xiaoyu Wang 已提交
71 72 73 74 75 76 77 78
#define PROCESS_SLASH(key, keyLen)           \
  for (int i = 1; i < keyLen; ++i) {         \
    if (IS_SLASH_LETTER(key + i)) {          \
      MOVE_FORWARD_ONE(key + i, keyLen - i); \
      i--;                                   \
      keyLen--;                              \
    }                                        \
  }
wmmhello's avatar
wmmhello 已提交
79

80 81 82
#define IS_INVALID_COL_LEN(len)   ((len) <= 0 || (len) >= TSDB_COL_NAME_LEN)
#define IS_INVALID_TABLE_LEN(len) ((len) <= 0 || (len) >= TSDB_TABLE_NAME_LEN)

83 84 85
#define OTD_JSON_SUB_FIELDS_NUM 2
#define OTD_JSON_FIELDS_NUM     4

X
Xiaoyu Wang 已提交
86 87 88 89
#define TS        "_ts"
#define TS_LEN    3
#define VALUE     "_value"
#define VALUE_LEN 6
90

X
Xiaoyu Wang 已提交
91 92
#define BINARY_ADD_LEN 2  // "binary"   2 means " "
#define NCHAR_ADD_LEN  3  // L"nchar"   3 means L" "
wmmhello's avatar
wmmhello 已提交
93 94

#define MAX_RETRY_TIMES 5
wmmhello's avatar
wmmhello 已提交
95 96 97 98
//=================================================================================================
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;

typedef enum {
99
  SCHEMA_ACTION_NULL,
wmmhello's avatar
wmmhello 已提交
100 101 102 103 104
  SCHEMA_ACTION_CREATE_STABLE,
  SCHEMA_ACTION_ADD_COLUMN,
  SCHEMA_ACTION_ADD_TAG,
  SCHEMA_ACTION_CHANGE_COLUMN_SIZE,
  SCHEMA_ACTION_CHANGE_TAG_SIZE,
wmmhello's avatar
wmmhello 已提交
105 106 107
} ESchemaAction;

typedef struct {
X
Xiaoyu Wang 已提交
108 109 110 111
  const char *measure;
  const char *tags;
  const char *cols;
  const char *timestamp;
wmmhello's avatar
wmmhello 已提交
112 113 114 115 116 117

  int32_t measureLen;
  int32_t measureTagsLen;
  int32_t tagsLen;
  int32_t colsLen;
  int32_t timestampLen;
118 119

  SArray *colArray;
wmmhello's avatar
wmmhello 已提交
120 121 122
} SSmlLineInfo;

typedef struct {
X
Xiaoyu Wang 已提交
123 124 125 126
  const char *sTableName;  // super table name
  int32_t     sTableNameLen;
  char        childTableName[TSDB_TABLE_NAME_LEN];
  uint64_t    uid;
wmmhello's avatar
wmmhello 已提交
127

X
Xiaoyu Wang 已提交
128
  SArray *tags;
wmmhello's avatar
wmmhello 已提交
129

130
  // elements are SHashObj<cols key string, SSmlKv*> for find by key quickly
X
Xiaoyu Wang 已提交
131
  SArray *cols;
132
  STableDataCxt *tableDataCtx;
wmmhello's avatar
wmmhello 已提交
133 134 135
} SSmlTableInfo;

typedef struct {
X
Xiaoyu Wang 已提交
136 137
  SArray   *tags;     // save the origin order to create table
  SHashObj *tagHash;  // elements are <key, index in tags>
138

X
Xiaoyu Wang 已提交
139 140
  SArray   *cols;
  SHashObj *colHash;
141

wmmhello's avatar
wmmhello 已提交
142 143 144 145
  STableMeta *tableMeta;
} SSmlSTableMeta;

typedef struct {
X
Xiaoyu Wang 已提交
146 147
  int32_t len;
  char   *buf;
wmmhello's avatar
wmmhello 已提交
148 149
} SSmlMsgBuf;

150 151 152 153 154 155 156
typedef struct {
  int32_t code;
  int32_t lineNum;

  int32_t numOfSTables;
  int32_t numOfCTables;
  int32_t numOfCreateSTables;
157 158
  int32_t numOfAlterColSTables;
  int32_t numOfAlterTagSTables;
159 160 161 162 163 164 165 166

  int64_t parseTime;
  int64_t schemaTime;
  int64_t insertBindTime;
  int64_t insertRpcTime;
  int64_t endTime;
} SSmlCostInfo;

X
Xiaoyu Wang 已提交
167 168 169
typedef struct {
  SRequestObj     *request;
  tsem_t           sem;
170 171
  int32_t          cnt;
  int32_t          total;
wmmhello's avatar
wmmhello 已提交
172 173 174
  TdThreadSpinlock lock;
} Params;

wmmhello's avatar
wmmhello 已提交
175
typedef struct {
X
Xiaoyu Wang 已提交
176 177 178 179 180
  int64_t id;
  Params *params;

  SMLProtocolType protocol;
  int8_t          precision;
181
  bool            reRun;
182 183 184
  bool            dataFormat;  // true means that the name and order of keys in each line are the same(only for influx protocol)
  bool            isRawLine;
  int32_t         ttl;
X
Xiaoyu Wang 已提交
185 186 187 188 189 190 191 192 193 194 195 196 197 198

  SHashObj *childTables;
  SHashObj *superTables;
  SHashObj *pVgHash;

  STscObj     *taos;
  SCatalog    *pCatalog;
  SRequestObj *pRequest;
  SQuery      *pQuery;

  SSmlCostInfo cost;
  int32_t      affectedRows;
  SSmlMsgBuf   msgBuf;
  SHashObj    *dumplicateKey;  // for dumplicate key
wmmhello's avatar
wmmhello 已提交
199 200

  cJSON       *root;  // for parse json
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
  SArray      *lines; // element is SSmlLineInfo

  //
  SHashObj    *superTableTagKeyStr;
  SHashObj    *superTableColKeyStr;
  void        *currentLineTagKeys;
  void        *preLineTagKeys;
  void        *currentLineColKeys;
  void        *preLineColKeys;

  SArray      *preLineTagKV;
  SArray      *preLineColKV;

  SSmlLineInfo preLine;
  STableMeta  *currSTableMeta;
  STableDataCxt *currTableDataCtx;
wmmhello's avatar
wmmhello 已提交
217
} SSmlHandle;
wmmhello's avatar
wmmhello 已提交
218 219
//=================================================================================================

wmmhello's avatar
wmmhello 已提交
220
//=================================================================================================
221
static volatile int64_t linesSmlHandleId = 0;
X
Xiaoyu Wang 已提交
222
static int64_t          smlGenId() {
223
  int64_t id;
wmmhello's avatar
wmmhello 已提交
224

225 226
  do {
    id = atomic_add_fetch_64(&linesSmlHandleId, 1);
wmmhello's avatar
wmmhello 已提交
227 228
  } while (id == 0);

229
  return id;
wmmhello's avatar
wmmhello 已提交
230 231
}

232
static inline bool smlDoubleToInt64OverFlow(double num) {
X
Xiaoyu Wang 已提交
233
  if (num >= (double)INT64_MAX || num <= (double)INT64_MIN) return true;
234 235 236 237 238 239 240 241 242 243 244 245
  return false;
}

static inline bool smlCheckDuplicateKey(const char *key, int32_t keyLen, SHashObj *pHash) {
  void *val = taosHashGet(pHash, key, keyLen);
  if (val) {
    return true;
  }
  taosHashPut(pHash, key, keyLen, key, 1);
  return false;
}

X
Xiaoyu Wang 已提交
246
static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf *pBuf, const char *msg1, const char *msg2) {
247
  if (pBuf->buf) {
248 249 250 251 252 253 254
    memset(pBuf->buf, 0, pBuf->len);
    if (msg1) strncat(pBuf->buf, msg1, pBuf->len);
    int32_t left = pBuf->len - strlen(pBuf->buf);
    if (left > 2 && msg2) {
      strncat(pBuf->buf, ":", left - 1);
      strncat(pBuf->buf, msg2, left - 2);
    }
wmmhello's avatar
wmmhello 已提交
255
  }
wmmhello's avatar
wmmhello 已提交
256 257 258
  return TSDB_CODE_SML_INVALID_DATA;
}

X
Xiaoyu Wang 已提交
259
static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSmlKv *kv, bool isTag,
260
                                       ESchemaAction *action, SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
261
  uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL;
262 263
  if (index) {
    if (colField[*index].type != kv->type) {
X
Xiaoyu Wang 已提交
264 265
      uError("SML:0x%" PRIx64 " point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id,
             kv->key, colField[*index].type, kv->type);
266 267 268
      return TSDB_CODE_TSC_INVALID_VALUE;
    }

X
Xiaoyu Wang 已提交
269 270 271 272
    if ((colField[*index].type == TSDB_DATA_TYPE_VARCHAR &&
         (colField[*index].bytes - VARSTR_HEADER_SIZE) < kv->length) ||
        (colField[*index].type == TSDB_DATA_TYPE_NCHAR &&
         ((colField[*index].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE < kv->length))) {
273
      if (isTag) {
wmmhello's avatar
wmmhello 已提交
274
        *action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
275
      } else {
wmmhello's avatar
wmmhello 已提交
276
        *action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
277 278 279 280
      }
    }
  } else {
    if (isTag) {
wmmhello's avatar
wmmhello 已提交
281
      *action = SCHEMA_ACTION_ADD_TAG;
282
    } else {
wmmhello's avatar
wmmhello 已提交
283
      *action = SCHEMA_ACTION_ADD_COLUMN;
284 285
    }
  }
wmmhello's avatar
wmmhello 已提交
286 287 288
  return 0;
}

wmmhello's avatar
wmmhello 已提交
289
static int32_t smlFindNearestPowerOf2(int32_t length, uint8_t type) {
wmmhello's avatar
wmmhello 已提交
290
  int32_t result = 1;
X
Xiaoyu Wang 已提交
291
  while (result <= length) {
wmmhello's avatar
wmmhello 已提交
292 293
    result *= 2;
  }
294
  if (type == TSDB_DATA_TYPE_BINARY && result > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
wmmhello's avatar
wmmhello 已提交
295
    result = TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE;
296
  } else if (type == TSDB_DATA_TYPE_NCHAR && result > (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
wmmhello's avatar
wmmhello 已提交
297 298
    result = (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
  }
wmmhello's avatar
wmmhello 已提交
299

300
  if (type == TSDB_DATA_TYPE_NCHAR) {
301
    result = result * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
302
  } else if (type == TSDB_DATA_TYPE_BINARY) {
303
    result = result + VARSTR_HEADER_SIZE;
wmmhello's avatar
wmmhello 已提交
304
  }
305
  return result;
wmmhello's avatar
wmmhello 已提交
306 307
}

X
Xiaoyu Wang 已提交
308
static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
309
                                      ESchemaAction *action, bool isTag) {
310 311
  int32_t code = TSDB_CODE_SUCCESS;
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
312
    if (j == 0 && !isTag) continue;
313
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j);
314
    code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, info);
X
Xiaoyu Wang 已提交
315
    if (code != TSDB_CODE_SUCCESS) {
316 317 318 319 320 321
      return code;
    }
  }
  return TSDB_CODE_SUCCESS;
}

322
static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool isTag) {
323
  SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
324 325
  int32_t   i = 0;
  for (; i < length; i++) {
326 327 328
    taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES);
  }

329
  if (isTag) {
330 331 332 333 334
    i = 0;
  } else {
    i = 1;
  }
  for (; i < taosArrayGetSize(cols); i++) {
335
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
X
Xiaoyu Wang 已提交
336
    if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) {
wmmhello's avatar
wmmhello 已提交
337
      taosHashCleanup(hashTmp);
338 339 340
      return -1;
    }
  }
341
  taosHashCleanup(hashTmp);
342 343 344
  return 0;
}

345
static int32_t getBytes(uint8_t type, int32_t length) {
346 347 348 349 350 351 352
  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
    return smlFindNearestPowerOf2(length, type);
  } else {
    return tDataTypes[type].bytes;
  }
}

353 354
static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
                                  SArray *results, int32_t numOfCols, bool isTag) {
wmmhello's avatar
wmmhello 已提交
355
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
356
    SSmlKv       *kv = (SSmlKv *)taosArrayGet(cols, j);
wmmhello's avatar
wmmhello 已提交
357 358
    ESchemaAction action = SCHEMA_ACTION_NULL;
    smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, &action, info);
359
    if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_ADD_TAG) {
wmmhello's avatar
wmmhello 已提交
360 361 362 363 364
      SField field = {0};
      field.type = kv->type;
      field.bytes = getBytes(kv->type, kv->length);
      memcpy(field.name, kv->key, kv->keyLen);
      taosArrayPush(results, &field);
365
    } else if (action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
wmmhello's avatar
wmmhello 已提交
366
      uint16_t *index = (uint16_t *)taosHashGet(schemaHash, kv->key, kv->keyLen);
367 368
      uint16_t  newIndex = *index;
      if (isTag) newIndex -= numOfCols;
wmmhello's avatar
wmmhello 已提交
369 370 371 372 373 374 375
      SField *field = (SField *)taosArrayGet(results, newIndex);
      field->bytes = getBytes(kv->type, kv->length);
    }
  }
  return TSDB_CODE_SUCCESS;
}

376 377 378 379 380
// static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SSmlSTableMeta *sTableData,
//                               int32_t colVer, int32_t tagVer, int8_t source, uint64_t suid){
static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, SArray *pTags, STableMeta *pTableMeta,
                              ESchemaAction action) {
  SRequestObj   *pRequest = NULL;
381 382 383 384
  SMCreateStbReq pReq = {0};
  int32_t        code = TSDB_CODE_SUCCESS;
  SCmdMsgInfo    pCmdMsg = {0};

wmmhello's avatar
wmmhello 已提交
385 386 387 388 389 390
  // put front for free
  pReq.numOfColumns = taosArrayGetSize(pColumns);
  pReq.pColumns = pColumns;
  pReq.numOfTags = taosArrayGetSize(pTags);
  pReq.pTags = pTags;

dengyihao's avatar
dengyihao 已提交
391
  code = buildRequest(info->taos->id, "", 0, NULL, false, &pRequest, 0);
392 393 394 395
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
396
  pRequest->syncQuery = true;
397 398 399 400 401
  if (!pRequest->pDb) {
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }

402
  if (action == SCHEMA_ACTION_CREATE_STABLE) {
wmmhello's avatar
wmmhello 已提交
403 404 405 406
    pReq.colVer = 1;
    pReq.tagVer = 1;
    pReq.suid = 0;
    pReq.source = TD_REQ_FROM_APP;
407
  } else if (action == SCHEMA_ACTION_ADD_TAG || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
wmmhello's avatar
wmmhello 已提交
408 409 410 411
    pReq.colVer = pTableMeta->sversion;
    pReq.tagVer = pTableMeta->tversion + 1;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
412
  } else if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE) {
wmmhello's avatar
wmmhello 已提交
413 414 415 416 417 418
    pReq.colVer = pTableMeta->sversion + 1;
    pReq.tagVer = pTableMeta->tversion;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
  }

419
  if (pReq.numOfTags == 0) {
wmmhello's avatar
wmmhello 已提交
420 421 422 423 424 425 426 427
    pReq.numOfTags = 1;
    SField field = {0};
    field.type = TSDB_DATA_TYPE_NCHAR;
    field.bytes = 1;
    strcpy(field.name, tsSmlTagName);
    taosArrayPush(pReq.pTags, &field);
  }

428 429 430 431 432 433 434 435 436 437 438 439 440 441
  pReq.commentLen = -1;
  pReq.igExists = true;
  tNameExtractFullName(pName, pReq.name);

  pCmdMsg.epSet = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

D
dapan1121 已提交
442 443
  SQuery pQuery;
  memset(&pQuery, 0, sizeof(pQuery));
444 445 446 447 448 449 450
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);

451
  if (pRequest->code == TSDB_CODE_SUCCESS) {
452 453 454 455 456
    catalogRemoveTableMeta(info->pCatalog, pName);
  }
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

457
  end:
458 459 460 461 462
  destroyRequest(pRequest);
  tFreeSMCreateStbReq(&pReq);
  return code;
}

X
Xiaoyu Wang 已提交
463
static int32_t smlModifyDBSchemas(SSmlHandle *info) {
464 465 466
  int32_t     code = 0;
  SHashObj   *hashTmp = NULL;
  STableMeta *pTableMeta = NULL;
wmmhello's avatar
wmmhello 已提交
467

468
  SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
wmmhello's avatar
wmmhello 已提交
469
  tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
wmmhello's avatar
wmmhello 已提交
470

D
dapan1121 已提交
471 472 473 474 475
  SRequestConnInfo conn = {0};
  conn.pTrans = info->taos->pAppInfo->pTransporter;
  conn.requestId = info->pRequest->requestId;
  conn.requestObjRefId = info->pRequest->self;
  conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
X
Xiaoyu Wang 已提交
476 477

  SSmlSTableMeta **tableMetaSml = (SSmlSTableMeta **)taosHashIterate(info->superTables, NULL);
wmmhello's avatar
wmmhello 已提交
478
  while (tableMetaSml) {
X
Xiaoyu Wang 已提交
479 480
    SSmlSTableMeta *sTableData = *tableMetaSml;
    bool            needCheckMeta = false;  // for multi thread
wmmhello's avatar
wmmhello 已提交
481

wmmhello's avatar
wmmhello 已提交
482
    size_t superTableLen = 0;
X
Xiaoyu Wang 已提交
483
    void  *superTable = taosHashGetKey(tableMetaSml, &superTableLen);
484
    memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
wmmhello's avatar
wmmhello 已提交
485
    memcpy(pName.tname, superTable, superTableLen);
wmmhello's avatar
wmmhello 已提交
486

D
dapan1121 已提交
487
    code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
wmmhello's avatar
wmmhello 已提交
488

489
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
490 491
      SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
      SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
wmmhello's avatar
wmmhello 已提交
492 493 494 495
      smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
      smlBuildFieldsList(info, NULL, NULL, sTableData->cols, pColumns, 0, false);

      code = smlSendMetaMsg(info, &pName, pColumns, pTags, NULL, SCHEMA_ACTION_CREATE_STABLE);
496
      if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
497
        uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
498
        goto end;
wmmhello's avatar
wmmhello 已提交
499
      }
500
      info->cost.numOfCreateSTables++;
wmmhello's avatar
wmmhello 已提交
501 502 503 504 505 506 507
      taosMemoryFreeClear(pTableMeta);

      code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
        goto end;
      }
X
Xiaoyu Wang 已提交
508
    } else if (code == TSDB_CODE_SUCCESS) {
509 510
      hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true,
                             HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
511 512
      for (uint16_t i = pTableMeta->tableInfo.numOfColumns;
           i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
513 514
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
      }
wmmhello's avatar
wmmhello 已提交
515

516 517
      ESchemaAction action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, &action, true);
518
      if (code != TSDB_CODE_SUCCESS) {
519
        goto end;
520
      }
521 522 523 524 525
      if (action != SCHEMA_ACTION_NULL) {
        SArray *pColumns =
            taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray *pTags =
            taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
wmmhello's avatar
wmmhello 已提交
526 527 528 529 530 531

        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
532
          if (i < pTableMeta->tableInfo.numOfColumns) {
wmmhello's avatar
wmmhello 已提交
533
            taosArrayPush(pColumns, &field);
534
          } else {
wmmhello's avatar
wmmhello 已提交
535 536 537
            taosArrayPush(pTags, &field);
          }
        }
538 539
        smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->tags, pTags,
                           pTableMeta->tableInfo.numOfColumns, true);
wmmhello's avatar
wmmhello 已提交
540 541

        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
542
        if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
543
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
544 545 546
          goto end;
        }

547
        info->cost.numOfAlterTagSTables++;
wmmhello's avatar
wmmhello 已提交
548 549 550 551 552 553 554 555 556
        taosMemoryFreeClear(pTableMeta);
        code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
        }
        code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
        }
wmmhello's avatar
wmmhello 已提交
557
      }
558 559

      taosHashClear(hashTmp);
wmmhello's avatar
wmmhello 已提交
560
      for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
561 562
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
      }
563 564
      action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &action, false);
565
      if (code != TSDB_CODE_SUCCESS) {
566
        goto end;
wmmhello's avatar
wmmhello 已提交
567
      }
568 569 570 571 572
      if (action != SCHEMA_ACTION_NULL) {
        SArray *pColumns =
            taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray *pTags =
            taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
wmmhello's avatar
wmmhello 已提交
573 574 575 576 577 578

        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
579
          if (i < pTableMeta->tableInfo.numOfColumns) {
wmmhello's avatar
wmmhello 已提交
580
            taosArrayPush(pColumns, &field);
581
          } else {
wmmhello's avatar
wmmhello 已提交
582 583 584 585
            taosArrayPush(pTags, &field);
          }
        }

586 587
        smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->cols, pColumns,
                           pTableMeta->tableInfo.numOfColumns, false);
wmmhello's avatar
wmmhello 已提交
588 589

        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
590
        if (code != TSDB_CODE_SUCCESS) {
591
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
592 593
          goto end;
        }
wmmhello's avatar
wmmhello 已提交
594

595
        info->cost.numOfAlterColSTables++;
wmmhello's avatar
wmmhello 已提交
596
        taosMemoryFreeClear(pTableMeta);
wmmhello's avatar
wmmhello 已提交
597 598 599 600
        code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
        }
601 602 603 604 605
        code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
          goto end;
        }
wmmhello's avatar
wmmhello 已提交
606
      }
wmmhello's avatar
wmmhello 已提交
607

608
      needCheckMeta = true;
wmmhello's avatar
wmmhello 已提交
609 610
      taosHashCleanup(hashTmp);
      hashTmp = NULL;
wmmhello's avatar
wmmhello 已提交
611
    } else {
X
Xiaoyu Wang 已提交
612
      uError("SML:0x%" PRIx64 " load table meta error: %s", info->id, tstrerror(code));
613
      goto end;
wmmhello's avatar
wmmhello 已提交
614
    }
615

X
Xiaoyu Wang 已提交
616 617
    if (needCheckMeta) {
      code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags,
618
                          sTableData->tags, true);
619
      if (code != TSDB_CODE_SUCCESS) {
620
        uError("SML:0x%" PRIx64 " check tag failed. super table name %s", info->id, pName.tname);
621 622
        goto end;
      }
623
      code = smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols, false);
624
      if (code != TSDB_CODE_SUCCESS) {
625
        uError("SML:0x%" PRIx64 " check cols failed. super table name %s", info->id, pName.tname);
626 627 628 629
        goto end;
      }
    }

630
    sTableData->tableMeta = pTableMeta;
wmmhello's avatar
wmmhello 已提交
631

X
Xiaoyu Wang 已提交
632
    tableMetaSml = (SSmlSTableMeta **)taosHashIterate(info->superTables, tableMetaSml);
wmmhello's avatar
wmmhello 已提交
633 634
  }
  return 0;
635

636
  end:
wmmhello's avatar
wmmhello 已提交
637 638
  taosHashCleanup(hashTmp);
  taosMemoryFreeClear(pTableMeta);
wmmhello's avatar
wmmhello 已提交
639
//  catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
640
  return code;
wmmhello's avatar
wmmhello 已提交
641 642
}

643
/******************************* parse basic type function **********************/
X
Xiaoyu Wang 已提交
644
static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
645
  const char *pVal = kvVal->value;
X
Xiaoyu Wang 已提交
646 647 648 649
  int32_t     len = kvVal->length;
  char       *endptr = NULL;
  double      result = taosStr2Double(pVal, &endptr);
  if (pVal == endptr) {
650
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
651 652 653
    return false;
  }

654
  int32_t left = len - (endptr - pVal);
X
Xiaoyu Wang 已提交
655
  if (left == 0 || (left == 3 && strncasecmp(endptr, "f64", left) == 0)) {
656 657
    kvVal->type = TSDB_DATA_TYPE_DOUBLE;
    kvVal->d = result;
X
Xiaoyu Wang 已提交
658 659
  } else if ((left == 3 && strncasecmp(endptr, "f32", left) == 0)) {
    if (!IS_VALID_FLOAT(result)) {
660 661
      smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
662
    }
663 664
    kvVal->type = TSDB_DATA_TYPE_FLOAT;
    kvVal->f = (float)result;
X
Xiaoyu Wang 已提交
665 666
  } else if ((left == 1 && *endptr == 'i') || (left == 3 && strncasecmp(endptr, "i64", left) == 0)) {
    if (smlDoubleToInt64OverFlow(result)) {
wmmhello's avatar
wmmhello 已提交
667 668
      errno = 0;
      int64_t tmp = taosStr2Int64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
669
      if (errno == ERANGE) {
wmmhello's avatar
wmmhello 已提交
670 671 672 673 674 675
        smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_BIGINT;
      kvVal->i = tmp;
      return true;
wmmhello's avatar
wmmhello 已提交
676
    }
677
    kvVal->type = TSDB_DATA_TYPE_BIGINT;
wmmhello's avatar
wmmhello 已提交
678
    kvVal->i = (int64_t)result;
wmmhello's avatar
wmmhello 已提交
679
  } else if ((left == 1 && *endptr == 'u') || (left == 3 && strncasecmp(endptr, "u64", left) == 0)) {
X
Xiaoyu Wang 已提交
680
    if (result >= (double)UINT64_MAX || result < 0) {
wmmhello's avatar
wmmhello 已提交
681 682
      errno = 0;
      uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
683
      if (errno == ERANGE || result < 0) {
wmmhello's avatar
wmmhello 已提交
684 685 686 687 688 689
        smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_UBIGINT;
      kvVal->u = tmp;
      return true;
690
    }
691
    kvVal->type = TSDB_DATA_TYPE_UBIGINT;
wmmhello's avatar
wmmhello 已提交
692
    kvVal->u = result;
X
Xiaoyu Wang 已提交
693 694
  } else if (left == 3 && strncasecmp(endptr, "i32", left) == 0) {
    if (!IS_VALID_INT(result)) {
695 696
      smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
697
    }
698 699
    kvVal->type = TSDB_DATA_TYPE_INT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
700 701
  } else if (left == 3 && strncasecmp(endptr, "u32", left) == 0) {
    if (!IS_VALID_UINT(result)) {
702 703
      smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
704
    }
705 706
    kvVal->type = TSDB_DATA_TYPE_UINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
707 708
  } else if (left == 3 && strncasecmp(endptr, "i16", left) == 0) {
    if (!IS_VALID_SMALLINT(result)) {
709 710
      smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
711
    }
712 713
    kvVal->type = TSDB_DATA_TYPE_SMALLINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
714 715
  } else if (left == 3 && strncasecmp(endptr, "u16", left) == 0) {
    if (!IS_VALID_USMALLINT(result)) {
716 717
      smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
718
    }
719 720
    kvVal->type = TSDB_DATA_TYPE_USMALLINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
721 722
  } else if (left == 2 && strncasecmp(endptr, "i8", left) == 0) {
    if (!IS_VALID_TINYINT(result)) {
723 724
      smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
725
    }
726 727
    kvVal->type = TSDB_DATA_TYPE_TINYINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
728 729
  } else if (left == 2 && strncasecmp(endptr, "u8", left) == 0) {
    if (!IS_VALID_UTINYINT(result)) {
730 731
      smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
732
    }
733 734
    kvVal->type = TSDB_DATA_TYPE_UTINYINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
735
  } else {
736
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
737 738
    return false;
  }
739
  return true;
wmmhello's avatar
wmmhello 已提交
740 741
}

wmmhello's avatar
wmmhello 已提交
742
static bool smlParseBool(SSmlKv *kvVal) {
wmmhello's avatar
wmmhello 已提交
743
  const char *pVal = kvVal->value;
X
Xiaoyu Wang 已提交
744
  int32_t     len = kvVal->length;
745
  if ((len == 1) && (pVal[0] == 't' || pVal[0] == 'T')) {
746
    kvVal->i = TSDB_TRUE;
wmmhello's avatar
wmmhello 已提交
747 748 749
    return true;
  }

750
  if ((len == 1) && (pVal[0] == 'f' || pVal[0] == 'F')) {
751
    kvVal->i = TSDB_FALSE;
wmmhello's avatar
wmmhello 已提交
752 753 754
    return true;
  }

X
Xiaoyu Wang 已提交
755
  if ((len == 4) && !strncasecmp(pVal, "true", len)) {
756
    kvVal->i = TSDB_TRUE;
wmmhello's avatar
wmmhello 已提交
757 758
    return true;
  }
X
Xiaoyu Wang 已提交
759
  if ((len == 5) && !strncasecmp(pVal, "false", len)) {
760
    kvVal->i = TSDB_FALSE;
wmmhello's avatar
wmmhello 已提交
761 762 763 764 765
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
766
static bool smlIsBinary(const char *pVal, uint16_t len) {
X
Xiaoyu Wang 已提交
767
  // binary: "abc"
wmmhello's avatar
wmmhello 已提交
768 769 770 771 772 773 774 775 776
  if (len < 2) {
    return false;
  }
  if (pVal[0] == '"' && pVal[len - 1] == '"') {
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
777
static bool smlIsNchar(const char *pVal, uint16_t len) {
X
Xiaoyu Wang 已提交
778
  // nchar: L"abc"
wmmhello's avatar
wmmhello 已提交
779 780 781
  if (len < 3) {
    return false;
  }
X
Xiaoyu Wang 已提交
782
  if ((pVal[0] == 'l' || pVal[0] == 'L') && pVal[1] == '"' && pVal[len - 1] == '"') {
wmmhello's avatar
wmmhello 已提交
783 784 785 786
    return true;
  }
  return false;
}
787
/******************************* parse basic type function end **********************/
wmmhello's avatar
wmmhello 已提交
788

789
/******************************* time function **********************/
790
static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) {
X
Xiaoyu Wang 已提交
791
  char   *endPtr = NULL;
wafwerar's avatar
wafwerar 已提交
792
  int64_t tsInt64 = taosStr2Int64(value, &endPtr, 10);
X
Xiaoyu Wang 已提交
793
  if (value + len != endPtr) {
794
    return -1;
wmmhello's avatar
wmmhello 已提交
795
  }
796
  double ts = tsInt64;
797 798
  switch (type) {
    case TSDB_TIME_PRECISION_HOURS:
wmmhello's avatar
wmmhello 已提交
799 800
      ts *= NANOSECOND_PER_HOUR;
      tsInt64 *= NANOSECOND_PER_HOUR;
801 802
      break;
    case TSDB_TIME_PRECISION_MINUTES:
wmmhello's avatar
wmmhello 已提交
803 804
      ts *= NANOSECOND_PER_MINUTE;
      tsInt64 *= NANOSECOND_PER_MINUTE;
805 806
      break;
    case TSDB_TIME_PRECISION_SECONDS:
wmmhello's avatar
wmmhello 已提交
807 808
      ts *= NANOSECOND_PER_SEC;
      tsInt64 *= NANOSECOND_PER_SEC;
809 810
      break;
    case TSDB_TIME_PRECISION_MILLI:
wmmhello's avatar
wmmhello 已提交
811 812
      ts *= NANOSECOND_PER_MSEC;
      tsInt64 *= NANOSECOND_PER_MSEC;
813 814
      break;
    case TSDB_TIME_PRECISION_MICRO:
wmmhello's avatar
wmmhello 已提交
815 816
      ts *= NANOSECOND_PER_USEC;
      tsInt64 *= NANOSECOND_PER_USEC;
817 818 819 820 821
      break;
    case TSDB_TIME_PRECISION_NANO:
      break;
    default:
      ASSERT(0);
wmmhello's avatar
wmmhello 已提交
822
  }
X
Xiaoyu Wang 已提交
823
  if (ts >= (double)INT64_MAX || ts < 0) {
824
    return -1;
wmmhello's avatar
wmmhello 已提交
825 826
  }

827
  return tsInt64;
828 829 830 831 832 833
}

static int8_t smlGetTsTypeByLen(int32_t len) {
  if (len == TSDB_TIME_PRECISION_SEC_DIGITS) {
    return TSDB_TIME_PRECISION_SECONDS;
  } else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) {
834
    return TSDB_TIME_PRECISION_MILLI;
835 836
  } else {
    return -1;
wmmhello's avatar
wmmhello 已提交
837
  }
838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856
}

static int8_t smlGetTsTypeByPrecision(int8_t precision) {
  switch (precision) {
    case TSDB_SML_TIMESTAMP_HOURS:
      return TSDB_TIME_PRECISION_HOURS;
    case TSDB_SML_TIMESTAMP_MILLI_SECONDS:
      return TSDB_TIME_PRECISION_MILLI;
    case TSDB_SML_TIMESTAMP_NANO_SECONDS:
    case TSDB_SML_TIMESTAMP_NOT_CONFIGURED:
      return TSDB_TIME_PRECISION_NANO;
    case TSDB_SML_TIMESTAMP_MICRO_SECONDS:
      return TSDB_TIME_PRECISION_MICRO;
    case TSDB_SML_TIMESTAMP_SECONDS:
      return TSDB_TIME_PRECISION_SECONDS;
    case TSDB_SML_TIMESTAMP_MINUTES:
      return TSDB_TIME_PRECISION_MINUTES;
    default:
      return -1;
wmmhello's avatar
wmmhello 已提交
857
  }
858 859
}

X
Xiaoyu Wang 已提交
860 861
static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t len) {
  if (len == 0 || (len == 1 && data[0] == '0')) {
862
    return taosGetTimestampNs();
wmmhello's avatar
wmmhello 已提交
863 864
  }

865 866 867 868
  int8_t tsType = smlGetTsTypeByPrecision(info->precision);
  if (tsType == -1) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp precision", NULL);
    return -1;
wmmhello's avatar
wmmhello 已提交
869
  }
870 871

  int64_t ts = smlGetTimeValue(data, len, tsType);
X
Xiaoyu Wang 已提交
872
  if (ts == -1) {
873 874
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
    return -1;
wmmhello's avatar
wmmhello 已提交
875
  }
876 877 878
  return ts;
}

X
Xiaoyu Wang 已提交
879 880
static int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t len) {
  if (!data) {
881 882
    smlBuildInvalidDataMsg(&info->msgBuf, "timestamp can not be null", NULL);
    return -1;
wmmhello's avatar
wmmhello 已提交
883
  }
X
Xiaoyu Wang 已提交
884
  if (len == 1 && data[0] == '0') {
885 886
    return taosGetTimestampNs();
  }
887 888
  int8_t tsType = smlGetTsTypeByLen(len);
  if (tsType == -1) {
X
Xiaoyu Wang 已提交
889 890
    smlBuildInvalidDataMsg(&info->msgBuf,
                           "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", data);
891 892 893
    return -1;
  }
  int64_t ts = smlGetTimeValue(data, len, tsType);
X
Xiaoyu Wang 已提交
894
  if (ts == -1) {
895 896 897 898 899 900
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
    return -1;
  }
  return ts;
}

X
Xiaoyu Wang 已提交
901
static int32_t smlParseTS(SSmlHandle *info, const char *data, int32_t len, SArray *cols) {
902
  int64_t ts = 0;
X
Xiaoyu Wang 已提交
903
  if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
904
    //    uError("SML:data:%s,len:%d", data, len);
905
    ts = smlParseInfluxTime(info, data, len);
X
Xiaoyu Wang 已提交
906
  } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
907
    ts = smlParseOpenTsdbTime(info, data, len);
X
Xiaoyu Wang 已提交
908
  } else {
909
    ASSERT(0);
910
  }
wmmhello's avatar
wmmhello 已提交
911
  uDebug("SML:0x%" PRIx64 " smlParseTS:%" PRId64, info->id, ts);
912

913 914 915 916
  if (ts <= 0) {
    uError("SML:0x%" PRIx64 " smlParseTS error:%" PRId64, info->id, ts);
    return TSDB_CODE_INVALID_TIMESTAMP;
  }
917 918

  // add ts to
919 920 921 922 923 924
  SSmlKv kv = { .key = TS, .keyLen = TS_LEN, .i = ts, .type = TSDB_DATA_TYPE_TIMESTAMP, .length = (int16_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
  if(info->dataFormat){
    kv.i = convertTimePrecision(kv.i, TSDB_TIME_PRECISION_NANO, info->currSTableMeta->tableInfo.precision);
    smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0);
  }else{
    taosArraySet(cols, 0, &kv);
925 926
  }

927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998
  return TSDB_CODE_SUCCESS;
}
/******************************* time function end **********************/

/******************************* Sml struct related function **********************/
static SSmlTableInfo *smlBuildTableInfo(int numRows, const char* measure, int32_t measureLen) {
  SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
  if (!tag) {
    return NULL;
  }

  tag->sTableName = measure;
  tag->sTableNameLen = measureLen;

  tag->cols = taosArrayInit(numRows, POINTER_BYTES);
  if (tag->cols == NULL) {
    uError("SML:smlBuildTableInfo failed to allocate memory");
    goto cleanup;
  }

  tag->tags = taosArrayInit(16, sizeof(SSmlKv));
  if (tag->tags == NULL) {
    uError("SML:smlBuildTableInfo failed to allocate memory");
    goto cleanup;
  }
  return tag;

  cleanup:
  taosMemoryFree(tag);
  return NULL;
}

static int32_t smlCheckDupUnit(SHashObj *dumplicateKey, SArray *tags, SSmlMsgBuf *msg){
  for(int i = 0; i < taosArrayGetSize(tags); i++) {
    SSmlKv *tag = taosArrayGet(tags, i);
    if (smlCheckDuplicateKey(tag->key, tag->keyLen, dumplicateKey)) {
      smlBuildInvalidDataMsg(msg, "dumplicate key", tag->key);
      return TSDB_CODE_TSC_DUP_NAMES;
    }
  }
  return TSDB_CODE_SUCCESS;
}

static int32_t smlJudgeDupColName(SArray *cols, SArray *tags, SSmlMsgBuf *msg) {
  SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  int ret = smlCheckDupUnit(dumplicateKey, cols, msg);
  if(ret != TSDB_CODE_SUCCESS){
    goto end;
  }
  ret = smlCheckDupUnit(dumplicateKey, tags, msg);
  if(ret != TSDB_CODE_SUCCESS){
    goto end;
  }

  end:
  taosHashCleanup(dumplicateKey);
  return ret;
}

static int32_t smlParseTableName(SArray *tags, char *childTableName) {
  size_t      childTableNameLen = strlen(tsSmlChildTableName);
  if (childTableNameLen <= 0) return TSDB_CODE_SUCCESS;

  for(int i = 0; i < taosArrayGetSize(tags); i++){
    SSmlKv *tag = taosArrayGet(tags, i);
    // handle child table name
    if (childTableNameLen == tag->keyLen && strncmp(tag->key, tsSmlChildTableName, tag->keyLen) == 0) {
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, tag->value, (tag->length < TSDB_TABLE_NAME_LEN ? tag->length : TSDB_TABLE_NAME_LEN));
      break;
    }
  }
wmmhello's avatar
wmmhello 已提交
999

1000 1001 1002
  return TSDB_CODE_SUCCESS;
}

1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058
static int32_t smlSetCTableName(SSmlTableInfo *oneTable){
  smlParseTableName(oneTable->tags, oneTable->childTableName);

  if (strlen(oneTable->childTableName) == 0) {
    RandTableName rName = {oneTable->tags, oneTable->sTableName, (uint8_t)oneTable->sTableNameLen,
                           oneTable->childTableName, 0};

    buildChildTableName(&rName);
    oneTable->uid = rName.uid;
  } else {
    oneTable->uid = *(uint64_t *)(oneTable->childTableName);
  }
  return TSDB_CODE_SUCCESS;
}

static SSmlSTableMeta *smlBuildSTableMeta(bool isDataFormat) {
  SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
  if (!meta) {
    return NULL;
  }

  if(unlikely(!isDataFormat)){
    meta->tagHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
    if (meta->tagHash == NULL) {
      uError("SML:smlBuildSTableMeta failed to allocate memory");
      goto cleanup;
    }

    meta->colHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
    if (meta->colHash == NULL) {
      uError("SML:smlBuildSTableMeta failed to allocate memory");
      goto cleanup;
    }
  }

  meta->tags = taosArrayInit(32, sizeof(SSmlKv));
  if (meta->tags == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }

  meta->cols = taosArrayInit(32, sizeof(SSmlKv));
  if (meta->cols == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }
  return meta;

  cleanup:
  taosMemoryFree(meta);
  return NULL;
}

static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols){
  for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
1059
    taosArrayPush(metaArray, kv);
1060 1061 1062 1063 1064 1065
    if(unlikely(metaHash != NULL)) {
      taosHashPut(metaHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
    }
  }
}

1066
bool smlFormatJudge(SHashObj* superTableKeyStr, void* preLineKeys, void* currentLineKeys,
1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087
                    SSmlLineInfo *currElements, SSmlLineInfo *preElements, int32_t len){
  // same measure
  if(preElements->measureLen == currElements->measureLen
     && memcmp(preElements->measure, currElements->measure, currElements->measureLen) == 0){
    if(varDataTLen(preLineKeys) != varDataTLen(currentLineKeys)
       || memcmp(preLineKeys, currentLineKeys, varDataTLen(preLineKeys)) != 0){
      return false;
    }
  }else{  // diff measure
    void *keyStr = taosHashGet(superTableKeyStr, currElements->measure, currElements->measureLen);
    if(unlikely(keyStr == NULL)){
      keyStr = taosMemoryMalloc(len);
      varDataCopy(keyStr, currentLineKeys);
      taosHashPut(superTableKeyStr, currElements->measure, currElements->measureLen, &keyStr, POINTER_BYTES);
    }else{
      if(varDataTLen(keyStr) != varDataTLen(currentLineKeys)
         && memcmp(keyStr, currentLineKeys, varDataTLen(currentLineKeys)) != 0){
        return false;
      }
    }
  }
1088
  varDataCopy(preLineKeys, currentLineKeys);
1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120

  return true;
}

static STableMeta* smlGetMeta(SSmlHandle *info, const void* measure, int32_t measureLen){
  STableMeta *pTableMeta = NULL;

  SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
  tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));

  SRequestConnInfo conn = {0};
  conn.pTrans = info->taos->pAppInfo->pTransporter;
  conn.requestId = info->pRequest->requestId;
  conn.requestObjRefId = info->pRequest->self;
  conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
  memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
  memcpy(pName.tname, measure, measureLen);

  catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
  return pTableMeta;
}

static void smlDestroySTableMeta(SSmlSTableMeta *meta) {
  taosHashCleanup(meta->tagHash);
  taosHashCleanup(meta->colHash);
  taosArrayDestroy(meta->tags);
  taosArrayDestroy(meta->cols);
  taosMemoryFree(meta->tableMeta);
  taosMemoryFree(meta);
}
/******************************* Sml struct related function end **********************/

wmmhello's avatar
wmmhello 已提交
1121
static int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
X
Xiaoyu Wang 已提交
1122
  // binary
wmmhello's avatar
wmmhello 已提交
1123
  if (smlIsBinary(pVal->value, pVal->length)) {
1124
    pVal->type = TSDB_DATA_TYPE_BINARY;
wmmhello's avatar
wmmhello 已提交
1125
    pVal->length -= BINARY_ADD_LEN;
1126
    if (pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
wmmhello's avatar
wmmhello 已提交
1127 1128
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }
1129
    pVal->value += (BINARY_ADD_LEN - 1);
wmmhello's avatar
wmmhello 已提交
1130
    return TSDB_CODE_SUCCESS;
1131
  }
X
Xiaoyu Wang 已提交
1132
  // nchar
wmmhello's avatar
wmmhello 已提交
1133
  if (smlIsNchar(pVal->value, pVal->length)) {
1134
    pVal->type = TSDB_DATA_TYPE_NCHAR;
wmmhello's avatar
wmmhello 已提交
1135
    pVal->length -= NCHAR_ADD_LEN;
1136
    if (pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
wmmhello's avatar
wmmhello 已提交
1137 1138
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }
1139
    pVal->value += (NCHAR_ADD_LEN - 1);
wmmhello's avatar
wmmhello 已提交
1140
    return TSDB_CODE_SUCCESS;
1141 1142
  }

X
Xiaoyu Wang 已提交
1143
  // bool
1144 1145 1146
  if (smlParseBool(pVal)) {
    pVal->type = TSDB_DATA_TYPE_BOOL;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
wmmhello's avatar
wmmhello 已提交
1147
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1148
  }
X
Xiaoyu Wang 已提交
1149
  // number
1150
  if (smlParseNumber(pVal, msg)) {
wmmhello's avatar
wmmhello 已提交
1151
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
wmmhello's avatar
wmmhello 已提交
1152
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1153 1154
  }

wmmhello's avatar
wmmhello 已提交
1155
  return TSDB_CODE_TSC_INVALID_VALUE;
wmmhello's avatar
wmmhello 已提交
1156 1157
}

1158 1159 1160 1161 1162 1163
static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd,
                          SSmlLineInfo* currElement, bool isTag){
  bool isSameMeasure = false;
  bool isSameCTable = false;
  int   cnt = 0;
  void *keyStr = NULL;
1164
//  bool isPreLineKVNULL = false;
1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220
  SArray *preLineKV = NULL;
  bool    isSuperKVInit = false;
  SArray *superKV = NULL;
  if(info->dataFormat){
    if(currElement->measureLen == info->preLine.measureLen
       && memcmp(currElement->measure, info->preLine.measure, currElement->measureLen) == 0){
      isSameMeasure = true;
    }

    if(!isSameMeasure){
      SSmlSTableMeta *sMeta = NULL;
      SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen);
      if(tableMeta == NULL){
        SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat);
        STableMeta * pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen);
        meta->tableMeta = pTableMeta;
        if(pTableMeta == NULL){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
        taosHashPut(info->superTables, currElement->measure, currElement->measureLen, &meta, POINTER_BYTES);
        sMeta = meta;
      }else{
        sMeta = *tableMeta;
      }
      info->currSTableMeta = sMeta->tableMeta;

      if(isTag){
        superKV = sMeta->tags;
      }else{
        superKV = sMeta->cols;
      }
      if(unlikely(taosArrayGetSize(superKV) == 0)){
        isSuperKVInit = true;
      }
    }

    if(currElement->measureTagsLen == info->preLine.measureTagsLen
       && memcmp(currElement->measure, info->preLine.measure, currElement->measureTagsLen) == 0){
      isSameCTable = true;
      if(isTag) return TSDB_CODE_SUCCESS;
    }else if(!isTag){
      SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashGet(info->childTables, currElement->measure, currElement->measureTagsLen);
      if (unlikely(oneTable == NULL)) {
        smlBuildInvalidDataMsg(&info->msgBuf, "child table should inside", currElement->measure);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      info->currTableDataCtx = (*oneTable)->tableDataCtx;
    }

    if(isTag){
      // prepare for judging if tag or col is the same for each line
      if(unlikely(info->currentLineTagKeys == NULL)){   // sml todo size need remalloc
        info->currentLineTagKeys = taosMemoryMalloc(sqlEnd - *sql);
      }
1221 1222 1223 1224
      if(info->preLineTagKeys == NULL){
        info->preLineTagKeys = taosMemoryMalloc(sqlEnd - *sql);
      }
      keyStr = info->currentLineTagKeys;
1225 1226 1227

      if(info->preLineTagKV == NULL){
        info->preLineTagKV = taosArrayInit(8, sizeof(SSmlKv));
1228
//        isPreLineKVNULL = true;
1229 1230 1231 1232 1233 1234
      }
      preLineKV = info->preLineTagKV;
    }else{
      if(unlikely(info->currentLineColKeys == NULL)){   // sml todo size need remalloc
        info->currentLineColKeys = taosMemoryMalloc(sqlEnd - *sql);
      }
1235 1236 1237 1238 1239

      if(info->preLineColKeys == NULL){
        info->preLineColKeys = taosMemoryMalloc(sqlEnd - *sql);
      }
      keyStr = info->currentLineColKeys;
1240 1241 1242

      if(info->preLineColKV == NULL){
        info->preLineColKV = taosArrayInit(8, sizeof(SSmlKv));
1243
//        isPreLineKVNULL = true;
1244 1245 1246 1247 1248 1249 1250 1251
      }
      preLineKV = info->preLineColKV;
    }

    if(!isSameMeasure){
      taosArraySetSize(preLineKV, 0);
    }
    varDataLen(keyStr) = 0; // clear keys
1252 1253
  }else{
    preLineKV = taosArrayInit(8, sizeof(SSmlKv));
1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282
  }

  while (*sql < sqlEnd) {
    if (IS_SPACE(*sql)) {
      break;
    }

    // parse key
    const char *key = *sql;
    int32_t     keyLen = 0;
    while (*sql < sqlEnd) {

      if (IS_COMMA(*sql)) {
        smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      if (IS_EQUAL(*sql)) {
        keyLen = *sql - key;
        (*sql)++;
        break;
      }
      (*sql)++;
    }

    if (IS_INVALID_COL_LEN(keyLen)) {
      smlBuildInvalidDataMsg(&info->msgBuf, "invalid key or key is too long than 64", key);
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
    }

1283 1284 1285 1286
    if(info->dataFormat){
      memcpy(keyStr + varDataTLen(keyStr), key, keyLen + 1); // use = symbol
      varDataLen(keyStr) += keyLen + 1;
    }
1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298

    // parse value
    const char *value = *sql;
    int32_t     valueLen = 0;
    bool        isInQuote = false;
    while (*sql < sqlEnd) {
      // parse value
      if (!isTag && IS_QUOTE(*sql)) {
        isInQuote = !isInQuote;
        (*sql)++;
        continue;
      }
1299 1300 1301
      if (!isInQuote && IS_SPACE(*sql)) {
        break;
      }
1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346
      if (!isInQuote && IS_COMMA(*sql)) {
        break;
      }
      if (!isInQuote && IS_EQUAL(*sql)) {
        smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      (*sql)++;
    }
    valueLen = *sql - value;

    if (isInQuote) {
      smlBuildInvalidDataMsg(&info->msgBuf, "only one quote", value);
      return TSDB_CODE_SML_INVALID_DATA;
    }
    if (valueLen == 0) {
      smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", value);
      return TSDB_CODE_SML_INVALID_DATA;
    }
    PROCESS_SLASH(key, keyLen)
    PROCESS_SLASH(value, valueLen)

    SSmlKv kv = {.key = key, .keyLen = keyLen, .value = value, .length = valueLen};
    if (!isTag) {
      int32_t ret = smlParseValue(&kv, &info->msgBuf);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
    } else {
      if (valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
        return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
      }
      kv.type = TSDB_DATA_TYPE_NCHAR;
    }

    if(info->dataFormat){
      if(!isTag && cnt + 1 > info->currSTableMeta->tableInfo.numOfColumns){
        smlBuildInvalidDataMsg(&info->msgBuf, "col more than meta", NULL);
        return TSDB_CODE_PAR_TOO_MANY_COLUMNS;
      }
      if(isTag && cnt + 1 > info->currSTableMeta->tableInfo.numOfTags){
        smlBuildInvalidDataMsg(&info->msgBuf, "tag more than meta", NULL);
        return TSDB_CODE_PAR_TOO_MANY_COLUMNS;
      }
      // bind data
1347 1348 1349 1350 1351 1352
      if(!isTag){
        int ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, cnt + 1);
        if (ret != TSDB_CODE_SUCCESS) {
          smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
          return ret;
        }
1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412
      }

      do {
        if(isSameMeasure){
          if(cnt >= taosArrayGetSize(preLineKV)) {
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }
          SSmlKv *preKV = taosArrayGet(preLineKV, cnt);
          if(!isTag && kv.type != preKV->type){
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }

          if(IS_VAR_DATA_TYPE(kv.type) && kv.length > preKV->length){
            preKV->length = kv.length;
            SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen);
            if(tableMeta == NULL){
              smlBuildInvalidDataMsg(&info->msgBuf, "measure should has inside", value);
              return TSDB_CODE_SML_INVALID_DATA;
            }

            if(isTag){
              superKV = (*tableMeta)->tags;
            }else{
              superKV = (*tableMeta)->cols;
            }
            SSmlKv *oldKV = taosArrayGet(superKV, cnt);
            oldKV->length = kv.length;
          }
        }else{
          if(isSuperKVInit){
            taosArrayPush(superKV, &kv);
          }else{
            if(cnt >= taosArrayGetSize(superKV)) {
              info->dataFormat = false;
              info->reRun      = true;
              return TSDB_CODE_SUCCESS;
            }
            SSmlKv *preKV = taosArrayGet(superKV, cnt);
            if(!isTag && kv.type != preKV->type){
              info->dataFormat = false;
              info->reRun      = true;
              return TSDB_CODE_SUCCESS;
            }

            if(IS_VAR_DATA_TYPE(kv.type)){
              if(kv.length > preKV->length) {
                preKV->length = kv.length;
              }else{
                kv.length = preKV->length;
              }
            }
          }
          taosArrayPush(preLineKV, &kv);
        }
        break;
      }while(0);
1413 1414
    }else{
      taosArrayPush(preLineKV, &kv);
1415 1416 1417 1418 1419 1420 1421 1422 1423
    }

    if(!info->dataFormat && !isTag){
      if(currElement->colArray == NULL){
        currElement->colArray = taosArrayInit(16, sizeof(SSmlKv));
        taosArraySetSize(currElement->colArray, 1);
      }
      taosArrayPush(currElement->colArray, &kv);   //reserve for timestamp
    }
1424 1425 1426 1427 1428
    cnt++;
    if(IS_SPACE(*sql)){
      break;
    }
    (*sql)++;
1429 1430
  }

1431
  if(isTag && cnt > TSDB_MAX_TAGS){
1432 1433 1434 1435 1436 1437
    smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
    return TSDB_CODE_PAR_INVALID_TAGS_NUM;
  }

  if(info->dataFormat){
    if(isTag){
1438
      info->dataFormat = smlFormatJudge(info->superTableTagKeyStr, info->preLineTagKeys,
1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461
                                        info->currentLineTagKeys, currElement, &info->preLine, sqlEnd - currElement->tags);
      if(!info->dataFormat) {
        info->reRun = true;
        return TSDB_CODE_SUCCESS;
      }
      if(!isSameCTable){
        if(taosArrayGetSize(preLineKV) > TSDB_MAX_TAGS){
          smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
          return TSDB_CODE_PAR_INVALID_TAGS_NUM;
        }

        void* oneTable = taosHashGet(info->childTables, currElement->measure, currElement->measureTagsLen);
        if (unlikely(oneTable == NULL)) {
          SSmlTableInfo *tinfo = smlBuildTableInfo(1, currElement->measure, currElement->measureLen);
          if (!tinfo) {
            return TSDB_CODE_OUT_OF_MEMORY;
          }
          for(int i = 0; i < taosArrayGetSize(preLineKV); i++){
            taosArrayPush(tinfo->tags, taosArrayGet(preLineKV, i));
          }
          smlSetCTableName(tinfo);
          info->currSTableMeta->uid = tinfo->uid;
          tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
1462 1463 1464 1465
          if(tinfo->tableDataCtx == NULL){
            smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
            return TSDB_CODE_SML_INVALID_DATA;
          }
1466 1467 1468 1469
          taosHashPut(info->childTables, currElement->measure, currElement->measureTagsLen, &tinfo, POINTER_BYTES);
        }
      }
    }else{
1470
      info->dataFormat = smlFormatJudge(info->superTableColKeyStr, info->preLineColKeys,
1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489
                                        info->currentLineColKeys, currElement, &info->preLine, sqlEnd - currElement->cols);
      if(!info->dataFormat) {
        info->reRun = true;
        return TSDB_CODE_SUCCESS;
      }
    }
  }else{
    void* oneTable = taosHashGet(info->childTables, currElement->measure, currElement->measureTagsLen);
    if (unlikely(oneTable == NULL)) {
      SSmlTableInfo *tinfo = smlBuildTableInfo(info->affectedRows / 2, currElement->measure, currElement->measureLen);
      if (!tinfo) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      for(int i = 0; i < taosArrayGetSize(preLineKV); i++){
        taosArrayPush(tinfo->tags, taosArrayGet(preLineKV, i));
      }
      smlSetCTableName(tinfo);
      taosHashPut(info->childTables, currElement->measure, currElement->measureTagsLen, &tinfo, POINTER_BYTES);
    }
1490
    taosArrayDestroy(preLineKV);      // smltodo
1491 1492 1493 1494 1495 1496
  }

  return TSDB_CODE_SUCCESS;
}

static int32_t smlParseInfluxString(SSmlHandle *info, const char *sql, const char *sqlEnd, SSmlLineInfo *elements) {
X
Xiaoyu Wang 已提交
1497
  if (!sql) return TSDB_CODE_SML_INVALID_DATA;
wmmhello's avatar
wmmhello 已提交
1498
  JUMP_SPACE(sql, sqlEnd)
X
Xiaoyu Wang 已提交
1499
  if (*sql == COMMA) return TSDB_CODE_SML_INVALID_DATA;
wmmhello's avatar
wmmhello 已提交
1500
  elements->measure = sql;
wmmhello's avatar
wmmhello 已提交
1501

wmmhello's avatar
wmmhello 已提交
1502
  // parse measure
wmmhello's avatar
wmmhello 已提交
1503
  while (sql < sqlEnd) {
X
Xiaoyu Wang 已提交
1504
    if ((sql != elements->measure) && IS_SLASH_LETTER(sql)) {
wmmhello's avatar
wmmhello 已提交
1505 1506
      MOVE_FORWARD_ONE(sql, sqlEnd - sql);
      sqlEnd--;
wmmhello's avatar
wmmhello 已提交
1507 1508
      continue;
    }
X
Xiaoyu Wang 已提交
1509
    if (IS_COMMA(sql)) {
wmmhello's avatar
wmmhello 已提交
1510 1511 1512
      break;
    }

X
Xiaoyu Wang 已提交
1513
    if (IS_SPACE(sql)) {
wmmhello's avatar
wmmhello 已提交
1514 1515
      break;
    }
wmmhello's avatar
wmmhello 已提交
1516 1517
    sql++;
  }
wmmhello's avatar
wmmhello 已提交
1518
  elements->measureLen = sql - elements->measure;
X
Xiaoyu Wang 已提交
1519
  if (IS_INVALID_TABLE_LEN(elements->measureLen)) {
1520
    smlBuildInvalidDataMsg(&info->msgBuf, "measure is empty or too large than 192", NULL);
wmmhello's avatar
wmmhello 已提交
1521
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
wmmhello's avatar
wmmhello 已提交
1522
  }
wmmhello's avatar
wmmhello 已提交
1523

1524 1525 1526 1527 1528 1529 1530 1531 1532 1533
  // to get measureTagsLen before
  const char* tmp = sql;
  while (tmp < sqlEnd){
    if (IS_SPACE(tmp)) {
      break;
    }
    tmp++;
  }
  elements->measureTagsLen = tmp - elements->measure;

wmmhello's avatar
wmmhello 已提交
1534
  // parse tag
X
Xiaoyu Wang 已提交
1535
  if (*sql == SPACE) {
wmmhello's avatar
wmmhello 已提交
1536
    elements->tagsLen = 0;
X
Xiaoyu Wang 已提交
1537 1538
  } else {
    if (*sql == COMMA) sql++;
wmmhello's avatar
wmmhello 已提交
1539
    elements->tags = sql;
1540 1541 1542 1543 1544

    // tinfo != NULL means child table has never occur before
    int ret = smlParseKv(info, &sql, sqlEnd, elements, true);
    if(ret != TSDB_CODE_SUCCESS){
      return ret;
wmmhello's avatar
wmmhello 已提交
1545
    }
1546 1547 1548 1549 1550 1551
    sql = elements->measure + elements->measureTagsLen;

    if(info->reRun){
      return TSDB_CODE_SUCCESS;
    }

wmmhello's avatar
wmmhello 已提交
1552
    elements->tagsLen = sql - elements->tags;
1553
  }
wmmhello's avatar
wmmhello 已提交
1554

wmmhello's avatar
wmmhello 已提交
1555
  // parse cols
wmmhello's avatar
wmmhello 已提交
1556
  JUMP_SPACE(sql, sqlEnd)
wmmhello's avatar
wmmhello 已提交
1557
  elements->cols = sql;
1558 1559 1560 1561

  int ret = smlParseKv(info, &sql, sqlEnd, elements, false);
  if(ret != TSDB_CODE_SUCCESS){
    return ret;
wmmhello's avatar
wmmhello 已提交
1562
  }
1563 1564 1565

  if(info->reRun){
    return TSDB_CODE_SUCCESS;
1566
  }
1567

wmmhello's avatar
wmmhello 已提交
1568
  elements->colsLen = sql - elements->cols;
X
Xiaoyu Wang 已提交
1569
  if (elements->colsLen == 0) {
1570
    smlBuildInvalidDataMsg(&info->msgBuf, "cols is empty", NULL);
wmmhello's avatar
wmmhello 已提交
1571 1572
    return TSDB_CODE_SML_INVALID_DATA;
  }
wmmhello's avatar
wmmhello 已提交
1573

wmmhello's avatar
wmmhello 已提交
1574
  // parse timestamp
wmmhello's avatar
wmmhello 已提交
1575
  JUMP_SPACE(sql, sqlEnd)
wmmhello's avatar
wmmhello 已提交
1576
  elements->timestamp = sql;
wmmhello's avatar
wmmhello 已提交
1577 1578
  while (sql < sqlEnd) {
    if (isspace(*sql)) {
wmmhello's avatar
wmmhello 已提交
1579 1580 1581 1582
      break;
    }
    sql++;
  }
wmmhello's avatar
wmmhello 已提交
1583
  elements->timestampLen = sql - elements->timestamp;
wmmhello's avatar
wmmhello 已提交
1584

1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595
  ret = smlParseTS(info, elements->timestamp, elements->timestampLen, elements->colArray);
  if (ret != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " smlParseTS failed", info->id);
    return ret;
  }

  if(info->dataFormat){
    smlBuildRow(info->currTableDataCtx);
    info->preLine = *elements;
  }

wmmhello's avatar
wmmhello 已提交
1596 1597 1598
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
1599 1600
static void smlParseTelnetElement(const char **sql, const char *sqlEnd, const char **data, int32_t *len) {
  while (*sql < sqlEnd) {
X
Xiaoyu Wang 已提交
1601
    if (**sql != SPACE && !(*data)) {
1602
      *data = *sql;
X
Xiaoyu Wang 已提交
1603
    } else if (**sql == SPACE && *data) {
1604 1605 1606 1607 1608 1609 1610
      *len = *sql - *data;
      break;
    }
    (*sql)++;
  }
}

dengyihao's avatar
dengyihao 已提交
1611 1612 1613
static int32_t smlParseTelnetTags(const char *data, const char *sqlEnd, SArray *cols, char *childTableName,
                                  SHashObj *dumplicateKey, SSmlMsgBuf *msg) {
  if (!cols) return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
1614
  const char *sql = data;
X
Xiaoyu Wang 已提交
1615
  size_t      childTableNameLen = strlen(tsSmlChildTableName);
wmmhello's avatar
wmmhello 已提交
1616 1617
  while (sql < sqlEnd) {
    JUMP_SPACE(sql, sqlEnd)
X
Xiaoyu Wang 已提交
1618
    if (*sql == '\0') break;
wmmhello's avatar
wmmhello 已提交
1619

wmmhello's avatar
wmmhello 已提交
1620
    const char *key = sql;
X
Xiaoyu Wang 已提交
1621
    int32_t     keyLen = 0;
wmmhello's avatar
wmmhello 已提交
1622 1623

    // parse key
wmmhello's avatar
wmmhello 已提交
1624
    while (sql < sqlEnd) {
X
Xiaoyu Wang 已提交
1625
      if (*sql == SPACE) {
wmmhello's avatar
wmmhello 已提交
1626 1627 1628
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
X
Xiaoyu Wang 已提交
1629
      if (*sql == EQUAL) {
wmmhello's avatar
wmmhello 已提交
1630 1631
        keyLen = sql - key;
        sql++;
1632 1633
        break;
      }
wmmhello's avatar
wmmhello 已提交
1634
      sql++;
1635
    }
wmmhello's avatar
wmmhello 已提交
1636

X
Xiaoyu Wang 已提交
1637
    if (IS_INVALID_COL_LEN(keyLen)) {
1638
      smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
wmmhello's avatar
wmmhello 已提交
1639
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
1640
    }
X
Xiaoyu Wang 已提交
1641
    if (smlCheckDuplicateKey(key, keyLen, dumplicateKey)) {
1642
      smlBuildInvalidDataMsg(msg, "dumplicate key", key);
wmmhello's avatar
wmmhello 已提交
1643
      return TSDB_CODE_TSC_DUP_NAMES;
1644 1645 1646
    }

    // parse value
wmmhello's avatar
wmmhello 已提交
1647
    const char *value = sql;
X
Xiaoyu Wang 已提交
1648
    int32_t     valueLen = 0;
wmmhello's avatar
wmmhello 已提交
1649
    while (sql < sqlEnd) {
wmmhello's avatar
wmmhello 已提交
1650 1651
      // parse value
      if (*sql == SPACE) {
1652 1653
        break;
      }
wmmhello's avatar
wmmhello 已提交
1654 1655 1656 1657 1658
      if (*sql == EQUAL) {
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      sql++;
1659
    }
wmmhello's avatar
wmmhello 已提交
1660
    valueLen = sql - value;
wmmhello's avatar
wmmhello 已提交
1661

X
Xiaoyu Wang 已提交
1662
    if (valueLen == 0) {
1663
      smlBuildInvalidDataMsg(msg, "invalid value", value);
wmmhello's avatar
wmmhello 已提交
1664
      return TSDB_CODE_TSC_INVALID_VALUE;
1665
    }
wmmhello's avatar
wmmhello 已提交
1666

X
Xiaoyu Wang 已提交
1667 1668
    // handle child table name
    if (childTableNameLen != 0 && strncmp(key, tsSmlChildTableName, keyLen) == 0) {
1669 1670 1671 1672 1673
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, value, (valueLen < TSDB_TABLE_NAME_LEN ? valueLen : TSDB_TABLE_NAME_LEN));
      continue;
    }

1674
    if (valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
wmmhello's avatar
wmmhello 已提交
1675 1676 1677
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }

1678 1679
    // add kv to SSmlKv
    SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1680
    if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
1681 1682 1683
    kv->key = key;
    kv->keyLen = keyLen;
    kv->value = value;
wmmhello's avatar
wmmhello 已提交
1684
    kv->length = valueLen;
wmmhello's avatar
wmmhello 已提交
1685
    kv->type = TSDB_DATA_TYPE_NCHAR;
1686

wmmhello's avatar
wmmhello 已提交
1687
    taosArrayPush(cols, &kv);
1688 1689 1690 1691
  }

  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
1692

1693
// format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
dengyihao's avatar
dengyihao 已提交
1694 1695
static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, const char *sqlEnd, SSmlTableInfo *tinfo,
                                    SArray *cols) {
X
Xiaoyu Wang 已提交
1696
  if (!sql) return TSDB_CODE_SML_INVALID_DATA;
1697 1698

  // parse metric
wmmhello's avatar
wmmhello 已提交
1699
  smlParseTelnetElement(&sql, sqlEnd, &tinfo->sTableName, &tinfo->sTableNameLen);
1700
  if (!(tinfo->sTableName) || IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
1701
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
wmmhello's avatar
wmmhello 已提交
1702
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
1703 1704 1705 1706
  }

  // parse timestamp
  const char *timestamp = NULL;
X
Xiaoyu Wang 已提交
1707
  int32_t     tLen = 0;
wmmhello's avatar
wmmhello 已提交
1708
  smlParseTelnetElement(&sql, sqlEnd, &timestamp, &tLen);
1709 1710 1711 1712 1713 1714 1715 1716
  if (!timestamp || tLen == 0) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
    return TSDB_CODE_SML_INVALID_DATA;
  }

  int32_t ret = smlParseTS(info, timestamp, tLen, cols);
  if (ret != TSDB_CODE_SUCCESS) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
wmmhello's avatar
wmmhello 已提交
1717
    return ret;
1718 1719 1720 1721
  }

  // parse value
  const char *value = NULL;
1722 1723 1724 1725 1726 1727
  int32_t     valueLen = 0;
  smlParseTelnetElement(&sql, sqlEnd, &value, &valueLen);
  if (!value || valueLen == 0) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql);
    return TSDB_CODE_TSC_INVALID_VALUE;
  }
1728

1729 1730 1731 1732 1733 1734 1735 1736 1737 1738
  SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
  if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
  taosArrayPush(cols, &kv);
  kv->key = VALUE;
  kv->keyLen = VALUE_LEN;
  kv->value = value;
  kv->length = valueLen;
  if ((ret = smlParseValue(kv, &info->msgBuf)) != TSDB_CODE_SUCCESS) {
    return ret;
  }
1739

1740 1741 1742 1743 1744
  // parse tags
  ret = smlParseTelnetTags(sql, sqlEnd, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
  if (ret != TSDB_CODE_SUCCESS) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
    return ret;
wmmhello's avatar
wmmhello 已提交
1745
  }
wmmhello's avatar
wmmhello 已提交
1746

wmmhello's avatar
wmmhello 已提交
1747 1748 1749
  return TSDB_CODE_SUCCESS;
}

1750
static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, bool isTag, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
1751
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
1752
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
wmmhello's avatar
wmmhello 已提交
1753

wmmhello's avatar
wmmhello 已提交
1754
    int16_t *index = (int16_t *)taosHashGet(metaHash, kv->key, kv->keyLen);
X
Xiaoyu Wang 已提交
1755
    if (index) {
1756 1757 1758 1759 1760 1761 1762 1763
      SSmlKv *value = (SSmlKv *)taosArrayGet(metaArray, *index);
      if (isTag){
        if (kv->length > value->length) {
          value->length = kv->length;
        }
        continue;
      }
      if (kv->type != value->type) {
wmmhello's avatar
wmmhello 已提交
1764
        smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
wmmhello's avatar
wmmhello 已提交
1765
        return TSDB_CODE_SML_NOT_SAME_TYPE;
1766 1767 1768 1769
      }

      if (IS_VAR_DATA_TYPE(kv->type) && (kv->length > value->length)) {  // update string len, if bigger
        value->length = kv->length;
1770
      }
X
Xiaoyu Wang 已提交
1771
    } else {
wmmhello's avatar
wmmhello 已提交
1772 1773 1774 1775 1776
      size_t tmp = taosArrayGetSize(metaArray);
      ASSERT(tmp <= INT16_MAX);
      int16_t size = tmp;
      taosArrayPush(metaArray, &kv);
      taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
1777
    }
wmmhello's avatar
wmmhello 已提交
1778
  }
wmmhello's avatar
wmmhello 已提交
1779

wmmhello's avatar
wmmhello 已提交
1780
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1781 1782
}

X
Xiaoyu Wang 已提交
1783
static void smlDestroyTableInfo(SSmlHandle *info, SSmlTableInfo *tag) {
1784 1785 1786
  for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
    SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i);
    taosHashCleanup(kvHash);
wmmhello's avatar
wmmhello 已提交
1787
  }
1788

1789 1790 1791 1792 1793
  taosArrayDestroy(tag->cols);
  taosArrayDestroy(tag->tags);
  taosMemoryFree(tag);
}

1794
static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
1795
  SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
1796
  if (!kvHash) {
1797
    uError("SML:smlDealCols failed to allocate memory");
1798
    return TSDB_CODE_OUT_OF_MEMORY;
1799
  }
X
Xiaoyu Wang 已提交
1800
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
1801
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
wmmhello's avatar
wmmhello 已提交
1802
    taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
1803 1804
  }

1805
  taosArrayPush(colsArray, &kvHash);
1806 1807 1808
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1809 1810
static void smlDestroyInfo(SSmlHandle *info) {
  if (!info) return;
1811 1812 1813
  qDestroyQuery(info->pQuery);

  // destroy info->childTables
X
Xiaoyu Wang 已提交
1814
  void **p1 = (void **)taosHashIterate(info->childTables, NULL);
1815
  while (p1) {
X
Xiaoyu Wang 已提交
1816 1817
    smlDestroyTableInfo(info, (SSmlTableInfo *)(*p1));
    p1 = (void **)taosHashIterate(info->childTables, p1);
1818 1819 1820 1821
  }
  taosHashCleanup(info->childTables);

  // destroy info->superTables
X
Xiaoyu Wang 已提交
1822
  p1 = (void **)taosHashIterate(info->superTables, NULL);
1823
  while (p1) {
X
Xiaoyu Wang 已提交
1824 1825
    smlDestroySTableMeta((SSmlSTableMeta *)(*p1));
    p1 = (void **)taosHashIterate(info->superTables, p1);
1826 1827 1828 1829 1830
  }
  taosHashCleanup(info->superTables);

  // destroy info->pVgHash
  taosHashCleanup(info->pVgHash);
wmmhello's avatar
wmmhello 已提交
1831
  destroyRequest(info->pRequest);
wmmhello's avatar
wmmhello 已提交
1832

1833 1834 1835 1836 1837
  p1 = (void **)taosHashIterate(info->superTableTagKeyStr, NULL);
  while (p1) {
    taosMemoryFree(*p1);
    p1 = (void **)taosHashIterate(info->superTableTagKeyStr, p1);
  }
1838
  taosHashCleanup(info->superTableTagKeyStr);
1839 1840 1841 1842 1843 1844

  p1 = (void **)taosHashIterate(info->superTableColKeyStr, NULL);
  while (p1) {
    taosMemoryFree(*p1);
    p1 = (void **)taosHashIterate(info->superTableColKeyStr, p1);
  }
1845
  taosHashCleanup(info->superTableColKeyStr);
1846

1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857
  taosMemoryFree(info->currentLineTagKeys);
  taosMemoryFree(info->preLineTagKeys);
  taosMemoryFree(info->currentLineColKeys);
  taosMemoryFree(info->preLineColKeys);
  taosArrayDestroy(info->preLineTagKV);
  taosArrayDestroy(info->preLineColKV);

  for(int i = 0; i < taosArrayGetSize(info->lines); i++){
    taosArrayDestroy(((SSmlLineInfo*)taosArrayGet(info->lines, i))->colArray);
  }
  taosArrayDestroy(info->lines);
wmmhello's avatar
wmmhello 已提交
1858
  cJSON_Delete(info->root);
1859 1860 1861
  taosMemoryFreeClear(info);
}

1862
static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLProtocolType protocol, int8_t precision, int32_t perBatch) {
1863 1864
  int32_t     code = TSDB_CODE_SUCCESS;
  SSmlHandle *info = (SSmlHandle *)taosMemoryCalloc(1, sizeof(SSmlHandle));
1865 1866 1867
  if (NULL == info) {
    return NULL;
  }
X
Xiaoyu Wang 已提交
1868
  info->id = smlGenId();
1869

wmmhello's avatar
wmmhello 已提交
1870
  info->pQuery = smlInitHandle();
1871

1872 1873
  if (pTscObj) {
    info->taos = pTscObj;
1874 1875 1876 1877 1878
    code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code);
      goto cleanup;
    }
wmmhello's avatar
wmmhello 已提交
1879
  }
1880

X
Xiaoyu Wang 已提交
1881 1882
  info->precision = precision;
  info->protocol = protocol;
1883 1884
  info->dataFormat = true;
  info->affectedRows = perBatch;
1885

1886
  if (request) {
1887 1888 1889 1890
    info->pRequest = request;
    info->msgBuf.buf = info->pRequest->msgBuf;
    info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
  }
1891

1892
  info->lines = taosArrayInit(perBatch, sizeof(SSmlLineInfo));
1893
  taosArraySetSize(info->lines, perBatch);
1894 1895 1896
  info->superTableTagKeyStr = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  info->superTableColKeyStr = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);

1897 1898
  info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
1899
  info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
1900

1901 1902
  if (NULL == info->pQuery || NULL == info->childTables || NULL == info->superTables || NULL == info->superTableTagKeyStr
      || NULL == info->superTableColKeyStr || NULL == info->pVgHash) {
X
Xiaoyu Wang 已提交
1903
    uError("SML:0x%" PRIx64 " create info failed", info->id);
1904 1905 1906 1907
    goto cleanup;
  }

  return info;
1908
  cleanup:
1909 1910 1911 1912 1913 1914 1915 1916
  smlDestroyInfo(info);
  return NULL;
}

/************* TSDB_SML_JSON_PROTOCOL function start **************/
static int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *root, SSmlTableInfo *tinfo) {
  cJSON *metric = cJSON_GetObjectItem(root, "metric");
  if (!cJSON_IsString(metric)) {
X
Xiaoyu Wang 已提交
1917
    return TSDB_CODE_TSC_INVALID_JSON;
1918 1919 1920
  }

  tinfo->sTableNameLen = strlen(metric->valuestring);
1921
  if (IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
X
Xiaoyu Wang 已提交
1922
    uError("OTD:0x%" PRIx64 " Metric lenght is 0 or large than 192", info->id);
1923 1924 1925
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
  }

wmmhello's avatar
wmmhello 已提交
1926 1927
  tinfo->sTableName = metric->valuestring;
  return TSDB_CODE_SUCCESS;
1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946
}

static int32_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int64_t *tsVal) {
  int32_t size = cJSON_GetArraySize(root);
  if (size != OTD_JSON_SUB_FIELDS_NUM) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *value = cJSON_GetObjectItem(root, "value");
  if (!cJSON_IsNumber(value)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *type = cJSON_GetObjectItem(root, "type");
  if (!cJSON_IsString(type)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  double timeDouble = value->valuedouble;
X
Xiaoyu Wang 已提交
1947
  if (smlDoubleToInt64OverFlow(timeDouble)) {
1948
    smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1949
    return TSDB_CODE_INVALID_TIMESTAMP;
1950
  }
wmmhello's avatar
wmmhello 已提交
1951 1952 1953 1954 1955 1956 1957 1958

  if (timeDouble == 0) {
    *tsVal = taosGetTimestampNs();
    return TSDB_CODE_SUCCESS;
  }

  if (timeDouble < 0) {
    return TSDB_CODE_INVALID_TIMESTAMP;
1959 1960
  }

1961
  *tsVal = timeDouble;
1962
  size_t typeLen = strlen(type->valuestring);
wmmhello's avatar
wmmhello 已提交
1963
  if (typeLen == 1 && (type->valuestring[0] == 's' || type->valuestring[0] == 'S')) {
X
Xiaoyu Wang 已提交
1964
    // seconds
1965 1966
    *tsVal = *tsVal * NANOSECOND_PER_SEC;
    timeDouble = timeDouble * NANOSECOND_PER_SEC;
X
Xiaoyu Wang 已提交
1967
    if (smlDoubleToInt64OverFlow(timeDouble)) {
1968
      smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1969
      return TSDB_CODE_INVALID_TIMESTAMP;
1970
    }
wmmhello's avatar
wmmhello 已提交
1971
  } else if (typeLen == 2 && (type->valuestring[1] == 's' || type->valuestring[1] == 'S')) {
1972 1973
    switch (type->valuestring[0]) {
      case 'm':
wmmhello's avatar
wmmhello 已提交
1974
      case 'M':
X
Xiaoyu Wang 已提交
1975
        // milliseconds
1976 1977
        *tsVal = *tsVal * NANOSECOND_PER_MSEC;
        timeDouble = timeDouble * NANOSECOND_PER_MSEC;
X
Xiaoyu Wang 已提交
1978
        if (smlDoubleToInt64OverFlow(timeDouble)) {
1979
          smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1980
          return TSDB_CODE_INVALID_TIMESTAMP;
1981 1982 1983
        }
        break;
      case 'u':
wmmhello's avatar
wmmhello 已提交
1984
      case 'U':
X
Xiaoyu Wang 已提交
1985
        // microseconds
1986 1987
        *tsVal = *tsVal * NANOSECOND_PER_USEC;
        timeDouble = timeDouble * NANOSECOND_PER_USEC;
X
Xiaoyu Wang 已提交
1988
        if (smlDoubleToInt64OverFlow(timeDouble)) {
1989
          smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1990
          return TSDB_CODE_INVALID_TIMESTAMP;
1991 1992 1993
        }
        break;
      case 'n':
wmmhello's avatar
wmmhello 已提交
1994
      case 'N':
1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015
        break;
      default:
        return TSDB_CODE_TSC_INVALID_JSON;
    }
  } else {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  return TSDB_CODE_SUCCESS;
}

static uint8_t smlGetTimestampLen(int64_t num) {
  uint8_t len = 0;
  while ((num /= 10) != 0) {
    len++;
  }
  len++;
  return len;
}

static int32_t smlParseTSFromJSON(SSmlHandle *info, cJSON *root, SArray *cols) {
X
Xiaoyu Wang 已提交
2016
  // Timestamp must be the first KV to parse
2017 2018 2019 2020
  int64_t tsVal = 0;

  cJSON *timestamp = cJSON_GetObjectItem(root, "timestamp");
  if (cJSON_IsNumber(timestamp)) {
X
Xiaoyu Wang 已提交
2021
    // timestamp value 0 indicates current system time
2022
    double timeDouble = timestamp->valuedouble;
X
Xiaoyu Wang 已提交
2023
    if (smlDoubleToInt64OverFlow(timeDouble)) {
2024
      smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
2025
      return TSDB_CODE_INVALID_TIMESTAMP;
2026
    }
wmmhello's avatar
wmmhello 已提交
2027

X
Xiaoyu Wang 已提交
2028
    if (timeDouble < 0) {
wmmhello's avatar
wmmhello 已提交
2029
      return TSDB_CODE_INVALID_TIMESTAMP;
2030
    }
2031

2032
    uint8_t tsLen = smlGetTimestampLen((int64_t)timeDouble);
2033
    tsVal = (int64_t)timeDouble;
2034
    if (tsLen == TSDB_TIME_PRECISION_SEC_DIGITS) {
2035 2036
      tsVal = tsVal * NANOSECOND_PER_SEC;
      timeDouble = timeDouble * NANOSECOND_PER_SEC;
X
Xiaoyu Wang 已提交
2037
      if (smlDoubleToInt64OverFlow(timeDouble)) {
2038
        smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
2039
        return TSDB_CODE_INVALID_TIMESTAMP;
2040 2041
      }
    } else if (tsLen == TSDB_TIME_PRECISION_MILLI_DIGITS) {
2042 2043
      tsVal = tsVal * NANOSECOND_PER_MSEC;
      timeDouble = timeDouble * NANOSECOND_PER_MSEC;
X
Xiaoyu Wang 已提交
2044
      if (smlDoubleToInt64OverFlow(timeDouble)) {
2045
        smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
2046
        return TSDB_CODE_INVALID_TIMESTAMP;
2047
      }
X
Xiaoyu Wang 已提交
2048
    } else if (timeDouble == 0) {
wmmhello's avatar
wmmhello 已提交
2049
      tsVal = taosGetTimestampNs();
X
Xiaoyu Wang 已提交
2050
    } else {
wmmhello's avatar
wmmhello 已提交
2051
      return TSDB_CODE_INVALID_TIMESTAMP;
2052 2053 2054 2055
    }
  } else if (cJSON_IsObject(timestamp)) {
    int32_t ret = smlParseTSFromJSONObj(info, timestamp, &tsVal);
    if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2056
      uError("SML:0x%" PRIx64 " Failed to parse timestamp from JSON Obj", info->id);
2057 2058 2059 2060
      return ret;
    }
  } else {
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2061 2062
  }

wmmhello's avatar
wmmhello 已提交
2063
  // add ts to
2064 2065 2066
  SSmlKv kv = {.key = TS, .keyLen = TS_LEN, .i = tsVal,
               .type = TSDB_DATA_TYPE_TIMESTAMP, .length = (int16_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};

wmmhello's avatar
wmmhello 已提交
2067
  taosArrayPush(cols, &kv);
wmmhello's avatar
wmmhello 已提交
2068 2069 2070
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
2071
static int32_t smlConvertJSONBool(SSmlKv *pVal, char *typeStr, cJSON *value) {
2072 2073 2074 2075 2076 2077 2078
  if (strcasecmp(typeStr, "bool") != 0) {
    uError("OTD:invalid type(%s) for JSON Bool", typeStr);
    return TSDB_CODE_TSC_INVALID_JSON_TYPE;
  }
  pVal->type = TSDB_DATA_TYPE_BOOL;
  pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
  pVal->i = value->valueint;
wmmhello's avatar
wmmhello 已提交
2079

2080 2081
  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
2082

X
Xiaoyu Wang 已提交
2083 2084 2085
static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
  // tinyint
  if (strcasecmp(typeStr, "i8") == 0 || strcasecmp(typeStr, "tinyint") == 0) {
2086 2087 2088 2089 2090 2091 2092 2093 2094
    if (!IS_VALID_TINYINT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(tinyint)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_TINYINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
2095 2096
  // smallint
  if (strcasecmp(typeStr, "i16") == 0 || strcasecmp(typeStr, "smallint") == 0) {
2097 2098 2099 2100 2101 2102 2103 2104 2105
    if (!IS_VALID_SMALLINT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(smallint)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_SMALLINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
2106 2107
  // int
  if (strcasecmp(typeStr, "i32") == 0 || strcasecmp(typeStr, "int") == 0) {
2108 2109 2110 2111 2112 2113 2114 2115 2116
    if (!IS_VALID_INT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(int)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_INT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
2117 2118
  // bigint
  if (strcasecmp(typeStr, "i64") == 0 || strcasecmp(typeStr, "bigint") == 0) {
2119 2120
    pVal->type = TSDB_DATA_TYPE_BIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
X
Xiaoyu Wang 已提交
2121
    if (value->valuedouble >= (double)INT64_MAX) {
2122
      pVal->i = INT64_MAX;
X
Xiaoyu Wang 已提交
2123
    } else if (value->valuedouble <= (double)INT64_MIN) {
2124
      pVal->i = INT64_MIN;
X
Xiaoyu Wang 已提交
2125
    } else {
2126
      pVal->i = value->valuedouble;
2127 2128 2129
    }
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
2130 2131
  // float
  if (strcasecmp(typeStr, "f32") == 0 || strcasecmp(typeStr, "float") == 0) {
2132 2133 2134
    if (!IS_VALID_FLOAT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(float)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
wmmhello's avatar
wmmhello 已提交
2135
    }
2136 2137 2138 2139 2140
    pVal->type = TSDB_DATA_TYPE_FLOAT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->f = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
2141 2142
  // double
  if (strcasecmp(typeStr, "f64") == 0 || strcasecmp(typeStr, "double") == 0) {
2143 2144 2145 2146
    pVal->type = TSDB_DATA_TYPE_DOUBLE;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->d = value->valuedouble;
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2147 2148
  }

X
Xiaoyu Wang 已提交
2149
  // if reach here means type is unsupported
2150 2151 2152
  uError("OTD:invalid type(%s) for JSON Number", typeStr);
  return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
2153

X
Xiaoyu Wang 已提交
2154
static int32_t smlConvertJSONString(SSmlKv *pVal, char *typeStr, cJSON *value) {
2155 2156 2157 2158 2159 2160 2161 2162 2163
  if (strcasecmp(typeStr, "binary") == 0) {
    pVal->type = TSDB_DATA_TYPE_BINARY;
  } else if (strcasecmp(typeStr, "nchar") == 0) {
    pVal->type = TSDB_DATA_TYPE_NCHAR;
  } else {
    uError("OTD:invalid type(%s) for JSON String", typeStr);
    return TSDB_CODE_TSC_INVALID_JSON_TYPE;
  }
  pVal->length = (int16_t)strlen(value->valuestring);
wmmhello's avatar
wmmhello 已提交
2164

2165
  if (pVal->type == TSDB_DATA_TYPE_BINARY && pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
wmmhello's avatar
wmmhello 已提交
2166 2167
    return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
  }
2168 2169
  if (pVal->type == TSDB_DATA_TYPE_NCHAR &&
      pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
wmmhello's avatar
wmmhello 已提交
2170 2171 2172
    return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
  }

wmmhello's avatar
wmmhello 已提交
2173 2174
  pVal->value = value->valuestring;
  return TSDB_CODE_SUCCESS;
2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200
}

static int32_t smlParseValueFromJSONObj(cJSON *root, SSmlKv *kv) {
  int32_t ret = TSDB_CODE_SUCCESS;
  int32_t size = cJSON_GetArraySize(root);

  if (size != OTD_JSON_SUB_FIELDS_NUM) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *value = cJSON_GetObjectItem(root, "value");
  if (value == NULL) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *type = cJSON_GetObjectItem(root, "type");
  if (!cJSON_IsString(type)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  switch (value->type) {
    case cJSON_True:
    case cJSON_False: {
      ret = smlConvertJSONBool(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
wmmhello's avatar
wmmhello 已提交
2201
      }
2202
      break;
wmmhello's avatar
wmmhello 已提交
2203
    }
2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219
    case cJSON_Number: {
      ret = smlConvertJSONNumber(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
      break;
    }
    case cJSON_String: {
      ret = smlConvertJSONString(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
      break;
    }
    default:
      return TSDB_CODE_TSC_INVALID_JSON_TYPE;
wmmhello's avatar
wmmhello 已提交
2220
  }
2221 2222

  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2223 2224
}

2225 2226 2227 2228 2229 2230 2231 2232
static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
  switch (root->type) {
    case cJSON_True:
    case cJSON_False: {
      kv->type = TSDB_DATA_TYPE_BOOL;
      kv->length = (int16_t)tDataTypes[kv->type].bytes;
      kv->i = root->valueint;
      break;
wmmhello's avatar
wmmhello 已提交
2233
    }
2234 2235 2236 2237 2238 2239 2240 2241 2242 2243
    case cJSON_Number: {
      kv->type = TSDB_DATA_TYPE_DOUBLE;
      kv->length = (int16_t)tDataTypes[kv->type].bytes;
      kv->d = root->valuedouble;
      break;
    }
    case cJSON_String: {
      /* set default JSON type to binary/nchar according to
       * user configured parameter tsDefaultJSONStrType
       */
wmmhello's avatar
wmmhello 已提交
2244

X
Xiaoyu Wang 已提交
2245
      char *tsDefaultJSONStrType = "nchar";  // todo
2246 2247
      smlConvertJSONString(kv, tsDefaultJSONStrType, root);
      break;
wmmhello's avatar
wmmhello 已提交
2248
    }
2249 2250 2251 2252 2253 2254 2255 2256 2257 2258
    case cJSON_Object: {
      int32_t ret = smlParseValueFromJSONObj(root, kv);
      if (ret != TSDB_CODE_SUCCESS) {
        uError("OTD:Failed to parse value from JSON Obj");
        return ret;
      }
      break;
    }
    default:
      return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2259
  }
2260 2261 2262 2263 2264

  return TSDB_CODE_SUCCESS;
}

static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
dengyihao's avatar
dengyihao 已提交
2265
  if (!cols) return TSDB_CODE_OUT_OF_MEMORY;
2266 2267 2268 2269 2270
  cJSON *metricVal = cJSON_GetObjectItem(root, "value");
  if (metricVal == NULL) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

2271 2272
  SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN};
  int32_t ret = smlParseValueFromJSON(metricVal, &kv);
2273 2274 2275
  if (ret != TSDB_CODE_SUCCESS) {
    return ret;
  }
2276 2277
  taosArrayPush(cols, &kv);

2278
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2279 2280
}

X
Xiaoyu Wang 已提交
2281 2282
static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, char *childTableName, SHashObj *dumplicateKey,
                                    SSmlMsgBuf *msg) {
2283
  int32_t ret = TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
2284
  if (!pKVs) {
wmmhello's avatar
wmmhello 已提交
2285 2286
    return TSDB_CODE_OUT_OF_MEMORY;
  }
2287 2288 2289 2290
  cJSON *tags = cJSON_GetObjectItem(root, "tags");
  if (tags == NULL || tags->type != cJSON_Object) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }
wmmhello's avatar
wmmhello 已提交
2291

X
Xiaoyu Wang 已提交
2292
  size_t  childTableNameLen = strlen(tsSmlChildTableName);
2293 2294 2295 2296 2297
  int32_t tagNum = cJSON_GetArraySize(tags);
  for (int32_t i = 0; i < tagNum; ++i) {
    cJSON *tag = cJSON_GetArrayItem(tags, i);
    if (tag == NULL) {
      return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2298
    }
2299 2300 2301 2302 2303
    size_t keyLen = strlen(tag->string);
    if (IS_INVALID_COL_LEN(keyLen)) {
      uError("OTD:Tag key length is 0 or too large than 64");
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
    }
X
Xiaoyu Wang 已提交
2304
    // check duplicate keys
2305
    if (smlCheckDuplicateKey(tag->string, keyLen, dumplicateKey)) {
wmmhello's avatar
wmmhello 已提交
2306
      return TSDB_CODE_TSC_DUP_NAMES;
wmmhello's avatar
wmmhello 已提交
2307 2308
    }

X
Xiaoyu Wang 已提交
2309 2310
    // handle child table name
    if (childTableNameLen != 0 && strcmp(tag->string, tsSmlChildTableName) == 0) {
2311 2312 2313 2314 2315
      if (!cJSON_IsString(tag)) {
        uError("OTD:ID must be JSON string");
        return TSDB_CODE_TSC_INVALID_JSON;
      }
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
wmmhello's avatar
wmmhello 已提交
2316
      tstrncpy(childTableName, tag->valuestring, TSDB_TABLE_NAME_LEN);
2317 2318 2319
      continue;
    }

2320
    // add kv to SSmlKv
2321
    SSmlKv kv ={.key = tag->string, .keyLen = keyLen};
X
Xiaoyu Wang 已提交
2322
    // value
2323
    ret = smlParseValueFromJSON(tag, &kv);
2324 2325 2326
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }
2327
    taosArrayPush(pKVs, &kv);
wmmhello's avatar
wmmhello 已提交
2328 2329
  }

2330
  return ret;
wmmhello's avatar
wmmhello 已提交
2331 2332
}

2333 2334
static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo *tinfo, SArray *cols) {
  int32_t ret = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2335

2336
  if (!cJSON_IsObject(root)) {
X
Xiaoyu Wang 已提交
2337
    uError("OTD:0x%" PRIx64 " data point needs to be JSON object", info->id);
2338
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2339
  }
2340

2341
  int32_t size = cJSON_GetArraySize(root);
X
Xiaoyu Wang 已提交
2342
  // outmost json fields has to be exactly 4
2343
  if (size != OTD_JSON_FIELDS_NUM) {
X
Xiaoyu Wang 已提交
2344
    uError("OTD:0x%" PRIx64 " Invalid number of JSON fields in data point %d", info->id, size);
2345
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2346
  }
2347

X
Xiaoyu Wang 已提交
2348
  // Parse metric
2349 2350
  ret = smlParseMetricFromJSON(info, root, tinfo);
  if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2351
    uError("OTD:0x%" PRIx64 " Unable to parse metric from JSON payload", info->id);
2352
    return ret;
wmmhello's avatar
wmmhello 已提交
2353
  }
X
Xiaoyu Wang 已提交
2354
  uDebug("OTD:0x%" PRIx64 " Parse metric from JSON payload finished", info->id);
wmmhello's avatar
wmmhello 已提交
2355

X
Xiaoyu Wang 已提交
2356
  // Parse timestamp
2357 2358
  ret = smlParseTSFromJSON(info, root, cols);
  if (ret) {
X
Xiaoyu Wang 已提交
2359
    uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id);
2360
    return ret;
wmmhello's avatar
wmmhello 已提交
2361
  }
X
Xiaoyu Wang 已提交
2362
  uDebug("OTD:0x%" PRIx64 " Parse timestamp from JSON payload finished", info->id);
2363

X
Xiaoyu Wang 已提交
2364
  // Parse metric value
2365 2366
  ret = smlParseColsFromJSON(root, cols);
  if (ret) {
X
Xiaoyu Wang 已提交
2367
    uError("OTD:0x%" PRIx64 " Unable to parse metric value from JSON payload", info->id);
2368
    return ret;
2369
  }
X
Xiaoyu Wang 已提交
2370
  uDebug("OTD:0x%" PRIx64 " Parse metric value from JSON payload finished", info->id);
2371

X
Xiaoyu Wang 已提交
2372
  // Parse tags
2373
  ret = smlParseTagsFromJSON(root, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
2374
  if (ret) {
X
Xiaoyu Wang 已提交
2375
    uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
2376
    return ret;
2377
  }
X
Xiaoyu Wang 已提交
2378
  uDebug("OTD:0x%" PRIx64 " Parse tags from JSON payload finished", info->id);
wmmhello's avatar
wmmhello 已提交
2379

2380
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2381
}
2382
/************* TSDB_SML_JSON_PROTOCOL function end **************/
2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395
static int32_t smlParseLineBottom(SSmlHandle *info) {
  if(info->dataFormat) return TSDB_CODE_SUCCESS;

  for(int32_t i = 0; i < taosArrayGetSize(info->lines); i ++){
    SSmlLineInfo* elements = taosArrayGet(info->lines, i);
    bool            hasTable = true;
    SSmlTableInfo  *tinfo = NULL;
    SSmlTableInfo **oneTable =
        (SSmlTableInfo **)taosHashGet(info->childTables, elements->measure, elements->measureTagsLen);
    if(oneTable == NULL){
      uError("SML:0x%" PRIx64 "get oneTable failed, line num:%d", info->id, i);
      smlBuildInvalidDataMsg(&info->msgBuf, "get oneTable failed", elements->measure);
      return TSDB_CODE_SML_INVALID_DATA;
2396
    }
2397
    tinfo = *oneTable;
wmmhello's avatar
wmmhello 已提交
2398

2399 2400 2401
    if (taosArrayGetSize(elements->colArray) + taosArrayGetSize(tinfo->tags) > TSDB_MAX_COLUMNS) {
      smlBuildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL);
      return TSDB_CODE_PAR_TOO_MANY_COLUMNS;
wmmhello's avatar
wmmhello 已提交
2402
    }
wmmhello's avatar
wmmhello 已提交
2403

2404
    int ret = smlPushCols(tinfo->cols, elements->colArray);
X
Xiaoyu Wang 已提交
2405
    if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2406 2407 2408
      return ret;
    }

2409 2410 2411 2412 2413 2414 2415 2416 2417 2418
    SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
    if (tableMeta) {  // update meta
      ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, elements->colArray, false, &info->msgBuf);
      if (ret == TSDB_CODE_SUCCESS) {
        ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, tinfo->tags, true, &info->msgBuf);
      }
      if (ret != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
        return ret;
      }
X
Xiaoyu Wang 已提交
2419
    } else {
2420 2421 2422 2423 2424
      ret = smlJudgeDupColName(elements->colArray, tinfo->tags, &info->msgBuf);
      if (ret != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
        return ret;
      }
wmmhello's avatar
wmmhello 已提交
2425

2426 2427 2428 2429
      SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat);
      smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags);
      smlInsertMeta(meta->colHash, meta->cols, elements->colArray);
      taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES);
wmmhello's avatar
wmmhello 已提交
2430
    }
wmmhello's avatar
wmmhello 已提交
2431
  }
2432

wmmhello's avatar
wmmhello 已提交
2433 2434 2435
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
2436
static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) {
X
Xiaoyu Wang 已提交
2437
  int            ret = TSDB_CODE_SUCCESS;
2438
  SSmlTableInfo *tinfo = smlBuildTableInfo(1, "", 0);
X
Xiaoyu Wang 已提交
2439
  if (!tinfo) {
2440
    return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
2441 2442
  }

2443 2444
  SArray *cols = taosArrayInit(16, POINTER_BYTES);
  if (cols == NULL) {
X
Xiaoyu Wang 已提交
2445
    uError("SML:0x%" PRIx64 " smlParseTelnetLine failed to allocate memory", info->id);
2446
    return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
2447 2448
  }

X
Xiaoyu Wang 已提交
2449
  if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
dengyihao's avatar
dengyihao 已提交
2450
    ret = smlParseTelnetString(info, (const char *)data, (char *)data + len, tinfo, cols);
X
Xiaoyu Wang 已提交
2451
  } else if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
2452
    ret = smlParseJSONString(info, (cJSON *)data, tinfo, cols);
X
Xiaoyu Wang 已提交
2453
  } else {
2454 2455
    ASSERT(0);
  }
X
Xiaoyu Wang 已提交
2456 2457
  if (ret != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " smlParseTelnetLine failed", info->id);
wmmhello's avatar
wmmhello 已提交
2458
    smlDestroyTableInfo(info, tinfo);
2459 2460
    taosArrayDestroy(cols);
    return ret;
wmmhello's avatar
wmmhello 已提交
2461
  }
wmmhello's avatar
wmmhello 已提交
2462

X
Xiaoyu Wang 已提交
2463
  if (taosArrayGetSize(tinfo->tags) <= 0 || taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS) {
2464
    smlBuildInvalidDataMsg(&info->msgBuf, "invalidate tags length:[1,128]", NULL);
wmmhello's avatar
wmmhello 已提交
2465 2466
    smlDestroyTableInfo(info, tinfo);
    taosArrayDestroy(cols);
wmmhello's avatar
wmmhello 已提交
2467
    return TSDB_CODE_PAR_INVALID_TAGS_NUM;
wmmhello's avatar
wmmhello 已提交
2468
  }
2469 2470
  taosHashClear(info->dumplicateKey);

X
Xiaoyu Wang 已提交
2471 2472
  if (strlen(tinfo->childTableName) == 0) {
    RandTableName rName = {tinfo->tags, tinfo->sTableName, (uint8_t)tinfo->sTableNameLen, tinfo->childTableName, 0};
2473 2474
    buildChildTableName(&rName);
    tinfo->uid = rName.uid;
X
Xiaoyu Wang 已提交
2475 2476
  } else {
    tinfo->uid = *(uint64_t *)(tinfo->childTableName);  // generate uid by name simple
2477 2478
  }

X
Xiaoyu Wang 已提交
2479 2480 2481 2482
  bool            hasTable = true;
  SSmlTableInfo **oneTable =
      (SSmlTableInfo **)taosHashGet(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName));
  if (!oneTable) {
2483
    taosHashPut(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName), &tinfo, POINTER_BYTES);
2484
    oneTable = &tinfo;
2485
    hasTable = false;
X
Xiaoyu Wang 已提交
2486
  } else {
wmmhello's avatar
wmmhello 已提交
2487
    smlDestroyTableInfo(info, tinfo);
wmmhello's avatar
wmmhello 已提交
2488
  }
wmmhello's avatar
wmmhello 已提交
2489

2490
  taosArrayPush((*oneTable)->cols, &cols);
X
Xiaoyu Wang 已提交
2491 2492 2493
  SSmlSTableMeta **tableMeta =
      (SSmlSTableMeta **)taosHashGet(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen);
  if (tableMeta) {  // update meta
2494
    ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, cols, false, &info->msgBuf);
wmmhello's avatar
wmmhello 已提交
2495
    if (!hasTable && ret == TSDB_CODE_SUCCESS) {
2496
      ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, (*oneTable)->tags, true, &info->msgBuf);
wmmhello's avatar
wmmhello 已提交
2497
    }
wmmhello's avatar
wmmhello 已提交
2498
    if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2499
      uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
wmmhello's avatar
wmmhello 已提交
2500
      return ret;
2501
    }
X
Xiaoyu Wang 已提交
2502
  } else {
2503
    SSmlSTableMeta *meta = smlBuildSTableMeta(false);
wmmhello's avatar
wmmhello 已提交
2504 2505
    smlInsertMeta(meta->tagHash, meta->tags, (*oneTable)->tags);
    smlInsertMeta(meta->colHash, meta->cols, cols);
2506
    taosHashPut(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen, &meta, POINTER_BYTES);
wmmhello's avatar
wmmhello 已提交
2507 2508
  }

2509 2510
  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
2511

X
Xiaoyu Wang 已提交
2512
static int32_t smlParseJSON(SSmlHandle *info, char *payload) {
2513 2514
  int32_t payloadNum = 0;
  int32_t ret = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2515

2516
  if (payload == NULL) {
X
Xiaoyu Wang 已提交
2517
    uError("SML:0x%" PRIx64 " empty JSON Payload", info->id);
2518
    return TSDB_CODE_TSC_INVALID_JSON;
2519
  }
2520

wmmhello's avatar
wmmhello 已提交
2521 2522
  info->root = cJSON_Parse(payload);
  if (info->root == NULL) {
X
Xiaoyu Wang 已提交
2523
    uError("SML:0x%" PRIx64 " parse json failed:%s", info->id, payload);
2524 2525
    return TSDB_CODE_TSC_INVALID_JSON;
  }
X
Xiaoyu Wang 已提交
2526
  // multiple data points must be sent in JSON array
wmmhello's avatar
wmmhello 已提交
2527
  if (cJSON_IsObject(info->root)) {
2528
    payloadNum = 1;
wmmhello's avatar
wmmhello 已提交
2529 2530
  } else if (cJSON_IsArray(info->root)) {
    payloadNum = cJSON_GetArraySize(info->root);
2531
  } else {
X
Xiaoyu Wang 已提交
2532
    uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
2533 2534
    ret = TSDB_CODE_TSC_INVALID_JSON;
    goto end;
wmmhello's avatar
wmmhello 已提交
2535
  }
wmmhello's avatar
wmmhello 已提交
2536

2537
  for (int32_t i = 0; i < payloadNum; ++i) {
wmmhello's avatar
wmmhello 已提交
2538
    cJSON *dataPoint = (payloadNum == 1 && cJSON_IsObject(info->root)) ? info->root : cJSON_GetArrayItem(info->root, i);
wmmhello's avatar
wmmhello 已提交
2539
    ret = smlParseTelnetLine(info, dataPoint, -1);
X
Xiaoyu Wang 已提交
2540 2541
    if (ret != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
2542 2543 2544 2545
      goto end;
    }
  }

2546
  end:
2547
  return ret;
wmmhello's avatar
wmmhello 已提交
2548
}
2549

X
Xiaoyu Wang 已提交
2550
static int32_t smlInsertData(SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
2551 2552
  int32_t code = TSDB_CODE_SUCCESS;

X
Xiaoyu Wang 已提交
2553
  SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
wmmhello's avatar
wmmhello 已提交
2554
  while (oneTable) {
X
Xiaoyu Wang 已提交
2555
    SSmlTableInfo *tableData = *oneTable;
wmmhello's avatar
wmmhello 已提交
2556 2557

    SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
wmmhello's avatar
wmmhello 已提交
2558
    tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
wmmhello's avatar
wmmhello 已提交
2559
    memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName));
D
dapan1121 已提交
2560 2561 2562 2563 2564 2565

    SRequestConnInfo conn = {0};
    conn.pTrans = info->taos->pAppInfo->pTransporter;
    conn.requestId = info->pRequest->requestId;
    conn.requestObjRefId = info->pRequest->self;
    conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
X
Xiaoyu Wang 已提交
2566

wmmhello's avatar
wmmhello 已提交
2567
    SVgroupInfo vg;
D
dapan1121 已提交
2568
    code = catalogGetTableHashVgroup(info->pCatalog, &conn, &pName, &vg);
2569
    if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2570
      uError("SML:0x%" PRIx64 " catalogGetTableHashVgroup failed. table name: %s", info->id, tableData->childTableName);
wmmhello's avatar
wmmhello 已提交
2571 2572
      return code;
    }
X
Xiaoyu Wang 已提交
2573
    taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
wmmhello's avatar
wmmhello 已提交
2574

X
Xiaoyu Wang 已提交
2575 2576 2577
    SSmlSTableMeta **pMeta =
        (SSmlSTableMeta **)taosHashGet(info->superTables, tableData->sTableName, tableData->sTableNameLen);
    ASSERT(NULL != pMeta && NULL != *pMeta);
wmmhello's avatar
wmmhello 已提交
2578

2579
    // use tablemeta of stable to save vgid and uid of child table
wmmhello's avatar
wmmhello 已提交
2580
    (*pMeta)->tableMeta->vgId = vg.vgId;
X
Xiaoyu Wang 已提交
2581
    (*pMeta)->tableMeta->uid = tableData->uid;  // one table merge data block together according uid
wmmhello's avatar
wmmhello 已提交
2582

2583
    code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, (*pMeta)->cols, tableData->cols,
2584
                       (*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen,
2585
                       info->ttl, info->msgBuf.buf, info->msgBuf.len);
X
Xiaoyu Wang 已提交
2586 2587
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " smlBindData failed", info->id);
wmmhello's avatar
wmmhello 已提交
2588 2589
      return code;
    }
X
Xiaoyu Wang 已提交
2590
    oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable);
wmmhello's avatar
wmmhello 已提交
2591
  }
wmmhello's avatar
wmmhello 已提交
2592

wmmhello's avatar
wmmhello 已提交
2593
  code = smlBuildOutput(info->pQuery, info->pVgHash);
2594
  if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2595
    uError("SML:0x%" PRIx64 " smlBuildOutput failed", info->id);
2596 2597
    return code;
  }
2598 2599
  info->cost.insertRpcTime = taosGetTimestampUs();

X
Xiaoyu Wang 已提交
2600 2601 2602
  // launchQueryImpl(info->pRequest, info->pQuery, false, NULL);
  //  info->affectedRows = taos_affected_rows(info->pRequest);
  //  return info->pRequest->code;
wmmhello's avatar
wmmhello 已提交
2603

2604 2605 2606
  SAppClusterSummary *pActivity = &info->taos->pAppInfo->summary;
  atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);

2607 2608 2609 2610 2611 2612
  SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
  if (pWrapper == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pWrapper->pRequest = info->pRequest;
  launchAsyncQuery(info->pRequest, info->pQuery, NULL, pWrapper);
wmmhello's avatar
wmmhello 已提交
2613
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2614 2615
}

X
Xiaoyu Wang 已提交
2616 2617
static void smlPrintStatisticInfo(SSmlHandle *info) {
  uError("SML:0x%" PRIx64
2618
             " smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \
X
Xiaoyu Wang 已提交
2619
        parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64
2620
             "",
X
Xiaoyu Wang 已提交
2621
         info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables,
2622 2623
         info->cost.numOfCreateSTables, info->cost.numOfAlterTagSTables, info->cost.numOfAlterColSTables,
         info->cost.schemaTime - info->cost.parseTime,
X
Xiaoyu Wang 已提交
2624 2625
         info->cost.insertBindTime - info->cost.schemaTime, info->cost.insertRpcTime - info->cost.insertBindTime,
         info->cost.endTime - info->cost.insertRpcTime, info->cost.endTime - info->cost.parseTime);
2626 2627
}

dengyihao's avatar
dengyihao 已提交
2628
static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
wmmhello's avatar
wmmhello 已提交
2629
  int32_t code = TSDB_CODE_SUCCESS;
2630
  if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
dengyihao's avatar
dengyihao 已提交
2631
    if (lines) {
wmmhello's avatar
wmmhello 已提交
2632
      code = smlParseJSON(info, *lines);
dengyihao's avatar
dengyihao 已提交
2633
    } else if (rawLine) {
wmmhello's avatar
wmmhello 已提交
2634 2635
      code = smlParseJSON(info, rawLine);
    }
2636
    if (code != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
2637
      uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, lines ? *lines : rawLine);
2638 2639
      return code;
    }
wmmhello's avatar
wmmhello 已提交
2640
    return code;
wmmhello's avatar
wmmhello 已提交
2641
  }
wmmhello's avatar
wmmhello 已提交
2642

2643 2644
  int32_t i = 0;
  while (i < numLines) {
wmmhello's avatar
wmmhello 已提交
2645
    char *tmp = NULL;
dengyihao's avatar
dengyihao 已提交
2646 2647
    int   len = 0;
    if (lines) {
wmmhello's avatar
wmmhello 已提交
2648 2649
      tmp = lines[i];
      len = strlen(tmp);
dengyihao's avatar
dengyihao 已提交
2650
    } else if (rawLine) {
wmmhello's avatar
wmmhello 已提交
2651
      tmp = rawLine;
dengyihao's avatar
dengyihao 已提交
2652 2653
      while (rawLine < rawLineEnd) {
        if (*(rawLine++) == '\n') {
wmmhello's avatar
wmmhello 已提交
2654 2655 2656 2657
          break;
        }
        len++;
      }
dengyihao's avatar
dengyihao 已提交
2658
      if (info->protocol == TSDB_SML_LINE_PROTOCOL && tmp[0] == '#') {  // this line is comment
2659
        i++;
wmmhello's avatar
wmmhello 已提交
2660 2661
        continue;
      }
wmmhello's avatar
wmmhello 已提交
2662 2663
    }

2664 2665
    uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, (info->isRawLine ? "rawdata" : tmp));

X
Xiaoyu Wang 已提交
2666
    if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
2667
      code = smlParseInfluxString(info, tmp, tmp + len, taosArrayGet(info->lines, i));
X
Xiaoyu Wang 已提交
2668
    } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
wmmhello's avatar
wmmhello 已提交
2669
      code = smlParseTelnetLine(info, tmp, len);
X
Xiaoyu Wang 已提交
2670
    } else {
2671 2672
      ASSERT(0);
    }
wmmhello's avatar
wmmhello 已提交
2673
    if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2674
      uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, tmp);
2675
      return code;
wmmhello's avatar
wmmhello 已提交
2676
    }
2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694
    if(info->reRun){
      i = 0;
      info->reRun = false;
      // clear info->childTables
      void **p1 = (void **)taosHashIterate(info->childTables, NULL);
      while (p1) {
        smlDestroyTableInfo(info, (SSmlTableInfo *)(*p1));
        p1 = (void **)taosHashIterate(info->childTables, p1);
      }
      taosHashClear(info->childTables);

      // clear info->superTables
      p1 = (void **)taosHashIterate(info->superTables, NULL);
      while (p1) {
        smlDestroySTableMeta((SSmlSTableMeta *)(*p1));
        p1 = (void **)taosHashIterate(info->superTables, p1);
      }
      taosHashClear(info->superTables);
2695
      continue;
2696
    }
2697
    i++;
wmmhello's avatar
wmmhello 已提交
2698
  }
2699

2700 2701 2702
  return code;
}

dengyihao's avatar
dengyihao 已提交
2703
static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
2704
  int32_t code = TSDB_CODE_SUCCESS;
2705 2706
  int32_t retryNum = 0;

2707 2708
  info->cost.parseTime = taosGetTimestampUs();

wmmhello's avatar
wmmhello 已提交
2709
  code = smlParseLine(info, lines, rawLine, rawLineEnd, numLines);
2710
  if (code != 0) {
X
Xiaoyu Wang 已提交
2711
    uError("SML:0x%" PRIx64 " smlParseLine error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2712
    return code;
2713
  }
wmmhello's avatar
wmmhello 已提交
2714

2715 2716 2717 2718 2719 2720
  code = smlParseLineBottom(info);
  if (code != 0) {
    uError("SML:0x%" PRIx64 " smlParseLineBottom error : %s", info->id, tstrerror(code));
    return code;
  }

2721 2722 2723 2724 2725
  info->cost.lineNum = numLines;
  info->cost.numOfSTables = taosHashGetSize(info->superTables);
  info->cost.numOfCTables = taosHashGetSize(info->childTables);

  info->cost.schemaTime = taosGetTimestampUs();
2726

X
Xiaoyu Wang 已提交
2727
  do {
2728 2729
    code = smlModifyDBSchemas(info);
    if (code == 0) break;
wmmhello's avatar
wmmhello 已提交
2730
  } while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
2731

wmmhello's avatar
wmmhello 已提交
2732
  if (code != 0) {
X
Xiaoyu Wang 已提交
2733
    uError("SML:0x%" PRIx64 " smlModifyDBSchemas error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2734
    return code;
wmmhello's avatar
wmmhello 已提交
2735
  }
wmmhello's avatar
wmmhello 已提交
2736

2737
  info->cost.insertBindTime = taosGetTimestampUs();
wmmhello's avatar
wmmhello 已提交
2738 2739
  code = smlInsertData(info);
  if (code != 0) {
X
Xiaoyu Wang 已提交
2740
    uError("SML:0x%" PRIx64 " smlInsertData error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2741
    return code;
wmmhello's avatar
wmmhello 已提交
2742 2743 2744 2745 2746
  }

  return code;
}

X
Xiaoyu Wang 已提交
2747
static int32_t isSchemalessDb(STscObj *taos, SRequestObj *request) {
2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775
  //  SCatalog *catalog = NULL;
  //  int32_t   code = catalogGetHandle(((STscObj *)taos)->pAppInfo->clusterId, &catalog);
  //  if (code != TSDB_CODE_SUCCESS) {
  //    uError("SML get catalog error %d", code);
  //    return code;
  //  }
  //
  //  SName name;
  //  tNameSetDbName(&name, taos->acctId, taos->db, strlen(taos->db));
  //  char dbFname[TSDB_DB_FNAME_LEN] = {0};
  //  tNameGetFullDbName(&name, dbFname);
  //  SDbCfgInfo pInfo = {0};
  //
  //  SRequestConnInfo conn = {0};
  //  conn.pTrans = taos->pAppInfo->pTransporter;
  //  conn.requestId = request->requestId;
  //  conn.requestObjRefId = request->self;
  //  conn.mgmtEps = getEpSet_s(&taos->pAppInfo->mgmtEp);
  //
  //  code = catalogGetDBCfg(catalog, &conn, dbFname, &pInfo);
  //  if (code != TSDB_CODE_SUCCESS) {
  //    return code;
  //  }
  //  taosArrayDestroy(pInfo.pRetensions);
  //
  //  if (!pInfo.schemaless) {
  //    return TSDB_CODE_SML_INVALID_DB_CONF;
  //  }
wmmhello's avatar
wmmhello 已提交
2776 2777 2778
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
2779
static void smlInsertCallback(void *param, void *res, int32_t code) {
wmmhello's avatar
wmmhello 已提交
2780
  SRequestObj *pRequest = (SRequestObj *)res;
X
Xiaoyu Wang 已提交
2781
  SSmlHandle  *info = (SSmlHandle *)param;
2782
  int32_t      rows = taos_affected_rows(pRequest);
wmmhello's avatar
wmmhello 已提交
2783

X
Xiaoyu Wang 已提交
2784
  uDebug("SML:0x%" PRIx64 " result. code:%d, msg:%s", info->id, pRequest->code, pRequest->msgBuf);
2785
  Params *pParam = info->params;
wmmhello's avatar
wmmhello 已提交
2786
  // lock
2787 2788
  taosThreadSpinLock(&pParam->lock);
  pParam->cnt++;
X
Xiaoyu Wang 已提交
2789
  if (code != TSDB_CODE_SUCCESS) {
2790 2791
    pParam->request->code = code;
    pParam->request->body.resInfo.numOfRows += rows;
2792
  } else {
2793 2794
    pParam->request->body.resInfo.numOfRows += info->affectedRows;
  }
2795 2796 2797
  // unlock
  taosThreadSpinUnlock(&pParam->lock);

2798 2799
  if (pParam->cnt == pParam->total) {
    tsem_post(&pParam->sem);
wmmhello's avatar
wmmhello 已提交
2800
  }
wmmhello's avatar
wmmhello 已提交
2801
  uDebug("SML:0x%" PRIx64 " insert finished, code: %d, rows: %d, total: %d", info->id, code, rows, info->affectedRows);
wmmhello's avatar
wmmhello 已提交
2802 2803 2804
  info->cost.endTime = taosGetTimestampUs();
  info->cost.code = code;
  smlPrintStatisticInfo(info);
wmmhello's avatar
wmmhello 已提交
2805 2806 2807
  smlDestroyInfo(info);
}

dengyihao's avatar
dengyihao 已提交
2808
TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd,
2809
                                       int numLines, int protocol, int precision, int32_t ttl) {
2810 2811
  int      batchs = 0;
  STscObj *pTscObj = request->pTscObj;
2812

2813
  pTscObj->schemalessType = 1;
wmmhello's avatar
wmmhello 已提交
2814
  SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
wmmhello's avatar
wmmhello 已提交
2815

2816
  Params params = {0};
wmmhello's avatar
wmmhello 已提交
2817
  params.request = request;
wmmhello's avatar
wmmhello 已提交
2818 2819 2820
  tsem_init(&params.sem, 0, 0);
  taosThreadSpinInit(&(params.lock), 0);

X
Xiaoyu Wang 已提交
2821
  if (request->pDb == NULL) {
wmmhello's avatar
wmmhello 已提交
2822
    request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
wmmhello's avatar
wmmhello 已提交
2823
    smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
wmmhello's avatar
wmmhello 已提交
2824 2825 2826
    goto end;
  }

2827
  if (isSchemalessDb(pTscObj, request) != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2828
    request->code = TSDB_CODE_SML_INVALID_DB_CONF;
wmmhello's avatar
wmmhello 已提交
2829
    smlBuildInvalidDataMsg(&msg, "Cannot write data to a non schemaless database", NULL);
wmmhello's avatar
wmmhello 已提交
2830 2831 2832
    goto end;
  }

X
Xiaoyu Wang 已提交
2833
  if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) {
2834
    request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
wmmhello's avatar
wmmhello 已提交
2835
    smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL);
2836
    goto end;
wmmhello's avatar
wmmhello 已提交
2837 2838
  }

X
Xiaoyu Wang 已提交
2839 2840
  if (protocol == TSDB_SML_LINE_PROTOCOL &&
      (precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)) {
2841
    request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
wmmhello's avatar
wmmhello 已提交
2842
    smlBuildInvalidDataMsg(&msg, "precision invalidate for line protocol", NULL);
2843 2844 2845
    goto end;
  }

2846
  if (protocol == TSDB_SML_JSON_PROTOCOL) {
wmmhello's avatar
wmmhello 已提交
2847
    numLines = 1;
2848
  } else if (numLines <= 0) {
wmmhello's avatar
wmmhello 已提交
2849 2850 2851 2852 2853
    request->code = TSDB_CODE_SML_INVALID_DATA;
    smlBuildInvalidDataMsg(&msg, "line num is invalid", NULL);
    goto end;
  }

2854
  batchs = ceil(((double)numLines) / tsSmlBatchSize);
2855
  params.total = batchs;
wmmhello's avatar
wmmhello 已提交
2856
  for (int i = 0; i < batchs; ++i) {
dengyihao's avatar
dengyihao 已提交
2857
    SRequestObj *req = (SRequestObj *)createRequest(pTscObj->id, TSDB_SQL_INSERT, 0);
2858
    if (!req) {
wmmhello's avatar
wmmhello 已提交
2859 2860 2861 2862
      request->code = TSDB_CODE_OUT_OF_MEMORY;
      uError("SML:taos_schemaless_insert error request is null");
      goto end;
    }
wmmhello's avatar
wmmhello 已提交
2863

2864
    int32_t perBatch = tsSmlBatchSize;
wmmhello's avatar
wmmhello 已提交
2865

X
Xiaoyu Wang 已提交
2866
    if (numLines > perBatch) {
wmmhello's avatar
wmmhello 已提交
2867
      numLines -= perBatch;
X
Xiaoyu Wang 已提交
2868
    } else {
wmmhello's avatar
wmmhello 已提交
2869 2870 2871 2872
      perBatch = numLines;
      numLines = 0;
    }

2873 2874 2875 2876 2877 2878 2879 2880 2881 2882
    SSmlHandle *info = smlBuildSmlInfo(pTscObj, req, (SMLProtocolType)protocol, precision, perBatch);
    if (!info) {
      request->code = TSDB_CODE_OUT_OF_MEMORY;
      uError("SML:taos_schemaless_insert error SSmlHandle is null");
      goto end;
    }

    info->isRawLine = (rawLine == NULL);
    info->ttl       = ttl;

wmmhello's avatar
wmmhello 已提交
2883
    info->params = &params;
wmmhello's avatar
wmmhello 已提交
2884
    info->pRequest->body.queryFp = smlInsertCallback;
X
Xiaoyu Wang 已提交
2885
    info->pRequest->body.param = info;
wmmhello's avatar
wmmhello 已提交
2886
    int32_t code = smlProcess(info, lines, rawLine, rawLineEnd, perBatch);
dengyihao's avatar
dengyihao 已提交
2887
    if (lines) {
wmmhello's avatar
wmmhello 已提交
2888 2889
      lines += perBatch;
    }
dengyihao's avatar
dengyihao 已提交
2890
    if (rawLine) {
wmmhello's avatar
wmmhello 已提交
2891
      int num = 0;
dengyihao's avatar
dengyihao 已提交
2892 2893
      while (rawLine < rawLineEnd) {
        if (*(rawLine++) == '\n') {
wmmhello's avatar
wmmhello 已提交
2894 2895
          num++;
        }
dengyihao's avatar
dengyihao 已提交
2896
        if (num == perBatch) {
wmmhello's avatar
wmmhello 已提交
2897 2898 2899 2900
          break;
        }
      }
    }
X
Xiaoyu Wang 已提交
2901
    if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2902 2903 2904 2905
      info->pRequest->body.queryFp(info, req, code);
    }
  }
  tsem_wait(&params.sem);
2906

2907
  end:
wmmhello's avatar
wmmhello 已提交
2908 2909
  taosThreadSpinDestroy(&params.lock);
  tsem_destroy(&params.sem);
2910
  //  ((STscObj *)taos)->schemalessType = 0;
2911
  pTscObj->schemalessType = 1;
2912
  uDebug("resultend:%s", request->msgBuf);
2913
  return (TAOS_RES *)request;
wmmhello's avatar
wmmhello 已提交
2914
}
wmmhello's avatar
wmmhello 已提交
2915 2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931

/**
 * taos_schemaless_insert() parse and insert data points into database according to
 * different protocol.
 *
 * @param $lines input array may contain multiple lines, each line indicates a data point.
 *               If protocol=2 is used input array should contain single JSON
 *               string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
 *               multiple data points in JSON format, should include them in $JSON_string
 *               as a JSON array.
 * @param $numLines indicates how many data points in $lines.
 *                  If protocol = 2 is used this param will be ignored as $lines should
 *                  contain single JSON string.
 * @param $protocol indicates which protocol to use for parsing:
 *                  0 - influxDB line protocol
 *                  1 - OpenTSDB telnet line protocol
 *                  2 - OpenTSDB JSON format protocol
dengyihao's avatar
dengyihao 已提交
2932
 * @return TAOS_RES
wmmhello's avatar
wmmhello 已提交
2933 2934
 */

2935 2936
TAOS_RES *taos_schemaless_insert_ttl_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision,
                                                int32_t ttl, int64_t reqid) {
wmmhello's avatar
wmmhello 已提交
2937 2938 2939 2940 2941
  if (NULL == taos) {
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    return NULL;
  }

2942
  SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
dengyihao's avatar
dengyihao 已提交
2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954
  if (!request) {
    uError("SML:taos_schemaless_insert error request is null");
    return NULL;
  }

  if (!lines) {
    SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
    request->code = TSDB_CODE_SML_INVALID_DATA;
    smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
    return (TAOS_RES *)request;
  }

2955
  return taos_schemaless_insert_inner(request, lines, NULL, NULL, numLines, protocol, precision, ttl);
dengyihao's avatar
dengyihao 已提交
2956 2957
}

2958 2959 2960
TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, TSDB_DEFAULT_TABLE_TTL, 0);
}
wmmhello's avatar
wmmhello 已提交
2961

2962 2963 2964
TAOS_RES *taos_schemaless_insert_ttl(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int32_t ttl) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, ttl, 0);
}
wmmhello's avatar
wmmhello 已提交
2965

2966 2967
TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int64_t reqid) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, TSDB_DEFAULT_TABLE_TTL, reqid);
wmmhello's avatar
wmmhello 已提交
2968 2969
}

2970
TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
2971
                                                    int precision, int32_t ttl, int64_t reqid) {
dengyihao's avatar
dengyihao 已提交
2972 2973 2974 2975 2976
  if (NULL == taos) {
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    return NULL;
  }

2977
  SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
dengyihao's avatar
dengyihao 已提交
2978 2979 2980 2981 2982 2983 2984 2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001
  if (!request) {
    uError("SML:taos_schemaless_insert error request is null");
    return NULL;
  }

  if (!lines || len <= 0) {
    SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
    request->code = TSDB_CODE_SML_INVALID_DATA;
    smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
    return (TAOS_RES *)request;
  }

  int numLines = 0;
  *totalRows = 0;
  char *tmp = lines;
  for (int i = 0; i < len; i++) {
    if (lines[i] == '\n' || i == len - 1) {
      numLines++;
      if (tmp[0] != '#' || protocol != TSDB_SML_LINE_PROTOCOL) {  // ignore comment
        (*totalRows)++;
      }
      tmp = lines + i + 1;
    }
  }
3002
  return taos_schemaless_insert_inner(request, NULL, lines, lines + len, numLines, protocol, precision, ttl);
dengyihao's avatar
dengyihao 已提交
3003 3004
}

3005 3006 3007 3008 3009 3010
TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int64_t reqid) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, TSDB_DEFAULT_TABLE_TTL, reqid);
}
TAOS_RES *taos_schemaless_insert_raw_ttl(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int32_t ttl) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, ttl, 0);
}
wmmhello's avatar
wmmhello 已提交
3011

3012 3013
TAOS_RES *taos_schemaless_insert_raw(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, TSDB_DEFAULT_TABLE_TTL, 0);
wmmhello's avatar
wmmhello 已提交
3014
}