clientSml.c 97.2 KB
Newer Older
wmmhello's avatar
wmmhello 已提交
1 2 3 4 5
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

X
Xiaoyu Wang 已提交
6 7 8 9 10
#include "cJSON.h"
#include "catalog.h"
#include "clientInt.h"
#include "osSemaphore.h"
#include "osThread.h"
wmmhello's avatar
wmmhello 已提交
11 12
#include "query.h"
#include "taos.h"
wmmhello's avatar
wmmhello 已提交
13
#include "taoserror.h"
X
Xiaoyu Wang 已提交
14
#include "tcommon.h"
wmmhello's avatar
wmmhello 已提交
15
#include "tdef.h"
X
Xiaoyu Wang 已提交
16
#include "tglobal.h"
wmmhello's avatar
wmmhello 已提交
17 18
#include "tlog.h"
#include "tmsg.h"
X
Xiaoyu Wang 已提交
19
#include "tname.h"
wmmhello's avatar
wmmhello 已提交
20 21
#include "ttime.h"
#include "ttypes.h"
wmmhello's avatar
wmmhello 已提交
22

23 24 25 26 27 28 29 30 31 32 33 34 35
#if (defined(__GNUC__) && (__GNUC__ >= 3)) || (defined(__INTEL_COMPILER) && (__INTEL_COMPILER >= 800)) || defined(__clang__)
#  define expect(expr,value)    (__builtin_expect ((expr),(value)) )
#else
#  define expect(expr,value)    (expr)
#endif

#ifndef likely
#define likely(expr)     expect((expr) != 0, 1)
#endif
#ifndef unlikely
#define unlikely(expr)   expect((expr) != 0, 0)
#endif

wmmhello's avatar
wmmhello 已提交
36
//=================================================================================================
wmmhello's avatar
wmmhello 已提交
37 38 39 40 41 42 43

#define SPACE ' '
#define COMMA ','
#define EQUAL '='
#define QUOTE '"'
#define SLASH '\\'

dengyihao's avatar
dengyihao 已提交
44 45 46 47 48 49
#define JUMP_SPACE(sql, sqlEnd) \
  while (sql < sqlEnd) {        \
    if (*sql == SPACE)          \
      sql++;                    \
    else                        \
      break;                    \
X
Xiaoyu Wang 已提交
50
  }
wmmhello's avatar
wmmhello 已提交
51
// comma ,
X
Xiaoyu Wang 已提交
52 53
#define IS_SLASH_COMMA(sql) (*(sql) == COMMA && *((sql)-1) == SLASH)
#define IS_COMMA(sql)       (*(sql) == COMMA && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
54
// space
X
Xiaoyu Wang 已提交
55 56
#define IS_SLASH_SPACE(sql) (*(sql) == SPACE && *((sql)-1) == SLASH)
#define IS_SPACE(sql)       (*(sql) == SPACE && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
57
// equal =
X
Xiaoyu Wang 已提交
58 59
#define IS_SLASH_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) == SLASH)
#define IS_EQUAL(sql)       (*(sql) == EQUAL && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
60
// quote "
X
Xiaoyu Wang 已提交
61 62
#define IS_SLASH_QUOTE(sql) (*(sql) == QUOTE && *((sql)-1) == SLASH)
#define IS_QUOTE(sql)       (*(sql) == QUOTE && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
63
// SLASH
X
Xiaoyu Wang 已提交
64
#define IS_SLASH_SLASH(sql) (*(sql) == SLASH && *((sql)-1) == SLASH)
wmmhello's avatar
wmmhello 已提交
65

X
Xiaoyu Wang 已提交
66 67
#define IS_SLASH_LETTER(sql) \
  (IS_SLASH_COMMA(sql) || IS_SLASH_SPACE(sql) || IS_SLASH_EQUAL(sql) || IS_SLASH_QUOTE(sql) || IS_SLASH_SLASH(sql))
wmmhello's avatar
wmmhello 已提交
68

X
Xiaoyu Wang 已提交
69
#define MOVE_FORWARD_ONE(sql, len) (memmove((void *)((sql)-1), (sql), len))
wmmhello's avatar
wmmhello 已提交
70

X
Xiaoyu Wang 已提交
71 72 73 74 75 76 77 78
#define PROCESS_SLASH(key, keyLen)           \
  for (int i = 1; i < keyLen; ++i) {         \
    if (IS_SLASH_LETTER(key + i)) {          \
      MOVE_FORWARD_ONE(key + i, keyLen - i); \
      i--;                                   \
      keyLen--;                              \
    }                                        \
  }
wmmhello's avatar
wmmhello 已提交
79

80 81 82
#define IS_INVALID_COL_LEN(len)   ((len) <= 0 || (len) >= TSDB_COL_NAME_LEN)
#define IS_INVALID_TABLE_LEN(len) ((len) <= 0 || (len) >= TSDB_TABLE_NAME_LEN)

83 84 85
#define OTD_JSON_SUB_FIELDS_NUM 2
#define OTD_JSON_FIELDS_NUM     4

X
Xiaoyu Wang 已提交
86 87 88 89
#define TS        "_ts"
#define TS_LEN    3
#define VALUE     "_value"
#define VALUE_LEN 6
90

X
Xiaoyu Wang 已提交
91 92
#define BINARY_ADD_LEN 2  // "binary"   2 means " "
#define NCHAR_ADD_LEN  3  // L"nchar"   3 means L" "
wmmhello's avatar
wmmhello 已提交
93 94

#define MAX_RETRY_TIMES 5
wmmhello's avatar
wmmhello 已提交
95 96 97 98
//=================================================================================================
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;

typedef enum {
99
  SCHEMA_ACTION_NULL,
wmmhello's avatar
wmmhello 已提交
100 101 102 103 104
  SCHEMA_ACTION_CREATE_STABLE,
  SCHEMA_ACTION_ADD_COLUMN,
  SCHEMA_ACTION_ADD_TAG,
  SCHEMA_ACTION_CHANGE_COLUMN_SIZE,
  SCHEMA_ACTION_CHANGE_TAG_SIZE,
wmmhello's avatar
wmmhello 已提交
105 106 107
} ESchemaAction;

typedef struct {
X
Xiaoyu Wang 已提交
108 109 110 111
  const char *measure;
  const char *tags;
  const char *cols;
  const char *timestamp;
wmmhello's avatar
wmmhello 已提交
112 113 114 115 116 117

  int32_t measureLen;
  int32_t measureTagsLen;
  int32_t tagsLen;
  int32_t colsLen;
  int32_t timestampLen;
118 119

  SArray *colArray;
wmmhello's avatar
wmmhello 已提交
120 121 122
} SSmlLineInfo;

typedef struct {
X
Xiaoyu Wang 已提交
123 124 125 126
  const char *sTableName;  // super table name
  int32_t     sTableNameLen;
  char        childTableName[TSDB_TABLE_NAME_LEN];
  uint64_t    uid;
wmmhello's avatar
wmmhello 已提交
127

X
Xiaoyu Wang 已提交
128
  SArray *tags;
wmmhello's avatar
wmmhello 已提交
129

130
  // elements are SHashObj<cols key string, SSmlKv*> for find by key quickly
X
Xiaoyu Wang 已提交
131
  SArray *cols;
132
  STableDataCxt *tableDataCtx;
wmmhello's avatar
wmmhello 已提交
133 134 135
} SSmlTableInfo;

typedef struct {
X
Xiaoyu Wang 已提交
136 137
  SArray   *tags;     // save the origin order to create table
  SHashObj *tagHash;  // elements are <key, index in tags>
138

X
Xiaoyu Wang 已提交
139 140
  SArray   *cols;
  SHashObj *colHash;
141

wmmhello's avatar
wmmhello 已提交
142 143 144 145
  STableMeta *tableMeta;
} SSmlSTableMeta;

typedef struct {
X
Xiaoyu Wang 已提交
146 147
  int32_t len;
  char   *buf;
wmmhello's avatar
wmmhello 已提交
148 149
} SSmlMsgBuf;

150 151 152 153 154 155 156
typedef struct {
  int32_t code;
  int32_t lineNum;

  int32_t numOfSTables;
  int32_t numOfCTables;
  int32_t numOfCreateSTables;
157 158
  int32_t numOfAlterColSTables;
  int32_t numOfAlterTagSTables;
159 160 161 162 163 164 165 166

  int64_t parseTime;
  int64_t schemaTime;
  int64_t insertBindTime;
  int64_t insertRpcTime;
  int64_t endTime;
} SSmlCostInfo;

X
Xiaoyu Wang 已提交
167 168 169
typedef struct {
  SRequestObj     *request;
  tsem_t           sem;
170 171
  int32_t          cnt;
  int32_t          total;
wmmhello's avatar
wmmhello 已提交
172 173 174
  TdThreadSpinlock lock;
} Params;

wmmhello's avatar
wmmhello 已提交
175
typedef struct {
X
Xiaoyu Wang 已提交
176 177 178 179 180
  int64_t id;
  Params *params;

  SMLProtocolType protocol;
  int8_t          precision;
181
  bool            reRun;
182 183 184
  bool            dataFormat;  // true means that the name and order of keys in each line are the same(only for influx protocol)
  bool            isRawLine;
  int32_t         ttl;
X
Xiaoyu Wang 已提交
185 186 187 188 189 190 191 192 193 194 195 196 197 198

  SHashObj *childTables;
  SHashObj *superTables;
  SHashObj *pVgHash;

  STscObj     *taos;
  SCatalog    *pCatalog;
  SRequestObj *pRequest;
  SQuery      *pQuery;

  SSmlCostInfo cost;
  int32_t      affectedRows;
  SSmlMsgBuf   msgBuf;
  SHashObj    *dumplicateKey;  // for dumplicate key
wmmhello's avatar
wmmhello 已提交
199 200

  cJSON       *root;  // for parse json
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
  SArray      *lines; // element is SSmlLineInfo

  //
  SHashObj    *superTableTagKeyStr;
  SHashObj    *superTableColKeyStr;
  void        *currentLineTagKeys;
  void        *preLineTagKeys;
  void        *currentLineColKeys;
  void        *preLineColKeys;

  SArray      *preLineTagKV;
  SArray      *preLineColKV;

  SSmlLineInfo preLine;
  STableMeta  *currSTableMeta;
  STableDataCxt *currTableDataCtx;
wmmhello's avatar
wmmhello 已提交
217
} SSmlHandle;
wmmhello's avatar
wmmhello 已提交
218 219
//=================================================================================================

wmmhello's avatar
wmmhello 已提交
220
//=================================================================================================
221
static volatile int64_t linesSmlHandleId = 0;
X
Xiaoyu Wang 已提交
222
static int64_t          smlGenId() {
223
  int64_t id;
wmmhello's avatar
wmmhello 已提交
224

225 226
  do {
    id = atomic_add_fetch_64(&linesSmlHandleId, 1);
wmmhello's avatar
wmmhello 已提交
227 228
  } while (id == 0);

229
  return id;
wmmhello's avatar
wmmhello 已提交
230 231
}

232
static inline bool smlDoubleToInt64OverFlow(double num) {
X
Xiaoyu Wang 已提交
233
  if (num >= (double)INT64_MAX || num <= (double)INT64_MIN) return true;
234 235 236 237 238 239 240 241 242 243 244 245
  return false;
}

static inline bool smlCheckDuplicateKey(const char *key, int32_t keyLen, SHashObj *pHash) {
  void *val = taosHashGet(pHash, key, keyLen);
  if (val) {
    return true;
  }
  taosHashPut(pHash, key, keyLen, key, 1);
  return false;
}

X
Xiaoyu Wang 已提交
246
static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf *pBuf, const char *msg1, const char *msg2) {
247
  if (pBuf->buf) {
248 249 250 251 252 253 254
    memset(pBuf->buf, 0, pBuf->len);
    if (msg1) strncat(pBuf->buf, msg1, pBuf->len);
    int32_t left = pBuf->len - strlen(pBuf->buf);
    if (left > 2 && msg2) {
      strncat(pBuf->buf, ":", left - 1);
      strncat(pBuf->buf, msg2, left - 2);
    }
wmmhello's avatar
wmmhello 已提交
255
  }
wmmhello's avatar
wmmhello 已提交
256 257 258
  return TSDB_CODE_SML_INVALID_DATA;
}

X
Xiaoyu Wang 已提交
259
static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSmlKv *kv, bool isTag,
260
                                       ESchemaAction *action, SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
261
  uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL;
262 263
  if (index) {
    if (colField[*index].type != kv->type) {
X
Xiaoyu Wang 已提交
264 265
      uError("SML:0x%" PRIx64 " point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id,
             kv->key, colField[*index].type, kv->type);
266 267 268
      return TSDB_CODE_TSC_INVALID_VALUE;
    }

X
Xiaoyu Wang 已提交
269 270 271 272
    if ((colField[*index].type == TSDB_DATA_TYPE_VARCHAR &&
         (colField[*index].bytes - VARSTR_HEADER_SIZE) < kv->length) ||
        (colField[*index].type == TSDB_DATA_TYPE_NCHAR &&
         ((colField[*index].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE < kv->length))) {
273
      if (isTag) {
wmmhello's avatar
wmmhello 已提交
274
        *action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
275
      } else {
wmmhello's avatar
wmmhello 已提交
276
        *action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
277 278 279 280
      }
    }
  } else {
    if (isTag) {
wmmhello's avatar
wmmhello 已提交
281
      *action = SCHEMA_ACTION_ADD_TAG;
282
    } else {
wmmhello's avatar
wmmhello 已提交
283
      *action = SCHEMA_ACTION_ADD_COLUMN;
284 285
    }
  }
wmmhello's avatar
wmmhello 已提交
286 287 288
  return 0;
}

wmmhello's avatar
wmmhello 已提交
289
static int32_t smlFindNearestPowerOf2(int32_t length, uint8_t type) {
wmmhello's avatar
wmmhello 已提交
290
  int32_t result = 1;
X
Xiaoyu Wang 已提交
291
  while (result <= length) {
wmmhello's avatar
wmmhello 已提交
292 293
    result *= 2;
  }
294
  if (type == TSDB_DATA_TYPE_BINARY && result > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
wmmhello's avatar
wmmhello 已提交
295
    result = TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE;
296
  } else if (type == TSDB_DATA_TYPE_NCHAR && result > (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
wmmhello's avatar
wmmhello 已提交
297 298
    result = (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
  }
wmmhello's avatar
wmmhello 已提交
299

300
  if (type == TSDB_DATA_TYPE_NCHAR) {
301
    result = result * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
302
  } else if (type == TSDB_DATA_TYPE_BINARY) {
303
    result = result + VARSTR_HEADER_SIZE;
wmmhello's avatar
wmmhello 已提交
304
  }
305
  return result;
wmmhello's avatar
wmmhello 已提交
306 307
}

X
Xiaoyu Wang 已提交
308
static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
309
                                      ESchemaAction *action, bool isTag) {
310 311
  int32_t code = TSDB_CODE_SUCCESS;
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
312
    if (j == 0 && !isTag) continue;
313
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j);
314
    code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, info);
X
Xiaoyu Wang 已提交
315
    if (code != TSDB_CODE_SUCCESS) {
316 317 318 319 320 321
      return code;
    }
  }
  return TSDB_CODE_SUCCESS;
}

322
static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool isTag) {
323
  SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
324 325
  int32_t   i = 0;
  for (; i < length; i++) {
326 327 328
    taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES);
  }

329
  if (isTag) {
330 331 332 333 334
    i = 0;
  } else {
    i = 1;
  }
  for (; i < taosArrayGetSize(cols); i++) {
335
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
X
Xiaoyu Wang 已提交
336
    if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) {
wmmhello's avatar
wmmhello 已提交
337
      taosHashCleanup(hashTmp);
338 339 340
      return -1;
    }
  }
341
  taosHashCleanup(hashTmp);
342 343 344
  return 0;
}

345
static int32_t getBytes(uint8_t type, int32_t length) {
346 347 348 349 350 351 352
  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
    return smlFindNearestPowerOf2(length, type);
  } else {
    return tDataTypes[type].bytes;
  }
}

353 354
static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
                                  SArray *results, int32_t numOfCols, bool isTag) {
wmmhello's avatar
wmmhello 已提交
355
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
356
    SSmlKv       *kv = (SSmlKv *)taosArrayGet(cols, j);
wmmhello's avatar
wmmhello 已提交
357 358
    ESchemaAction action = SCHEMA_ACTION_NULL;
    smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, &action, info);
359
    if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_ADD_TAG) {
wmmhello's avatar
wmmhello 已提交
360 361 362 363 364
      SField field = {0};
      field.type = kv->type;
      field.bytes = getBytes(kv->type, kv->length);
      memcpy(field.name, kv->key, kv->keyLen);
      taosArrayPush(results, &field);
365
    } else if (action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
wmmhello's avatar
wmmhello 已提交
366
      uint16_t *index = (uint16_t *)taosHashGet(schemaHash, kv->key, kv->keyLen);
367 368
      uint16_t  newIndex = *index;
      if (isTag) newIndex -= numOfCols;
wmmhello's avatar
wmmhello 已提交
369 370 371 372 373 374 375
      SField *field = (SField *)taosArrayGet(results, newIndex);
      field->bytes = getBytes(kv->type, kv->length);
    }
  }
  return TSDB_CODE_SUCCESS;
}

376 377 378 379 380
// static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SSmlSTableMeta *sTableData,
//                               int32_t colVer, int32_t tagVer, int8_t source, uint64_t suid){
static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, SArray *pTags, STableMeta *pTableMeta,
                              ESchemaAction action) {
  SRequestObj   *pRequest = NULL;
381 382 383 384
  SMCreateStbReq pReq = {0};
  int32_t        code = TSDB_CODE_SUCCESS;
  SCmdMsgInfo    pCmdMsg = {0};

wmmhello's avatar
wmmhello 已提交
385 386 387 388 389 390
  // put front for free
  pReq.numOfColumns = taosArrayGetSize(pColumns);
  pReq.pColumns = pColumns;
  pReq.numOfTags = taosArrayGetSize(pTags);
  pReq.pTags = pTags;

dengyihao's avatar
dengyihao 已提交
391
  code = buildRequest(info->taos->id, "", 0, NULL, false, &pRequest, 0);
392 393 394 395
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
396
  pRequest->syncQuery = true;
397 398 399 400 401
  if (!pRequest->pDb) {
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }

402
  if (action == SCHEMA_ACTION_CREATE_STABLE) {
wmmhello's avatar
wmmhello 已提交
403 404 405 406
    pReq.colVer = 1;
    pReq.tagVer = 1;
    pReq.suid = 0;
    pReq.source = TD_REQ_FROM_APP;
407
  } else if (action == SCHEMA_ACTION_ADD_TAG || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
wmmhello's avatar
wmmhello 已提交
408 409 410 411
    pReq.colVer = pTableMeta->sversion;
    pReq.tagVer = pTableMeta->tversion + 1;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
412
  } else if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE) {
wmmhello's avatar
wmmhello 已提交
413 414 415 416 417 418
    pReq.colVer = pTableMeta->sversion + 1;
    pReq.tagVer = pTableMeta->tversion;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
  }

419
  if (pReq.numOfTags == 0) {
wmmhello's avatar
wmmhello 已提交
420 421 422 423 424 425 426 427
    pReq.numOfTags = 1;
    SField field = {0};
    field.type = TSDB_DATA_TYPE_NCHAR;
    field.bytes = 1;
    strcpy(field.name, tsSmlTagName);
    taosArrayPush(pReq.pTags, &field);
  }

428 429 430 431 432 433 434 435 436 437 438 439 440 441
  pReq.commentLen = -1;
  pReq.igExists = true;
  tNameExtractFullName(pName, pReq.name);

  pCmdMsg.epSet = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

D
dapan1121 已提交
442 443
  SQuery pQuery;
  memset(&pQuery, 0, sizeof(pQuery));
444 445 446 447 448 449 450
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);

451
  if (pRequest->code == TSDB_CODE_SUCCESS) {
452 453 454 455 456
    catalogRemoveTableMeta(info->pCatalog, pName);
  }
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

457
  end:
458 459 460 461 462
  destroyRequest(pRequest);
  tFreeSMCreateStbReq(&pReq);
  return code;
}

X
Xiaoyu Wang 已提交
463
static int32_t smlModifyDBSchemas(SSmlHandle *info) {
464 465 466
  int32_t     code = 0;
  SHashObj   *hashTmp = NULL;
  STableMeta *pTableMeta = NULL;
wmmhello's avatar
wmmhello 已提交
467

468
  SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
wmmhello's avatar
wmmhello 已提交
469
  tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
wmmhello's avatar
wmmhello 已提交
470

D
dapan1121 已提交
471 472 473 474 475
  SRequestConnInfo conn = {0};
  conn.pTrans = info->taos->pAppInfo->pTransporter;
  conn.requestId = info->pRequest->requestId;
  conn.requestObjRefId = info->pRequest->self;
  conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
X
Xiaoyu Wang 已提交
476 477

  SSmlSTableMeta **tableMetaSml = (SSmlSTableMeta **)taosHashIterate(info->superTables, NULL);
wmmhello's avatar
wmmhello 已提交
478
  while (tableMetaSml) {
X
Xiaoyu Wang 已提交
479 480
    SSmlSTableMeta *sTableData = *tableMetaSml;
    bool            needCheckMeta = false;  // for multi thread
wmmhello's avatar
wmmhello 已提交
481

wmmhello's avatar
wmmhello 已提交
482
    size_t superTableLen = 0;
X
Xiaoyu Wang 已提交
483
    void  *superTable = taosHashGetKey(tableMetaSml, &superTableLen);
484
    memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
wmmhello's avatar
wmmhello 已提交
485
    memcpy(pName.tname, superTable, superTableLen);
wmmhello's avatar
wmmhello 已提交
486

D
dapan1121 已提交
487
    code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
wmmhello's avatar
wmmhello 已提交
488

489
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
490 491
      SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
      SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
wmmhello's avatar
wmmhello 已提交
492 493 494 495
      smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
      smlBuildFieldsList(info, NULL, NULL, sTableData->cols, pColumns, 0, false);

      code = smlSendMetaMsg(info, &pName, pColumns, pTags, NULL, SCHEMA_ACTION_CREATE_STABLE);
496
      if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
497
        uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
498
        goto end;
wmmhello's avatar
wmmhello 已提交
499
      }
500
      info->cost.numOfCreateSTables++;
wmmhello's avatar
wmmhello 已提交
501 502 503 504 505 506 507
      taosMemoryFreeClear(pTableMeta);

      code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
        goto end;
      }
X
Xiaoyu Wang 已提交
508
    } else if (code == TSDB_CODE_SUCCESS) {
509 510
      hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true,
                             HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
511 512
      for (uint16_t i = pTableMeta->tableInfo.numOfColumns;
           i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
513 514
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
      }
wmmhello's avatar
wmmhello 已提交
515

516 517
      ESchemaAction action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, &action, true);
518
      if (code != TSDB_CODE_SUCCESS) {
519
        goto end;
520
      }
521 522 523 524 525
      if (action != SCHEMA_ACTION_NULL) {
        SArray *pColumns =
            taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray *pTags =
            taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
wmmhello's avatar
wmmhello 已提交
526 527 528 529 530 531

        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
532
          if (i < pTableMeta->tableInfo.numOfColumns) {
wmmhello's avatar
wmmhello 已提交
533
            taosArrayPush(pColumns, &field);
534
          } else {
wmmhello's avatar
wmmhello 已提交
535 536 537
            taosArrayPush(pTags, &field);
          }
        }
538 539
        smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->tags, pTags,
                           pTableMeta->tableInfo.numOfColumns, true);
wmmhello's avatar
wmmhello 已提交
540 541

        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
542
        if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
543
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
544 545 546
          goto end;
        }

547
        info->cost.numOfAlterTagSTables++;
wmmhello's avatar
wmmhello 已提交
548 549 550 551 552 553 554 555 556
        taosMemoryFreeClear(pTableMeta);
        code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
        }
        code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
        }
wmmhello's avatar
wmmhello 已提交
557
      }
558 559

      taosHashClear(hashTmp);
wmmhello's avatar
wmmhello 已提交
560
      for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
561 562
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
      }
563 564
      action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &action, false);
565
      if (code != TSDB_CODE_SUCCESS) {
566
        goto end;
wmmhello's avatar
wmmhello 已提交
567
      }
568 569 570 571 572
      if (action != SCHEMA_ACTION_NULL) {
        SArray *pColumns =
            taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray *pTags =
            taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
wmmhello's avatar
wmmhello 已提交
573 574 575 576 577 578

        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
579
          if (i < pTableMeta->tableInfo.numOfColumns) {
wmmhello's avatar
wmmhello 已提交
580
            taosArrayPush(pColumns, &field);
581
          } else {
wmmhello's avatar
wmmhello 已提交
582 583 584 585
            taosArrayPush(pTags, &field);
          }
        }

586 587
        smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->cols, pColumns,
                           pTableMeta->tableInfo.numOfColumns, false);
wmmhello's avatar
wmmhello 已提交
588 589

        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
590
        if (code != TSDB_CODE_SUCCESS) {
591
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
592 593
          goto end;
        }
wmmhello's avatar
wmmhello 已提交
594

595
        info->cost.numOfAlterColSTables++;
wmmhello's avatar
wmmhello 已提交
596
        taosMemoryFreeClear(pTableMeta);
wmmhello's avatar
wmmhello 已提交
597 598 599 600
        code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
        }
601 602 603 604 605
        code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
          goto end;
        }
wmmhello's avatar
wmmhello 已提交
606
      }
wmmhello's avatar
wmmhello 已提交
607

608
      needCheckMeta = true;
wmmhello's avatar
wmmhello 已提交
609 610
      taosHashCleanup(hashTmp);
      hashTmp = NULL;
wmmhello's avatar
wmmhello 已提交
611
    } else {
X
Xiaoyu Wang 已提交
612
      uError("SML:0x%" PRIx64 " load table meta error: %s", info->id, tstrerror(code));
613
      goto end;
wmmhello's avatar
wmmhello 已提交
614
    }
615

X
Xiaoyu Wang 已提交
616 617
    if (needCheckMeta) {
      code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags,
618
                          sTableData->tags, true);
619
      if (code != TSDB_CODE_SUCCESS) {
620
        uError("SML:0x%" PRIx64 " check tag failed. super table name %s", info->id, pName.tname);
621 622
        goto end;
      }
623
      code = smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols, false);
624
      if (code != TSDB_CODE_SUCCESS) {
625
        uError("SML:0x%" PRIx64 " check cols failed. super table name %s", info->id, pName.tname);
626 627 628 629
        goto end;
      }
    }

630
    sTableData->tableMeta = pTableMeta;
wmmhello's avatar
wmmhello 已提交
631

X
Xiaoyu Wang 已提交
632
    tableMetaSml = (SSmlSTableMeta **)taosHashIterate(info->superTables, tableMetaSml);
wmmhello's avatar
wmmhello 已提交
633 634
  }
  return 0;
635

636
  end:
wmmhello's avatar
wmmhello 已提交
637 638
  taosHashCleanup(hashTmp);
  taosMemoryFreeClear(pTableMeta);
wmmhello's avatar
wmmhello 已提交
639
//  catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
640
  return code;
wmmhello's avatar
wmmhello 已提交
641 642
}

643
/******************************* parse basic type function **********************/
X
Xiaoyu Wang 已提交
644
static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
645
  const char *pVal = kvVal->value;
X
Xiaoyu Wang 已提交
646 647 648 649
  int32_t     len = kvVal->length;
  char       *endptr = NULL;
  double      result = taosStr2Double(pVal, &endptr);
  if (pVal == endptr) {
650
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
651 652 653
    return false;
  }

654
  int32_t left = len - (endptr - pVal);
X
Xiaoyu Wang 已提交
655
  if (left == 0 || (left == 3 && strncasecmp(endptr, "f64", left) == 0)) {
656 657
    kvVal->type = TSDB_DATA_TYPE_DOUBLE;
    kvVal->d = result;
X
Xiaoyu Wang 已提交
658 659
  } else if ((left == 3 && strncasecmp(endptr, "f32", left) == 0)) {
    if (!IS_VALID_FLOAT(result)) {
660 661
      smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
662
    }
663 664
    kvVal->type = TSDB_DATA_TYPE_FLOAT;
    kvVal->f = (float)result;
X
Xiaoyu Wang 已提交
665 666
  } else if ((left == 1 && *endptr == 'i') || (left == 3 && strncasecmp(endptr, "i64", left) == 0)) {
    if (smlDoubleToInt64OverFlow(result)) {
wmmhello's avatar
wmmhello 已提交
667 668
      errno = 0;
      int64_t tmp = taosStr2Int64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
669
      if (errno == ERANGE) {
wmmhello's avatar
wmmhello 已提交
670 671 672 673 674 675
        smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_BIGINT;
      kvVal->i = tmp;
      return true;
wmmhello's avatar
wmmhello 已提交
676
    }
677
    kvVal->type = TSDB_DATA_TYPE_BIGINT;
wmmhello's avatar
wmmhello 已提交
678
    kvVal->i = (int64_t)result;
wmmhello's avatar
wmmhello 已提交
679
  } else if ((left == 1 && *endptr == 'u') || (left == 3 && strncasecmp(endptr, "u64", left) == 0)) {
X
Xiaoyu Wang 已提交
680
    if (result >= (double)UINT64_MAX || result < 0) {
wmmhello's avatar
wmmhello 已提交
681 682
      errno = 0;
      uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
683
      if (errno == ERANGE || result < 0) {
wmmhello's avatar
wmmhello 已提交
684 685 686 687 688 689
        smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_UBIGINT;
      kvVal->u = tmp;
      return true;
690
    }
691
    kvVal->type = TSDB_DATA_TYPE_UBIGINT;
wmmhello's avatar
wmmhello 已提交
692
    kvVal->u = result;
X
Xiaoyu Wang 已提交
693 694
  } else if (left == 3 && strncasecmp(endptr, "i32", left) == 0) {
    if (!IS_VALID_INT(result)) {
695 696
      smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
697
    }
698 699
    kvVal->type = TSDB_DATA_TYPE_INT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
700 701
  } else if (left == 3 && strncasecmp(endptr, "u32", left) == 0) {
    if (!IS_VALID_UINT(result)) {
702 703
      smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
704
    }
705 706
    kvVal->type = TSDB_DATA_TYPE_UINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
707 708
  } else if (left == 3 && strncasecmp(endptr, "i16", left) == 0) {
    if (!IS_VALID_SMALLINT(result)) {
709 710
      smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
711
    }
712 713
    kvVal->type = TSDB_DATA_TYPE_SMALLINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
714 715
  } else if (left == 3 && strncasecmp(endptr, "u16", left) == 0) {
    if (!IS_VALID_USMALLINT(result)) {
716 717
      smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
718
    }
719 720
    kvVal->type = TSDB_DATA_TYPE_USMALLINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
721 722
  } else if (left == 2 && strncasecmp(endptr, "i8", left) == 0) {
    if (!IS_VALID_TINYINT(result)) {
723 724
      smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
725
    }
726 727
    kvVal->type = TSDB_DATA_TYPE_TINYINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
728 729
  } else if (left == 2 && strncasecmp(endptr, "u8", left) == 0) {
    if (!IS_VALID_UTINYINT(result)) {
730 731
      smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
732
    }
733 734
    kvVal->type = TSDB_DATA_TYPE_UTINYINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
735
  } else {
736
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
737 738
    return false;
  }
739
  return true;
wmmhello's avatar
wmmhello 已提交
740 741
}

wmmhello's avatar
wmmhello 已提交
742
static bool smlParseBool(SSmlKv *kvVal) {
wmmhello's avatar
wmmhello 已提交
743
  const char *pVal = kvVal->value;
X
Xiaoyu Wang 已提交
744
  int32_t     len = kvVal->length;
745
  if ((len == 1) && (pVal[0] == 't' || pVal[0] == 'T')) {
746
    kvVal->i = TSDB_TRUE;
wmmhello's avatar
wmmhello 已提交
747 748 749
    return true;
  }

750
  if ((len == 1) && (pVal[0] == 'f' || pVal[0] == 'F')) {
751
    kvVal->i = TSDB_FALSE;
wmmhello's avatar
wmmhello 已提交
752 753 754
    return true;
  }

X
Xiaoyu Wang 已提交
755
  if ((len == 4) && !strncasecmp(pVal, "true", len)) {
756
    kvVal->i = TSDB_TRUE;
wmmhello's avatar
wmmhello 已提交
757 758
    return true;
  }
X
Xiaoyu Wang 已提交
759
  if ((len == 5) && !strncasecmp(pVal, "false", len)) {
760
    kvVal->i = TSDB_FALSE;
wmmhello's avatar
wmmhello 已提交
761 762 763 764 765
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
766
static bool smlIsBinary(const char *pVal, uint16_t len) {
X
Xiaoyu Wang 已提交
767
  // binary: "abc"
wmmhello's avatar
wmmhello 已提交
768 769 770 771 772 773 774 775 776
  if (len < 2) {
    return false;
  }
  if (pVal[0] == '"' && pVal[len - 1] == '"') {
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
777
static bool smlIsNchar(const char *pVal, uint16_t len) {
X
Xiaoyu Wang 已提交
778
  // nchar: L"abc"
wmmhello's avatar
wmmhello 已提交
779 780 781
  if (len < 3) {
    return false;
  }
X
Xiaoyu Wang 已提交
782
  if ((pVal[0] == 'l' || pVal[0] == 'L') && pVal[1] == '"' && pVal[len - 1] == '"') {
wmmhello's avatar
wmmhello 已提交
783 784 785 786
    return true;
  }
  return false;
}
787
/******************************* parse basic type function end **********************/
wmmhello's avatar
wmmhello 已提交
788

789
/******************************* time function **********************/
790
static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) {
X
Xiaoyu Wang 已提交
791
  char   *endPtr = NULL;
wafwerar's avatar
wafwerar 已提交
792
  int64_t tsInt64 = taosStr2Int64(value, &endPtr, 10);
X
Xiaoyu Wang 已提交
793
  if (value + len != endPtr) {
794
    return -1;
wmmhello's avatar
wmmhello 已提交
795
  }
796
  double ts = tsInt64;
797 798
  switch (type) {
    case TSDB_TIME_PRECISION_HOURS:
wmmhello's avatar
wmmhello 已提交
799 800
      ts *= NANOSECOND_PER_HOUR;
      tsInt64 *= NANOSECOND_PER_HOUR;
801 802
      break;
    case TSDB_TIME_PRECISION_MINUTES:
wmmhello's avatar
wmmhello 已提交
803 804
      ts *= NANOSECOND_PER_MINUTE;
      tsInt64 *= NANOSECOND_PER_MINUTE;
805 806
      break;
    case TSDB_TIME_PRECISION_SECONDS:
wmmhello's avatar
wmmhello 已提交
807 808
      ts *= NANOSECOND_PER_SEC;
      tsInt64 *= NANOSECOND_PER_SEC;
809 810
      break;
    case TSDB_TIME_PRECISION_MILLI:
wmmhello's avatar
wmmhello 已提交
811 812
      ts *= NANOSECOND_PER_MSEC;
      tsInt64 *= NANOSECOND_PER_MSEC;
813 814
      break;
    case TSDB_TIME_PRECISION_MICRO:
wmmhello's avatar
wmmhello 已提交
815 816
      ts *= NANOSECOND_PER_USEC;
      tsInt64 *= NANOSECOND_PER_USEC;
817 818 819 820 821
      break;
    case TSDB_TIME_PRECISION_NANO:
      break;
    default:
      ASSERT(0);
wmmhello's avatar
wmmhello 已提交
822
  }
X
Xiaoyu Wang 已提交
823
  if (ts >= (double)INT64_MAX || ts < 0) {
824
    return -1;
wmmhello's avatar
wmmhello 已提交
825 826
  }

827
  return tsInt64;
828 829 830 831 832 833
}

static int8_t smlGetTsTypeByLen(int32_t len) {
  if (len == TSDB_TIME_PRECISION_SEC_DIGITS) {
    return TSDB_TIME_PRECISION_SECONDS;
  } else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) {
834
    return TSDB_TIME_PRECISION_MILLI;
835 836
  } else {
    return -1;
wmmhello's avatar
wmmhello 已提交
837
  }
838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856
}

static int8_t smlGetTsTypeByPrecision(int8_t precision) {
  switch (precision) {
    case TSDB_SML_TIMESTAMP_HOURS:
      return TSDB_TIME_PRECISION_HOURS;
    case TSDB_SML_TIMESTAMP_MILLI_SECONDS:
      return TSDB_TIME_PRECISION_MILLI;
    case TSDB_SML_TIMESTAMP_NANO_SECONDS:
    case TSDB_SML_TIMESTAMP_NOT_CONFIGURED:
      return TSDB_TIME_PRECISION_NANO;
    case TSDB_SML_TIMESTAMP_MICRO_SECONDS:
      return TSDB_TIME_PRECISION_MICRO;
    case TSDB_SML_TIMESTAMP_SECONDS:
      return TSDB_TIME_PRECISION_SECONDS;
    case TSDB_SML_TIMESTAMP_MINUTES:
      return TSDB_TIME_PRECISION_MINUTES;
    default:
      return -1;
wmmhello's avatar
wmmhello 已提交
857
  }
858 859
}

X
Xiaoyu Wang 已提交
860 861
static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t len) {
  if (len == 0 || (len == 1 && data[0] == '0')) {
862
    return taosGetTimestampNs();
wmmhello's avatar
wmmhello 已提交
863 864
  }

865 866 867 868
  int8_t tsType = smlGetTsTypeByPrecision(info->precision);
  if (tsType == -1) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp precision", NULL);
    return -1;
wmmhello's avatar
wmmhello 已提交
869
  }
870 871

  int64_t ts = smlGetTimeValue(data, len, tsType);
X
Xiaoyu Wang 已提交
872
  if (ts == -1) {
873 874
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
    return -1;
wmmhello's avatar
wmmhello 已提交
875
  }
876 877 878
  return ts;
}

X
Xiaoyu Wang 已提交
879 880
static int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t len) {
  if (!data) {
881 882
    smlBuildInvalidDataMsg(&info->msgBuf, "timestamp can not be null", NULL);
    return -1;
wmmhello's avatar
wmmhello 已提交
883
  }
X
Xiaoyu Wang 已提交
884
  if (len == 1 && data[0] == '0') {
885 886
    return taosGetTimestampNs();
  }
887 888
  int8_t tsType = smlGetTsTypeByLen(len);
  if (tsType == -1) {
X
Xiaoyu Wang 已提交
889 890
    smlBuildInvalidDataMsg(&info->msgBuf,
                           "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", data);
891 892 893
    return -1;
  }
  int64_t ts = smlGetTimeValue(data, len, tsType);
X
Xiaoyu Wang 已提交
894
  if (ts == -1) {
895 896 897 898 899 900
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
    return -1;
  }
  return ts;
}

X
Xiaoyu Wang 已提交
901
static int32_t smlParseTS(SSmlHandle *info, const char *data, int32_t len, SArray *cols) {
902
  int64_t ts = 0;
X
Xiaoyu Wang 已提交
903
  if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
904
    //    uError("SML:data:%s,len:%d", data, len);
905
    ts = smlParseInfluxTime(info, data, len);
X
Xiaoyu Wang 已提交
906
  } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
907
    ts = smlParseOpenTsdbTime(info, data, len);
X
Xiaoyu Wang 已提交
908
  } else {
909
    ASSERT(0);
910
  }
wmmhello's avatar
wmmhello 已提交
911
  uDebug("SML:0x%" PRIx64 " smlParseTS:%" PRId64, info->id, ts);
912

913 914 915 916
  if (ts <= 0) {
    uError("SML:0x%" PRIx64 " smlParseTS error:%" PRId64, info->id, ts);
    return TSDB_CODE_INVALID_TIMESTAMP;
  }
917 918

  // add ts to
919 920 921 922 923 924
  SSmlKv kv = { .key = TS, .keyLen = TS_LEN, .i = ts, .type = TSDB_DATA_TYPE_TIMESTAMP, .length = (int16_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
  if(info->dataFormat){
    kv.i = convertTimePrecision(kv.i, TSDB_TIME_PRECISION_NANO, info->currSTableMeta->tableInfo.precision);
    smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0);
  }else{
    taosArraySet(cols, 0, &kv);
925 926
  }

927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998
  return TSDB_CODE_SUCCESS;
}
/******************************* time function end **********************/

/******************************* Sml struct related function **********************/
static SSmlTableInfo *smlBuildTableInfo(int numRows, const char* measure, int32_t measureLen) {
  SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
  if (!tag) {
    return NULL;
  }

  tag->sTableName = measure;
  tag->sTableNameLen = measureLen;

  tag->cols = taosArrayInit(numRows, POINTER_BYTES);
  if (tag->cols == NULL) {
    uError("SML:smlBuildTableInfo failed to allocate memory");
    goto cleanup;
  }

  tag->tags = taosArrayInit(16, sizeof(SSmlKv));
  if (tag->tags == NULL) {
    uError("SML:smlBuildTableInfo failed to allocate memory");
    goto cleanup;
  }
  return tag;

  cleanup:
  taosMemoryFree(tag);
  return NULL;
}

static int32_t smlCheckDupUnit(SHashObj *dumplicateKey, SArray *tags, SSmlMsgBuf *msg){
  for(int i = 0; i < taosArrayGetSize(tags); i++) {
    SSmlKv *tag = taosArrayGet(tags, i);
    if (smlCheckDuplicateKey(tag->key, tag->keyLen, dumplicateKey)) {
      smlBuildInvalidDataMsg(msg, "dumplicate key", tag->key);
      return TSDB_CODE_TSC_DUP_NAMES;
    }
  }
  return TSDB_CODE_SUCCESS;
}

static int32_t smlJudgeDupColName(SArray *cols, SArray *tags, SSmlMsgBuf *msg) {
  SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  int ret = smlCheckDupUnit(dumplicateKey, cols, msg);
  if(ret != TSDB_CODE_SUCCESS){
    goto end;
  }
  ret = smlCheckDupUnit(dumplicateKey, tags, msg);
  if(ret != TSDB_CODE_SUCCESS){
    goto end;
  }

  end:
  taosHashCleanup(dumplicateKey);
  return ret;
}

static int32_t smlParseTableName(SArray *tags, char *childTableName) {
  size_t      childTableNameLen = strlen(tsSmlChildTableName);
  if (childTableNameLen <= 0) return TSDB_CODE_SUCCESS;

  for(int i = 0; i < taosArrayGetSize(tags); i++){
    SSmlKv *tag = taosArrayGet(tags, i);
    // handle child table name
    if (childTableNameLen == tag->keyLen && strncmp(tag->key, tsSmlChildTableName, tag->keyLen) == 0) {
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, tag->value, (tag->length < TSDB_TABLE_NAME_LEN ? tag->length : TSDB_TABLE_NAME_LEN));
      break;
    }
  }
wmmhello's avatar
wmmhello 已提交
999

1000 1001 1002
  return TSDB_CODE_SUCCESS;
}

1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125
static int32_t smlSetCTableName(SSmlTableInfo *oneTable){
  smlParseTableName(oneTable->tags, oneTable->childTableName);

  if (strlen(oneTable->childTableName) == 0) {
    RandTableName rName = {oneTable->tags, oneTable->sTableName, (uint8_t)oneTable->sTableNameLen,
                           oneTable->childTableName, 0};

    buildChildTableName(&rName);
    oneTable->uid = rName.uid;
  } else {
    oneTable->uid = *(uint64_t *)(oneTable->childTableName);
  }
  return TSDB_CODE_SUCCESS;
}

static SSmlSTableMeta *smlBuildSTableMeta(bool isDataFormat) {
  SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
  if (!meta) {
    return NULL;
  }

  if(unlikely(!isDataFormat)){
    meta->tagHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
    if (meta->tagHash == NULL) {
      uError("SML:smlBuildSTableMeta failed to allocate memory");
      goto cleanup;
    }

    meta->colHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
    if (meta->colHash == NULL) {
      uError("SML:smlBuildSTableMeta failed to allocate memory");
      goto cleanup;
    }
  }

  meta->tags = taosArrayInit(32, sizeof(SSmlKv));
  if (meta->tags == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }

  meta->cols = taosArrayInit(32, sizeof(SSmlKv));
  if (meta->cols == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }
  return meta;

  cleanup:
  taosMemoryFree(meta);
  return NULL;
}

static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols){
  for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
    taosArrayPush(metaArray, &kv);
    if(unlikely(metaHash != NULL)) {
      taosHashPut(metaHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
    }
  }
}

bool smlFormatJudge(SHashObj* superTableKeyStr, void** preLineKeys, void* currentLineKeys,
                    SSmlLineInfo *currElements, SSmlLineInfo *preElements, int32_t len){
  if(*preLineKeys == NULL){
    *preLineKeys = taosMemoryMalloc(len);
    varDataCopy(*preLineKeys, currentLineKeys);
    return true;
  }

  // same measure
  if(preElements->measureLen == currElements->measureLen
     && memcmp(preElements->measure, currElements->measure, currElements->measureLen) == 0){
    if(varDataTLen(preLineKeys) != varDataTLen(currentLineKeys)
       || memcmp(preLineKeys, currentLineKeys, varDataTLen(preLineKeys)) != 0){
      return false;
    }
  }else{  // diff measure
    void *keyStr = taosHashGet(superTableKeyStr, currElements->measure, currElements->measureLen);
    if(unlikely(keyStr == NULL)){
      keyStr = taosMemoryMalloc(len);
      varDataCopy(keyStr, currentLineKeys);
      taosHashPut(superTableKeyStr, currElements->measure, currElements->measureLen, &keyStr, POINTER_BYTES);
    }else{
      if(varDataTLen(keyStr) != varDataTLen(currentLineKeys)
         && memcmp(keyStr, currentLineKeys, varDataTLen(currentLineKeys)) != 0){
        return false;
      }
    }
  }

  return true;
}

static STableMeta* smlGetMeta(SSmlHandle *info, const void* measure, int32_t measureLen){
  STableMeta *pTableMeta = NULL;

  SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
  tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));

  SRequestConnInfo conn = {0};
  conn.pTrans = info->taos->pAppInfo->pTransporter;
  conn.requestId = info->pRequest->requestId;
  conn.requestObjRefId = info->pRequest->self;
  conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
  memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
  memcpy(pName.tname, measure, measureLen);

  catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
  return pTableMeta;
}

static void smlDestroySTableMeta(SSmlSTableMeta *meta) {
  taosHashCleanup(meta->tagHash);
  taosHashCleanup(meta->colHash);
  taosArrayDestroy(meta->tags);
  taosArrayDestroy(meta->cols);
  taosMemoryFree(meta->tableMeta);
  taosMemoryFree(meta);
}
/******************************* Sml struct related function end **********************/

wmmhello's avatar
wmmhello 已提交
1126
static int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
X
Xiaoyu Wang 已提交
1127
  // binary
wmmhello's avatar
wmmhello 已提交
1128
  if (smlIsBinary(pVal->value, pVal->length)) {
1129
    pVal->type = TSDB_DATA_TYPE_BINARY;
wmmhello's avatar
wmmhello 已提交
1130
    pVal->length -= BINARY_ADD_LEN;
1131
    if (pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
wmmhello's avatar
wmmhello 已提交
1132 1133
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }
1134
    pVal->value += (BINARY_ADD_LEN - 1);
wmmhello's avatar
wmmhello 已提交
1135
    return TSDB_CODE_SUCCESS;
1136
  }
X
Xiaoyu Wang 已提交
1137
  // nchar
wmmhello's avatar
wmmhello 已提交
1138
  if (smlIsNchar(pVal->value, pVal->length)) {
1139
    pVal->type = TSDB_DATA_TYPE_NCHAR;
wmmhello's avatar
wmmhello 已提交
1140
    pVal->length -= NCHAR_ADD_LEN;
1141
    if (pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
wmmhello's avatar
wmmhello 已提交
1142 1143
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }
1144
    pVal->value += (NCHAR_ADD_LEN - 1);
wmmhello's avatar
wmmhello 已提交
1145
    return TSDB_CODE_SUCCESS;
1146 1147
  }

X
Xiaoyu Wang 已提交
1148
  // bool
1149 1150 1151
  if (smlParseBool(pVal)) {
    pVal->type = TSDB_DATA_TYPE_BOOL;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
wmmhello's avatar
wmmhello 已提交
1152
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1153
  }
X
Xiaoyu Wang 已提交
1154
  // number
1155
  if (smlParseNumber(pVal, msg)) {
wmmhello's avatar
wmmhello 已提交
1156
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
wmmhello's avatar
wmmhello 已提交
1157
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1158 1159
  }

wmmhello's avatar
wmmhello 已提交
1160
  return TSDB_CODE_TSC_INVALID_VALUE;
wmmhello's avatar
wmmhello 已提交
1161 1162
}

1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481
static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd,
                          SSmlLineInfo* currElement, bool isTag){
  bool isSameMeasure = false;
  bool isSameCTable = false;
  int   cnt = 0;
  void *keyStr = NULL;
  bool isPreLineKVNULL = false;
  SArray *preLineKV = NULL;
  bool    isSuperKVInit = false;
  SArray *superKV = NULL;
  if(info->dataFormat){
    if(currElement->measureLen == info->preLine.measureLen
       && memcmp(currElement->measure, info->preLine.measure, currElement->measureLen) == 0){
      isSameMeasure = true;
    }

    if(!isSameMeasure){
      SSmlSTableMeta *sMeta = NULL;
      SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen);
      if(tableMeta == NULL){
        SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat);
        STableMeta * pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen);
        meta->tableMeta = pTableMeta;
        if(pTableMeta == NULL){
          info->dataFormat = false;
          info->reRun      = true;
          return TSDB_CODE_SUCCESS;
        }
        taosHashPut(info->superTables, currElement->measure, currElement->measureLen, &meta, POINTER_BYTES);
        sMeta = meta;
      }else{
        sMeta = *tableMeta;
      }
      info->currSTableMeta = sMeta->tableMeta;

      if(isTag){
        superKV = sMeta->tags;
      }else{
        superKV = sMeta->cols;
      }
      if(unlikely(taosArrayGetSize(superKV) == 0)){
        isSuperKVInit = true;
      }
    }

    if(currElement->measureTagsLen == info->preLine.measureTagsLen
       && memcmp(currElement->measure, info->preLine.measure, currElement->measureTagsLen) == 0){
      isSameCTable = true;
      if(isTag) return TSDB_CODE_SUCCESS;
    }else if(!isTag){
      SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashGet(info->childTables, currElement->measure, currElement->measureTagsLen);
      if (unlikely(oneTable == NULL)) {
        smlBuildInvalidDataMsg(&info->msgBuf, "child table should inside", currElement->measure);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      info->currTableDataCtx = (*oneTable)->tableDataCtx;
    }

    if(isTag){
      // prepare for judging if tag or col is the same for each line
      if(unlikely(info->currentLineTagKeys == NULL)){   // sml todo size need remalloc
        info->currentLineTagKeys = taosMemoryMalloc(sqlEnd - *sql);
      }
      keyStr = info->preLineTagKeys;

      if(info->preLineTagKV == NULL){
        info->preLineTagKV = taosArrayInit(8, sizeof(SSmlKv));
        isPreLineKVNULL = true;
      }
      preLineKV = info->preLineTagKV;
    }else{
      if(unlikely(info->currentLineColKeys == NULL)){   // sml todo size need remalloc
        info->currentLineColKeys = taosMemoryMalloc(sqlEnd - *sql);
      }
      keyStr = info->preLineColKeys;

      if(info->preLineColKV == NULL){
        info->preLineColKV = taosArrayInit(8, sizeof(SSmlKv));
        isPreLineKVNULL = true;
      }
      preLineKV = info->preLineColKV;
    }

    if(!isSameMeasure){
      taosArraySetSize(preLineKV, 0);
    }
    varDataLen(keyStr) = 0; // clear keys
  }

  while (*sql < sqlEnd) {
    if (IS_SPACE(*sql)) {
      break;
    }

    // parse key
    const char *key = *sql;
    int32_t     keyLen = 0;
    while (*sql < sqlEnd) {

      if (IS_COMMA(*sql)) {
        smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      if (IS_EQUAL(*sql)) {
        keyLen = *sql - key;
        (*sql)++;
        break;
      }
      (*sql)++;
    }

    if (IS_INVALID_COL_LEN(keyLen)) {
      smlBuildInvalidDataMsg(&info->msgBuf, "invalid key or key is too long than 64", key);
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
    }

    memcpy(keyStr + varDataTLen(keyStr), key, keyLen + 1); // use = symbol
    varDataLen(keyStr) += keyLen + 1;

    // parse value
    const char *value = *sql;
    int32_t     valueLen = 0;
    bool        isInQuote = false;
    while (*sql < sqlEnd) {
      // parse value
      if (!isTag && IS_QUOTE(*sql)) {
        isInQuote = !isInQuote;
        (*sql)++;
        continue;
      }
      if (!isInQuote && IS_COMMA(*sql)) {
        break;
      }
      if (!isInQuote && IS_EQUAL(*sql)) {
        smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      (*sql)++;
    }
    valueLen = *sql - value;

    if (isInQuote) {
      smlBuildInvalidDataMsg(&info->msgBuf, "only one quote", value);
      return TSDB_CODE_SML_INVALID_DATA;
    }
    if (valueLen == 0) {
      smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", value);
      return TSDB_CODE_SML_INVALID_DATA;
    }
    PROCESS_SLASH(key, keyLen)
    PROCESS_SLASH(value, valueLen)

    (*sql)++;

    SSmlKv kv = {.key = key, .keyLen = keyLen, .value = value, .length = valueLen};
    if (!isTag) {
      int32_t ret = smlParseValue(&kv, &info->msgBuf);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
    } else {
      if (valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
        return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
      }
      kv.type = TSDB_DATA_TYPE_NCHAR;
    }

    if(info->dataFormat){
      if(!isTag && cnt + 1 > info->currSTableMeta->tableInfo.numOfColumns){
        smlBuildInvalidDataMsg(&info->msgBuf, "col more than meta", NULL);
        return TSDB_CODE_PAR_TOO_MANY_COLUMNS;
      }
      if(isTag && cnt + 1 > info->currSTableMeta->tableInfo.numOfTags){
        smlBuildInvalidDataMsg(&info->msgBuf, "tag more than meta", NULL);
        return TSDB_CODE_PAR_TOO_MANY_COLUMNS;
      }
      // bind data
      int ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, cnt + 1);
      if (ret != TSDB_CODE_SUCCESS) {
        smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
        return ret;
      }

      do {
        if(isPreLineKVNULL){
          taosArrayPush(preLineKV, &kv);
          break;
        }

        if(isSameMeasure){
          if(cnt >= taosArrayGetSize(preLineKV)) {
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }
          SSmlKv *preKV = taosArrayGet(preLineKV, cnt);
          if(!isTag && kv.type != preKV->type){
            info->dataFormat = false;
            info->reRun      = true;
            return TSDB_CODE_SUCCESS;
          }

          if(IS_VAR_DATA_TYPE(kv.type) && kv.length > preKV->length){
            preKV->length = kv.length;
            SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen);
            if(tableMeta == NULL){
              smlBuildInvalidDataMsg(&info->msgBuf, "measure should has inside", value);
              return TSDB_CODE_SML_INVALID_DATA;
            }

            if(isTag){
              superKV = (*tableMeta)->tags;
            }else{
              superKV = (*tableMeta)->cols;
            }
            SSmlKv *oldKV = taosArrayGet(superKV, cnt);
            oldKV->length = kv.length;
          }
        }else{
          if(isSuperKVInit){
            taosArrayPush(superKV, &kv);
          }else{
            if(cnt >= taosArrayGetSize(superKV)) {
              info->dataFormat = false;
              info->reRun      = true;
              return TSDB_CODE_SUCCESS;
            }
            SSmlKv *preKV = taosArrayGet(superKV, cnt);
            if(!isTag && kv.type != preKV->type){
              info->dataFormat = false;
              info->reRun      = true;
              return TSDB_CODE_SUCCESS;
            }

            if(IS_VAR_DATA_TYPE(kv.type)){
              if(kv.length > preKV->length) {
                preKV->length = kv.length;
              }else{
                kv.length = preKV->length;
              }
            }
          }
          taosArrayPush(preLineKV, &kv);
        }
        cnt++;
        break;
      }while(0);
    }

    if(!info->dataFormat && !isTag){
      if(currElement->colArray == NULL){
        currElement->colArray = taosArrayInit(16, sizeof(SSmlKv));
        taosArraySetSize(currElement->colArray, 1);
      }
      taosArrayPush(currElement->colArray, &kv);   //reserve for timestamp
    }
  }

  if(isTag && taosArrayGetSize(preLineKV) > TSDB_MAX_TAGS){
    smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
    return TSDB_CODE_PAR_INVALID_TAGS_NUM;
  }

  if(info->dataFormat){
    if(isTag){
      info->dataFormat = smlFormatJudge(info->superTableTagKeyStr, &info->preLineTagKeys,
                                        info->currentLineTagKeys, currElement, &info->preLine, sqlEnd - currElement->tags);
      if(!info->dataFormat) {
        info->reRun = true;
        return TSDB_CODE_SUCCESS;
      }
      if(!isSameCTable){
        if(taosArrayGetSize(preLineKV) > TSDB_MAX_TAGS){
          smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
          return TSDB_CODE_PAR_INVALID_TAGS_NUM;
        }

        void* oneTable = taosHashGet(info->childTables, currElement->measure, currElement->measureTagsLen);
        if (unlikely(oneTable == NULL)) {
          SSmlTableInfo *tinfo = smlBuildTableInfo(1, currElement->measure, currElement->measureLen);
          if (!tinfo) {
            return TSDB_CODE_OUT_OF_MEMORY;
          }
          for(int i = 0; i < taosArrayGetSize(preLineKV); i++){
            taosArrayPush(tinfo->tags, taosArrayGet(preLineKV, i));
          }
          smlSetCTableName(tinfo);
          info->currSTableMeta->uid = tinfo->uid;
          tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
          taosHashPut(info->childTables, currElement->measure, currElement->measureTagsLen, &tinfo, POINTER_BYTES);
        }
      }
    }else{
      info->dataFormat = smlFormatJudge(info->superTableColKeyStr, &info->preLineColKeys,
                                        info->currentLineColKeys, currElement, &info->preLine, sqlEnd - currElement->cols);
      if(!info->dataFormat) {
        info->reRun = true;
        return TSDB_CODE_SUCCESS;
      }
    }
  }else{
    void* oneTable = taosHashGet(info->childTables, currElement->measure, currElement->measureTagsLen);
    if (unlikely(oneTable == NULL)) {
      SSmlTableInfo *tinfo = smlBuildTableInfo(info->affectedRows / 2, currElement->measure, currElement->measureLen);
      if (!tinfo) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      for(int i = 0; i < taosArrayGetSize(preLineKV); i++){
        taosArrayPush(tinfo->tags, taosArrayGet(preLineKV, i));
      }
      smlSetCTableName(tinfo);
      taosHashPut(info->childTables, currElement->measure, currElement->measureTagsLen, &tinfo, POINTER_BYTES);
    }
  }

  return TSDB_CODE_SUCCESS;
}

static int32_t smlParseInfluxString(SSmlHandle *info, const char *sql, const char *sqlEnd, SSmlLineInfo *elements) {
X
Xiaoyu Wang 已提交
1482
  if (!sql) return TSDB_CODE_SML_INVALID_DATA;
wmmhello's avatar
wmmhello 已提交
1483
  JUMP_SPACE(sql, sqlEnd)
X
Xiaoyu Wang 已提交
1484
  if (*sql == COMMA) return TSDB_CODE_SML_INVALID_DATA;
wmmhello's avatar
wmmhello 已提交
1485
  elements->measure = sql;
wmmhello's avatar
wmmhello 已提交
1486

wmmhello's avatar
wmmhello 已提交
1487
  // parse measure
wmmhello's avatar
wmmhello 已提交
1488
  while (sql < sqlEnd) {
X
Xiaoyu Wang 已提交
1489
    if ((sql != elements->measure) && IS_SLASH_LETTER(sql)) {
wmmhello's avatar
wmmhello 已提交
1490 1491
      MOVE_FORWARD_ONE(sql, sqlEnd - sql);
      sqlEnd--;
wmmhello's avatar
wmmhello 已提交
1492 1493
      continue;
    }
X
Xiaoyu Wang 已提交
1494
    if (IS_COMMA(sql)) {
wmmhello's avatar
wmmhello 已提交
1495 1496 1497
      break;
    }

X
Xiaoyu Wang 已提交
1498
    if (IS_SPACE(sql)) {
wmmhello's avatar
wmmhello 已提交
1499 1500
      break;
    }
wmmhello's avatar
wmmhello 已提交
1501 1502
    sql++;
  }
wmmhello's avatar
wmmhello 已提交
1503
  elements->measureLen = sql - elements->measure;
X
Xiaoyu Wang 已提交
1504
  if (IS_INVALID_TABLE_LEN(elements->measureLen)) {
1505
    smlBuildInvalidDataMsg(&info->msgBuf, "measure is empty or too large than 192", NULL);
wmmhello's avatar
wmmhello 已提交
1506
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
wmmhello's avatar
wmmhello 已提交
1507
  }
wmmhello's avatar
wmmhello 已提交
1508

1509 1510 1511 1512 1513 1514 1515 1516 1517 1518
  // to get measureTagsLen before
  const char* tmp = sql;
  while (tmp < sqlEnd){
    if (IS_SPACE(tmp)) {
      break;
    }
    tmp++;
  }
  elements->measureTagsLen = tmp - elements->measure;

wmmhello's avatar
wmmhello 已提交
1519
  // parse tag
X
Xiaoyu Wang 已提交
1520
  if (*sql == SPACE) {
wmmhello's avatar
wmmhello 已提交
1521
    elements->tagsLen = 0;
X
Xiaoyu Wang 已提交
1522 1523
  } else {
    if (*sql == COMMA) sql++;
wmmhello's avatar
wmmhello 已提交
1524
    elements->tags = sql;
1525 1526 1527 1528 1529

    // tinfo != NULL means child table has never occur before
    int ret = smlParseKv(info, &sql, sqlEnd, elements, true);
    if(ret != TSDB_CODE_SUCCESS){
      return ret;
wmmhello's avatar
wmmhello 已提交
1530
    }
1531 1532 1533 1534 1535 1536
    sql = elements->measure + elements->measureTagsLen;

    if(info->reRun){
      return TSDB_CODE_SUCCESS;
    }

wmmhello's avatar
wmmhello 已提交
1537
    elements->tagsLen = sql - elements->tags;
1538
  }
wmmhello's avatar
wmmhello 已提交
1539

wmmhello's avatar
wmmhello 已提交
1540
  // parse cols
wmmhello's avatar
wmmhello 已提交
1541
  JUMP_SPACE(sql, sqlEnd)
wmmhello's avatar
wmmhello 已提交
1542
  elements->cols = sql;
1543 1544 1545 1546

  int ret = smlParseKv(info, &sql, sqlEnd, elements, false);
  if(ret != TSDB_CODE_SUCCESS){
    return ret;
wmmhello's avatar
wmmhello 已提交
1547
  }
1548 1549 1550

  if(info->reRun){
    return TSDB_CODE_SUCCESS;
1551
  }
1552

wmmhello's avatar
wmmhello 已提交
1553
  elements->colsLen = sql - elements->cols;
X
Xiaoyu Wang 已提交
1554
  if (elements->colsLen == 0) {
1555
    smlBuildInvalidDataMsg(&info->msgBuf, "cols is empty", NULL);
wmmhello's avatar
wmmhello 已提交
1556 1557
    return TSDB_CODE_SML_INVALID_DATA;
  }
wmmhello's avatar
wmmhello 已提交
1558

wmmhello's avatar
wmmhello 已提交
1559
  // parse timestamp
wmmhello's avatar
wmmhello 已提交
1560
  JUMP_SPACE(sql, sqlEnd)
wmmhello's avatar
wmmhello 已提交
1561
  elements->timestamp = sql;
wmmhello's avatar
wmmhello 已提交
1562 1563
  while (sql < sqlEnd) {
    if (isspace(*sql)) {
wmmhello's avatar
wmmhello 已提交
1564 1565 1566 1567
      break;
    }
    sql++;
  }
wmmhello's avatar
wmmhello 已提交
1568
  elements->timestampLen = sql - elements->timestamp;
wmmhello's avatar
wmmhello 已提交
1569

1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580
  ret = smlParseTS(info, elements->timestamp, elements->timestampLen, elements->colArray);
  if (ret != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " smlParseTS failed", info->id);
    return ret;
  }

  if(info->dataFormat){
    smlBuildRow(info->currTableDataCtx);
    info->preLine = *elements;
  }

wmmhello's avatar
wmmhello 已提交
1581 1582 1583
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
1584 1585
static void smlParseTelnetElement(const char **sql, const char *sqlEnd, const char **data, int32_t *len) {
  while (*sql < sqlEnd) {
X
Xiaoyu Wang 已提交
1586
    if (**sql != SPACE && !(*data)) {
1587
      *data = *sql;
X
Xiaoyu Wang 已提交
1588
    } else if (**sql == SPACE && *data) {
1589 1590 1591 1592 1593 1594 1595
      *len = *sql - *data;
      break;
    }
    (*sql)++;
  }
}

dengyihao's avatar
dengyihao 已提交
1596 1597 1598
static int32_t smlParseTelnetTags(const char *data, const char *sqlEnd, SArray *cols, char *childTableName,
                                  SHashObj *dumplicateKey, SSmlMsgBuf *msg) {
  if (!cols) return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
1599
  const char *sql = data;
X
Xiaoyu Wang 已提交
1600
  size_t      childTableNameLen = strlen(tsSmlChildTableName);
wmmhello's avatar
wmmhello 已提交
1601 1602
  while (sql < sqlEnd) {
    JUMP_SPACE(sql, sqlEnd)
X
Xiaoyu Wang 已提交
1603
    if (*sql == '\0') break;
wmmhello's avatar
wmmhello 已提交
1604

wmmhello's avatar
wmmhello 已提交
1605
    const char *key = sql;
X
Xiaoyu Wang 已提交
1606
    int32_t     keyLen = 0;
wmmhello's avatar
wmmhello 已提交
1607 1608

    // parse key
wmmhello's avatar
wmmhello 已提交
1609
    while (sql < sqlEnd) {
X
Xiaoyu Wang 已提交
1610
      if (*sql == SPACE) {
wmmhello's avatar
wmmhello 已提交
1611 1612 1613
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
X
Xiaoyu Wang 已提交
1614
      if (*sql == EQUAL) {
wmmhello's avatar
wmmhello 已提交
1615 1616
        keyLen = sql - key;
        sql++;
1617 1618
        break;
      }
wmmhello's avatar
wmmhello 已提交
1619
      sql++;
1620
    }
wmmhello's avatar
wmmhello 已提交
1621

X
Xiaoyu Wang 已提交
1622
    if (IS_INVALID_COL_LEN(keyLen)) {
1623
      smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
wmmhello's avatar
wmmhello 已提交
1624
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
1625
    }
X
Xiaoyu Wang 已提交
1626
    if (smlCheckDuplicateKey(key, keyLen, dumplicateKey)) {
1627
      smlBuildInvalidDataMsg(msg, "dumplicate key", key);
wmmhello's avatar
wmmhello 已提交
1628
      return TSDB_CODE_TSC_DUP_NAMES;
1629 1630 1631
    }

    // parse value
wmmhello's avatar
wmmhello 已提交
1632
    const char *value = sql;
X
Xiaoyu Wang 已提交
1633
    int32_t     valueLen = 0;
wmmhello's avatar
wmmhello 已提交
1634
    while (sql < sqlEnd) {
wmmhello's avatar
wmmhello 已提交
1635 1636
      // parse value
      if (*sql == SPACE) {
1637 1638
        break;
      }
wmmhello's avatar
wmmhello 已提交
1639 1640 1641 1642 1643
      if (*sql == EQUAL) {
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      sql++;
1644
    }
wmmhello's avatar
wmmhello 已提交
1645
    valueLen = sql - value;
wmmhello's avatar
wmmhello 已提交
1646

X
Xiaoyu Wang 已提交
1647
    if (valueLen == 0) {
1648
      smlBuildInvalidDataMsg(msg, "invalid value", value);
wmmhello's avatar
wmmhello 已提交
1649
      return TSDB_CODE_TSC_INVALID_VALUE;
1650
    }
wmmhello's avatar
wmmhello 已提交
1651

X
Xiaoyu Wang 已提交
1652 1653
    // handle child table name
    if (childTableNameLen != 0 && strncmp(key, tsSmlChildTableName, keyLen) == 0) {
1654 1655 1656 1657 1658
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, value, (valueLen < TSDB_TABLE_NAME_LEN ? valueLen : TSDB_TABLE_NAME_LEN));
      continue;
    }

1659
    if (valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
wmmhello's avatar
wmmhello 已提交
1660 1661 1662
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }

1663 1664
    // add kv to SSmlKv
    SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1665
    if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
1666 1667 1668
    kv->key = key;
    kv->keyLen = keyLen;
    kv->value = value;
wmmhello's avatar
wmmhello 已提交
1669
    kv->length = valueLen;
wmmhello's avatar
wmmhello 已提交
1670
    kv->type = TSDB_DATA_TYPE_NCHAR;
1671

wmmhello's avatar
wmmhello 已提交
1672
    taosArrayPush(cols, &kv);
1673 1674 1675 1676
  }

  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
1677

1678
// format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
dengyihao's avatar
dengyihao 已提交
1679 1680
static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, const char *sqlEnd, SSmlTableInfo *tinfo,
                                    SArray *cols) {
X
Xiaoyu Wang 已提交
1681
  if (!sql) return TSDB_CODE_SML_INVALID_DATA;
1682 1683

  // parse metric
wmmhello's avatar
wmmhello 已提交
1684
  smlParseTelnetElement(&sql, sqlEnd, &tinfo->sTableName, &tinfo->sTableNameLen);
1685
  if (!(tinfo->sTableName) || IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
1686
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
wmmhello's avatar
wmmhello 已提交
1687
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
1688 1689 1690 1691
  }

  // parse timestamp
  const char *timestamp = NULL;
X
Xiaoyu Wang 已提交
1692
  int32_t     tLen = 0;
wmmhello's avatar
wmmhello 已提交
1693
  smlParseTelnetElement(&sql, sqlEnd, &timestamp, &tLen);
1694 1695 1696 1697 1698 1699 1700 1701
  if (!timestamp || tLen == 0) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
    return TSDB_CODE_SML_INVALID_DATA;
  }

  int32_t ret = smlParseTS(info, timestamp, tLen, cols);
  if (ret != TSDB_CODE_SUCCESS) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
wmmhello's avatar
wmmhello 已提交
1702
    return ret;
1703 1704 1705 1706
  }

  // parse value
  const char *value = NULL;
1707 1708 1709 1710 1711 1712
  int32_t     valueLen = 0;
  smlParseTelnetElement(&sql, sqlEnd, &value, &valueLen);
  if (!value || valueLen == 0) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql);
    return TSDB_CODE_TSC_INVALID_VALUE;
  }
1713

1714 1715 1716 1717 1718 1719 1720 1721 1722 1723
  SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
  if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
  taosArrayPush(cols, &kv);
  kv->key = VALUE;
  kv->keyLen = VALUE_LEN;
  kv->value = value;
  kv->length = valueLen;
  if ((ret = smlParseValue(kv, &info->msgBuf)) != TSDB_CODE_SUCCESS) {
    return ret;
  }
1724

1725 1726 1727 1728 1729
  // parse tags
  ret = smlParseTelnetTags(sql, sqlEnd, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
  if (ret != TSDB_CODE_SUCCESS) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
    return ret;
wmmhello's avatar
wmmhello 已提交
1730
  }
wmmhello's avatar
wmmhello 已提交
1731

wmmhello's avatar
wmmhello 已提交
1732 1733 1734
  return TSDB_CODE_SUCCESS;
}

1735
static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, bool isTag, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
1736
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
1737
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
wmmhello's avatar
wmmhello 已提交
1738

wmmhello's avatar
wmmhello 已提交
1739
    int16_t *index = (int16_t *)taosHashGet(metaHash, kv->key, kv->keyLen);
X
Xiaoyu Wang 已提交
1740
    if (index) {
1741 1742 1743 1744 1745 1746 1747 1748
      SSmlKv *value = (SSmlKv *)taosArrayGet(metaArray, *index);
      if (isTag){
        if (kv->length > value->length) {
          value->length = kv->length;
        }
        continue;
      }
      if (kv->type != value->type) {
wmmhello's avatar
wmmhello 已提交
1749
        smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
wmmhello's avatar
wmmhello 已提交
1750
        return TSDB_CODE_SML_NOT_SAME_TYPE;
1751 1752 1753 1754
      }

      if (IS_VAR_DATA_TYPE(kv->type) && (kv->length > value->length)) {  // update string len, if bigger
        value->length = kv->length;
1755
      }
X
Xiaoyu Wang 已提交
1756
    } else {
wmmhello's avatar
wmmhello 已提交
1757 1758 1759 1760 1761
      size_t tmp = taosArrayGetSize(metaArray);
      ASSERT(tmp <= INT16_MAX);
      int16_t size = tmp;
      taosArrayPush(metaArray, &kv);
      taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
1762
    }
wmmhello's avatar
wmmhello 已提交
1763
  }
wmmhello's avatar
wmmhello 已提交
1764

wmmhello's avatar
wmmhello 已提交
1765
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1766 1767
}

X
Xiaoyu Wang 已提交
1768
static void smlDestroyTableInfo(SSmlHandle *info, SSmlTableInfo *tag) {
1769 1770 1771
  for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
    SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i);
    taosHashCleanup(kvHash);
wmmhello's avatar
wmmhello 已提交
1772
  }
1773

1774 1775 1776 1777 1778
  taosArrayDestroy(tag->cols);
  taosArrayDestroy(tag->tags);
  taosMemoryFree(tag);
}

1779
static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
1780
  SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
1781
  if (!kvHash) {
1782
    uError("SML:smlDealCols failed to allocate memory");
1783
    return TSDB_CODE_OUT_OF_MEMORY;
1784
  }
X
Xiaoyu Wang 已提交
1785
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
1786
    SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
wmmhello's avatar
wmmhello 已提交
1787
    taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
1788 1789
  }

1790
  taosArrayPush(colsArray, &kvHash);
1791 1792 1793
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1794 1795
static void smlDestroyInfo(SSmlHandle *info) {
  if (!info) return;
1796 1797 1798
  qDestroyQuery(info->pQuery);

  // destroy info->childTables
X
Xiaoyu Wang 已提交
1799
  void **p1 = (void **)taosHashIterate(info->childTables, NULL);
1800
  while (p1) {
X
Xiaoyu Wang 已提交
1801 1802
    smlDestroyTableInfo(info, (SSmlTableInfo *)(*p1));
    p1 = (void **)taosHashIterate(info->childTables, p1);
1803 1804 1805 1806
  }
  taosHashCleanup(info->childTables);

  // destroy info->superTables
X
Xiaoyu Wang 已提交
1807
  p1 = (void **)taosHashIterate(info->superTables, NULL);
1808
  while (p1) {
X
Xiaoyu Wang 已提交
1809 1810
    smlDestroySTableMeta((SSmlSTableMeta *)(*p1));
    p1 = (void **)taosHashIterate(info->superTables, p1);
1811 1812 1813 1814 1815
  }
  taosHashCleanup(info->superTables);

  // destroy info->pVgHash
  taosHashCleanup(info->pVgHash);
wmmhello's avatar
wmmhello 已提交
1816
  destroyRequest(info->pRequest);
wmmhello's avatar
wmmhello 已提交
1817

1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830
  taosHashCleanup(info->superTableTagKeyStr);
  taosHashCleanup(info->superTableColKeyStr);
  taosMemoryFree(info->currentLineTagKeys);
  taosMemoryFree(info->preLineTagKeys);
  taosMemoryFree(info->currentLineColKeys);
  taosMemoryFree(info->preLineColKeys);
  taosArrayDestroy(info->preLineTagKV);
  taosArrayDestroy(info->preLineColKV);

  for(int i = 0; i < taosArrayGetSize(info->lines); i++){
    taosArrayDestroy(((SSmlLineInfo*)taosArrayGet(info->lines, i))->colArray);
  }
  taosArrayDestroy(info->lines);
wmmhello's avatar
wmmhello 已提交
1831
  cJSON_Delete(info->root);
1832 1833 1834
  taosMemoryFreeClear(info);
}

1835
static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLProtocolType protocol, int8_t precision, int32_t perBatch) {
1836 1837
  int32_t     code = TSDB_CODE_SUCCESS;
  SSmlHandle *info = (SSmlHandle *)taosMemoryCalloc(1, sizeof(SSmlHandle));
1838 1839 1840
  if (NULL == info) {
    return NULL;
  }
X
Xiaoyu Wang 已提交
1841
  info->id = smlGenId();
1842

wmmhello's avatar
wmmhello 已提交
1843
  info->pQuery = smlInitHandle();
1844

1845 1846
  if (pTscObj) {
    info->taos = pTscObj;
1847 1848 1849 1850 1851
    code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code);
      goto cleanup;
    }
wmmhello's avatar
wmmhello 已提交
1852
  }
1853

X
Xiaoyu Wang 已提交
1854 1855
  info->precision = precision;
  info->protocol = protocol;
1856 1857
  info->dataFormat = true;
  info->affectedRows = perBatch;
1858

1859
  if (request) {
1860 1861 1862 1863
    info->pRequest = request;
    info->msgBuf.buf = info->pRequest->msgBuf;
    info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
  }
1864

1865 1866 1867 1868
  info->lines = taosArrayInit(perBatch, sizeof(SSmlLineInfo));
  info->superTableTagKeyStr = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  info->superTableColKeyStr = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);

1869 1870
  info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
1871
  info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
1872

1873 1874
  if (NULL == info->pQuery || NULL == info->childTables || NULL == info->superTables || NULL == info->superTableTagKeyStr
      || NULL == info->superTableColKeyStr || NULL == info->pVgHash) {
X
Xiaoyu Wang 已提交
1875
    uError("SML:0x%" PRIx64 " create info failed", info->id);
1876 1877 1878 1879
    goto cleanup;
  }

  return info;
1880
  cleanup:
1881 1882 1883 1884 1885 1886 1887 1888
  smlDestroyInfo(info);
  return NULL;
}

/************* TSDB_SML_JSON_PROTOCOL function start **************/
static int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *root, SSmlTableInfo *tinfo) {
  cJSON *metric = cJSON_GetObjectItem(root, "metric");
  if (!cJSON_IsString(metric)) {
X
Xiaoyu Wang 已提交
1889
    return TSDB_CODE_TSC_INVALID_JSON;
1890 1891 1892
  }

  tinfo->sTableNameLen = strlen(metric->valuestring);
1893
  if (IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
X
Xiaoyu Wang 已提交
1894
    uError("OTD:0x%" PRIx64 " Metric lenght is 0 or large than 192", info->id);
1895 1896 1897
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
  }

wmmhello's avatar
wmmhello 已提交
1898 1899
  tinfo->sTableName = metric->valuestring;
  return TSDB_CODE_SUCCESS;
1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918
}

static int32_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int64_t *tsVal) {
  int32_t size = cJSON_GetArraySize(root);
  if (size != OTD_JSON_SUB_FIELDS_NUM) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *value = cJSON_GetObjectItem(root, "value");
  if (!cJSON_IsNumber(value)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *type = cJSON_GetObjectItem(root, "type");
  if (!cJSON_IsString(type)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  double timeDouble = value->valuedouble;
X
Xiaoyu Wang 已提交
1919
  if (smlDoubleToInt64OverFlow(timeDouble)) {
1920
    smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1921
    return TSDB_CODE_INVALID_TIMESTAMP;
1922
  }
wmmhello's avatar
wmmhello 已提交
1923 1924 1925 1926 1927 1928 1929 1930

  if (timeDouble == 0) {
    *tsVal = taosGetTimestampNs();
    return TSDB_CODE_SUCCESS;
  }

  if (timeDouble < 0) {
    return TSDB_CODE_INVALID_TIMESTAMP;
1931 1932
  }

1933
  *tsVal = timeDouble;
1934
  size_t typeLen = strlen(type->valuestring);
wmmhello's avatar
wmmhello 已提交
1935
  if (typeLen == 1 && (type->valuestring[0] == 's' || type->valuestring[0] == 'S')) {
X
Xiaoyu Wang 已提交
1936
    // seconds
1937 1938
    *tsVal = *tsVal * NANOSECOND_PER_SEC;
    timeDouble = timeDouble * NANOSECOND_PER_SEC;
X
Xiaoyu Wang 已提交
1939
    if (smlDoubleToInt64OverFlow(timeDouble)) {
1940
      smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1941
      return TSDB_CODE_INVALID_TIMESTAMP;
1942
    }
wmmhello's avatar
wmmhello 已提交
1943
  } else if (typeLen == 2 && (type->valuestring[1] == 's' || type->valuestring[1] == 'S')) {
1944 1945
    switch (type->valuestring[0]) {
      case 'm':
wmmhello's avatar
wmmhello 已提交
1946
      case 'M':
X
Xiaoyu Wang 已提交
1947
        // milliseconds
1948 1949
        *tsVal = *tsVal * NANOSECOND_PER_MSEC;
        timeDouble = timeDouble * NANOSECOND_PER_MSEC;
X
Xiaoyu Wang 已提交
1950
        if (smlDoubleToInt64OverFlow(timeDouble)) {
1951
          smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1952
          return TSDB_CODE_INVALID_TIMESTAMP;
1953 1954 1955
        }
        break;
      case 'u':
wmmhello's avatar
wmmhello 已提交
1956
      case 'U':
X
Xiaoyu Wang 已提交
1957
        // microseconds
1958 1959
        *tsVal = *tsVal * NANOSECOND_PER_USEC;
        timeDouble = timeDouble * NANOSECOND_PER_USEC;
X
Xiaoyu Wang 已提交
1960
        if (smlDoubleToInt64OverFlow(timeDouble)) {
1961
          smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1962
          return TSDB_CODE_INVALID_TIMESTAMP;
1963 1964 1965
        }
        break;
      case 'n':
wmmhello's avatar
wmmhello 已提交
1966
      case 'N':
1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987
        break;
      default:
        return TSDB_CODE_TSC_INVALID_JSON;
    }
  } else {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  return TSDB_CODE_SUCCESS;
}

static uint8_t smlGetTimestampLen(int64_t num) {
  uint8_t len = 0;
  while ((num /= 10) != 0) {
    len++;
  }
  len++;
  return len;
}

static int32_t smlParseTSFromJSON(SSmlHandle *info, cJSON *root, SArray *cols) {
X
Xiaoyu Wang 已提交
1988
  // Timestamp must be the first KV to parse
1989 1990 1991 1992
  int64_t tsVal = 0;

  cJSON *timestamp = cJSON_GetObjectItem(root, "timestamp");
  if (cJSON_IsNumber(timestamp)) {
X
Xiaoyu Wang 已提交
1993
    // timestamp value 0 indicates current system time
1994
    double timeDouble = timestamp->valuedouble;
X
Xiaoyu Wang 已提交
1995
    if (smlDoubleToInt64OverFlow(timeDouble)) {
1996
      smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1997
      return TSDB_CODE_INVALID_TIMESTAMP;
1998
    }
wmmhello's avatar
wmmhello 已提交
1999

X
Xiaoyu Wang 已提交
2000
    if (timeDouble < 0) {
wmmhello's avatar
wmmhello 已提交
2001
      return TSDB_CODE_INVALID_TIMESTAMP;
2002
    }
2003

2004
    uint8_t tsLen = smlGetTimestampLen((int64_t)timeDouble);
2005
    tsVal = (int64_t)timeDouble;
2006
    if (tsLen == TSDB_TIME_PRECISION_SEC_DIGITS) {
2007 2008
      tsVal = tsVal * NANOSECOND_PER_SEC;
      timeDouble = timeDouble * NANOSECOND_PER_SEC;
X
Xiaoyu Wang 已提交
2009
      if (smlDoubleToInt64OverFlow(timeDouble)) {
2010
        smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
2011
        return TSDB_CODE_INVALID_TIMESTAMP;
2012 2013
      }
    } else if (tsLen == TSDB_TIME_PRECISION_MILLI_DIGITS) {
2014 2015
      tsVal = tsVal * NANOSECOND_PER_MSEC;
      timeDouble = timeDouble * NANOSECOND_PER_MSEC;
X
Xiaoyu Wang 已提交
2016
      if (smlDoubleToInt64OverFlow(timeDouble)) {
2017
        smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
2018
        return TSDB_CODE_INVALID_TIMESTAMP;
2019
      }
X
Xiaoyu Wang 已提交
2020
    } else if (timeDouble == 0) {
wmmhello's avatar
wmmhello 已提交
2021
      tsVal = taosGetTimestampNs();
X
Xiaoyu Wang 已提交
2022
    } else {
wmmhello's avatar
wmmhello 已提交
2023
      return TSDB_CODE_INVALID_TIMESTAMP;
2024 2025 2026 2027
    }
  } else if (cJSON_IsObject(timestamp)) {
    int32_t ret = smlParseTSFromJSONObj(info, timestamp, &tsVal);
    if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2028
      uError("SML:0x%" PRIx64 " Failed to parse timestamp from JSON Obj", info->id);
2029 2030 2031 2032
      return ret;
    }
  } else {
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2033 2034
  }

wmmhello's avatar
wmmhello 已提交
2035
  // add ts to
2036 2037 2038
  SSmlKv kv = {.key = TS, .keyLen = TS_LEN, .i = tsVal,
               .type = TSDB_DATA_TYPE_TIMESTAMP, .length = (int16_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};

wmmhello's avatar
wmmhello 已提交
2039
  taosArrayPush(cols, &kv);
wmmhello's avatar
wmmhello 已提交
2040 2041 2042
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
2043
static int32_t smlConvertJSONBool(SSmlKv *pVal, char *typeStr, cJSON *value) {
2044 2045 2046 2047 2048 2049 2050
  if (strcasecmp(typeStr, "bool") != 0) {
    uError("OTD:invalid type(%s) for JSON Bool", typeStr);
    return TSDB_CODE_TSC_INVALID_JSON_TYPE;
  }
  pVal->type = TSDB_DATA_TYPE_BOOL;
  pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
  pVal->i = value->valueint;
wmmhello's avatar
wmmhello 已提交
2051

2052 2053
  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
2054

X
Xiaoyu Wang 已提交
2055 2056 2057
static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
  // tinyint
  if (strcasecmp(typeStr, "i8") == 0 || strcasecmp(typeStr, "tinyint") == 0) {
2058 2059 2060 2061 2062 2063 2064 2065 2066
    if (!IS_VALID_TINYINT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(tinyint)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_TINYINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
2067 2068
  // smallint
  if (strcasecmp(typeStr, "i16") == 0 || strcasecmp(typeStr, "smallint") == 0) {
2069 2070 2071 2072 2073 2074 2075 2076 2077
    if (!IS_VALID_SMALLINT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(smallint)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_SMALLINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
2078 2079
  // int
  if (strcasecmp(typeStr, "i32") == 0 || strcasecmp(typeStr, "int") == 0) {
2080 2081 2082 2083 2084 2085 2086 2087 2088
    if (!IS_VALID_INT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(int)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_INT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
2089 2090
  // bigint
  if (strcasecmp(typeStr, "i64") == 0 || strcasecmp(typeStr, "bigint") == 0) {
2091 2092
    pVal->type = TSDB_DATA_TYPE_BIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
X
Xiaoyu Wang 已提交
2093
    if (value->valuedouble >= (double)INT64_MAX) {
2094
      pVal->i = INT64_MAX;
X
Xiaoyu Wang 已提交
2095
    } else if (value->valuedouble <= (double)INT64_MIN) {
2096
      pVal->i = INT64_MIN;
X
Xiaoyu Wang 已提交
2097
    } else {
2098
      pVal->i = value->valuedouble;
2099 2100 2101
    }
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
2102 2103
  // float
  if (strcasecmp(typeStr, "f32") == 0 || strcasecmp(typeStr, "float") == 0) {
2104 2105 2106
    if (!IS_VALID_FLOAT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(float)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
wmmhello's avatar
wmmhello 已提交
2107
    }
2108 2109 2110 2111 2112
    pVal->type = TSDB_DATA_TYPE_FLOAT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->f = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
2113 2114
  // double
  if (strcasecmp(typeStr, "f64") == 0 || strcasecmp(typeStr, "double") == 0) {
2115 2116 2117 2118
    pVal->type = TSDB_DATA_TYPE_DOUBLE;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->d = value->valuedouble;
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2119 2120
  }

X
Xiaoyu Wang 已提交
2121
  // if reach here means type is unsupported
2122 2123 2124
  uError("OTD:invalid type(%s) for JSON Number", typeStr);
  return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
2125

X
Xiaoyu Wang 已提交
2126
static int32_t smlConvertJSONString(SSmlKv *pVal, char *typeStr, cJSON *value) {
2127 2128 2129 2130 2131 2132 2133 2134 2135
  if (strcasecmp(typeStr, "binary") == 0) {
    pVal->type = TSDB_DATA_TYPE_BINARY;
  } else if (strcasecmp(typeStr, "nchar") == 0) {
    pVal->type = TSDB_DATA_TYPE_NCHAR;
  } else {
    uError("OTD:invalid type(%s) for JSON String", typeStr);
    return TSDB_CODE_TSC_INVALID_JSON_TYPE;
  }
  pVal->length = (int16_t)strlen(value->valuestring);
wmmhello's avatar
wmmhello 已提交
2136

2137
  if (pVal->type == TSDB_DATA_TYPE_BINARY && pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
wmmhello's avatar
wmmhello 已提交
2138 2139
    return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
  }
2140 2141
  if (pVal->type == TSDB_DATA_TYPE_NCHAR &&
      pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
wmmhello's avatar
wmmhello 已提交
2142 2143 2144
    return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
  }

wmmhello's avatar
wmmhello 已提交
2145 2146
  pVal->value = value->valuestring;
  return TSDB_CODE_SUCCESS;
2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172
}

static int32_t smlParseValueFromJSONObj(cJSON *root, SSmlKv *kv) {
  int32_t ret = TSDB_CODE_SUCCESS;
  int32_t size = cJSON_GetArraySize(root);

  if (size != OTD_JSON_SUB_FIELDS_NUM) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *value = cJSON_GetObjectItem(root, "value");
  if (value == NULL) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *type = cJSON_GetObjectItem(root, "type");
  if (!cJSON_IsString(type)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  switch (value->type) {
    case cJSON_True:
    case cJSON_False: {
      ret = smlConvertJSONBool(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
wmmhello's avatar
wmmhello 已提交
2173
      }
2174
      break;
wmmhello's avatar
wmmhello 已提交
2175
    }
2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191
    case cJSON_Number: {
      ret = smlConvertJSONNumber(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
      break;
    }
    case cJSON_String: {
      ret = smlConvertJSONString(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
      break;
    }
    default:
      return TSDB_CODE_TSC_INVALID_JSON_TYPE;
wmmhello's avatar
wmmhello 已提交
2192
  }
2193 2194

  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2195 2196
}

2197 2198 2199 2200 2201 2202 2203 2204
static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
  switch (root->type) {
    case cJSON_True:
    case cJSON_False: {
      kv->type = TSDB_DATA_TYPE_BOOL;
      kv->length = (int16_t)tDataTypes[kv->type].bytes;
      kv->i = root->valueint;
      break;
wmmhello's avatar
wmmhello 已提交
2205
    }
2206 2207 2208 2209 2210 2211 2212 2213 2214 2215
    case cJSON_Number: {
      kv->type = TSDB_DATA_TYPE_DOUBLE;
      kv->length = (int16_t)tDataTypes[kv->type].bytes;
      kv->d = root->valuedouble;
      break;
    }
    case cJSON_String: {
      /* set default JSON type to binary/nchar according to
       * user configured parameter tsDefaultJSONStrType
       */
wmmhello's avatar
wmmhello 已提交
2216

X
Xiaoyu Wang 已提交
2217
      char *tsDefaultJSONStrType = "nchar";  // todo
2218 2219
      smlConvertJSONString(kv, tsDefaultJSONStrType, root);
      break;
wmmhello's avatar
wmmhello 已提交
2220
    }
2221 2222 2223 2224 2225 2226 2227 2228 2229 2230
    case cJSON_Object: {
      int32_t ret = smlParseValueFromJSONObj(root, kv);
      if (ret != TSDB_CODE_SUCCESS) {
        uError("OTD:Failed to parse value from JSON Obj");
        return ret;
      }
      break;
    }
    default:
      return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2231
  }
2232 2233 2234 2235 2236

  return TSDB_CODE_SUCCESS;
}

static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
dengyihao's avatar
dengyihao 已提交
2237
  if (!cols) return TSDB_CODE_OUT_OF_MEMORY;
2238 2239 2240 2241 2242
  cJSON *metricVal = cJSON_GetObjectItem(root, "value");
  if (metricVal == NULL) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

2243 2244
  SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN};
  int32_t ret = smlParseValueFromJSON(metricVal, &kv);
2245 2246 2247
  if (ret != TSDB_CODE_SUCCESS) {
    return ret;
  }
2248 2249
  taosArrayPush(cols, &kv);

2250
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2251 2252
}

X
Xiaoyu Wang 已提交
2253 2254
static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, char *childTableName, SHashObj *dumplicateKey,
                                    SSmlMsgBuf *msg) {
2255
  int32_t ret = TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
2256
  if (!pKVs) {
wmmhello's avatar
wmmhello 已提交
2257 2258
    return TSDB_CODE_OUT_OF_MEMORY;
  }
2259 2260 2261 2262
  cJSON *tags = cJSON_GetObjectItem(root, "tags");
  if (tags == NULL || tags->type != cJSON_Object) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }
wmmhello's avatar
wmmhello 已提交
2263

X
Xiaoyu Wang 已提交
2264
  size_t  childTableNameLen = strlen(tsSmlChildTableName);
2265 2266 2267 2268 2269
  int32_t tagNum = cJSON_GetArraySize(tags);
  for (int32_t i = 0; i < tagNum; ++i) {
    cJSON *tag = cJSON_GetArrayItem(tags, i);
    if (tag == NULL) {
      return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2270
    }
2271 2272 2273 2274 2275
    size_t keyLen = strlen(tag->string);
    if (IS_INVALID_COL_LEN(keyLen)) {
      uError("OTD:Tag key length is 0 or too large than 64");
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
    }
X
Xiaoyu Wang 已提交
2276
    // check duplicate keys
2277
    if (smlCheckDuplicateKey(tag->string, keyLen, dumplicateKey)) {
wmmhello's avatar
wmmhello 已提交
2278
      return TSDB_CODE_TSC_DUP_NAMES;
wmmhello's avatar
wmmhello 已提交
2279 2280
    }

X
Xiaoyu Wang 已提交
2281 2282
    // handle child table name
    if (childTableNameLen != 0 && strcmp(tag->string, tsSmlChildTableName) == 0) {
2283 2284 2285 2286 2287
      if (!cJSON_IsString(tag)) {
        uError("OTD:ID must be JSON string");
        return TSDB_CODE_TSC_INVALID_JSON;
      }
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
wmmhello's avatar
wmmhello 已提交
2288
      tstrncpy(childTableName, tag->valuestring, TSDB_TABLE_NAME_LEN);
2289 2290 2291
      continue;
    }

2292
    // add kv to SSmlKv
2293
    SSmlKv kv ={.key = tag->string, .keyLen = keyLen};
X
Xiaoyu Wang 已提交
2294
    // value
2295
    ret = smlParseValueFromJSON(tag, &kv);
2296 2297 2298
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }
2299
    taosArrayPush(pKVs, &kv);
wmmhello's avatar
wmmhello 已提交
2300 2301
  }

2302
  return ret;
wmmhello's avatar
wmmhello 已提交
2303 2304
}

2305 2306
static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo *tinfo, SArray *cols) {
  int32_t ret = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2307

2308
  if (!cJSON_IsObject(root)) {
X
Xiaoyu Wang 已提交
2309
    uError("OTD:0x%" PRIx64 " data point needs to be JSON object", info->id);
2310
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2311
  }
2312

2313
  int32_t size = cJSON_GetArraySize(root);
X
Xiaoyu Wang 已提交
2314
  // outmost json fields has to be exactly 4
2315
  if (size != OTD_JSON_FIELDS_NUM) {
X
Xiaoyu Wang 已提交
2316
    uError("OTD:0x%" PRIx64 " Invalid number of JSON fields in data point %d", info->id, size);
2317
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2318
  }
2319

X
Xiaoyu Wang 已提交
2320
  // Parse metric
2321 2322
  ret = smlParseMetricFromJSON(info, root, tinfo);
  if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2323
    uError("OTD:0x%" PRIx64 " Unable to parse metric from JSON payload", info->id);
2324
    return ret;
wmmhello's avatar
wmmhello 已提交
2325
  }
X
Xiaoyu Wang 已提交
2326
  uDebug("OTD:0x%" PRIx64 " Parse metric from JSON payload finished", info->id);
wmmhello's avatar
wmmhello 已提交
2327

X
Xiaoyu Wang 已提交
2328
  // Parse timestamp
2329 2330
  ret = smlParseTSFromJSON(info, root, cols);
  if (ret) {
X
Xiaoyu Wang 已提交
2331
    uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id);
2332
    return ret;
wmmhello's avatar
wmmhello 已提交
2333
  }
X
Xiaoyu Wang 已提交
2334
  uDebug("OTD:0x%" PRIx64 " Parse timestamp from JSON payload finished", info->id);
2335

X
Xiaoyu Wang 已提交
2336
  // Parse metric value
2337 2338
  ret = smlParseColsFromJSON(root, cols);
  if (ret) {
X
Xiaoyu Wang 已提交
2339
    uError("OTD:0x%" PRIx64 " Unable to parse metric value from JSON payload", info->id);
2340
    return ret;
2341
  }
X
Xiaoyu Wang 已提交
2342
  uDebug("OTD:0x%" PRIx64 " Parse metric value from JSON payload finished", info->id);
2343

X
Xiaoyu Wang 已提交
2344
  // Parse tags
2345
  ret = smlParseTagsFromJSON(root, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
2346
  if (ret) {
X
Xiaoyu Wang 已提交
2347
    uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
2348
    return ret;
2349
  }
X
Xiaoyu Wang 已提交
2350
  uDebug("OTD:0x%" PRIx64 " Parse tags from JSON payload finished", info->id);
wmmhello's avatar
wmmhello 已提交
2351

2352
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2353
}
2354
/************* TSDB_SML_JSON_PROTOCOL function end **************/
2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367
static int32_t smlParseLineBottom(SSmlHandle *info) {
  if(info->dataFormat) return TSDB_CODE_SUCCESS;

  for(int32_t i = 0; i < taosArrayGetSize(info->lines); i ++){
    SSmlLineInfo* elements = taosArrayGet(info->lines, i);
    bool            hasTable = true;
    SSmlTableInfo  *tinfo = NULL;
    SSmlTableInfo **oneTable =
        (SSmlTableInfo **)taosHashGet(info->childTables, elements->measure, elements->measureTagsLen);
    if(oneTable == NULL){
      uError("SML:0x%" PRIx64 "get oneTable failed, line num:%d", info->id, i);
      smlBuildInvalidDataMsg(&info->msgBuf, "get oneTable failed", elements->measure);
      return TSDB_CODE_SML_INVALID_DATA;
2368
    }
2369
    tinfo = *oneTable;
wmmhello's avatar
wmmhello 已提交
2370

2371 2372 2373
    if (taosArrayGetSize(elements->colArray) + taosArrayGetSize(tinfo->tags) > TSDB_MAX_COLUMNS) {
      smlBuildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL);
      return TSDB_CODE_PAR_TOO_MANY_COLUMNS;
wmmhello's avatar
wmmhello 已提交
2374
    }
wmmhello's avatar
wmmhello 已提交
2375

2376
    int ret = smlPushCols(tinfo->cols, elements->colArray);
X
Xiaoyu Wang 已提交
2377
    if (ret != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2378 2379 2380
      return ret;
    }

2381 2382 2383 2384 2385 2386 2387 2388 2389 2390
    SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
    if (tableMeta) {  // update meta
      ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, elements->colArray, false, &info->msgBuf);
      if (ret == TSDB_CODE_SUCCESS) {
        ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, tinfo->tags, true, &info->msgBuf);
      }
      if (ret != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
        return ret;
      }
X
Xiaoyu Wang 已提交
2391
    } else {
2392 2393 2394 2395 2396
      ret = smlJudgeDupColName(elements->colArray, tinfo->tags, &info->msgBuf);
      if (ret != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
        return ret;
      }
wmmhello's avatar
wmmhello 已提交
2397

2398 2399 2400 2401
      SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat);
      smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags);
      smlInsertMeta(meta->colHash, meta->cols, elements->colArray);
      taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES);
wmmhello's avatar
wmmhello 已提交
2402
    }
wmmhello's avatar
wmmhello 已提交
2403
  }
2404

wmmhello's avatar
wmmhello 已提交
2405 2406 2407
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
2408
static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) {
X
Xiaoyu Wang 已提交
2409
  int            ret = TSDB_CODE_SUCCESS;
2410
  SSmlTableInfo *tinfo = smlBuildTableInfo(1, "", 0);
X
Xiaoyu Wang 已提交
2411
  if (!tinfo) {
2412
    return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
2413 2414
  }

2415 2416
  SArray *cols = taosArrayInit(16, POINTER_BYTES);
  if (cols == NULL) {
X
Xiaoyu Wang 已提交
2417
    uError("SML:0x%" PRIx64 " smlParseTelnetLine failed to allocate memory", info->id);
2418
    return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
2419 2420
  }

X
Xiaoyu Wang 已提交
2421
  if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
dengyihao's avatar
dengyihao 已提交
2422
    ret = smlParseTelnetString(info, (const char *)data, (char *)data + len, tinfo, cols);
X
Xiaoyu Wang 已提交
2423
  } else if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
2424
    ret = smlParseJSONString(info, (cJSON *)data, tinfo, cols);
X
Xiaoyu Wang 已提交
2425
  } else {
2426 2427
    ASSERT(0);
  }
X
Xiaoyu Wang 已提交
2428 2429
  if (ret != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " smlParseTelnetLine failed", info->id);
wmmhello's avatar
wmmhello 已提交
2430
    smlDestroyTableInfo(info, tinfo);
2431 2432
    taosArrayDestroy(cols);
    return ret;
wmmhello's avatar
wmmhello 已提交
2433
  }
wmmhello's avatar
wmmhello 已提交
2434

X
Xiaoyu Wang 已提交
2435
  if (taosArrayGetSize(tinfo->tags) <= 0 || taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS) {
2436
    smlBuildInvalidDataMsg(&info->msgBuf, "invalidate tags length:[1,128]", NULL);
wmmhello's avatar
wmmhello 已提交
2437 2438
    smlDestroyTableInfo(info, tinfo);
    taosArrayDestroy(cols);
wmmhello's avatar
wmmhello 已提交
2439
    return TSDB_CODE_PAR_INVALID_TAGS_NUM;
wmmhello's avatar
wmmhello 已提交
2440
  }
2441 2442
  taosHashClear(info->dumplicateKey);

X
Xiaoyu Wang 已提交
2443 2444
  if (strlen(tinfo->childTableName) == 0) {
    RandTableName rName = {tinfo->tags, tinfo->sTableName, (uint8_t)tinfo->sTableNameLen, tinfo->childTableName, 0};
2445 2446
    buildChildTableName(&rName);
    tinfo->uid = rName.uid;
X
Xiaoyu Wang 已提交
2447 2448
  } else {
    tinfo->uid = *(uint64_t *)(tinfo->childTableName);  // generate uid by name simple
2449 2450
  }

X
Xiaoyu Wang 已提交
2451 2452 2453 2454
  bool            hasTable = true;
  SSmlTableInfo **oneTable =
      (SSmlTableInfo **)taosHashGet(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName));
  if (!oneTable) {
2455
    taosHashPut(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName), &tinfo, POINTER_BYTES);
2456
    oneTable = &tinfo;
2457
    hasTable = false;
X
Xiaoyu Wang 已提交
2458
  } else {
wmmhello's avatar
wmmhello 已提交
2459
    smlDestroyTableInfo(info, tinfo);
wmmhello's avatar
wmmhello 已提交
2460
  }
wmmhello's avatar
wmmhello 已提交
2461

2462
  taosArrayPush((*oneTable)->cols, &cols);
X
Xiaoyu Wang 已提交
2463 2464 2465
  SSmlSTableMeta **tableMeta =
      (SSmlSTableMeta **)taosHashGet(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen);
  if (tableMeta) {  // update meta
2466
    ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, cols, false, &info->msgBuf);
wmmhello's avatar
wmmhello 已提交
2467
    if (!hasTable && ret == TSDB_CODE_SUCCESS) {
2468
      ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, (*oneTable)->tags, true, &info->msgBuf);
wmmhello's avatar
wmmhello 已提交
2469
    }
wmmhello's avatar
wmmhello 已提交
2470
    if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2471
      uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
wmmhello's avatar
wmmhello 已提交
2472
      return ret;
2473
    }
X
Xiaoyu Wang 已提交
2474
  } else {
2475
    SSmlSTableMeta *meta = smlBuildSTableMeta(false);
wmmhello's avatar
wmmhello 已提交
2476 2477
    smlInsertMeta(meta->tagHash, meta->tags, (*oneTable)->tags);
    smlInsertMeta(meta->colHash, meta->cols, cols);
2478
    taosHashPut(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen, &meta, POINTER_BYTES);
wmmhello's avatar
wmmhello 已提交
2479 2480
  }

2481 2482
  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
2483

X
Xiaoyu Wang 已提交
2484
static int32_t smlParseJSON(SSmlHandle *info, char *payload) {
2485 2486
  int32_t payloadNum = 0;
  int32_t ret = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2487

2488
  if (payload == NULL) {
X
Xiaoyu Wang 已提交
2489
    uError("SML:0x%" PRIx64 " empty JSON Payload", info->id);
2490
    return TSDB_CODE_TSC_INVALID_JSON;
2491
  }
2492

wmmhello's avatar
wmmhello 已提交
2493 2494
  info->root = cJSON_Parse(payload);
  if (info->root == NULL) {
X
Xiaoyu Wang 已提交
2495
    uError("SML:0x%" PRIx64 " parse json failed:%s", info->id, payload);
2496 2497
    return TSDB_CODE_TSC_INVALID_JSON;
  }
X
Xiaoyu Wang 已提交
2498
  // multiple data points must be sent in JSON array
wmmhello's avatar
wmmhello 已提交
2499
  if (cJSON_IsObject(info->root)) {
2500
    payloadNum = 1;
wmmhello's avatar
wmmhello 已提交
2501 2502
  } else if (cJSON_IsArray(info->root)) {
    payloadNum = cJSON_GetArraySize(info->root);
2503
  } else {
X
Xiaoyu Wang 已提交
2504
    uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
2505 2506
    ret = TSDB_CODE_TSC_INVALID_JSON;
    goto end;
wmmhello's avatar
wmmhello 已提交
2507
  }
wmmhello's avatar
wmmhello 已提交
2508

2509
  for (int32_t i = 0; i < payloadNum; ++i) {
wmmhello's avatar
wmmhello 已提交
2510
    cJSON *dataPoint = (payloadNum == 1 && cJSON_IsObject(info->root)) ? info->root : cJSON_GetArrayItem(info->root, i);
wmmhello's avatar
wmmhello 已提交
2511
    ret = smlParseTelnetLine(info, dataPoint, -1);
X
Xiaoyu Wang 已提交
2512 2513
    if (ret != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
2514 2515 2516 2517
      goto end;
    }
  }

2518
  end:
2519
  return ret;
wmmhello's avatar
wmmhello 已提交
2520
}
2521

X
Xiaoyu Wang 已提交
2522
static int32_t smlInsertData(SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
2523 2524
  int32_t code = TSDB_CODE_SUCCESS;

X
Xiaoyu Wang 已提交
2525
  SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
wmmhello's avatar
wmmhello 已提交
2526
  while (oneTable) {
X
Xiaoyu Wang 已提交
2527
    SSmlTableInfo *tableData = *oneTable;
wmmhello's avatar
wmmhello 已提交
2528 2529

    SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
wmmhello's avatar
wmmhello 已提交
2530
    tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
wmmhello's avatar
wmmhello 已提交
2531
    memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName));
D
dapan1121 已提交
2532 2533 2534 2535 2536 2537

    SRequestConnInfo conn = {0};
    conn.pTrans = info->taos->pAppInfo->pTransporter;
    conn.requestId = info->pRequest->requestId;
    conn.requestObjRefId = info->pRequest->self;
    conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
X
Xiaoyu Wang 已提交
2538

wmmhello's avatar
wmmhello 已提交
2539
    SVgroupInfo vg;
D
dapan1121 已提交
2540
    code = catalogGetTableHashVgroup(info->pCatalog, &conn, &pName, &vg);
2541
    if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2542
      uError("SML:0x%" PRIx64 " catalogGetTableHashVgroup failed. table name: %s", info->id, tableData->childTableName);
wmmhello's avatar
wmmhello 已提交
2543 2544
      return code;
    }
X
Xiaoyu Wang 已提交
2545
    taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
wmmhello's avatar
wmmhello 已提交
2546

X
Xiaoyu Wang 已提交
2547 2548 2549
    SSmlSTableMeta **pMeta =
        (SSmlSTableMeta **)taosHashGet(info->superTables, tableData->sTableName, tableData->sTableNameLen);
    ASSERT(NULL != pMeta && NULL != *pMeta);
wmmhello's avatar
wmmhello 已提交
2550

2551
    // use tablemeta of stable to save vgid and uid of child table
wmmhello's avatar
wmmhello 已提交
2552
    (*pMeta)->tableMeta->vgId = vg.vgId;
X
Xiaoyu Wang 已提交
2553
    (*pMeta)->tableMeta->uid = tableData->uid;  // one table merge data block together according uid
wmmhello's avatar
wmmhello 已提交
2554

2555
    code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, (*pMeta)->cols, tableData->cols,
2556
                       (*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen,
2557
                       info->ttl, info->msgBuf.buf, info->msgBuf.len);
X
Xiaoyu Wang 已提交
2558 2559
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " smlBindData failed", info->id);
wmmhello's avatar
wmmhello 已提交
2560 2561
      return code;
    }
X
Xiaoyu Wang 已提交
2562
    oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable);
wmmhello's avatar
wmmhello 已提交
2563
  }
wmmhello's avatar
wmmhello 已提交
2564

wmmhello's avatar
wmmhello 已提交
2565
  code = smlBuildOutput(info->pQuery, info->pVgHash);
2566
  if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2567
    uError("SML:0x%" PRIx64 " smlBuildOutput failed", info->id);
2568 2569
    return code;
  }
2570 2571
  info->cost.insertRpcTime = taosGetTimestampUs();

X
Xiaoyu Wang 已提交
2572 2573 2574
  // launchQueryImpl(info->pRequest, info->pQuery, false, NULL);
  //  info->affectedRows = taos_affected_rows(info->pRequest);
  //  return info->pRequest->code;
wmmhello's avatar
wmmhello 已提交
2575

2576 2577 2578
  SAppClusterSummary *pActivity = &info->taos->pAppInfo->summary;
  atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);

2579 2580 2581 2582 2583 2584
  SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
  if (pWrapper == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pWrapper->pRequest = info->pRequest;
  launchAsyncQuery(info->pRequest, info->pQuery, NULL, pWrapper);
wmmhello's avatar
wmmhello 已提交
2585
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2586 2587
}

X
Xiaoyu Wang 已提交
2588 2589
static void smlPrintStatisticInfo(SSmlHandle *info) {
  uError("SML:0x%" PRIx64
2590
             " smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \
X
Xiaoyu Wang 已提交
2591
        parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64
2592
             "",
X
Xiaoyu Wang 已提交
2593
         info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables,
2594 2595
         info->cost.numOfCreateSTables, info->cost.numOfAlterTagSTables, info->cost.numOfAlterColSTables,
         info->cost.schemaTime - info->cost.parseTime,
X
Xiaoyu Wang 已提交
2596 2597
         info->cost.insertBindTime - info->cost.schemaTime, info->cost.insertRpcTime - info->cost.insertBindTime,
         info->cost.endTime - info->cost.insertRpcTime, info->cost.endTime - info->cost.parseTime);
2598 2599
}

dengyihao's avatar
dengyihao 已提交
2600
static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
wmmhello's avatar
wmmhello 已提交
2601
  int32_t code = TSDB_CODE_SUCCESS;
2602
  if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
dengyihao's avatar
dengyihao 已提交
2603
    if (lines) {
wmmhello's avatar
wmmhello 已提交
2604
      code = smlParseJSON(info, *lines);
dengyihao's avatar
dengyihao 已提交
2605
    } else if (rawLine) {
wmmhello's avatar
wmmhello 已提交
2606 2607
      code = smlParseJSON(info, rawLine);
    }
2608
    if (code != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
2609
      uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, lines ? *lines : rawLine);
2610 2611
      return code;
    }
wmmhello's avatar
wmmhello 已提交
2612
    return code;
wmmhello's avatar
wmmhello 已提交
2613
  }
wmmhello's avatar
wmmhello 已提交
2614

wmmhello's avatar
wmmhello 已提交
2615
  for (int32_t i = 0; i < numLines; ++i) {
wmmhello's avatar
wmmhello 已提交
2616
    char *tmp = NULL;
dengyihao's avatar
dengyihao 已提交
2617 2618
    int   len = 0;
    if (lines) {
wmmhello's avatar
wmmhello 已提交
2619 2620
      tmp = lines[i];
      len = strlen(tmp);
dengyihao's avatar
dengyihao 已提交
2621
    } else if (rawLine) {
wmmhello's avatar
wmmhello 已提交
2622
      tmp = rawLine;
dengyihao's avatar
dengyihao 已提交
2623 2624
      while (rawLine < rawLineEnd) {
        if (*(rawLine++) == '\n') {
wmmhello's avatar
wmmhello 已提交
2625 2626 2627 2628
          break;
        }
        len++;
      }
dengyihao's avatar
dengyihao 已提交
2629
      if (info->protocol == TSDB_SML_LINE_PROTOCOL && tmp[0] == '#') {  // this line is comment
wmmhello's avatar
wmmhello 已提交
2630 2631
        continue;
      }
wmmhello's avatar
wmmhello 已提交
2632 2633
    }

2634 2635
    uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, (info->isRawLine ? "rawdata" : tmp));

X
Xiaoyu Wang 已提交
2636
    if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
2637
      code = smlParseInfluxString(info, tmp, tmp + len, taosArrayGet(info->lines, i));
X
Xiaoyu Wang 已提交
2638
    } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
wmmhello's avatar
wmmhello 已提交
2639
      code = smlParseTelnetLine(info, tmp, len);
X
Xiaoyu Wang 已提交
2640
    } else {
2641 2642
      ASSERT(0);
    }
wmmhello's avatar
wmmhello 已提交
2643
    if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2644
      uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, tmp);
2645
      return code;
wmmhello's avatar
wmmhello 已提交
2646
    }
2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665
    if(info->reRun){
      i = 0;
      info->reRun = false;
      // clear info->childTables
      void **p1 = (void **)taosHashIterate(info->childTables, NULL);
      while (p1) {
        smlDestroyTableInfo(info, (SSmlTableInfo *)(*p1));
        p1 = (void **)taosHashIterate(info->childTables, p1);
      }
      taosHashClear(info->childTables);

      // clear info->superTables
      p1 = (void **)taosHashIterate(info->superTables, NULL);
      while (p1) {
        smlDestroySTableMeta((SSmlSTableMeta *)(*p1));
        p1 = (void **)taosHashIterate(info->superTables, p1);
      }
      taosHashClear(info->superTables);
    }
wmmhello's avatar
wmmhello 已提交
2666
  }
2667

2668 2669 2670
  return code;
}

dengyihao's avatar
dengyihao 已提交
2671
static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
2672
  int32_t code = TSDB_CODE_SUCCESS;
2673 2674
  int32_t retryNum = 0;

2675 2676
  info->cost.parseTime = taosGetTimestampUs();

wmmhello's avatar
wmmhello 已提交
2677
  code = smlParseLine(info, lines, rawLine, rawLineEnd, numLines);
2678
  if (code != 0) {
X
Xiaoyu Wang 已提交
2679
    uError("SML:0x%" PRIx64 " smlParseLine error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2680
    return code;
2681
  }
wmmhello's avatar
wmmhello 已提交
2682

2683 2684 2685 2686 2687 2688
  code = smlParseLineBottom(info);
  if (code != 0) {
    uError("SML:0x%" PRIx64 " smlParseLineBottom error : %s", info->id, tstrerror(code));
    return code;
  }

2689 2690 2691 2692 2693
  info->cost.lineNum = numLines;
  info->cost.numOfSTables = taosHashGetSize(info->superTables);
  info->cost.numOfCTables = taosHashGetSize(info->childTables);

  info->cost.schemaTime = taosGetTimestampUs();
2694

X
Xiaoyu Wang 已提交
2695
  do {
2696 2697
    code = smlModifyDBSchemas(info);
    if (code == 0) break;
wmmhello's avatar
wmmhello 已提交
2698
  } while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
2699

wmmhello's avatar
wmmhello 已提交
2700
  if (code != 0) {
X
Xiaoyu Wang 已提交
2701
    uError("SML:0x%" PRIx64 " smlModifyDBSchemas error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2702
    return code;
wmmhello's avatar
wmmhello 已提交
2703
  }
wmmhello's avatar
wmmhello 已提交
2704

2705
  info->cost.insertBindTime = taosGetTimestampUs();
wmmhello's avatar
wmmhello 已提交
2706 2707
  code = smlInsertData(info);
  if (code != 0) {
X
Xiaoyu Wang 已提交
2708
    uError("SML:0x%" PRIx64 " smlInsertData error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2709
    return code;
wmmhello's avatar
wmmhello 已提交
2710 2711 2712 2713 2714
  }

  return code;
}

X
Xiaoyu Wang 已提交
2715
static int32_t isSchemalessDb(STscObj *taos, SRequestObj *request) {
2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743
  //  SCatalog *catalog = NULL;
  //  int32_t   code = catalogGetHandle(((STscObj *)taos)->pAppInfo->clusterId, &catalog);
  //  if (code != TSDB_CODE_SUCCESS) {
  //    uError("SML get catalog error %d", code);
  //    return code;
  //  }
  //
  //  SName name;
  //  tNameSetDbName(&name, taos->acctId, taos->db, strlen(taos->db));
  //  char dbFname[TSDB_DB_FNAME_LEN] = {0};
  //  tNameGetFullDbName(&name, dbFname);
  //  SDbCfgInfo pInfo = {0};
  //
  //  SRequestConnInfo conn = {0};
  //  conn.pTrans = taos->pAppInfo->pTransporter;
  //  conn.requestId = request->requestId;
  //  conn.requestObjRefId = request->self;
  //  conn.mgmtEps = getEpSet_s(&taos->pAppInfo->mgmtEp);
  //
  //  code = catalogGetDBCfg(catalog, &conn, dbFname, &pInfo);
  //  if (code != TSDB_CODE_SUCCESS) {
  //    return code;
  //  }
  //  taosArrayDestroy(pInfo.pRetensions);
  //
  //  if (!pInfo.schemaless) {
  //    return TSDB_CODE_SML_INVALID_DB_CONF;
  //  }
wmmhello's avatar
wmmhello 已提交
2744 2745 2746
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
2747
static void smlInsertCallback(void *param, void *res, int32_t code) {
wmmhello's avatar
wmmhello 已提交
2748
  SRequestObj *pRequest = (SRequestObj *)res;
X
Xiaoyu Wang 已提交
2749
  SSmlHandle  *info = (SSmlHandle *)param;
2750
  int32_t      rows = taos_affected_rows(pRequest);
wmmhello's avatar
wmmhello 已提交
2751

X
Xiaoyu Wang 已提交
2752
  uDebug("SML:0x%" PRIx64 " result. code:%d, msg:%s", info->id, pRequest->code, pRequest->msgBuf);
2753
  Params *pParam = info->params;
wmmhello's avatar
wmmhello 已提交
2754
  // lock
2755 2756
  taosThreadSpinLock(&pParam->lock);
  pParam->cnt++;
X
Xiaoyu Wang 已提交
2757
  if (code != TSDB_CODE_SUCCESS) {
2758 2759
    pParam->request->code = code;
    pParam->request->body.resInfo.numOfRows += rows;
2760
  } else {
2761 2762
    pParam->request->body.resInfo.numOfRows += info->affectedRows;
  }
2763 2764 2765
  // unlock
  taosThreadSpinUnlock(&pParam->lock);

2766 2767
  if (pParam->cnt == pParam->total) {
    tsem_post(&pParam->sem);
wmmhello's avatar
wmmhello 已提交
2768
  }
wmmhello's avatar
wmmhello 已提交
2769
  uDebug("SML:0x%" PRIx64 " insert finished, code: %d, rows: %d, total: %d", info->id, code, rows, info->affectedRows);
wmmhello's avatar
wmmhello 已提交
2770 2771 2772
  info->cost.endTime = taosGetTimestampUs();
  info->cost.code = code;
  smlPrintStatisticInfo(info);
wmmhello's avatar
wmmhello 已提交
2773 2774 2775
  smlDestroyInfo(info);
}

dengyihao's avatar
dengyihao 已提交
2776
TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd,
2777
                                       int numLines, int protocol, int precision, int32_t ttl) {
2778 2779
  int      batchs = 0;
  STscObj *pTscObj = request->pTscObj;
2780

2781
  pTscObj->schemalessType = 1;
wmmhello's avatar
wmmhello 已提交
2782
  SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
wmmhello's avatar
wmmhello 已提交
2783

2784
  Params params = {0};
wmmhello's avatar
wmmhello 已提交
2785
  params.request = request;
wmmhello's avatar
wmmhello 已提交
2786 2787 2788
  tsem_init(&params.sem, 0, 0);
  taosThreadSpinInit(&(params.lock), 0);

X
Xiaoyu Wang 已提交
2789
  if (request->pDb == NULL) {
wmmhello's avatar
wmmhello 已提交
2790
    request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
wmmhello's avatar
wmmhello 已提交
2791
    smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
wmmhello's avatar
wmmhello 已提交
2792 2793 2794
    goto end;
  }

2795
  if (isSchemalessDb(pTscObj, request) != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2796
    request->code = TSDB_CODE_SML_INVALID_DB_CONF;
wmmhello's avatar
wmmhello 已提交
2797
    smlBuildInvalidDataMsg(&msg, "Cannot write data to a non schemaless database", NULL);
wmmhello's avatar
wmmhello 已提交
2798 2799 2800
    goto end;
  }

X
Xiaoyu Wang 已提交
2801
  if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) {
2802
    request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
wmmhello's avatar
wmmhello 已提交
2803
    smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL);
2804
    goto end;
wmmhello's avatar
wmmhello 已提交
2805 2806
  }

X
Xiaoyu Wang 已提交
2807 2808
  if (protocol == TSDB_SML_LINE_PROTOCOL &&
      (precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)) {
2809
    request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
wmmhello's avatar
wmmhello 已提交
2810
    smlBuildInvalidDataMsg(&msg, "precision invalidate for line protocol", NULL);
2811 2812 2813
    goto end;
  }

2814
  if (protocol == TSDB_SML_JSON_PROTOCOL) {
wmmhello's avatar
wmmhello 已提交
2815
    numLines = 1;
2816
  } else if (numLines <= 0) {
wmmhello's avatar
wmmhello 已提交
2817 2818 2819 2820 2821
    request->code = TSDB_CODE_SML_INVALID_DATA;
    smlBuildInvalidDataMsg(&msg, "line num is invalid", NULL);
    goto end;
  }

2822
  batchs = ceil(((double)numLines) / tsSmlBatchSize);
2823
  params.total = batchs;
wmmhello's avatar
wmmhello 已提交
2824
  for (int i = 0; i < batchs; ++i) {
dengyihao's avatar
dengyihao 已提交
2825
    SRequestObj *req = (SRequestObj *)createRequest(pTscObj->id, TSDB_SQL_INSERT, 0);
2826
    if (!req) {
wmmhello's avatar
wmmhello 已提交
2827 2828 2829 2830
      request->code = TSDB_CODE_OUT_OF_MEMORY;
      uError("SML:taos_schemaless_insert error request is null");
      goto end;
    }
wmmhello's avatar
wmmhello 已提交
2831

2832
    int32_t perBatch = tsSmlBatchSize;
wmmhello's avatar
wmmhello 已提交
2833

X
Xiaoyu Wang 已提交
2834
    if (numLines > perBatch) {
wmmhello's avatar
wmmhello 已提交
2835
      numLines -= perBatch;
X
Xiaoyu Wang 已提交
2836
    } else {
wmmhello's avatar
wmmhello 已提交
2837 2838 2839 2840
      perBatch = numLines;
      numLines = 0;
    }

2841 2842 2843 2844 2845 2846 2847 2848 2849 2850
    SSmlHandle *info = smlBuildSmlInfo(pTscObj, req, (SMLProtocolType)protocol, precision, perBatch);
    if (!info) {
      request->code = TSDB_CODE_OUT_OF_MEMORY;
      uError("SML:taos_schemaless_insert error SSmlHandle is null");
      goto end;
    }

    info->isRawLine = (rawLine == NULL);
    info->ttl       = ttl;

wmmhello's avatar
wmmhello 已提交
2851
    info->params = &params;
wmmhello's avatar
wmmhello 已提交
2852
    info->pRequest->body.queryFp = smlInsertCallback;
X
Xiaoyu Wang 已提交
2853
    info->pRequest->body.param = info;
wmmhello's avatar
wmmhello 已提交
2854
    int32_t code = smlProcess(info, lines, rawLine, rawLineEnd, perBatch);
dengyihao's avatar
dengyihao 已提交
2855
    if (lines) {
wmmhello's avatar
wmmhello 已提交
2856 2857
      lines += perBatch;
    }
dengyihao's avatar
dengyihao 已提交
2858
    if (rawLine) {
wmmhello's avatar
wmmhello 已提交
2859
      int num = 0;
dengyihao's avatar
dengyihao 已提交
2860 2861
      while (rawLine < rawLineEnd) {
        if (*(rawLine++) == '\n') {
wmmhello's avatar
wmmhello 已提交
2862 2863
          num++;
        }
dengyihao's avatar
dengyihao 已提交
2864
        if (num == perBatch) {
wmmhello's avatar
wmmhello 已提交
2865 2866 2867 2868
          break;
        }
      }
    }
X
Xiaoyu Wang 已提交
2869
    if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2870 2871 2872 2873
      info->pRequest->body.queryFp(info, req, code);
    }
  }
  tsem_wait(&params.sem);
2874

2875
  end:
wmmhello's avatar
wmmhello 已提交
2876 2877
  taosThreadSpinDestroy(&params.lock);
  tsem_destroy(&params.sem);
2878
  //  ((STscObj *)taos)->schemalessType = 0;
2879
  pTscObj->schemalessType = 1;
2880
  uDebug("resultend:%s", request->msgBuf);
2881
  return (TAOS_RES *)request;
wmmhello's avatar
wmmhello 已提交
2882
}
wmmhello's avatar
wmmhello 已提交
2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898 2899

/**
 * taos_schemaless_insert() parse and insert data points into database according to
 * different protocol.
 *
 * @param $lines input array may contain multiple lines, each line indicates a data point.
 *               If protocol=2 is used input array should contain single JSON
 *               string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
 *               multiple data points in JSON format, should include them in $JSON_string
 *               as a JSON array.
 * @param $numLines indicates how many data points in $lines.
 *                  If protocol = 2 is used this param will be ignored as $lines should
 *                  contain single JSON string.
 * @param $protocol indicates which protocol to use for parsing:
 *                  0 - influxDB line protocol
 *                  1 - OpenTSDB telnet line protocol
 *                  2 - OpenTSDB JSON format protocol
dengyihao's avatar
dengyihao 已提交
2900
 * @return TAOS_RES
wmmhello's avatar
wmmhello 已提交
2901 2902
 */

2903 2904
TAOS_RES *taos_schemaless_insert_ttl_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision,
                                                int32_t ttl, int64_t reqid) {
wmmhello's avatar
wmmhello 已提交
2905 2906 2907 2908 2909
  if (NULL == taos) {
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    return NULL;
  }

2910
  SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
dengyihao's avatar
dengyihao 已提交
2911 2912 2913 2914 2915 2916 2917 2918 2919 2920 2921 2922
  if (!request) {
    uError("SML:taos_schemaless_insert error request is null");
    return NULL;
  }

  if (!lines) {
    SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
    request->code = TSDB_CODE_SML_INVALID_DATA;
    smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
    return (TAOS_RES *)request;
  }

2923
  return taos_schemaless_insert_inner(request, lines, NULL, NULL, numLines, protocol, precision, ttl);
dengyihao's avatar
dengyihao 已提交
2924 2925
}

2926 2927 2928
TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, TSDB_DEFAULT_TABLE_TTL, 0);
}
wmmhello's avatar
wmmhello 已提交
2929

2930 2931 2932
TAOS_RES *taos_schemaless_insert_ttl(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int32_t ttl) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, ttl, 0);
}
wmmhello's avatar
wmmhello 已提交
2933

2934 2935
TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int64_t reqid) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, TSDB_DEFAULT_TABLE_TTL, reqid);
wmmhello's avatar
wmmhello 已提交
2936 2937
}

2938
TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
2939
                                                    int precision, int32_t ttl, int64_t reqid) {
dengyihao's avatar
dengyihao 已提交
2940 2941 2942 2943 2944
  if (NULL == taos) {
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    return NULL;
  }

2945
  SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
dengyihao's avatar
dengyihao 已提交
2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969
  if (!request) {
    uError("SML:taos_schemaless_insert error request is null");
    return NULL;
  }

  if (!lines || len <= 0) {
    SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
    request->code = TSDB_CODE_SML_INVALID_DATA;
    smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
    return (TAOS_RES *)request;
  }

  int numLines = 0;
  *totalRows = 0;
  char *tmp = lines;
  for (int i = 0; i < len; i++) {
    if (lines[i] == '\n' || i == len - 1) {
      numLines++;
      if (tmp[0] != '#' || protocol != TSDB_SML_LINE_PROTOCOL) {  // ignore comment
        (*totalRows)++;
      }
      tmp = lines + i + 1;
    }
  }
2970
  return taos_schemaless_insert_inner(request, NULL, lines, lines + len, numLines, protocol, precision, ttl);
dengyihao's avatar
dengyihao 已提交
2971 2972
}

2973 2974 2975 2976 2977 2978
TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int64_t reqid) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, TSDB_DEFAULT_TABLE_TTL, reqid);
}
TAOS_RES *taos_schemaless_insert_raw_ttl(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int32_t ttl) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, ttl, 0);
}
wmmhello's avatar
wmmhello 已提交
2979

2980 2981
TAOS_RES *taos_schemaless_insert_raw(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, TSDB_DEFAULT_TABLE_TTL, 0);
wmmhello's avatar
wmmhello 已提交
2982
}