clientSml.c 87.7 KB
Newer Older
wmmhello's avatar
wmmhello 已提交
1 2 3 4 5
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

X
Xiaoyu Wang 已提交
6 7 8 9 10
#include "cJSON.h"
#include "catalog.h"
#include "clientInt.h"
#include "osSemaphore.h"
#include "osThread.h"
wmmhello's avatar
wmmhello 已提交
11 12
#include "query.h"
#include "taos.h"
wmmhello's avatar
wmmhello 已提交
13
#include "taoserror.h"
X
Xiaoyu Wang 已提交
14
#include "tcommon.h"
wmmhello's avatar
wmmhello 已提交
15
#include "tdef.h"
X
Xiaoyu Wang 已提交
16
#include "tglobal.h"
wmmhello's avatar
wmmhello 已提交
17 18
#include "tlog.h"
#include "tmsg.h"
X
Xiaoyu Wang 已提交
19
#include "tname.h"
wmmhello's avatar
wmmhello 已提交
20 21
#include "ttime.h"
#include "ttypes.h"
wmmhello's avatar
wmmhello 已提交
22

wmmhello's avatar
wmmhello 已提交
23
//=================================================================================================
wmmhello's avatar
wmmhello 已提交
24 25 26 27 28 29 30

#define SPACE ' '
#define COMMA ','
#define EQUAL '='
#define QUOTE '"'
#define SLASH '\\'

dengyihao's avatar
dengyihao 已提交
31 32 33 34 35 36
#define JUMP_SPACE(sql, sqlEnd) \
  while (sql < sqlEnd) {        \
    if (*sql == SPACE)          \
      sql++;                    \
    else                        \
      break;                    \
X
Xiaoyu Wang 已提交
37
  }
wmmhello's avatar
wmmhello 已提交
38
// comma ,
X
Xiaoyu Wang 已提交
39 40
#define IS_SLASH_COMMA(sql) (*(sql) == COMMA && *((sql)-1) == SLASH)
#define IS_COMMA(sql)       (*(sql) == COMMA && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
41
// space
X
Xiaoyu Wang 已提交
42 43
#define IS_SLASH_SPACE(sql) (*(sql) == SPACE && *((sql)-1) == SLASH)
#define IS_SPACE(sql)       (*(sql) == SPACE && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
44
// equal =
X
Xiaoyu Wang 已提交
45 46
#define IS_SLASH_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) == SLASH)
#define IS_EQUAL(sql)       (*(sql) == EQUAL && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
47
// quote "
X
Xiaoyu Wang 已提交
48 49
#define IS_SLASH_QUOTE(sql) (*(sql) == QUOTE && *((sql)-1) == SLASH)
#define IS_QUOTE(sql)       (*(sql) == QUOTE && *((sql)-1) != SLASH)
wmmhello's avatar
wmmhello 已提交
50
// SLASH
X
Xiaoyu Wang 已提交
51
#define IS_SLASH_SLASH(sql) (*(sql) == SLASH && *((sql)-1) == SLASH)
wmmhello's avatar
wmmhello 已提交
52

X
Xiaoyu Wang 已提交
53 54
#define IS_SLASH_LETTER(sql) \
  (IS_SLASH_COMMA(sql) || IS_SLASH_SPACE(sql) || IS_SLASH_EQUAL(sql) || IS_SLASH_QUOTE(sql) || IS_SLASH_SLASH(sql))
wmmhello's avatar
wmmhello 已提交
55

X
Xiaoyu Wang 已提交
56
#define MOVE_FORWARD_ONE(sql, len) (memmove((void *)((sql)-1), (sql), len))
wmmhello's avatar
wmmhello 已提交
57

X
Xiaoyu Wang 已提交
58 59 60 61 62 63 64 65
#define PROCESS_SLASH(key, keyLen)           \
  for (int i = 1; i < keyLen; ++i) {         \
    if (IS_SLASH_LETTER(key + i)) {          \
      MOVE_FORWARD_ONE(key + i, keyLen - i); \
      i--;                                   \
      keyLen--;                              \
    }                                        \
  }
wmmhello's avatar
wmmhello 已提交
66

67 68 69
#define IS_INVALID_COL_LEN(len)   ((len) <= 0 || (len) >= TSDB_COL_NAME_LEN)
#define IS_INVALID_TABLE_LEN(len) ((len) <= 0 || (len) >= TSDB_TABLE_NAME_LEN)

70 71 72
#define OTD_JSON_SUB_FIELDS_NUM 2
#define OTD_JSON_FIELDS_NUM     4

X
Xiaoyu Wang 已提交
73 74 75 76
#define TS        "_ts"
#define TS_LEN    3
#define VALUE     "_value"
#define VALUE_LEN 6
77

X
Xiaoyu Wang 已提交
78 79
#define BINARY_ADD_LEN 2  // "binary"   2 means " "
#define NCHAR_ADD_LEN  3  // L"nchar"   3 means L" "
wmmhello's avatar
wmmhello 已提交
80 81

#define MAX_RETRY_TIMES 5
wmmhello's avatar
wmmhello 已提交
82 83 84 85
//=================================================================================================
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;

typedef enum {
86
  SCHEMA_ACTION_NULL,
wmmhello's avatar
wmmhello 已提交
87 88 89 90 91
  SCHEMA_ACTION_CREATE_STABLE,
  SCHEMA_ACTION_ADD_COLUMN,
  SCHEMA_ACTION_ADD_TAG,
  SCHEMA_ACTION_CHANGE_COLUMN_SIZE,
  SCHEMA_ACTION_CHANGE_TAG_SIZE,
wmmhello's avatar
wmmhello 已提交
92 93 94
} ESchemaAction;

typedef struct {
X
Xiaoyu Wang 已提交
95 96 97 98
  const char *measure;
  const char *tags;
  const char *cols;
  const char *timestamp;
wmmhello's avatar
wmmhello 已提交
99 100 101 102 103 104 105 106 107

  int32_t measureLen;
  int32_t measureTagsLen;
  int32_t tagsLen;
  int32_t colsLen;
  int32_t timestampLen;
} SSmlLineInfo;

typedef struct {
X
Xiaoyu Wang 已提交
108 109 110 111
  const char *sTableName;  // super table name
  int32_t     sTableNameLen;
  char        childTableName[TSDB_TABLE_NAME_LEN];
  uint64_t    uid;
wmmhello's avatar
wmmhello 已提交
112

X
Xiaoyu Wang 已提交
113
  SArray *tags;
wmmhello's avatar
wmmhello 已提交
114

115 116
  // if info->formatData is true, elements are SArray<SSmlKv*>.
  // if info->formatData is false, elements are SHashObj<cols key string, SSmlKv*> for find by key quickly
X
Xiaoyu Wang 已提交
117
  SArray *cols;
wmmhello's avatar
wmmhello 已提交
118 119 120
} SSmlTableInfo;

typedef struct {
X
Xiaoyu Wang 已提交
121 122
  SArray   *tags;     // save the origin order to create table
  SHashObj *tagHash;  // elements are <key, index in tags>
123

X
Xiaoyu Wang 已提交
124 125
  SArray   *cols;
  SHashObj *colHash;
126

wmmhello's avatar
wmmhello 已提交
127 128 129 130
  STableMeta *tableMeta;
} SSmlSTableMeta;

typedef struct {
X
Xiaoyu Wang 已提交
131 132
  int32_t len;
  char   *buf;
wmmhello's avatar
wmmhello 已提交
133 134
} SSmlMsgBuf;

135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
typedef struct {
  int32_t code;
  int32_t lineNum;

  int32_t numOfSTables;
  int32_t numOfCTables;
  int32_t numOfCreateSTables;

  int64_t parseTime;
  int64_t schemaTime;
  int64_t insertBindTime;
  int64_t insertRpcTime;
  int64_t endTime;
} SSmlCostInfo;

X
Xiaoyu Wang 已提交
150 151 152
typedef struct {
  SRequestObj     *request;
  tsem_t           sem;
153 154
  int32_t          cnt;
  int32_t          total;
wmmhello's avatar
wmmhello 已提交
155 156 157
  TdThreadSpinlock lock;
} Params;

wmmhello's avatar
wmmhello 已提交
158
typedef struct {
X
Xiaoyu Wang 已提交
159 160 161 162 163
  int64_t id;
  Params *params;

  SMLProtocolType protocol;
  int8_t          precision;
164 165 166
  bool            dataFormat;  // true means that the name and order of keys in each line are the same(only for influx protocol)
  bool            isRawLine;
  int32_t         ttl;
X
Xiaoyu Wang 已提交
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182

  SHashObj *childTables;
  SHashObj *superTables;
  SHashObj *pVgHash;
  void     *exec;

  STscObj     *taos;
  SCatalog    *pCatalog;
  SRequestObj *pRequest;
  SQuery      *pQuery;

  SSmlCostInfo cost;
  int32_t      affectedRows;
  SSmlMsgBuf   msgBuf;
  SHashObj    *dumplicateKey;  // for dumplicate key
  SArray      *colsContainer;  // for cols parse, if dataFormat == false
wmmhello's avatar
wmmhello 已提交
183 184

  cJSON       *root;  // for parse json
wmmhello's avatar
wmmhello 已提交
185
} SSmlHandle;
wmmhello's avatar
wmmhello 已提交
186 187
//=================================================================================================

wmmhello's avatar
wmmhello 已提交
188
//=================================================================================================
189
static volatile int64_t linesSmlHandleId = 0;
X
Xiaoyu Wang 已提交
190 191
static int64_t          smlGenId() {
           int64_t id;
wmmhello's avatar
wmmhello 已提交
192

X
Xiaoyu Wang 已提交
193 194
           do {
             id = atomic_add_fetch_64(&linesSmlHandleId, 1);
wmmhello's avatar
wmmhello 已提交
195 196
  } while (id == 0);

X
Xiaoyu Wang 已提交
197
           return id;
wmmhello's avatar
wmmhello 已提交
198 199
}

200
static inline bool smlDoubleToInt64OverFlow(double num) {
X
Xiaoyu Wang 已提交
201
  if (num >= (double)INT64_MAX || num <= (double)INT64_MIN) return true;
202 203 204 205 206 207 208 209 210 211 212 213
  return false;
}

static inline bool smlCheckDuplicateKey(const char *key, int32_t keyLen, SHashObj *pHash) {
  void *val = taosHashGet(pHash, key, keyLen);
  if (val) {
    return true;
  }
  taosHashPut(pHash, key, keyLen, key, 1);
  return false;
}

X
Xiaoyu Wang 已提交
214
static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf *pBuf, const char *msg1, const char *msg2) {
215
  if (pBuf->buf) {
216 217 218 219 220 221 222
    memset(pBuf->buf, 0, pBuf->len);
    if (msg1) strncat(pBuf->buf, msg1, pBuf->len);
    int32_t left = pBuf->len - strlen(pBuf->buf);
    if (left > 2 && msg2) {
      strncat(pBuf->buf, ":", left - 1);
      strncat(pBuf->buf, msg2, left - 2);
    }
wmmhello's avatar
wmmhello 已提交
223
  }
wmmhello's avatar
wmmhello 已提交
224 225 226
  return TSDB_CODE_SML_INVALID_DATA;
}

X
Xiaoyu Wang 已提交
227
static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSmlKv *kv, bool isTag,
228
                                       ESchemaAction *action, SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
229
  uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL;
230 231
  if (index) {
    if (colField[*index].type != kv->type) {
X
Xiaoyu Wang 已提交
232 233
      uError("SML:0x%" PRIx64 " point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id,
             kv->key, colField[*index].type, kv->type);
234 235 236
      return TSDB_CODE_TSC_INVALID_VALUE;
    }

X
Xiaoyu Wang 已提交
237 238 239 240
    if ((colField[*index].type == TSDB_DATA_TYPE_VARCHAR &&
         (colField[*index].bytes - VARSTR_HEADER_SIZE) < kv->length) ||
        (colField[*index].type == TSDB_DATA_TYPE_NCHAR &&
         ((colField[*index].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE < kv->length))) {
241
      if (isTag) {
wmmhello's avatar
wmmhello 已提交
242
        *action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
243
      } else {
wmmhello's avatar
wmmhello 已提交
244
        *action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
245 246 247 248
      }
    }
  } else {
    if (isTag) {
wmmhello's avatar
wmmhello 已提交
249
      *action = SCHEMA_ACTION_ADD_TAG;
250
    } else {
wmmhello's avatar
wmmhello 已提交
251
      *action = SCHEMA_ACTION_ADD_COLUMN;
252 253
    }
  }
wmmhello's avatar
wmmhello 已提交
254 255 256
  return 0;
}

wmmhello's avatar
wmmhello 已提交
257
static int32_t smlFindNearestPowerOf2(int32_t length, uint8_t type) {
wmmhello's avatar
wmmhello 已提交
258
  int32_t result = 1;
X
Xiaoyu Wang 已提交
259
  while (result <= length) {
wmmhello's avatar
wmmhello 已提交
260 261
    result *= 2;
  }
262
  if (type == TSDB_DATA_TYPE_BINARY && result > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
wmmhello's avatar
wmmhello 已提交
263
    result = TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE;
264
  } else if (type == TSDB_DATA_TYPE_NCHAR && result > (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
wmmhello's avatar
wmmhello 已提交
265 266
    result = (TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
  }
wmmhello's avatar
wmmhello 已提交
267

268
  if (type == TSDB_DATA_TYPE_NCHAR) {
269
    result = result * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
270
  } else if (type == TSDB_DATA_TYPE_BINARY) {
271
    result = result + VARSTR_HEADER_SIZE;
wmmhello's avatar
wmmhello 已提交
272
  }
273
  return result;
wmmhello's avatar
wmmhello 已提交
274 275
}

X
Xiaoyu Wang 已提交
276
static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
277
                                      ESchemaAction *action, bool isTag) {
278 279
  int32_t code = TSDB_CODE_SUCCESS;
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
280
    if (j == 0 && !isTag) continue;
X
Xiaoyu Wang 已提交
281
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, j);
282
    code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, info);
X
Xiaoyu Wang 已提交
283
    if (code != TSDB_CODE_SUCCESS) {
284 285 286 287 288 289
      return code;
    }
  }
  return TSDB_CODE_SUCCESS;
}

290
static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool isTag) {
291
  SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
292 293
  int32_t   i = 0;
  for (; i < length; i++) {
294 295 296
    taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES);
  }

297
  if (isTag) {
298 299 300 301 302
    i = 0;
  } else {
    i = 1;
  }
  for (; i < taosArrayGetSize(cols); i++) {
X
Xiaoyu Wang 已提交
303 304
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
    if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) {
wmmhello's avatar
wmmhello 已提交
305
      taosHashCleanup(hashTmp);
306 307 308
      return -1;
    }
  }
309
  taosHashCleanup(hashTmp);
310 311 312
  return 0;
}

313
static int32_t getBytes(uint8_t type, int32_t length) {
314 315 316 317 318 319 320
  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
    return smlFindNearestPowerOf2(length, type);
  } else {
    return tDataTypes[type].bytes;
  }
}

321 322
static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
                                  SArray *results, int32_t numOfCols, bool isTag) {
wmmhello's avatar
wmmhello 已提交
323
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
324
    SSmlKv       *kv = (SSmlKv *)taosArrayGetP(cols, j);
wmmhello's avatar
wmmhello 已提交
325 326
    ESchemaAction action = SCHEMA_ACTION_NULL;
    smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, &action, info);
327
    if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_ADD_TAG) {
wmmhello's avatar
wmmhello 已提交
328 329 330 331 332
      SField field = {0};
      field.type = kv->type;
      field.bytes = getBytes(kv->type, kv->length);
      memcpy(field.name, kv->key, kv->keyLen);
      taosArrayPush(results, &field);
333
    } else if (action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
wmmhello's avatar
wmmhello 已提交
334
      uint16_t *index = (uint16_t *)taosHashGet(schemaHash, kv->key, kv->keyLen);
335 336
      uint16_t  newIndex = *index;
      if (isTag) newIndex -= numOfCols;
wmmhello's avatar
wmmhello 已提交
337 338 339 340 341 342 343
      SField *field = (SField *)taosArrayGet(results, newIndex);
      field->bytes = getBytes(kv->type, kv->length);
    }
  }
  return TSDB_CODE_SUCCESS;
}

344 345 346 347 348
// static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SSmlSTableMeta *sTableData,
//                               int32_t colVer, int32_t tagVer, int8_t source, uint64_t suid){
static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, SArray *pTags, STableMeta *pTableMeta,
                              ESchemaAction action) {
  SRequestObj   *pRequest = NULL;
349 350 351 352
  SMCreateStbReq pReq = {0};
  int32_t        code = TSDB_CODE_SUCCESS;
  SCmdMsgInfo    pCmdMsg = {0};

wmmhello's avatar
wmmhello 已提交
353 354 355 356 357 358
  // put front for free
  pReq.numOfColumns = taosArrayGetSize(pColumns);
  pReq.pColumns = pColumns;
  pReq.numOfTags = taosArrayGetSize(pTags);
  pReq.pTags = pTags;

dengyihao's avatar
dengyihao 已提交
359
  code = buildRequest(info->taos->id, "", 0, NULL, false, &pRequest, 0);
360 361 362 363
  if (code != TSDB_CODE_SUCCESS) {
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
364
  pRequest->syncQuery = true;
365 366 367 368 369
  if (!pRequest->pDb) {
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
    goto end;
  }

370
  if (action == SCHEMA_ACTION_CREATE_STABLE) {
wmmhello's avatar
wmmhello 已提交
371 372 373 374
    pReq.colVer = 1;
    pReq.tagVer = 1;
    pReq.suid = 0;
    pReq.source = TD_REQ_FROM_APP;
375
  } else if (action == SCHEMA_ACTION_ADD_TAG || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
wmmhello's avatar
wmmhello 已提交
376 377 378 379
    pReq.colVer = pTableMeta->sversion;
    pReq.tagVer = pTableMeta->tversion + 1;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
380
  } else if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE) {
wmmhello's avatar
wmmhello 已提交
381 382 383 384 385 386
    pReq.colVer = pTableMeta->sversion + 1;
    pReq.tagVer = pTableMeta->tversion;
    pReq.suid = pTableMeta->uid;
    pReq.source = TD_REQ_FROM_TAOX;
  }

387
  if (pReq.numOfTags == 0) {
wmmhello's avatar
wmmhello 已提交
388 389 390 391 392 393 394 395
    pReq.numOfTags = 1;
    SField field = {0};
    field.type = TSDB_DATA_TYPE_NCHAR;
    field.bytes = 1;
    strcpy(field.name, tsSmlTagName);
    taosArrayPush(pReq.pTags, &field);
  }

396 397 398 399 400 401 402 403 404 405 406 407 408 409
  pReq.commentLen = -1;
  pReq.igExists = true;
  tNameExtractFullName(pName, pReq.name);

  pCmdMsg.epSet = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
  if (NULL == pCmdMsg.pMsg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto end;
  }
  tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);

D
dapan1121 已提交
410 411
  SQuery pQuery;
  memset(&pQuery, 0, sizeof(pQuery));
412 413 414 415 416 417 418
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
  pQuery.pCmdMsg = &pCmdMsg;
  pQuery.msgType = pQuery.pCmdMsg->msgType;
  pQuery.stableQuery = true;

  launchQueryImpl(pRequest, &pQuery, true, NULL);

419
  if (pRequest->code == TSDB_CODE_SUCCESS) {
420 421 422 423 424 425 426 427 428 429 430
    catalogRemoveTableMeta(info->pCatalog, pName);
  }
  code = pRequest->code;
  taosMemoryFree(pCmdMsg.pMsg);

end:
  destroyRequest(pRequest);
  tFreeSMCreateStbReq(&pReq);
  return code;
}

X
Xiaoyu Wang 已提交
431
static int32_t smlModifyDBSchemas(SSmlHandle *info) {
432 433 434
  int32_t     code = 0;
  SHashObj   *hashTmp = NULL;
  STableMeta *pTableMeta = NULL;
wmmhello's avatar
wmmhello 已提交
435

436
  SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
wmmhello's avatar
wmmhello 已提交
437
  tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
wmmhello's avatar
wmmhello 已提交
438

D
dapan1121 已提交
439 440 441 442 443
  SRequestConnInfo conn = {0};
  conn.pTrans = info->taos->pAppInfo->pTransporter;
  conn.requestId = info->pRequest->requestId;
  conn.requestObjRefId = info->pRequest->self;
  conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
X
Xiaoyu Wang 已提交
444 445

  SSmlSTableMeta **tableMetaSml = (SSmlSTableMeta **)taosHashIterate(info->superTables, NULL);
wmmhello's avatar
wmmhello 已提交
446
  while (tableMetaSml) {
X
Xiaoyu Wang 已提交
447 448
    SSmlSTableMeta *sTableData = *tableMetaSml;
    bool            needCheckMeta = false;  // for multi thread
wmmhello's avatar
wmmhello 已提交
449

wmmhello's avatar
wmmhello 已提交
450
    size_t superTableLen = 0;
X
Xiaoyu Wang 已提交
451
    void  *superTable = taosHashGetKey(tableMetaSml, &superTableLen);
452
    memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
wmmhello's avatar
wmmhello 已提交
453
    memcpy(pName.tname, superTable, superTableLen);
wmmhello's avatar
wmmhello 已提交
454

D
dapan1121 已提交
455
    code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
wmmhello's avatar
wmmhello 已提交
456

457
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
458 459
      SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
      SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
wmmhello's avatar
wmmhello 已提交
460 461 462 463
      smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
      smlBuildFieldsList(info, NULL, NULL, sTableData->cols, pColumns, 0, false);

      code = smlSendMetaMsg(info, &pName, pColumns, pTags, NULL, SCHEMA_ACTION_CREATE_STABLE);
464
      if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
465
        uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
466
        goto end;
wmmhello's avatar
wmmhello 已提交
467
      }
468
      info->cost.numOfCreateSTables++;
wmmhello's avatar
wmmhello 已提交
469 470 471 472 473 474 475
      taosMemoryFreeClear(pTableMeta);

      code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
        goto end;
      }
X
Xiaoyu Wang 已提交
476
    } else if (code == TSDB_CODE_SUCCESS) {
477 478
      hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true,
                             HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
479 480
      for (uint16_t i = pTableMeta->tableInfo.numOfColumns;
           i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
481 482
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
      }
wmmhello's avatar
wmmhello 已提交
483

484 485
      ESchemaAction action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, &action, true);
486
      if (code != TSDB_CODE_SUCCESS) {
487
        goto end;
488
      }
489 490 491 492 493
      if (action != SCHEMA_ACTION_NULL) {
        SArray *pColumns =
            taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray *pTags =
            taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
wmmhello's avatar
wmmhello 已提交
494 495 496 497 498 499

        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
500
          if (i < pTableMeta->tableInfo.numOfColumns) {
wmmhello's avatar
wmmhello 已提交
501
            taosArrayPush(pColumns, &field);
502
          } else {
wmmhello's avatar
wmmhello 已提交
503 504 505
            taosArrayPush(pTags, &field);
          }
        }
506 507
        smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->tags, pTags,
                           pTableMeta->tableInfo.numOfColumns, true);
wmmhello's avatar
wmmhello 已提交
508 509

        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
510
        if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
511
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
512 513 514
          goto end;
        }

wmmhello's avatar
wmmhello 已提交
515 516 517 518 519 520 521 522 523
        taosMemoryFreeClear(pTableMeta);
        code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
        }
        code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
        }
wmmhello's avatar
wmmhello 已提交
524
      }
525 526

      taosHashClear(hashTmp);
wmmhello's avatar
wmmhello 已提交
527
      for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
528 529
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
      }
530 531
      action = SCHEMA_ACTION_NULL;
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &action, false);
532
      if (code != TSDB_CODE_SUCCESS) {
533
        goto end;
wmmhello's avatar
wmmhello 已提交
534
      }
535 536 537 538 539
      if (action != SCHEMA_ACTION_NULL) {
        SArray *pColumns =
            taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
        SArray *pTags =
            taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
wmmhello's avatar
wmmhello 已提交
540 541 542 543 544 545

        for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
          SField field = {0};
          field.type = pTableMeta->schema[i].type;
          field.bytes = pTableMeta->schema[i].bytes;
          strcpy(field.name, pTableMeta->schema[i].name);
546
          if (i < pTableMeta->tableInfo.numOfColumns) {
wmmhello's avatar
wmmhello 已提交
547
            taosArrayPush(pColumns, &field);
548
          } else {
wmmhello's avatar
wmmhello 已提交
549 550 551 552
            taosArrayPush(pTags, &field);
          }
        }

553 554
        smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->cols, pColumns,
                           pTableMeta->tableInfo.numOfColumns, false);
wmmhello's avatar
wmmhello 已提交
555 556

        code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
557
        if (code != TSDB_CODE_SUCCESS) {
558
          uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
559 560
          goto end;
        }
wmmhello's avatar
wmmhello 已提交
561

wmmhello's avatar
wmmhello 已提交
562
        taosMemoryFreeClear(pTableMeta);
wmmhello's avatar
wmmhello 已提交
563 564 565 566
        code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
        if (code != TSDB_CODE_SUCCESS) {
          goto end;
        }
567 568 569 570 571
        code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
          goto end;
        }
wmmhello's avatar
wmmhello 已提交
572
      }
wmmhello's avatar
wmmhello 已提交
573

574
      needCheckMeta = true;
wmmhello's avatar
wmmhello 已提交
575 576
      taosHashCleanup(hashTmp);
      hashTmp = NULL;
wmmhello's avatar
wmmhello 已提交
577
    } else {
X
Xiaoyu Wang 已提交
578
      uError("SML:0x%" PRIx64 " load table meta error: %s", info->id, tstrerror(code));
579
      goto end;
wmmhello's avatar
wmmhello 已提交
580
    }
581

X
Xiaoyu Wang 已提交
582 583
    if (needCheckMeta) {
      code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags,
584
                          sTableData->tags, true);
585
      if (code != TSDB_CODE_SUCCESS) {
586
        uError("SML:0x%" PRIx64 " check tag failed. super table name %s", info->id, pName.tname);
587 588
        goto end;
      }
589
      code = smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols, false);
590
      if (code != TSDB_CODE_SUCCESS) {
591
        uError("SML:0x%" PRIx64 " check cols failed. super table name %s", info->id, pName.tname);
592 593 594 595
        goto end;
      }
    }

596
    sTableData->tableMeta = pTableMeta;
wmmhello's avatar
wmmhello 已提交
597

X
Xiaoyu Wang 已提交
598
    tableMetaSml = (SSmlSTableMeta **)taosHashIterate(info->superTables, tableMetaSml);
wmmhello's avatar
wmmhello 已提交
599 600
  }
  return 0;
601 602

end:
wmmhello's avatar
wmmhello 已提交
603 604
  taosHashCleanup(hashTmp);
  taosMemoryFreeClear(pTableMeta);
wmmhello's avatar
wmmhello 已提交
605
//  catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
606
  return code;
wmmhello's avatar
wmmhello 已提交
607 608
}

X
Xiaoyu Wang 已提交
609
static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
610
  const char *pVal = kvVal->value;
X
Xiaoyu Wang 已提交
611 612 613 614
  int32_t     len = kvVal->length;
  char       *endptr = NULL;
  double      result = taosStr2Double(pVal, &endptr);
  if (pVal == endptr) {
615
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
616 617 618
    return false;
  }

619
  int32_t left = len - (endptr - pVal);
X
Xiaoyu Wang 已提交
620
  if (left == 0 || (left == 3 && strncasecmp(endptr, "f64", left) == 0)) {
621 622
    kvVal->type = TSDB_DATA_TYPE_DOUBLE;
    kvVal->d = result;
X
Xiaoyu Wang 已提交
623 624
  } else if ((left == 3 && strncasecmp(endptr, "f32", left) == 0)) {
    if (!IS_VALID_FLOAT(result)) {
625 626
      smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
627
    }
628 629
    kvVal->type = TSDB_DATA_TYPE_FLOAT;
    kvVal->f = (float)result;
X
Xiaoyu Wang 已提交
630 631
  } else if ((left == 1 && *endptr == 'i') || (left == 3 && strncasecmp(endptr, "i64", left) == 0)) {
    if (smlDoubleToInt64OverFlow(result)) {
wmmhello's avatar
wmmhello 已提交
632 633
      errno = 0;
      int64_t tmp = taosStr2Int64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
634
      if (errno == ERANGE) {
wmmhello's avatar
wmmhello 已提交
635 636 637 638 639 640
        smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_BIGINT;
      kvVal->i = tmp;
      return true;
wmmhello's avatar
wmmhello 已提交
641
    }
642
    kvVal->type = TSDB_DATA_TYPE_BIGINT;
wmmhello's avatar
wmmhello 已提交
643
    kvVal->i = (int64_t)result;
wmmhello's avatar
wmmhello 已提交
644
  } else if ((left == 1 && *endptr == 'u') || (left == 3 && strncasecmp(endptr, "u64", left) == 0)) {
X
Xiaoyu Wang 已提交
645
    if (result >= (double)UINT64_MAX || result < 0) {
wmmhello's avatar
wmmhello 已提交
646 647
      errno = 0;
      uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10);
X
Xiaoyu Wang 已提交
648
      if (errno == ERANGE || result < 0) {
wmmhello's avatar
wmmhello 已提交
649 650 651 652 653 654
        smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_UBIGINT;
      kvVal->u = tmp;
      return true;
655
    }
656
    kvVal->type = TSDB_DATA_TYPE_UBIGINT;
wmmhello's avatar
wmmhello 已提交
657
    kvVal->u = result;
X
Xiaoyu Wang 已提交
658 659
  } else if (left == 3 && strncasecmp(endptr, "i32", left) == 0) {
    if (!IS_VALID_INT(result)) {
660 661
      smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
662
    }
663 664
    kvVal->type = TSDB_DATA_TYPE_INT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
665 666
  } else if (left == 3 && strncasecmp(endptr, "u32", left) == 0) {
    if (!IS_VALID_UINT(result)) {
667 668
      smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
669
    }
670 671
    kvVal->type = TSDB_DATA_TYPE_UINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
672 673
  } else if (left == 3 && strncasecmp(endptr, "i16", left) == 0) {
    if (!IS_VALID_SMALLINT(result)) {
674 675
      smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
676
    }
677 678
    kvVal->type = TSDB_DATA_TYPE_SMALLINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
679 680
  } else if (left == 3 && strncasecmp(endptr, "u16", left) == 0) {
    if (!IS_VALID_USMALLINT(result)) {
681 682
      smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
683
    }
684 685
    kvVal->type = TSDB_DATA_TYPE_USMALLINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
686 687
  } else if (left == 2 && strncasecmp(endptr, "i8", left) == 0) {
    if (!IS_VALID_TINYINT(result)) {
688 689
      smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
690
    }
691 692
    kvVal->type = TSDB_DATA_TYPE_TINYINT;
    kvVal->i = result;
X
Xiaoyu Wang 已提交
693 694
  } else if (left == 2 && strncasecmp(endptr, "u8", left) == 0) {
    if (!IS_VALID_UTINYINT(result)) {
695 696
      smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
697
    }
698 699
    kvVal->type = TSDB_DATA_TYPE_UTINYINT;
    kvVal->u = result;
X
Xiaoyu Wang 已提交
700
  } else {
701
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
702 703
    return false;
  }
704
  return true;
wmmhello's avatar
wmmhello 已提交
705 706
}

wmmhello's avatar
wmmhello 已提交
707
static bool smlParseBool(SSmlKv *kvVal) {
wmmhello's avatar
wmmhello 已提交
708
  const char *pVal = kvVal->value;
X
Xiaoyu Wang 已提交
709
  int32_t     len = kvVal->length;
710
  if ((len == 1) && (pVal[0] == 't' || pVal[0] == 'T')) {
wmmhello's avatar
wmmhello 已提交
711
    kvVal->i = true;
wmmhello's avatar
wmmhello 已提交
712 713 714
    return true;
  }

715
  if ((len == 1) && (pVal[0] == 'f' || pVal[0] == 'F')) {
wmmhello's avatar
wmmhello 已提交
716
    kvVal->i = false;
wmmhello's avatar
wmmhello 已提交
717 718 719
    return true;
  }

X
Xiaoyu Wang 已提交
720
  if ((len == 4) && !strncasecmp(pVal, "true", len)) {
wmmhello's avatar
wmmhello 已提交
721
    kvVal->i = true;
wmmhello's avatar
wmmhello 已提交
722 723
    return true;
  }
X
Xiaoyu Wang 已提交
724
  if ((len == 5) && !strncasecmp(pVal, "false", len)) {
wmmhello's avatar
wmmhello 已提交
725
    kvVal->i = false;
wmmhello's avatar
wmmhello 已提交
726 727 728 729 730
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
731
static bool smlIsBinary(const char *pVal, uint16_t len) {
X
Xiaoyu Wang 已提交
732
  // binary: "abc"
wmmhello's avatar
wmmhello 已提交
733 734 735 736 737 738 739 740 741
  if (len < 2) {
    return false;
  }
  if (pVal[0] == '"' && pVal[len - 1] == '"') {
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
742
static bool smlIsNchar(const char *pVal, uint16_t len) {
X
Xiaoyu Wang 已提交
743
  // nchar: L"abc"
wmmhello's avatar
wmmhello 已提交
744 745 746
  if (len < 3) {
    return false;
  }
X
Xiaoyu Wang 已提交
747
  if ((pVal[0] == 'l' || pVal[0] == 'L') && pVal[1] == '"' && pVal[len - 1] == '"') {
wmmhello's avatar
wmmhello 已提交
748 749 750 751 752
    return true;
  }
  return false;
}

753
static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) {
X
Xiaoyu Wang 已提交
754
  char   *endPtr = NULL;
wafwerar's avatar
wafwerar 已提交
755
  int64_t tsInt64 = taosStr2Int64(value, &endPtr, 10);
X
Xiaoyu Wang 已提交
756
  if (value + len != endPtr) {
757
    return -1;
wmmhello's avatar
wmmhello 已提交
758
  }
759
  double ts = tsInt64;
760 761
  switch (type) {
    case TSDB_TIME_PRECISION_HOURS:
wmmhello's avatar
wmmhello 已提交
762 763
      ts *= NANOSECOND_PER_HOUR;
      tsInt64 *= NANOSECOND_PER_HOUR;
764 765
      break;
    case TSDB_TIME_PRECISION_MINUTES:
wmmhello's avatar
wmmhello 已提交
766 767
      ts *= NANOSECOND_PER_MINUTE;
      tsInt64 *= NANOSECOND_PER_MINUTE;
768 769
      break;
    case TSDB_TIME_PRECISION_SECONDS:
wmmhello's avatar
wmmhello 已提交
770 771
      ts *= NANOSECOND_PER_SEC;
      tsInt64 *= NANOSECOND_PER_SEC;
772 773
      break;
    case TSDB_TIME_PRECISION_MILLI:
wmmhello's avatar
wmmhello 已提交
774 775
      ts *= NANOSECOND_PER_MSEC;
      tsInt64 *= NANOSECOND_PER_MSEC;
776 777
      break;
    case TSDB_TIME_PRECISION_MICRO:
wmmhello's avatar
wmmhello 已提交
778 779
      ts *= NANOSECOND_PER_USEC;
      tsInt64 *= NANOSECOND_PER_USEC;
780 781 782 783 784
      break;
    case TSDB_TIME_PRECISION_NANO:
      break;
    default:
      ASSERT(0);
wmmhello's avatar
wmmhello 已提交
785
  }
X
Xiaoyu Wang 已提交
786
  if (ts >= (double)INT64_MAX || ts < 0) {
787
    return -1;
wmmhello's avatar
wmmhello 已提交
788 789
  }

790
  return tsInt64;
791 792 793 794 795 796
}

static int8_t smlGetTsTypeByLen(int32_t len) {
  if (len == TSDB_TIME_PRECISION_SEC_DIGITS) {
    return TSDB_TIME_PRECISION_SECONDS;
  } else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) {
797
    return TSDB_TIME_PRECISION_MILLI;
798 799
  } else {
    return -1;
wmmhello's avatar
wmmhello 已提交
800
  }
801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819
}

static int8_t smlGetTsTypeByPrecision(int8_t precision) {
  switch (precision) {
    case TSDB_SML_TIMESTAMP_HOURS:
      return TSDB_TIME_PRECISION_HOURS;
    case TSDB_SML_TIMESTAMP_MILLI_SECONDS:
      return TSDB_TIME_PRECISION_MILLI;
    case TSDB_SML_TIMESTAMP_NANO_SECONDS:
    case TSDB_SML_TIMESTAMP_NOT_CONFIGURED:
      return TSDB_TIME_PRECISION_NANO;
    case TSDB_SML_TIMESTAMP_MICRO_SECONDS:
      return TSDB_TIME_PRECISION_MICRO;
    case TSDB_SML_TIMESTAMP_SECONDS:
      return TSDB_TIME_PRECISION_SECONDS;
    case TSDB_SML_TIMESTAMP_MINUTES:
      return TSDB_TIME_PRECISION_MINUTES;
    default:
      return -1;
wmmhello's avatar
wmmhello 已提交
820
  }
821 822
}

X
Xiaoyu Wang 已提交
823
static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t len) {
wmmhello's avatar
wmmhello 已提交
824
  void *tmp = taosMemoryCalloc(1, len + 1);
wmmhello's avatar
wmmhello 已提交
825
  memcpy(tmp, data, len);
wmmhello's avatar
wmmhello 已提交
826
  uDebug("SML:0x%" PRIx64 " smlParseInfluxTime tslen:%d, ts:%s", info->id, len, (char*)tmp);
wmmhello's avatar
wmmhello 已提交
827 828
  taosMemoryFree(tmp);

X
Xiaoyu Wang 已提交
829
  if (len == 0 || (len == 1 && data[0] == '0')) {
830
    return taosGetTimestampNs();
wmmhello's avatar
wmmhello 已提交
831 832
  }

833 834 835 836
  int8_t tsType = smlGetTsTypeByPrecision(info->precision);
  if (tsType == -1) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp precision", NULL);
    return -1;
wmmhello's avatar
wmmhello 已提交
837
  }
838 839

  int64_t ts = smlGetTimeValue(data, len, tsType);
X
Xiaoyu Wang 已提交
840
  if (ts == -1) {
841 842
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
    return -1;
wmmhello's avatar
wmmhello 已提交
843
  }
844 845 846
  return ts;
}

X
Xiaoyu Wang 已提交
847 848
static int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t len) {
  if (!data) {
849 850
    smlBuildInvalidDataMsg(&info->msgBuf, "timestamp can not be null", NULL);
    return -1;
wmmhello's avatar
wmmhello 已提交
851
  }
X
Xiaoyu Wang 已提交
852
  if (len == 1 && data[0] == '0') {
853 854
    return taosGetTimestampNs();
  }
855 856
  int8_t tsType = smlGetTsTypeByLen(len);
  if (tsType == -1) {
X
Xiaoyu Wang 已提交
857 858
    smlBuildInvalidDataMsg(&info->msgBuf,
                           "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", data);
859 860 861
    return -1;
  }
  int64_t ts = smlGetTimeValue(data, len, tsType);
X
Xiaoyu Wang 已提交
862
  if (ts == -1) {
863 864 865 866 867 868
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
    return -1;
  }
  return ts;
}

X
Xiaoyu Wang 已提交
869
static int32_t smlParseTS(SSmlHandle *info, const char *data, int32_t len, SArray *cols) {
870
  int64_t ts = 0;
X
Xiaoyu Wang 已提交
871
  if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
872
    //    uError("SML:data:%s,len:%d", data, len);
873
    ts = smlParseInfluxTime(info, data, len);
X
Xiaoyu Wang 已提交
874
  } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
875
    ts = smlParseOpenTsdbTime(info, data, len);
X
Xiaoyu Wang 已提交
876
  } else {
877
    ASSERT(0);
878
  }
wmmhello's avatar
wmmhello 已提交
879
  uDebug("SML:0x%" PRIx64 " smlParseTS:%" PRId64, info->id, ts);
880

wmmhello's avatar
wmmhello 已提交
881
  if (ts == -1) return TSDB_CODE_INVALID_TIMESTAMP;
882 883 884

  // add ts to
  SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
885
  if (!kv) {
886 887 888 889 890 891 892 893
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  kv->key = TS;
  kv->keyLen = TS_LEN;
  kv->i = ts;
  kv->type = TSDB_DATA_TYPE_TIMESTAMP;
  kv->length = (int16_t)tDataTypes[kv->type].bytes;
wmmhello's avatar
wmmhello 已提交
894 895
  taosArrayPush(cols, &kv);

896 897 898
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
899
static int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
X
Xiaoyu Wang 已提交
900
  // binary
wmmhello's avatar
wmmhello 已提交
901
  if (smlIsBinary(pVal->value, pVal->length)) {
902
    pVal->type = TSDB_DATA_TYPE_BINARY;
wmmhello's avatar
wmmhello 已提交
903
    pVal->length -= BINARY_ADD_LEN;
904
    if (pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
wmmhello's avatar
wmmhello 已提交
905 906
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }
907
    pVal->value += (BINARY_ADD_LEN - 1);
wmmhello's avatar
wmmhello 已提交
908
    return TSDB_CODE_SUCCESS;
909
  }
X
Xiaoyu Wang 已提交
910
  // nchar
wmmhello's avatar
wmmhello 已提交
911
  if (smlIsNchar(pVal->value, pVal->length)) {
912
    pVal->type = TSDB_DATA_TYPE_NCHAR;
wmmhello's avatar
wmmhello 已提交
913
    pVal->length -= NCHAR_ADD_LEN;
914
    if (pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
wmmhello's avatar
wmmhello 已提交
915 916
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }
917
    pVal->value += (NCHAR_ADD_LEN - 1);
wmmhello's avatar
wmmhello 已提交
918
    return TSDB_CODE_SUCCESS;
919 920
  }

X
Xiaoyu Wang 已提交
921
  // bool
922 923 924
  if (smlParseBool(pVal)) {
    pVal->type = TSDB_DATA_TYPE_BOOL;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
wmmhello's avatar
wmmhello 已提交
925
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
926
  }
X
Xiaoyu Wang 已提交
927
  // number
928
  if (smlParseNumber(pVal, msg)) {
wmmhello's avatar
wmmhello 已提交
929
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
wmmhello's avatar
wmmhello 已提交
930
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
931 932
  }

wmmhello's avatar
wmmhello 已提交
933
  return TSDB_CODE_TSC_INVALID_VALUE;
wmmhello's avatar
wmmhello 已提交
934 935
}

wmmhello's avatar
wmmhello 已提交
936
static int32_t smlParseInfluxString(const char *sql, const char *sqlEnd, SSmlLineInfo *elements, SSmlMsgBuf *msg) {
X
Xiaoyu Wang 已提交
937
  if (!sql) return TSDB_CODE_SML_INVALID_DATA;
wmmhello's avatar
wmmhello 已提交
938
  JUMP_SPACE(sql, sqlEnd)
X
Xiaoyu Wang 已提交
939
  if (*sql == COMMA) return TSDB_CODE_SML_INVALID_DATA;
wmmhello's avatar
wmmhello 已提交
940
  elements->measure = sql;
wmmhello's avatar
wmmhello 已提交
941

wmmhello's avatar
wmmhello 已提交
942
  // parse measure
wmmhello's avatar
wmmhello 已提交
943
  while (sql < sqlEnd) {
X
Xiaoyu Wang 已提交
944
    if ((sql != elements->measure) && IS_SLASH_LETTER(sql)) {
wmmhello's avatar
wmmhello 已提交
945 946
      MOVE_FORWARD_ONE(sql, sqlEnd - sql);
      sqlEnd--;
wmmhello's avatar
wmmhello 已提交
947 948
      continue;
    }
X
Xiaoyu Wang 已提交
949
    if (IS_COMMA(sql)) {
wmmhello's avatar
wmmhello 已提交
950 951 952
      break;
    }

X
Xiaoyu Wang 已提交
953
    if (IS_SPACE(sql)) {
wmmhello's avatar
wmmhello 已提交
954 955
      break;
    }
wmmhello's avatar
wmmhello 已提交
956 957
    sql++;
  }
wmmhello's avatar
wmmhello 已提交
958
  elements->measureLen = sql - elements->measure;
X
Xiaoyu Wang 已提交
959
  if (IS_INVALID_TABLE_LEN(elements->measureLen)) {
960
    smlBuildInvalidDataMsg(msg, "measure is empty or too large than 192", NULL);
wmmhello's avatar
wmmhello 已提交
961
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
wmmhello's avatar
wmmhello 已提交
962
  }
wmmhello's avatar
wmmhello 已提交
963

wmmhello's avatar
wmmhello 已提交
964
  // parse tag
X
Xiaoyu Wang 已提交
965
  if (*sql == SPACE) {
wmmhello's avatar
wmmhello 已提交
966
    elements->tagsLen = 0;
X
Xiaoyu Wang 已提交
967 968
  } else {
    if (*sql == COMMA) sql++;
wmmhello's avatar
wmmhello 已提交
969
    elements->tags = sql;
wmmhello's avatar
wmmhello 已提交
970
    while (sql < sqlEnd) {
X
Xiaoyu Wang 已提交
971
      if (IS_SPACE(sql)) {
wmmhello's avatar
wmmhello 已提交
972 973 974
        break;
      }
      sql++;
wmmhello's avatar
wmmhello 已提交
975
    }
wmmhello's avatar
wmmhello 已提交
976
    elements->tagsLen = sql - elements->tags;
977
  }
wmmhello's avatar
wmmhello 已提交
978
  elements->measureTagsLen = sql - elements->measure;
wmmhello's avatar
wmmhello 已提交
979

wmmhello's avatar
wmmhello 已提交
980
  // parse cols
wmmhello's avatar
wmmhello 已提交
981
  JUMP_SPACE(sql, sqlEnd)
wmmhello's avatar
wmmhello 已提交
982
  elements->cols = sql;
wmmhello's avatar
wmmhello 已提交
983
  bool isInQuote = false;
wmmhello's avatar
wmmhello 已提交
984
  while (sql < sqlEnd) {
X
Xiaoyu Wang 已提交
985
    if (IS_QUOTE(sql)) {
wmmhello's avatar
wmmhello 已提交
986 987
      isInQuote = !isInQuote;
    }
X
Xiaoyu Wang 已提交
988
    if (!isInQuote && IS_SPACE(sql)) {
wmmhello's avatar
wmmhello 已提交
989 990 991 992
      break;
    }
    sql++;
  }
X
Xiaoyu Wang 已提交
993
  if (isInQuote) {
994 995 996
    smlBuildInvalidDataMsg(msg, "only one quote", elements->cols);
    return TSDB_CODE_SML_INVALID_DATA;
  }
wmmhello's avatar
wmmhello 已提交
997
  elements->colsLen = sql - elements->cols;
X
Xiaoyu Wang 已提交
998
  if (elements->colsLen == 0) {
wmmhello's avatar
wmmhello 已提交
999 1000 1001
    smlBuildInvalidDataMsg(msg, "cols is empty", NULL);
    return TSDB_CODE_SML_INVALID_DATA;
  }
wmmhello's avatar
wmmhello 已提交
1002

wmmhello's avatar
wmmhello 已提交
1003
  // parse timestamp
wmmhello's avatar
wmmhello 已提交
1004
  JUMP_SPACE(sql, sqlEnd)
wmmhello's avatar
wmmhello 已提交
1005
  elements->timestamp = sql;
wmmhello's avatar
wmmhello 已提交
1006 1007
  while (sql < sqlEnd) {
    if (isspace(*sql)) {
wmmhello's avatar
wmmhello 已提交
1008 1009 1010 1011
      break;
    }
    sql++;
  }
wmmhello's avatar
wmmhello 已提交
1012
  elements->timestampLen = sql - elements->timestamp;
wmmhello's avatar
wmmhello 已提交
1013 1014 1015 1016

  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
1017 1018
static void smlParseTelnetElement(const char **sql, const char *sqlEnd, const char **data, int32_t *len) {
  while (*sql < sqlEnd) {
X
Xiaoyu Wang 已提交
1019
    if (**sql != SPACE && !(*data)) {
1020
      *data = *sql;
X
Xiaoyu Wang 已提交
1021
    } else if (**sql == SPACE && *data) {
1022 1023 1024 1025 1026 1027 1028
      *len = *sql - *data;
      break;
    }
    (*sql)++;
  }
}

dengyihao's avatar
dengyihao 已提交
1029 1030 1031
static int32_t smlParseTelnetTags(const char *data, const char *sqlEnd, SArray *cols, char *childTableName,
                                  SHashObj *dumplicateKey, SSmlMsgBuf *msg) {
  if (!cols) return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
1032
  const char *sql = data;
X
Xiaoyu Wang 已提交
1033
  size_t      childTableNameLen = strlen(tsSmlChildTableName);
wmmhello's avatar
wmmhello 已提交
1034 1035
  while (sql < sqlEnd) {
    JUMP_SPACE(sql, sqlEnd)
X
Xiaoyu Wang 已提交
1036
    if (*sql == '\0') break;
wmmhello's avatar
wmmhello 已提交
1037

wmmhello's avatar
wmmhello 已提交
1038
    const char *key = sql;
X
Xiaoyu Wang 已提交
1039
    int32_t     keyLen = 0;
wmmhello's avatar
wmmhello 已提交
1040 1041

    // parse key
wmmhello's avatar
wmmhello 已提交
1042
    while (sql < sqlEnd) {
X
Xiaoyu Wang 已提交
1043
      if (*sql == SPACE) {
wmmhello's avatar
wmmhello 已提交
1044 1045 1046
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
X
Xiaoyu Wang 已提交
1047
      if (*sql == EQUAL) {
wmmhello's avatar
wmmhello 已提交
1048 1049
        keyLen = sql - key;
        sql++;
1050 1051
        break;
      }
wmmhello's avatar
wmmhello 已提交
1052
      sql++;
1053
    }
wmmhello's avatar
wmmhello 已提交
1054

X
Xiaoyu Wang 已提交
1055
    if (IS_INVALID_COL_LEN(keyLen)) {
1056
      smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
wmmhello's avatar
wmmhello 已提交
1057
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
1058
    }
X
Xiaoyu Wang 已提交
1059
    if (smlCheckDuplicateKey(key, keyLen, dumplicateKey)) {
1060
      smlBuildInvalidDataMsg(msg, "dumplicate key", key);
wmmhello's avatar
wmmhello 已提交
1061
      return TSDB_CODE_TSC_DUP_NAMES;
1062 1063 1064
    }

    // parse value
wmmhello's avatar
wmmhello 已提交
1065
    const char *value = sql;
X
Xiaoyu Wang 已提交
1066
    int32_t     valueLen = 0;
wmmhello's avatar
wmmhello 已提交
1067
    while (sql < sqlEnd) {
wmmhello's avatar
wmmhello 已提交
1068 1069
      // parse value
      if (*sql == SPACE) {
1070 1071
        break;
      }
wmmhello's avatar
wmmhello 已提交
1072 1073 1074 1075 1076
      if (*sql == EQUAL) {
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      sql++;
1077
    }
wmmhello's avatar
wmmhello 已提交
1078
    valueLen = sql - value;
wmmhello's avatar
wmmhello 已提交
1079

X
Xiaoyu Wang 已提交
1080
    if (valueLen == 0) {
1081
      smlBuildInvalidDataMsg(msg, "invalid value", value);
wmmhello's avatar
wmmhello 已提交
1082
      return TSDB_CODE_TSC_INVALID_VALUE;
1083
    }
wmmhello's avatar
wmmhello 已提交
1084

X
Xiaoyu Wang 已提交
1085 1086
    // handle child table name
    if (childTableNameLen != 0 && strncmp(key, tsSmlChildTableName, keyLen) == 0) {
1087 1088 1089 1090 1091
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, value, (valueLen < TSDB_TABLE_NAME_LEN ? valueLen : TSDB_TABLE_NAME_LEN));
      continue;
    }

1092
    if (valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
wmmhello's avatar
wmmhello 已提交
1093 1094 1095
      return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
    }

1096 1097
    // add kv to SSmlKv
    SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1098
    if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
1099 1100 1101
    kv->key = key;
    kv->keyLen = keyLen;
    kv->value = value;
wmmhello's avatar
wmmhello 已提交
1102
    kv->length = valueLen;
wmmhello's avatar
wmmhello 已提交
1103
    kv->type = TSDB_DATA_TYPE_NCHAR;
1104

wmmhello's avatar
wmmhello 已提交
1105
    taosArrayPush(cols, &kv);
1106 1107 1108 1109
  }

  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
1110

1111
// format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
dengyihao's avatar
dengyihao 已提交
1112 1113
static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, const char *sqlEnd, SSmlTableInfo *tinfo,
                                    SArray *cols) {
X
Xiaoyu Wang 已提交
1114
  if (!sql) return TSDB_CODE_SML_INVALID_DATA;
1115 1116

  // parse metric
wmmhello's avatar
wmmhello 已提交
1117
  smlParseTelnetElement(&sql, sqlEnd, &tinfo->sTableName, &tinfo->sTableNameLen);
1118
  if (!(tinfo->sTableName) || IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
1119
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
wmmhello's avatar
wmmhello 已提交
1120
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
1121 1122 1123 1124
  }

  // parse timestamp
  const char *timestamp = NULL;
X
Xiaoyu Wang 已提交
1125
  int32_t     tLen = 0;
wmmhello's avatar
wmmhello 已提交
1126
  smlParseTelnetElement(&sql, sqlEnd, &timestamp, &tLen);
1127 1128 1129 1130 1131 1132 1133 1134
  if (!timestamp || tLen == 0) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
    return TSDB_CODE_SML_INVALID_DATA;
  }

  int32_t ret = smlParseTS(info, timestamp, tLen, cols);
  if (ret != TSDB_CODE_SUCCESS) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
wmmhello's avatar
wmmhello 已提交
1135
    return ret;
1136 1137 1138 1139
  }

  // parse value
  const char *value = NULL;
X
Xiaoyu Wang 已提交
1140
  int32_t     valueLen = 0;
wmmhello's avatar
wmmhello 已提交
1141
  smlParseTelnetElement(&sql, sqlEnd, &value, &valueLen);
1142 1143
  if (!value || valueLen == 0) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql);
wmmhello's avatar
wmmhello 已提交
1144
    return TSDB_CODE_TSC_INVALID_VALUE;
1145 1146 1147
  }

  SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1148
  if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
1149 1150 1151 1152
  taosArrayPush(cols, &kv);
  kv->key = VALUE;
  kv->keyLen = VALUE_LEN;
  kv->value = value;
wmmhello's avatar
wmmhello 已提交
1153
  kv->length = valueLen;
wmmhello's avatar
wmmhello 已提交
1154 1155
  if ((ret = smlParseValue(kv, &info->msgBuf)) != TSDB_CODE_SUCCESS) {
    return ret;
1156 1157 1158
  }

  // parse tags
wmmhello's avatar
wmmhello 已提交
1159
  ret = smlParseTelnetTags(sql, sqlEnd, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
1160 1161
  if (ret != TSDB_CODE_SUCCESS) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
wmmhello's avatar
wmmhello 已提交
1162
    return ret;
1163 1164 1165 1166 1167
  }

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1168 1169 1170
static int32_t smlParseCols(const char *data, int32_t len, SArray *cols, char *childTableName, bool isTag,
                            SHashObj *dumplicateKey, SSmlMsgBuf *msg) {
  if (len == 0) {
wmmhello's avatar
wmmhello 已提交
1171
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1172 1173
  }

X
Xiaoyu Wang 已提交
1174
  size_t      childTableNameLen = strlen(tsSmlChildTableName);
wmmhello's avatar
wmmhello 已提交
1175
  const char *sql = data;
X
Xiaoyu Wang 已提交
1176
  while (sql < data + len) {
wmmhello's avatar
wmmhello 已提交
1177
    const char *key = sql;
X
Xiaoyu Wang 已提交
1178
    int32_t     keyLen = 0;
wmmhello's avatar
wmmhello 已提交
1179

X
Xiaoyu Wang 已提交
1180
    while (sql < data + len) {
wmmhello's avatar
wmmhello 已提交
1181
      // parse key
X
Xiaoyu Wang 已提交
1182
      if (IS_COMMA(sql)) {
wmmhello's avatar
wmmhello 已提交
1183 1184 1185
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
X
Xiaoyu Wang 已提交
1186
      if (IS_EQUAL(sql)) {
wmmhello's avatar
wmmhello 已提交
1187 1188
        keyLen = sql - key;
        sql++;
wmmhello's avatar
wmmhello 已提交
1189 1190
        break;
      }
wmmhello's avatar
wmmhello 已提交
1191
      sql++;
wmmhello's avatar
wmmhello 已提交
1192
    }
wmmhello's avatar
wmmhello 已提交
1193

X
Xiaoyu Wang 已提交
1194
    if (IS_INVALID_COL_LEN(keyLen)) {
wmmhello's avatar
wmmhello 已提交
1195
      smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
wmmhello's avatar
wmmhello 已提交
1196
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
wmmhello's avatar
wmmhello 已提交
1197
    }
X
Xiaoyu Wang 已提交
1198
    if (smlCheckDuplicateKey(key, keyLen, dumplicateKey)) {
1199
      smlBuildInvalidDataMsg(msg, "dumplicate key", key);
wmmhello's avatar
wmmhello 已提交
1200
      return TSDB_CODE_TSC_DUP_NAMES;
1201 1202
    }

wmmhello's avatar
wmmhello 已提交
1203
    // parse value
wmmhello's avatar
wmmhello 已提交
1204
    const char *value = sql;
X
Xiaoyu Wang 已提交
1205 1206 1207
    int32_t     valueLen = 0;
    bool        isInQuote = false;
    while (sql < data + len) {
wmmhello's avatar
wmmhello 已提交
1208
      // parse value
X
Xiaoyu Wang 已提交
1209
      if (!isTag && IS_QUOTE(sql)) {
wmmhello's avatar
wmmhello 已提交
1210
        isInQuote = !isInQuote;
wmmhello's avatar
wmmhello 已提交
1211 1212
        sql++;
        continue;
wmmhello's avatar
wmmhello 已提交
1213
      }
wmmhello's avatar
wmmhello 已提交
1214
      if (!isInQuote && IS_COMMA(sql)) {
wmmhello's avatar
wmmhello 已提交
1215 1216
        break;
      }
wmmhello's avatar
wmmhello 已提交
1217 1218 1219 1220 1221
      if (!isInQuote && IS_EQUAL(sql)) {
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      sql++;
wmmhello's avatar
wmmhello 已提交
1222
    }
wmmhello's avatar
wmmhello 已提交
1223 1224 1225
    valueLen = sql - value;
    sql++;

X
Xiaoyu Wang 已提交
1226
    if (isInQuote) {
wmmhello's avatar
wmmhello 已提交
1227 1228 1229
      smlBuildInvalidDataMsg(msg, "only one quote", value);
      return TSDB_CODE_SML_INVALID_DATA;
    }
X
Xiaoyu Wang 已提交
1230
    if (valueLen == 0) {
wmmhello's avatar
wmmhello 已提交
1231
      smlBuildInvalidDataMsg(msg, "invalid value", value);
wmmhello's avatar
wmmhello 已提交
1232 1233
      return TSDB_CODE_SML_INVALID_DATA;
    }
wmmhello's avatar
wmmhello 已提交
1234 1235
    PROCESS_SLASH(key, keyLen)
    PROCESS_SLASH(value, valueLen)
wmmhello's avatar
wmmhello 已提交
1236

X
Xiaoyu Wang 已提交
1237 1238
    // handle child table name
    if (childTableName && childTableNameLen != 0 && strncmp(key, tsSmlChildTableName, keyLen) == 0) {
1239 1240 1241 1242 1243
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, value, (valueLen < TSDB_TABLE_NAME_LEN ? valueLen : TSDB_TABLE_NAME_LEN));
      continue;
    }

wmmhello's avatar
wmmhello 已提交
1244
    // add kv to SSmlKv
wmmhello's avatar
wmmhello 已提交
1245
    SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1246 1247
    if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
    if (cols) taosArrayPush(cols, &kv);
1248

wmmhello's avatar
wmmhello 已提交
1249 1250 1251
    kv->key = key;
    kv->keyLen = keyLen;
    kv->value = value;
wmmhello's avatar
wmmhello 已提交
1252
    kv->length = valueLen;
X
Xiaoyu Wang 已提交
1253
    if (isTag) {
1254
      if (valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
1255 1256
        return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
      }
wmmhello's avatar
wmmhello 已提交
1257
      kv->type = TSDB_DATA_TYPE_NCHAR;
X
Xiaoyu Wang 已提交
1258
    } else {
wmmhello's avatar
wmmhello 已提交
1259 1260 1261
      int32_t ret = smlParseValue(kv, msg);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
wmmhello's avatar
wmmhello 已提交
1262
      }
wmmhello's avatar
wmmhello 已提交
1263 1264
    }
  }
wmmhello's avatar
wmmhello 已提交
1265

wmmhello's avatar
wmmhello 已提交
1266 1267 1268
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
1269 1270
static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, SSmlMsgBuf *msg) {
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
wmmhello's avatar
wmmhello 已提交
1271
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
wmmhello's avatar
wmmhello 已提交
1272

wmmhello's avatar
wmmhello 已提交
1273
    int16_t *index = (int16_t *)taosHashGet(metaHash, kv->key, kv->keyLen);
X
Xiaoyu Wang 已提交
1274
    if (index) {
wmmhello's avatar
wmmhello 已提交
1275
      SSmlKv **value = (SSmlKv **)taosArrayGet(metaArray, *index);
X
Xiaoyu Wang 已提交
1276
      if (kv->type != (*value)->type) {
wmmhello's avatar
wmmhello 已提交
1277
        smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
wmmhello's avatar
wmmhello 已提交
1278
        return TSDB_CODE_SML_NOT_SAME_TYPE;
X
Xiaoyu Wang 已提交
1279 1280 1281
      } else {
        if (IS_VAR_DATA_TYPE(kv->type)) {  // update string len, if bigger
          if (kv->length > (*value)->length) {
wmmhello's avatar
wmmhello 已提交
1282
            *value = kv;
1283 1284 1285
          }
        }
      }
X
Xiaoyu Wang 已提交
1286
    } else {
wmmhello's avatar
wmmhello 已提交
1287 1288 1289 1290 1291
      size_t tmp = taosArrayGetSize(metaArray);
      ASSERT(tmp <= INT16_MAX);
      int16_t size = tmp;
      taosArrayPush(metaArray, &kv);
      taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
1292
    }
wmmhello's avatar
wmmhello 已提交
1293
  }
wmmhello's avatar
wmmhello 已提交
1294

wmmhello's avatar
wmmhello 已提交
1295
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1296 1297
}

X
Xiaoyu Wang 已提交
1298
static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols) {
wmmhello's avatar
wmmhello 已提交
1299 1300 1301 1302
  for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
    taosArrayPush(metaArray, &kv);
    taosHashPut(metaHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
wmmhello's avatar
wmmhello 已提交
1303 1304 1305
  }
}

X
Xiaoyu Wang 已提交
1306
static SSmlTableInfo *smlBuildTableInfo() {
1307
  SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
X
Xiaoyu Wang 已提交
1308
  if (!tag) {
1309
    return NULL;
wmmhello's avatar
wmmhello 已提交
1310
  }
1311 1312 1313 1314 1315

  tag->cols = taosArrayInit(16, POINTER_BYTES);
  if (tag->cols == NULL) {
    uError("SML:smlBuildTableInfo failed to allocate memory");
    goto cleanup;
wmmhello's avatar
wmmhello 已提交
1316
  }
1317 1318 1319 1320 1321

  tag->tags = taosArrayInit(16, POINTER_BYTES);
  if (tag->tags == NULL) {
    uError("SML:smlBuildTableInfo failed to allocate memory");
    goto cleanup;
wmmhello's avatar
wmmhello 已提交
1322
  }
1323 1324 1325 1326 1327
  return tag;

cleanup:
  taosMemoryFree(tag);
  return NULL;
wmmhello's avatar
wmmhello 已提交
1328 1329
}

X
Xiaoyu Wang 已提交
1330 1331 1332
static void smlDestroyTableInfo(SSmlHandle *info, SSmlTableInfo *tag) {
  if (info->dataFormat) {
    for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
1333 1334
      SArray *kvArray = (SArray *)taosArrayGetP(tag->cols, i);
      for (int j = 0; j < taosArrayGetSize(kvArray); ++j) {
wmmhello's avatar
wmmhello 已提交
1335
        SSmlKv *p = (SSmlKv *)taosArrayGetP(kvArray, j);
1336 1337 1338 1339
        taosMemoryFree(p);
      }
      taosArrayDestroy(kvArray);
    }
X
Xiaoyu Wang 已提交
1340 1341
  } else {
    for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
1342
      SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i);
X
Xiaoyu Wang 已提交
1343
      void    **p1 = (void **)taosHashIterate(kvHash, NULL);
1344 1345
      while (p1) {
        taosMemoryFree(*p1);
X
Xiaoyu Wang 已提交
1346
        p1 = (void **)taosHashIterate(kvHash, p1);
1347 1348 1349 1350
      }
      taosHashCleanup(kvHash);
    }
  }
X
Xiaoyu Wang 已提交
1351
  for (size_t i = 0; i < taosArrayGetSize(tag->tags); i++) {
wmmhello's avatar
wmmhello 已提交
1352 1353 1354
    SSmlKv *p = (SSmlKv *)taosArrayGetP(tag->tags, i);
    taosMemoryFree(p);
  }
1355 1356 1357 1358 1359
  taosArrayDestroy(tag->cols);
  taosArrayDestroy(tag->tags);
  taosMemoryFree(tag);
}

X
Xiaoyu Wang 已提交
1360
static int32_t smlKvTimeArrayCompare(const void *key1, const void *key2) {
1361 1362
  SArray *s1 = *(SArray **)key1;
  SArray *s2 = *(SArray **)key2;
1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375
  SSmlKv *kv1 = (SSmlKv *)taosArrayGetP(s1, 0);
  SSmlKv *kv2 = (SSmlKv *)taosArrayGetP(s2, 0);
  ASSERT(kv1->type == TSDB_DATA_TYPE_TIMESTAMP);
  ASSERT(kv2->type == TSDB_DATA_TYPE_TIMESTAMP);
  if (kv1->i < kv2->i) {
    return -1;
  } else if (kv1->i > kv2->i) {
    return 1;
  } else {
    return 0;
  }
}

X
Xiaoyu Wang 已提交
1376
static int32_t smlKvTimeHashCompare(const void *key1, const void *key2) {
1377 1378
  SHashObj *s1 = *(SHashObj **)key1;
  SHashObj *s2 = *(SHashObj **)key2;
dengyihao's avatar
dengyihao 已提交
1379 1380 1381
  SSmlKv  **kv1pp = (SSmlKv **)taosHashGet(s1, TS, TS_LEN);
  SSmlKv  **kv2pp = (SSmlKv **)taosHashGet(s2, TS, TS_LEN);
  if (!kv1pp || !kv2pp) {
wmmhello's avatar
wmmhello 已提交
1382 1383 1384
    uError("smlKvTimeHashCompare kv is null");
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
1385 1386 1387
  SSmlKv *kv1 = *kv1pp;
  SSmlKv *kv2 = *kv2pp;
  if (!kv1 || kv1->type != TSDB_DATA_TYPE_TIMESTAMP) {
wmmhello's avatar
wmmhello 已提交
1388 1389 1390
    uError("smlKvTimeHashCompare kv1");
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
1391
  if (!kv2 || kv2->type != TSDB_DATA_TYPE_TIMESTAMP) {
wmmhello's avatar
wmmhello 已提交
1392 1393 1394
    uError("smlKvTimeHashCompare kv2");
    return -1;
  }
1395 1396 1397 1398 1399 1400 1401 1402 1403
  if (kv1->i < kv2->i) {
    return -1;
  } else if (kv1->i > kv2->i) {
    return 1;
  } else {
    return 0;
  }
}

1404 1405
static int32_t smlDealCols(SSmlTableInfo *oneTable, bool dataFormat, SArray *cols) {
  if (dataFormat) {
1406
    void *p = taosArraySearch(oneTable->cols, &cols, smlKvTimeArrayCompare, TD_GT);
1407
    if (p == NULL) {
1408
      taosArrayPush(oneTable->cols, &cols);
1409
    } else {
1410
      taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &cols);
1411
    }
1412 1413 1414 1415
    return TSDB_CODE_SUCCESS;
  }

  SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
1416
  if (!kvHash) {
1417 1418 1419
    uError("SML:smlDealCols failed to allocate memory");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
1420
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
1421
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
wmmhello's avatar
wmmhello 已提交
1422
    taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
1423 1424
  }

1425
  void *p = taosArraySearch(oneTable->cols, &kvHash, smlKvTimeHashCompare, TD_GT);
1426
  if (p == NULL) {
1427
    taosArrayPush(oneTable->cols, &kvHash);
1428
  } else {
1429
    taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &kvHash);
1430
  }
1431 1432 1433
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1434 1435 1436
static SSmlSTableMeta *smlBuildSTableMeta() {
  SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
  if (!meta) {
1437 1438 1439 1440 1441 1442 1443 1444
    return NULL;
  }
  meta->tagHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (meta->tagHash == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }

wmmhello's avatar
wmmhello 已提交
1445 1446
  meta->colHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (meta->colHash == NULL) {
1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }

  meta->tags = taosArrayInit(32, POINTER_BYTES);
  if (meta->tags == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }

  meta->cols = taosArrayInit(32, POINTER_BYTES);
  if (meta->cols == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }
  return meta;

cleanup:
  taosMemoryFree(meta);
  return NULL;
}

X
Xiaoyu Wang 已提交
1469
static void smlDestroySTableMeta(SSmlSTableMeta *meta) {
1470
  taosHashCleanup(meta->tagHash);
wmmhello's avatar
wmmhello 已提交
1471
  taosHashCleanup(meta->colHash);
1472 1473 1474
  taosArrayDestroy(meta->tags);
  taosArrayDestroy(meta->cols);
  taosMemoryFree(meta->tableMeta);
wmmhello's avatar
wmmhello 已提交
1475
  taosMemoryFree(meta);
1476 1477
}

wmmhello's avatar
wmmhello 已提交
1478
static void smlDestroyCols(SArray *cols) {
1479 1480
  if (!cols) return;
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
wmmhello's avatar
wmmhello 已提交
1481
    void *kv = taosArrayGetP(cols, i);
1482 1483 1484 1485
    taosMemoryFree(kv);
  }
}

X
Xiaoyu Wang 已提交
1486 1487
static void smlDestroyInfo(SSmlHandle *info) {
  if (!info) return;
1488 1489 1490 1491
  qDestroyQuery(info->pQuery);
  smlDestroyHandle(info->exec);

  // destroy info->childTables
X
Xiaoyu Wang 已提交
1492
  void **p1 = (void **)taosHashIterate(info->childTables, NULL);
1493
  while (p1) {
X
Xiaoyu Wang 已提交
1494 1495
    smlDestroyTableInfo(info, (SSmlTableInfo *)(*p1));
    p1 = (void **)taosHashIterate(info->childTables, p1);
1496 1497 1498 1499
  }
  taosHashCleanup(info->childTables);

  // destroy info->superTables
X
Xiaoyu Wang 已提交
1500
  p1 = (void **)taosHashIterate(info->superTables, NULL);
1501
  while (p1) {
X
Xiaoyu Wang 已提交
1502 1503
    smlDestroySTableMeta((SSmlSTableMeta *)(*p1));
    p1 = (void **)taosHashIterate(info->superTables, p1);
1504 1505 1506 1507 1508 1509
  }
  taosHashCleanup(info->superTables);

  // destroy info->pVgHash
  taosHashCleanup(info->pVgHash);
  taosHashCleanup(info->dumplicateKey);
X
Xiaoyu Wang 已提交
1510
  if (!info->dataFormat) {
wmmhello's avatar
wmmhello 已提交
1511 1512
    taosArrayDestroy(info->colsContainer);
  }
wmmhello's avatar
wmmhello 已提交
1513
  destroyRequest(info->pRequest);
wmmhello's avatar
wmmhello 已提交
1514 1515

  cJSON_Delete(info->root);
1516 1517 1518
  taosMemoryFreeClear(info);
}

1519 1520 1521
static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLProtocolType protocol, int8_t precision) {
  int32_t     code = TSDB_CODE_SUCCESS;
  SSmlHandle *info = (SSmlHandle *)taosMemoryCalloc(1, sizeof(SSmlHandle));
1522 1523 1524
  if (NULL == info) {
    return NULL;
  }
X
Xiaoyu Wang 已提交
1525
  info->id = smlGenId();
1526

1527
  info->pQuery = (SQuery *)nodesMakeNode(QUERY_NODE_QUERY);
1528
  if (NULL == info->pQuery) {
X
Xiaoyu Wang 已提交
1529
    uError("SML:0x%" PRIx64 " create info->pQuery error", info->id);
1530 1531
    goto cleanup;
  }
X
Xiaoyu Wang 已提交
1532
  info->pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
1533
  info->pQuery->haveResultSet = false;
X
Xiaoyu Wang 已提交
1534 1535 1536 1537
  info->pQuery->msgType = TDMT_VND_SUBMIT;
  info->pQuery->pRoot = (SNode *)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
  if (NULL == info->pQuery->pRoot) {
    uError("SML:0x%" PRIx64 " create info->pQuery->pRoot error", info->id);
1538 1539 1540
    goto cleanup;
  }

1541 1542
  if (pTscObj) {
    info->taos = pTscObj;
1543 1544 1545 1546 1547
    code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code);
      goto cleanup;
    }
wmmhello's avatar
wmmhello 已提交
1548
  }
1549

X
Xiaoyu Wang 已提交
1550 1551 1552
  info->precision = precision;
  info->protocol = protocol;
  if (protocol == TSDB_SML_LINE_PROTOCOL) {
1553
    info->dataFormat = tsSmlDataFormat;
X
Xiaoyu Wang 已提交
1554
  } else {
1555 1556
    info->dataFormat = true;
  }
1557

1558
  if (request) {
1559 1560 1561
    info->pRequest = request;
    info->msgBuf.buf = info->pRequest->msgBuf;
    info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
1562
    info->pRequest->stmtType = info->pQuery->pRoot->type;
1563
  }
1564

X
Xiaoyu Wang 已提交
1565
  info->exec = smlInitHandle(info->pQuery);
1566 1567
  info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
1568
  info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
1569 1570

  info->dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
1571
  if (!info->dataFormat) {
1572
    info->colsContainer = taosArrayInit(32, POINTER_BYTES);
X
Xiaoyu Wang 已提交
1573 1574
    if (NULL == info->colsContainer) {
      uError("SML:0x%" PRIx64 " create info failed", info->id);
1575 1576 1577
      goto cleanup;
    }
  }
X
Xiaoyu Wang 已提交
1578 1579 1580
  if (NULL == info->exec || NULL == info->childTables || NULL == info->superTables || NULL == info->pVgHash ||
      NULL == info->dumplicateKey) {
    uError("SML:0x%" PRIx64 " create info failed", info->id);
1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593
    goto cleanup;
  }

  return info;
cleanup:
  smlDestroyInfo(info);
  return NULL;
}

/************* TSDB_SML_JSON_PROTOCOL function start **************/
static int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *root, SSmlTableInfo *tinfo) {
  cJSON *metric = cJSON_GetObjectItem(root, "metric");
  if (!cJSON_IsString(metric)) {
X
Xiaoyu Wang 已提交
1594
    return TSDB_CODE_TSC_INVALID_JSON;
1595 1596 1597
  }

  tinfo->sTableNameLen = strlen(metric->valuestring);
1598
  if (IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
X
Xiaoyu Wang 已提交
1599
    uError("OTD:0x%" PRIx64 " Metric lenght is 0 or large than 192", info->id);
1600 1601 1602
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
  }

wmmhello's avatar
wmmhello 已提交
1603 1604
  tinfo->sTableName = metric->valuestring;
  return TSDB_CODE_SUCCESS;
1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623
}

static int32_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int64_t *tsVal) {
  int32_t size = cJSON_GetArraySize(root);
  if (size != OTD_JSON_SUB_FIELDS_NUM) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *value = cJSON_GetObjectItem(root, "value");
  if (!cJSON_IsNumber(value)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *type = cJSON_GetObjectItem(root, "type");
  if (!cJSON_IsString(type)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  double timeDouble = value->valuedouble;
X
Xiaoyu Wang 已提交
1624
  if (smlDoubleToInt64OverFlow(timeDouble)) {
1625
    smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1626
    return TSDB_CODE_INVALID_TIMESTAMP;
1627
  }
wmmhello's avatar
wmmhello 已提交
1628 1629 1630 1631 1632 1633 1634 1635

  if (timeDouble == 0) {
    *tsVal = taosGetTimestampNs();
    return TSDB_CODE_SUCCESS;
  }

  if (timeDouble < 0) {
    return TSDB_CODE_INVALID_TIMESTAMP;
1636 1637
  }

1638
  *tsVal = timeDouble;
1639
  size_t typeLen = strlen(type->valuestring);
wmmhello's avatar
wmmhello 已提交
1640
  if (typeLen == 1 && (type->valuestring[0] == 's' || type->valuestring[0] == 'S')) {
X
Xiaoyu Wang 已提交
1641
    // seconds
1642 1643
    *tsVal = *tsVal * NANOSECOND_PER_SEC;
    timeDouble = timeDouble * NANOSECOND_PER_SEC;
X
Xiaoyu Wang 已提交
1644
    if (smlDoubleToInt64OverFlow(timeDouble)) {
1645
      smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1646
      return TSDB_CODE_INVALID_TIMESTAMP;
1647
    }
wmmhello's avatar
wmmhello 已提交
1648
  } else if (typeLen == 2 && (type->valuestring[1] == 's' || type->valuestring[1] == 'S')) {
1649 1650
    switch (type->valuestring[0]) {
      case 'm':
wmmhello's avatar
wmmhello 已提交
1651
      case 'M':
X
Xiaoyu Wang 已提交
1652
        // milliseconds
1653 1654
        *tsVal = *tsVal * NANOSECOND_PER_MSEC;
        timeDouble = timeDouble * NANOSECOND_PER_MSEC;
X
Xiaoyu Wang 已提交
1655
        if (smlDoubleToInt64OverFlow(timeDouble)) {
1656
          smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1657
          return TSDB_CODE_INVALID_TIMESTAMP;
1658 1659 1660
        }
        break;
      case 'u':
wmmhello's avatar
wmmhello 已提交
1661
      case 'U':
X
Xiaoyu Wang 已提交
1662
        // microseconds
1663 1664
        *tsVal = *tsVal * NANOSECOND_PER_USEC;
        timeDouble = timeDouble * NANOSECOND_PER_USEC;
X
Xiaoyu Wang 已提交
1665
        if (smlDoubleToInt64OverFlow(timeDouble)) {
1666
          smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1667
          return TSDB_CODE_INVALID_TIMESTAMP;
1668 1669 1670
        }
        break;
      case 'n':
wmmhello's avatar
wmmhello 已提交
1671
      case 'N':
1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692
        break;
      default:
        return TSDB_CODE_TSC_INVALID_JSON;
    }
  } else {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  return TSDB_CODE_SUCCESS;
}

static uint8_t smlGetTimestampLen(int64_t num) {
  uint8_t len = 0;
  while ((num /= 10) != 0) {
    len++;
  }
  len++;
  return len;
}

static int32_t smlParseTSFromJSON(SSmlHandle *info, cJSON *root, SArray *cols) {
X
Xiaoyu Wang 已提交
1693
  // Timestamp must be the first KV to parse
1694 1695 1696 1697
  int64_t tsVal = 0;

  cJSON *timestamp = cJSON_GetObjectItem(root, "timestamp");
  if (cJSON_IsNumber(timestamp)) {
X
Xiaoyu Wang 已提交
1698
    // timestamp value 0 indicates current system time
1699
    double timeDouble = timestamp->valuedouble;
X
Xiaoyu Wang 已提交
1700
    if (smlDoubleToInt64OverFlow(timeDouble)) {
1701
      smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1702
      return TSDB_CODE_INVALID_TIMESTAMP;
1703
    }
wmmhello's avatar
wmmhello 已提交
1704

X
Xiaoyu Wang 已提交
1705
    if (timeDouble < 0) {
wmmhello's avatar
wmmhello 已提交
1706
      return TSDB_CODE_INVALID_TIMESTAMP;
1707
    }
1708

1709
    uint8_t tsLen = smlGetTimestampLen((int64_t)timeDouble);
1710
    tsVal = (int64_t)timeDouble;
1711
    if (tsLen == TSDB_TIME_PRECISION_SEC_DIGITS) {
1712 1713
      tsVal = tsVal * NANOSECOND_PER_SEC;
      timeDouble = timeDouble * NANOSECOND_PER_SEC;
X
Xiaoyu Wang 已提交
1714
      if (smlDoubleToInt64OverFlow(timeDouble)) {
1715
        smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1716
        return TSDB_CODE_INVALID_TIMESTAMP;
1717 1718
      }
    } else if (tsLen == TSDB_TIME_PRECISION_MILLI_DIGITS) {
1719 1720
      tsVal = tsVal * NANOSECOND_PER_MSEC;
      timeDouble = timeDouble * NANOSECOND_PER_MSEC;
X
Xiaoyu Wang 已提交
1721
      if (smlDoubleToInt64OverFlow(timeDouble)) {
1722
        smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
wmmhello's avatar
wmmhello 已提交
1723
        return TSDB_CODE_INVALID_TIMESTAMP;
1724
      }
X
Xiaoyu Wang 已提交
1725
    } else if (timeDouble == 0) {
wmmhello's avatar
wmmhello 已提交
1726
      tsVal = taosGetTimestampNs();
X
Xiaoyu Wang 已提交
1727
    } else {
wmmhello's avatar
wmmhello 已提交
1728
      return TSDB_CODE_INVALID_TIMESTAMP;
1729 1730 1731 1732
    }
  } else if (cJSON_IsObject(timestamp)) {
    int32_t ret = smlParseTSFromJSONObj(info, timestamp, &tsVal);
    if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
1733
      uError("SML:0x%" PRIx64 " Failed to parse timestamp from JSON Obj", info->id);
1734 1735 1736 1737
      return ret;
    }
  } else {
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
1738 1739
  }

wmmhello's avatar
wmmhello 已提交
1740
  // add ts to
wmmhello's avatar
wmmhello 已提交
1741
  SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1742
  if (!kv) {
wmmhello's avatar
wmmhello 已提交
1743 1744 1745
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  kv->key = TS;
1746 1747
  kv->keyLen = TS_LEN;
  kv->i = tsVal;
wmmhello's avatar
wmmhello 已提交
1748
  kv->type = TSDB_DATA_TYPE_TIMESTAMP;
wmmhello's avatar
wmmhello 已提交
1749
  kv->length = (int16_t)tDataTypes[kv->type].bytes;
wmmhello's avatar
wmmhello 已提交
1750
  taosArrayPush(cols, &kv);
wmmhello's avatar
wmmhello 已提交
1751 1752 1753
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1754
static int32_t smlConvertJSONBool(SSmlKv *pVal, char *typeStr, cJSON *value) {
1755 1756 1757 1758 1759 1760 1761
  if (strcasecmp(typeStr, "bool") != 0) {
    uError("OTD:invalid type(%s) for JSON Bool", typeStr);
    return TSDB_CODE_TSC_INVALID_JSON_TYPE;
  }
  pVal->type = TSDB_DATA_TYPE_BOOL;
  pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
  pVal->i = value->valueint;
wmmhello's avatar
wmmhello 已提交
1762

1763 1764
  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
1765

X
Xiaoyu Wang 已提交
1766 1767 1768
static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
  // tinyint
  if (strcasecmp(typeStr, "i8") == 0 || strcasecmp(typeStr, "tinyint") == 0) {
1769 1770 1771 1772 1773 1774 1775 1776 1777
    if (!IS_VALID_TINYINT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(tinyint)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_TINYINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1778 1779
  // smallint
  if (strcasecmp(typeStr, "i16") == 0 || strcasecmp(typeStr, "smallint") == 0) {
1780 1781 1782 1783 1784 1785 1786 1787 1788
    if (!IS_VALID_SMALLINT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(smallint)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_SMALLINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1789 1790
  // int
  if (strcasecmp(typeStr, "i32") == 0 || strcasecmp(typeStr, "int") == 0) {
1791 1792 1793 1794 1795 1796 1797 1798 1799
    if (!IS_VALID_INT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(int)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_INT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1800 1801
  // bigint
  if (strcasecmp(typeStr, "i64") == 0 || strcasecmp(typeStr, "bigint") == 0) {
1802 1803
    pVal->type = TSDB_DATA_TYPE_BIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
X
Xiaoyu Wang 已提交
1804
    if (value->valuedouble >= (double)INT64_MAX) {
1805
      pVal->i = INT64_MAX;
X
Xiaoyu Wang 已提交
1806
    } else if (value->valuedouble <= (double)INT64_MIN) {
1807
      pVal->i = INT64_MIN;
X
Xiaoyu Wang 已提交
1808
    } else {
1809
      pVal->i = value->valuedouble;
1810 1811 1812
    }
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1813 1814
  // float
  if (strcasecmp(typeStr, "f32") == 0 || strcasecmp(typeStr, "float") == 0) {
1815 1816 1817
    if (!IS_VALID_FLOAT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(float)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
wmmhello's avatar
wmmhello 已提交
1818
    }
1819 1820 1821 1822 1823
    pVal->type = TSDB_DATA_TYPE_FLOAT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->f = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1824 1825
  // double
  if (strcasecmp(typeStr, "f64") == 0 || strcasecmp(typeStr, "double") == 0) {
1826 1827 1828 1829
    pVal->type = TSDB_DATA_TYPE_DOUBLE;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->d = value->valuedouble;
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1830 1831
  }

X
Xiaoyu Wang 已提交
1832
  // if reach here means type is unsupported
1833 1834 1835
  uError("OTD:invalid type(%s) for JSON Number", typeStr);
  return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
1836

X
Xiaoyu Wang 已提交
1837
static int32_t smlConvertJSONString(SSmlKv *pVal, char *typeStr, cJSON *value) {
1838 1839 1840 1841 1842 1843 1844 1845 1846
  if (strcasecmp(typeStr, "binary") == 0) {
    pVal->type = TSDB_DATA_TYPE_BINARY;
  } else if (strcasecmp(typeStr, "nchar") == 0) {
    pVal->type = TSDB_DATA_TYPE_NCHAR;
  } else {
    uError("OTD:invalid type(%s) for JSON String", typeStr);
    return TSDB_CODE_TSC_INVALID_JSON_TYPE;
  }
  pVal->length = (int16_t)strlen(value->valuestring);
wmmhello's avatar
wmmhello 已提交
1847

1848
  if (pVal->type == TSDB_DATA_TYPE_BINARY && pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
wmmhello's avatar
wmmhello 已提交
1849 1850
    return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
  }
1851 1852
  if (pVal->type == TSDB_DATA_TYPE_NCHAR &&
      pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
wmmhello's avatar
wmmhello 已提交
1853 1854 1855
    return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
  }

wmmhello's avatar
wmmhello 已提交
1856 1857
  pVal->value = value->valuestring;
  return TSDB_CODE_SUCCESS;
1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883
}

static int32_t smlParseValueFromJSONObj(cJSON *root, SSmlKv *kv) {
  int32_t ret = TSDB_CODE_SUCCESS;
  int32_t size = cJSON_GetArraySize(root);

  if (size != OTD_JSON_SUB_FIELDS_NUM) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *value = cJSON_GetObjectItem(root, "value");
  if (value == NULL) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *type = cJSON_GetObjectItem(root, "type");
  if (!cJSON_IsString(type)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  switch (value->type) {
    case cJSON_True:
    case cJSON_False: {
      ret = smlConvertJSONBool(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
wmmhello's avatar
wmmhello 已提交
1884
      }
1885
      break;
wmmhello's avatar
wmmhello 已提交
1886
    }
1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902
    case cJSON_Number: {
      ret = smlConvertJSONNumber(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
      break;
    }
    case cJSON_String: {
      ret = smlConvertJSONString(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
      break;
    }
    default:
      return TSDB_CODE_TSC_INVALID_JSON_TYPE;
wmmhello's avatar
wmmhello 已提交
1903
  }
1904 1905

  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1906 1907
}

1908 1909 1910 1911 1912 1913 1914 1915
static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
  switch (root->type) {
    case cJSON_True:
    case cJSON_False: {
      kv->type = TSDB_DATA_TYPE_BOOL;
      kv->length = (int16_t)tDataTypes[kv->type].bytes;
      kv->i = root->valueint;
      break;
wmmhello's avatar
wmmhello 已提交
1916
    }
1917 1918 1919 1920 1921 1922 1923 1924 1925 1926
    case cJSON_Number: {
      kv->type = TSDB_DATA_TYPE_DOUBLE;
      kv->length = (int16_t)tDataTypes[kv->type].bytes;
      kv->d = root->valuedouble;
      break;
    }
    case cJSON_String: {
      /* set default JSON type to binary/nchar according to
       * user configured parameter tsDefaultJSONStrType
       */
wmmhello's avatar
wmmhello 已提交
1927

X
Xiaoyu Wang 已提交
1928
      char *tsDefaultJSONStrType = "nchar";  // todo
1929 1930
      smlConvertJSONString(kv, tsDefaultJSONStrType, root);
      break;
wmmhello's avatar
wmmhello 已提交
1931
    }
1932 1933 1934 1935 1936 1937 1938 1939 1940 1941
    case cJSON_Object: {
      int32_t ret = smlParseValueFromJSONObj(root, kv);
      if (ret != TSDB_CODE_SUCCESS) {
        uError("OTD:Failed to parse value from JSON Obj");
        return ret;
      }
      break;
    }
    default:
      return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
1942
  }
1943 1944 1945 1946 1947

  return TSDB_CODE_SUCCESS;
}

static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
dengyihao's avatar
dengyihao 已提交
1948
  if (!cols) return TSDB_CODE_OUT_OF_MEMORY;
1949 1950 1951 1952 1953 1954
  cJSON *metricVal = cJSON_GetObjectItem(root, "value");
  if (metricVal == NULL) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
1955
  if (!kv) {
1956 1957
    return TSDB_CODE_OUT_OF_MEMORY;
  }
wmmhello's avatar
wmmhello 已提交
1958
  taosArrayPush(cols, &kv);
1959 1960 1961 1962 1963 1964 1965 1966

  kv->key = VALUE;
  kv->keyLen = VALUE_LEN;
  int32_t ret = smlParseValueFromJSON(metricVal, kv);
  if (ret != TSDB_CODE_SUCCESS) {
    return ret;
  }
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1967 1968
}

X
Xiaoyu Wang 已提交
1969 1970
static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, char *childTableName, SHashObj *dumplicateKey,
                                    SSmlMsgBuf *msg) {
1971
  int32_t ret = TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1972
  if (!pKVs) {
wmmhello's avatar
wmmhello 已提交
1973 1974
    return TSDB_CODE_OUT_OF_MEMORY;
  }
1975 1976 1977 1978
  cJSON *tags = cJSON_GetObjectItem(root, "tags");
  if (tags == NULL || tags->type != cJSON_Object) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }
wmmhello's avatar
wmmhello 已提交
1979

X
Xiaoyu Wang 已提交
1980
  size_t  childTableNameLen = strlen(tsSmlChildTableName);
1981 1982 1983 1984 1985
  int32_t tagNum = cJSON_GetArraySize(tags);
  for (int32_t i = 0; i < tagNum; ++i) {
    cJSON *tag = cJSON_GetArrayItem(tags, i);
    if (tag == NULL) {
      return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
1986
    }
1987 1988 1989 1990 1991
    size_t keyLen = strlen(tag->string);
    if (IS_INVALID_COL_LEN(keyLen)) {
      uError("OTD:Tag key length is 0 or too large than 64");
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
    }
X
Xiaoyu Wang 已提交
1992
    // check duplicate keys
1993
    if (smlCheckDuplicateKey(tag->string, keyLen, dumplicateKey)) {
wmmhello's avatar
wmmhello 已提交
1994
      return TSDB_CODE_TSC_DUP_NAMES;
wmmhello's avatar
wmmhello 已提交
1995 1996
    }

X
Xiaoyu Wang 已提交
1997 1998
    // handle child table name
    if (childTableNameLen != 0 && strcmp(tag->string, tsSmlChildTableName) == 0) {
1999 2000 2001 2002 2003
      if (!cJSON_IsString(tag)) {
        uError("OTD:ID must be JSON string");
        return TSDB_CODE_TSC_INVALID_JSON;
      }
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
wmmhello's avatar
wmmhello 已提交
2004
      tstrncpy(childTableName, tag->valuestring, TSDB_TABLE_NAME_LEN);
2005 2006 2007
      continue;
    }

2008 2009
    // add kv to SSmlKv
    SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
X
Xiaoyu Wang 已提交
2010
    if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
2011
    taosArrayPush(pKVs, &kv);
2012

X
Xiaoyu Wang 已提交
2013
    // key
2014
    kv->keyLen = keyLen;
wmmhello's avatar
wmmhello 已提交
2015 2016
    kv->key = tag->string;

X
Xiaoyu Wang 已提交
2017
    // value
2018 2019 2020 2021
    ret = smlParseValueFromJSON(tag, kv);
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }
wmmhello's avatar
wmmhello 已提交
2022 2023
  }

2024
  return ret;
wmmhello's avatar
wmmhello 已提交
2025 2026
}

2027 2028
static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo *tinfo, SArray *cols) {
  int32_t ret = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2029

2030
  if (!cJSON_IsObject(root)) {
X
Xiaoyu Wang 已提交
2031
    uError("OTD:0x%" PRIx64 " data point needs to be JSON object", info->id);
2032
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2033
  }
2034

2035
  int32_t size = cJSON_GetArraySize(root);
X
Xiaoyu Wang 已提交
2036
  // outmost json fields has to be exactly 4
2037
  if (size != OTD_JSON_FIELDS_NUM) {
X
Xiaoyu Wang 已提交
2038
    uError("OTD:0x%" PRIx64 " Invalid number of JSON fields in data point %d", info->id, size);
2039
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
2040
  }
2041

X
Xiaoyu Wang 已提交
2042
  // Parse metric
2043 2044
  ret = smlParseMetricFromJSON(info, root, tinfo);
  if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2045
    uError("OTD:0x%" PRIx64 " Unable to parse metric from JSON payload", info->id);
2046
    return ret;
wmmhello's avatar
wmmhello 已提交
2047
  }
X
Xiaoyu Wang 已提交
2048
  uDebug("OTD:0x%" PRIx64 " Parse metric from JSON payload finished", info->id);
wmmhello's avatar
wmmhello 已提交
2049

X
Xiaoyu Wang 已提交
2050
  // Parse timestamp
2051 2052
  ret = smlParseTSFromJSON(info, root, cols);
  if (ret) {
X
Xiaoyu Wang 已提交
2053
    uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id);
2054
    return ret;
wmmhello's avatar
wmmhello 已提交
2055
  }
X
Xiaoyu Wang 已提交
2056
  uDebug("OTD:0x%" PRIx64 " Parse timestamp from JSON payload finished", info->id);
2057

X
Xiaoyu Wang 已提交
2058
  // Parse metric value
2059 2060
  ret = smlParseColsFromJSON(root, cols);
  if (ret) {
X
Xiaoyu Wang 已提交
2061
    uError("OTD:0x%" PRIx64 " Unable to parse metric value from JSON payload", info->id);
2062
    return ret;
2063
  }
X
Xiaoyu Wang 已提交
2064
  uDebug("OTD:0x%" PRIx64 " Parse metric value from JSON payload finished", info->id);
2065

X
Xiaoyu Wang 已提交
2066
  // Parse tags
2067
  ret = smlParseTagsFromJSON(root, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
2068
  if (ret) {
X
Xiaoyu Wang 已提交
2069
    uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
2070
    return ret;
2071
  }
X
Xiaoyu Wang 已提交
2072
  uDebug("OTD:0x%" PRIx64 " Parse tags from JSON payload finished", info->id);
wmmhello's avatar
wmmhello 已提交
2073

2074
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2075
}
2076
/************* TSDB_SML_JSON_PROTOCOL function end **************/
wmmhello's avatar
wmmhello 已提交
2077

wmmhello's avatar
wmmhello 已提交
2078
static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql, const int len) {
wmmhello's avatar
wmmhello 已提交
2079
  SSmlLineInfo elements = {0};
wmmhello's avatar
wmmhello 已提交
2080
  void *tmp = taosMemoryCalloc(1, len + 1);
wmmhello's avatar
wmmhello 已提交
2081
  memcpy(tmp, sql, len);
wmmhello's avatar
wmmhello 已提交
2082
  uDebug("SML:0x%" PRIx64 " smlParseInfluxLine raw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, (info->isRawLine ? (char*)tmp : sql));
wmmhello's avatar
wmmhello 已提交
2083
  taosMemoryFree(tmp);
2084

wmmhello's avatar
wmmhello 已提交
2085
  int ret = smlParseInfluxString(sql, sql + len, &elements, &info->msgBuf);
X
Xiaoyu Wang 已提交
2086 2087
  if (ret != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " smlParseInfluxLine failed", info->id);
wmmhello's avatar
wmmhello 已提交
2088 2089 2090
    return ret;
  }

2091
  SArray *cols = NULL;
X
Xiaoyu Wang 已提交
2092
  if (info->dataFormat) {  // if dataFormat, cols need new memory to save data
2093 2094
    cols = taosArrayInit(16, POINTER_BYTES);
    if (cols == NULL) {
X
Xiaoyu Wang 已提交
2095
      uError("SML:0x%" PRIx64 " smlParseInfluxLine failed to allocate memory", info->id);
2096 2097
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
2098
  } else {  // if dataFormat is false, cols do not need to save data, there is another new memory to save data
2099
    cols = info->colsContainer;
wmmhello's avatar
wmmhello 已提交
2100
  }
wmmhello's avatar
wmmhello 已提交
2101

wmmhello's avatar
wmmhello 已提交
2102
  ret = smlParseTS(info, elements.timestamp, elements.timestampLen, cols);
X
Xiaoyu Wang 已提交
2103 2104 2105
  if (ret != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " smlParseTS failed", info->id);
    if (info->dataFormat) taosArrayDestroy(cols);
wmmhello's avatar
wmmhello 已提交
2106 2107
    return ret;
  }
2108
  ret = smlParseCols(elements.cols, elements.colsLen, cols, NULL, false, info->dumplicateKey, &info->msgBuf);
X
Xiaoyu Wang 已提交
2109 2110
  if (ret != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " smlParseCols parse cloums fields failed", info->id);
wmmhello's avatar
wmmhello 已提交
2111
    smlDestroyCols(cols);
X
Xiaoyu Wang 已提交
2112
    if (info->dataFormat) taosArrayDestroy(cols);
wmmhello's avatar
wmmhello 已提交
2113 2114 2115
    return ret;
  }

X
Xiaoyu Wang 已提交
2116 2117 2118 2119 2120
  bool            hasTable = true;
  SSmlTableInfo  *tinfo = NULL;
  SSmlTableInfo **oneTable =
      (SSmlTableInfo **)taosHashGet(info->childTables, elements.measure, elements.measureTagsLen);
  if (!oneTable) {
2121
    tinfo = smlBuildTableInfo();
X
Xiaoyu Wang 已提交
2122
    if (!tinfo) {
wmmhello's avatar
wmmhello 已提交
2123
      smlDestroyCols(cols);
wmmhello's avatar
wmmhello 已提交
2124
      if (info->dataFormat) taosArrayDestroy(cols);
wmmhello's avatar
wmmhello 已提交
2125 2126
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
2127
    taosHashPut(info->childTables, elements.measure, elements.measureTagsLen, &tinfo, POINTER_BYTES);
2128
    oneTable = &tinfo;
2129 2130 2131 2132
    hasTable = false;
  }

  ret = smlDealCols(*oneTable, info->dataFormat, cols);
X
Xiaoyu Wang 已提交
2133
  if (ret != TSDB_CODE_SUCCESS) {
2134 2135
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
2136

X
Xiaoyu Wang 已提交
2137 2138 2139 2140 2141
  if (!hasTable) {
    ret = smlParseCols(elements.tags, elements.tagsLen, (*oneTable)->tags, (*oneTable)->childTableName, true,
                       info->dumplicateKey, &info->msgBuf);
    if (ret != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " smlParseCols parse tag fields failed", info->id);
wmmhello's avatar
wmmhello 已提交
2142 2143 2144
      return ret;
    }

X
Xiaoyu Wang 已提交
2145
    if (taosArrayGetSize((*oneTable)->tags) > TSDB_MAX_TAGS) {
wmmhello's avatar
wmmhello 已提交
2146
      smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
wmmhello's avatar
wmmhello 已提交
2147
      return TSDB_CODE_PAR_INVALID_TAGS_NUM;
wmmhello's avatar
wmmhello 已提交
2148 2149
    }

2150 2151 2152 2153 2154
    if (taosArrayGetSize(cols) + taosArrayGetSize((*oneTable)->tags) > TSDB_MAX_COLUMNS) {
      smlBuildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL);
      return TSDB_CODE_PAR_TOO_MANY_COLUMNS;
    }

2155 2156
    (*oneTable)->sTableName = elements.measure;
    (*oneTable)->sTableNameLen = elements.measureLen;
X
Xiaoyu Wang 已提交
2157 2158 2159
    if (strlen((*oneTable)->childTableName) == 0) {
      RandTableName rName = {(*oneTable)->tags, (*oneTable)->sTableName, (uint8_t)(*oneTable)->sTableNameLen,
                             (*oneTable)->childTableName, 0};
2160 2161 2162

      buildChildTableName(&rName);
      (*oneTable)->uid = rName.uid;
X
Xiaoyu Wang 已提交
2163 2164
    } else {
      (*oneTable)->uid = *(uint64_t *)((*oneTable)->childTableName);
2165
    }
2166
  }
wmmhello's avatar
wmmhello 已提交
2167

X
Xiaoyu Wang 已提交
2168 2169
  SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, elements.measure, elements.measureLen);
  if (tableMeta) {  // update meta
wmmhello's avatar
wmmhello 已提交
2170
    ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, cols, &info->msgBuf);
wmmhello's avatar
wmmhello 已提交
2171
    if (!hasTable && ret == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2172 2173
      ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, (*oneTable)->tags, &info->msgBuf);
    }
wmmhello's avatar
wmmhello 已提交
2174
    if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2175
      uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
wmmhello's avatar
wmmhello 已提交
2176
      return ret;
wmmhello's avatar
wmmhello 已提交
2177
    }
X
Xiaoyu Wang 已提交
2178
  } else {
2179
    SSmlSTableMeta *meta = smlBuildSTableMeta();
wmmhello's avatar
wmmhello 已提交
2180 2181
    smlInsertMeta(meta->tagHash, meta->tags, (*oneTable)->tags);
    smlInsertMeta(meta->colHash, meta->cols, cols);
2182
    taosHashPut(info->superTables, elements.measure, elements.measureLen, &meta, POINTER_BYTES);
wmmhello's avatar
wmmhello 已提交
2183
  }
2184

X
Xiaoyu Wang 已提交
2185
  if (!info->dataFormat) {
2186 2187 2188
    taosArrayClear(info->colsContainer);
  }
  taosHashClear(info->dumplicateKey);
wmmhello's avatar
wmmhello 已提交
2189 2190 2191
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
2192
static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) {
X
Xiaoyu Wang 已提交
2193
  int            ret = TSDB_CODE_SUCCESS;
2194
  SSmlTableInfo *tinfo = smlBuildTableInfo();
X
Xiaoyu Wang 已提交
2195
  if (!tinfo) {
2196
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
2197 2198
  }

2199 2200
  SArray *cols = taosArrayInit(16, POINTER_BYTES);
  if (cols == NULL) {
X
Xiaoyu Wang 已提交
2201
    uError("SML:0x%" PRIx64 " smlParseTelnetLine failed to allocate memory", info->id);
2202
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
2203 2204
  }

X
Xiaoyu Wang 已提交
2205
  if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
dengyihao's avatar
dengyihao 已提交
2206
    ret = smlParseTelnetString(info, (const char *)data, (char *)data + len, tinfo, cols);
X
Xiaoyu Wang 已提交
2207
  } else if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
2208
    ret = smlParseJSONString(info, (cJSON *)data, tinfo, cols);
X
Xiaoyu Wang 已提交
2209
  } else {
2210 2211
    ASSERT(0);
  }
X
Xiaoyu Wang 已提交
2212 2213
  if (ret != TSDB_CODE_SUCCESS) {
    uError("SML:0x%" PRIx64 " smlParseTelnetLine failed", info->id);
wmmhello's avatar
wmmhello 已提交
2214
    smlDestroyTableInfo(info, tinfo);
wmmhello's avatar
wmmhello 已提交
2215
    smlDestroyCols(cols);
2216 2217
    taosArrayDestroy(cols);
    return ret;
wmmhello's avatar
wmmhello 已提交
2218
  }
wmmhello's avatar
wmmhello 已提交
2219

X
Xiaoyu Wang 已提交
2220
  if (taosArrayGetSize(tinfo->tags) <= 0 || taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS) {
2221
    smlBuildInvalidDataMsg(&info->msgBuf, "invalidate tags length:[1,128]", NULL);
wmmhello's avatar
wmmhello 已提交
2222
    smlDestroyTableInfo(info, tinfo);
wmmhello's avatar
wmmhello 已提交
2223
    smlDestroyCols(cols);
wmmhello's avatar
wmmhello 已提交
2224
    taosArrayDestroy(cols);
wmmhello's avatar
wmmhello 已提交
2225
    return TSDB_CODE_PAR_INVALID_TAGS_NUM;
wmmhello's avatar
wmmhello 已提交
2226
  }
2227 2228
  taosHashClear(info->dumplicateKey);

X
Xiaoyu Wang 已提交
2229 2230
  if (strlen(tinfo->childTableName) == 0) {
    RandTableName rName = {tinfo->tags, tinfo->sTableName, (uint8_t)tinfo->sTableNameLen, tinfo->childTableName, 0};
2231 2232
    buildChildTableName(&rName);
    tinfo->uid = rName.uid;
X
Xiaoyu Wang 已提交
2233 2234
  } else {
    tinfo->uid = *(uint64_t *)(tinfo->childTableName);  // generate uid by name simple
2235 2236
  }

X
Xiaoyu Wang 已提交
2237 2238 2239 2240
  bool            hasTable = true;
  SSmlTableInfo **oneTable =
      (SSmlTableInfo **)taosHashGet(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName));
  if (!oneTable) {
2241
    taosHashPut(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName), &tinfo, POINTER_BYTES);
2242
    oneTable = &tinfo;
2243
    hasTable = false;
X
Xiaoyu Wang 已提交
2244
  } else {
wmmhello's avatar
wmmhello 已提交
2245
    smlDestroyTableInfo(info, tinfo);
wmmhello's avatar
wmmhello 已提交
2246
  }
wmmhello's avatar
wmmhello 已提交
2247

2248
  taosArrayPush((*oneTable)->cols, &cols);
X
Xiaoyu Wang 已提交
2249 2250 2251
  SSmlSTableMeta **tableMeta =
      (SSmlSTableMeta **)taosHashGet(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen);
  if (tableMeta) {  // update meta
wmmhello's avatar
wmmhello 已提交
2252
    ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, cols, &info->msgBuf);
wmmhello's avatar
wmmhello 已提交
2253
    if (!hasTable && ret == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2254 2255
      ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, (*oneTable)->tags, &info->msgBuf);
    }
wmmhello's avatar
wmmhello 已提交
2256
    if (ret != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2257
      uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
wmmhello's avatar
wmmhello 已提交
2258
      return ret;
2259
    }
X
Xiaoyu Wang 已提交
2260
  } else {
2261
    SSmlSTableMeta *meta = smlBuildSTableMeta();
wmmhello's avatar
wmmhello 已提交
2262 2263
    smlInsertMeta(meta->tagHash, meta->tags, (*oneTable)->tags);
    smlInsertMeta(meta->colHash, meta->cols, cols);
2264
    taosHashPut(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen, &meta, POINTER_BYTES);
wmmhello's avatar
wmmhello 已提交
2265 2266
  }

2267 2268
  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
2269

X
Xiaoyu Wang 已提交
2270
static int32_t smlParseJSON(SSmlHandle *info, char *payload) {
2271 2272
  int32_t payloadNum = 0;
  int32_t ret = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2273

2274
  if (payload == NULL) {
X
Xiaoyu Wang 已提交
2275
    uError("SML:0x%" PRIx64 " empty JSON Payload", info->id);
2276
    return TSDB_CODE_TSC_INVALID_JSON;
2277
  }
2278

wmmhello's avatar
wmmhello 已提交
2279 2280
  info->root = cJSON_Parse(payload);
  if (info->root == NULL) {
X
Xiaoyu Wang 已提交
2281
    uError("SML:0x%" PRIx64 " parse json failed:%s", info->id, payload);
2282 2283
    return TSDB_CODE_TSC_INVALID_JSON;
  }
X
Xiaoyu Wang 已提交
2284
  // multiple data points must be sent in JSON array
wmmhello's avatar
wmmhello 已提交
2285
  if (cJSON_IsObject(info->root)) {
2286
    payloadNum = 1;
wmmhello's avatar
wmmhello 已提交
2287 2288
  } else if (cJSON_IsArray(info->root)) {
    payloadNum = cJSON_GetArraySize(info->root);
2289
  } else {
X
Xiaoyu Wang 已提交
2290
    uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
2291 2292
    ret = TSDB_CODE_TSC_INVALID_JSON;
    goto end;
wmmhello's avatar
wmmhello 已提交
2293
  }
wmmhello's avatar
wmmhello 已提交
2294

2295
  for (int32_t i = 0; i < payloadNum; ++i) {
wmmhello's avatar
wmmhello 已提交
2296
    cJSON *dataPoint = (payloadNum == 1 && cJSON_IsObject(info->root)) ? info->root : cJSON_GetArrayItem(info->root, i);
wmmhello's avatar
wmmhello 已提交
2297
    ret = smlParseTelnetLine(info, dataPoint, -1);
X
Xiaoyu Wang 已提交
2298 2299
    if (ret != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
2300 2301 2302 2303 2304 2305
      goto end;
    }
  }

end:
  return ret;
wmmhello's avatar
wmmhello 已提交
2306
}
2307

X
Xiaoyu Wang 已提交
2308
static int32_t smlInsertData(SSmlHandle *info) {
wmmhello's avatar
wmmhello 已提交
2309 2310
  int32_t code = TSDB_CODE_SUCCESS;

X
Xiaoyu Wang 已提交
2311
  SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
wmmhello's avatar
wmmhello 已提交
2312
  while (oneTable) {
X
Xiaoyu Wang 已提交
2313
    SSmlTableInfo *tableData = *oneTable;
wmmhello's avatar
wmmhello 已提交
2314 2315

    SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
wmmhello's avatar
wmmhello 已提交
2316
    tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
wmmhello's avatar
wmmhello 已提交
2317
    memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName));
D
dapan1121 已提交
2318 2319 2320 2321 2322 2323

    SRequestConnInfo conn = {0};
    conn.pTrans = info->taos->pAppInfo->pTransporter;
    conn.requestId = info->pRequest->requestId;
    conn.requestObjRefId = info->pRequest->self;
    conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
X
Xiaoyu Wang 已提交
2324

wmmhello's avatar
wmmhello 已提交
2325
    SVgroupInfo vg;
D
dapan1121 已提交
2326
    code = catalogGetTableHashVgroup(info->pCatalog, &conn, &pName, &vg);
2327
    if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2328
      uError("SML:0x%" PRIx64 " catalogGetTableHashVgroup failed. table name: %s", info->id, tableData->childTableName);
wmmhello's avatar
wmmhello 已提交
2329 2330
      return code;
    }
X
Xiaoyu Wang 已提交
2331
    taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
wmmhello's avatar
wmmhello 已提交
2332

X
Xiaoyu Wang 已提交
2333 2334 2335
    SSmlSTableMeta **pMeta =
        (SSmlSTableMeta **)taosHashGet(info->superTables, tableData->sTableName, tableData->sTableNameLen);
    ASSERT(NULL != pMeta && NULL != *pMeta);
wmmhello's avatar
wmmhello 已提交
2336

2337
    // use tablemeta of stable to save vgid and uid of child table
wmmhello's avatar
wmmhello 已提交
2338
    (*pMeta)->tableMeta->vgId = vg.vgId;
X
Xiaoyu Wang 已提交
2339
    (*pMeta)->tableMeta->uid = tableData->uid;  // one table merge data block together according uid
wmmhello's avatar
wmmhello 已提交
2340

2341
    code = smlBindData(info->exec, tableData->tags, (*pMeta)->cols, tableData->cols, info->dataFormat,
2342
                       (*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen,
2343
                       info->ttl, info->msgBuf.buf, info->msgBuf.len);
X
Xiaoyu Wang 已提交
2344 2345
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " smlBindData failed", info->id);
wmmhello's avatar
wmmhello 已提交
2346 2347
      return code;
    }
X
Xiaoyu Wang 已提交
2348
    oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable);
wmmhello's avatar
wmmhello 已提交
2349
  }
wmmhello's avatar
wmmhello 已提交
2350

2351 2352
  code = smlBuildOutput(info->exec, info->pVgHash);
  if (code != TSDB_CODE_SUCCESS) {
X
Xiaoyu Wang 已提交
2353
    uError("SML:0x%" PRIx64 " smlBuildOutput failed", info->id);
2354 2355
    return code;
  }
2356 2357
  info->cost.insertRpcTime = taosGetTimestampUs();

X
Xiaoyu Wang 已提交
2358 2359 2360
  // launchQueryImpl(info->pRequest, info->pQuery, false, NULL);
  //  info->affectedRows = taos_affected_rows(info->pRequest);
  //  return info->pRequest->code;
wmmhello's avatar
wmmhello 已提交
2361

2362 2363 2364
  SAppClusterSummary *pActivity = &info->taos->pAppInfo->summary;
  atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);

2365 2366 2367 2368 2369 2370
  SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
  if (pWrapper == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pWrapper->pRequest = info->pRequest;
  launchAsyncQuery(info->pRequest, info->pQuery, NULL, pWrapper);
wmmhello's avatar
wmmhello 已提交
2371
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2372 2373
}

X
Xiaoyu Wang 已提交
2374 2375 2376 2377 2378 2379 2380 2381 2382
static void smlPrintStatisticInfo(SSmlHandle *info) {
  uError("SML:0x%" PRIx64
         " smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d \
        parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64
         "",
         info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables,
         info->cost.numOfCreateSTables, info->cost.schemaTime - info->cost.parseTime,
         info->cost.insertBindTime - info->cost.schemaTime, info->cost.insertRpcTime - info->cost.insertBindTime,
         info->cost.endTime - info->cost.insertRpcTime, info->cost.endTime - info->cost.parseTime);
2383 2384
}

dengyihao's avatar
dengyihao 已提交
2385
static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
wmmhello's avatar
wmmhello 已提交
2386
  int32_t code = TSDB_CODE_SUCCESS;
2387
  if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
dengyihao's avatar
dengyihao 已提交
2388
    if (lines) {
wmmhello's avatar
wmmhello 已提交
2389
      code = smlParseJSON(info, *lines);
dengyihao's avatar
dengyihao 已提交
2390
    } else if (rawLine) {
wmmhello's avatar
wmmhello 已提交
2391 2392
      code = smlParseJSON(info, rawLine);
    }
2393
    if (code != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
2394
      uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, lines ? *lines : rawLine);
2395 2396
      return code;
    }
wmmhello's avatar
wmmhello 已提交
2397
    return code;
wmmhello's avatar
wmmhello 已提交
2398
  }
wmmhello's avatar
wmmhello 已提交
2399

wmmhello's avatar
wmmhello 已提交
2400
  for (int32_t i = 0; i < numLines; ++i) {
wmmhello's avatar
wmmhello 已提交
2401
    char *tmp = NULL;
dengyihao's avatar
dengyihao 已提交
2402 2403
    int   len = 0;
    if (lines) {
wmmhello's avatar
wmmhello 已提交
2404 2405
      tmp = lines[i];
      len = strlen(tmp);
dengyihao's avatar
dengyihao 已提交
2406
    } else if (rawLine) {
wmmhello's avatar
wmmhello 已提交
2407
      tmp = rawLine;
dengyihao's avatar
dengyihao 已提交
2408 2409
      while (rawLine < rawLineEnd) {
        if (*(rawLine++) == '\n') {
wmmhello's avatar
wmmhello 已提交
2410 2411 2412 2413
          break;
        }
        len++;
      }
dengyihao's avatar
dengyihao 已提交
2414
      if (info->protocol == TSDB_SML_LINE_PROTOCOL && tmp[0] == '#') {  // this line is comment
wmmhello's avatar
wmmhello 已提交
2415 2416
        continue;
      }
wmmhello's avatar
wmmhello 已提交
2417 2418
    }

X
Xiaoyu Wang 已提交
2419
    if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
wmmhello's avatar
wmmhello 已提交
2420
      code = smlParseInfluxLine(info, tmp, len);
X
Xiaoyu Wang 已提交
2421
    } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
wmmhello's avatar
wmmhello 已提交
2422
      code = smlParseTelnetLine(info, tmp, len);
X
Xiaoyu Wang 已提交
2423
    } else {
2424 2425
      ASSERT(0);
    }
wmmhello's avatar
wmmhello 已提交
2426
    if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2427
      uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, tmp);
2428
      return code;
wmmhello's avatar
wmmhello 已提交
2429 2430
    }
  }
2431 2432 2433
  return code;
}

dengyihao's avatar
dengyihao 已提交
2434
static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
2435
  int32_t code = TSDB_CODE_SUCCESS;
2436 2437
  int32_t retryNum = 0;

2438 2439
  info->cost.parseTime = taosGetTimestampUs();

wmmhello's avatar
wmmhello 已提交
2440
  code = smlParseLine(info, lines, rawLine, rawLineEnd, numLines);
2441
  if (code != 0) {
X
Xiaoyu Wang 已提交
2442
    uError("SML:0x%" PRIx64 " smlParseLine error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2443
    return code;
2444
  }
wmmhello's avatar
wmmhello 已提交
2445

2446 2447 2448 2449 2450
  info->cost.lineNum = numLines;
  info->cost.numOfSTables = taosHashGetSize(info->superTables);
  info->cost.numOfCTables = taosHashGetSize(info->childTables);

  info->cost.schemaTime = taosGetTimestampUs();
2451

X
Xiaoyu Wang 已提交
2452
  do {
2453 2454
    code = smlModifyDBSchemas(info);
    if (code == 0) break;
wmmhello's avatar
wmmhello 已提交
2455
  } while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
2456

wmmhello's avatar
wmmhello 已提交
2457
  if (code != 0) {
X
Xiaoyu Wang 已提交
2458
    uError("SML:0x%" PRIx64 " smlModifyDBSchemas error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2459
    return code;
wmmhello's avatar
wmmhello 已提交
2460
  }
wmmhello's avatar
wmmhello 已提交
2461

2462
  info->cost.insertBindTime = taosGetTimestampUs();
wmmhello's avatar
wmmhello 已提交
2463 2464
  code = smlInsertData(info);
  if (code != 0) {
X
Xiaoyu Wang 已提交
2465
    uError("SML:0x%" PRIx64 " smlInsertData error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2466
    return code;
wmmhello's avatar
wmmhello 已提交
2467 2468 2469 2470 2471
  }

  return code;
}

X
Xiaoyu Wang 已提交
2472
static int32_t isSchemalessDb(STscObj *taos, SRequestObj *request) {
2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500
  //  SCatalog *catalog = NULL;
  //  int32_t   code = catalogGetHandle(((STscObj *)taos)->pAppInfo->clusterId, &catalog);
  //  if (code != TSDB_CODE_SUCCESS) {
  //    uError("SML get catalog error %d", code);
  //    return code;
  //  }
  //
  //  SName name;
  //  tNameSetDbName(&name, taos->acctId, taos->db, strlen(taos->db));
  //  char dbFname[TSDB_DB_FNAME_LEN] = {0};
  //  tNameGetFullDbName(&name, dbFname);
  //  SDbCfgInfo pInfo = {0};
  //
  //  SRequestConnInfo conn = {0};
  //  conn.pTrans = taos->pAppInfo->pTransporter;
  //  conn.requestId = request->requestId;
  //  conn.requestObjRefId = request->self;
  //  conn.mgmtEps = getEpSet_s(&taos->pAppInfo->mgmtEp);
  //
  //  code = catalogGetDBCfg(catalog, &conn, dbFname, &pInfo);
  //  if (code != TSDB_CODE_SUCCESS) {
  //    return code;
  //  }
  //  taosArrayDestroy(pInfo.pRetensions);
  //
  //  if (!pInfo.schemaless) {
  //    return TSDB_CODE_SML_INVALID_DB_CONF;
  //  }
wmmhello's avatar
wmmhello 已提交
2501 2502 2503
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
2504
static void smlInsertCallback(void *param, void *res, int32_t code) {
wmmhello's avatar
wmmhello 已提交
2505
  SRequestObj *pRequest = (SRequestObj *)res;
X
Xiaoyu Wang 已提交
2506
  SSmlHandle  *info = (SSmlHandle *)param;
2507
  int32_t      rows = taos_affected_rows(pRequest);
wmmhello's avatar
wmmhello 已提交
2508

X
Xiaoyu Wang 已提交
2509
  uDebug("SML:0x%" PRIx64 " result. code:%d, msg:%s", info->id, pRequest->code, pRequest->msgBuf);
2510
  Params *pParam = info->params;
wmmhello's avatar
wmmhello 已提交
2511
  // lock
2512 2513
  taosThreadSpinLock(&pParam->lock);
  pParam->cnt++;
X
Xiaoyu Wang 已提交
2514
  if (code != TSDB_CODE_SUCCESS) {
2515 2516
    pParam->request->code = code;
    pParam->request->body.resInfo.numOfRows += rows;
2517
  } else {
2518 2519
    pParam->request->body.resInfo.numOfRows += info->affectedRows;
  }
2520 2521 2522
  // unlock
  taosThreadSpinUnlock(&pParam->lock);

2523 2524
  if (pParam->cnt == pParam->total) {
    tsem_post(&pParam->sem);
wmmhello's avatar
wmmhello 已提交
2525
  }
wmmhello's avatar
wmmhello 已提交
2526
  uDebug("SML:0x%" PRIx64 " insert finished, code: %d, rows: %d, total: %d", info->id, code, rows, info->affectedRows);
wmmhello's avatar
wmmhello 已提交
2527 2528 2529
  info->cost.endTime = taosGetTimestampUs();
  info->cost.code = code;
  smlPrintStatisticInfo(info);
wmmhello's avatar
wmmhello 已提交
2530 2531 2532
  smlDestroyInfo(info);
}

dengyihao's avatar
dengyihao 已提交
2533
TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd,
2534
                                       int numLines, int protocol, int precision, int32_t ttl) {
2535 2536
  int      batchs = 0;
  STscObj *pTscObj = request->pTscObj;
2537

2538
  pTscObj->schemalessType = 1;
wmmhello's avatar
wmmhello 已提交
2539
  SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
wmmhello's avatar
wmmhello 已提交
2540

2541
  Params params = {0};
wmmhello's avatar
wmmhello 已提交
2542
  params.request = request;
wmmhello's avatar
wmmhello 已提交
2543 2544 2545
  tsem_init(&params.sem, 0, 0);
  taosThreadSpinInit(&(params.lock), 0);

X
Xiaoyu Wang 已提交
2546
  if (request->pDb == NULL) {
wmmhello's avatar
wmmhello 已提交
2547
    request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
wmmhello's avatar
wmmhello 已提交
2548
    smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
wmmhello's avatar
wmmhello 已提交
2549 2550 2551
    goto end;
  }

2552
  if (isSchemalessDb(pTscObj, request) != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2553
    request->code = TSDB_CODE_SML_INVALID_DB_CONF;
wmmhello's avatar
wmmhello 已提交
2554
    smlBuildInvalidDataMsg(&msg, "Cannot write data to a non schemaless database", NULL);
wmmhello's avatar
wmmhello 已提交
2555 2556 2557
    goto end;
  }

X
Xiaoyu Wang 已提交
2558
  if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) {
2559
    request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
wmmhello's avatar
wmmhello 已提交
2560
    smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL);
2561
    goto end;
wmmhello's avatar
wmmhello 已提交
2562 2563
  }

X
Xiaoyu Wang 已提交
2564 2565
  if (protocol == TSDB_SML_LINE_PROTOCOL &&
      (precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)) {
2566
    request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
wmmhello's avatar
wmmhello 已提交
2567
    smlBuildInvalidDataMsg(&msg, "precision invalidate for line protocol", NULL);
2568 2569 2570
    goto end;
  }

2571
  if (protocol == TSDB_SML_JSON_PROTOCOL) {
wmmhello's avatar
wmmhello 已提交
2572
    numLines = 1;
2573
  } else if (numLines <= 0) {
wmmhello's avatar
wmmhello 已提交
2574 2575 2576 2577 2578
    request->code = TSDB_CODE_SML_INVALID_DATA;
    smlBuildInvalidDataMsg(&msg, "line num is invalid", NULL);
    goto end;
  }

2579
  batchs = ceil(((double)numLines) / tsSmlBatchSize);
2580
  params.total = batchs;
wmmhello's avatar
wmmhello 已提交
2581
  for (int i = 0; i < batchs; ++i) {
dengyihao's avatar
dengyihao 已提交
2582
    SRequestObj *req = (SRequestObj *)createRequest(pTscObj->id, TSDB_SQL_INSERT, 0);
2583
    if (!req) {
wmmhello's avatar
wmmhello 已提交
2584 2585 2586 2587
      request->code = TSDB_CODE_OUT_OF_MEMORY;
      uError("SML:taos_schemaless_insert error request is null");
      goto end;
    }
2588 2589
    SSmlHandle *info = smlBuildSmlInfo(pTscObj, req, (SMLProtocolType)protocol, precision);
    if (!info) {
wmmhello's avatar
wmmhello 已提交
2590 2591 2592 2593 2594
      request->code = TSDB_CODE_OUT_OF_MEMORY;
      uError("SML:taos_schemaless_insert error SSmlHandle is null");
      goto end;
    }

wmmhello's avatar
wmmhello 已提交
2595
    info->isRawLine = (rawLine == NULL);
2596
    info->ttl       = ttl;
wmmhello's avatar
wmmhello 已提交
2597

2598
    int32_t perBatch = tsSmlBatchSize;
wmmhello's avatar
wmmhello 已提交
2599

X
Xiaoyu Wang 已提交
2600
    if (numLines > perBatch) {
wmmhello's avatar
wmmhello 已提交
2601
      numLines -= perBatch;
X
Xiaoyu Wang 已提交
2602
    } else {
wmmhello's avatar
wmmhello 已提交
2603 2604 2605 2606
      perBatch = numLines;
      numLines = 0;
    }

wmmhello's avatar
wmmhello 已提交
2607
    info->params = &params;
wmmhello's avatar
wmmhello 已提交
2608 2609
    info->affectedRows = perBatch;
    info->pRequest->body.queryFp = smlInsertCallback;
X
Xiaoyu Wang 已提交
2610
    info->pRequest->body.param = info;
wmmhello's avatar
wmmhello 已提交
2611
    int32_t code = smlProcess(info, lines, rawLine, rawLineEnd, perBatch);
dengyihao's avatar
dengyihao 已提交
2612
    if (lines) {
wmmhello's avatar
wmmhello 已提交
2613 2614
      lines += perBatch;
    }
dengyihao's avatar
dengyihao 已提交
2615
    if (rawLine) {
wmmhello's avatar
wmmhello 已提交
2616
      int num = 0;
dengyihao's avatar
dengyihao 已提交
2617 2618
      while (rawLine < rawLineEnd) {
        if (*(rawLine++) == '\n') {
wmmhello's avatar
wmmhello 已提交
2619 2620
          num++;
        }
dengyihao's avatar
dengyihao 已提交
2621
        if (num == perBatch) {
wmmhello's avatar
wmmhello 已提交
2622 2623 2624 2625
          break;
        }
      }
    }
X
Xiaoyu Wang 已提交
2626
    if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2627 2628 2629 2630
      info->pRequest->body.queryFp(info, req, code);
    }
  }
  tsem_wait(&params.sem);
2631

dengyihao's avatar
dengyihao 已提交
2632
end:
wmmhello's avatar
wmmhello 已提交
2633 2634
  taosThreadSpinDestroy(&params.lock);
  tsem_destroy(&params.sem);
2635
  //  ((STscObj *)taos)->schemalessType = 0;
2636
  pTscObj->schemalessType = 1;
2637
  uDebug("resultend:%s", request->msgBuf);
2638
  return (TAOS_RES *)request;
wmmhello's avatar
wmmhello 已提交
2639
}
wmmhello's avatar
wmmhello 已提交
2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656

/**
 * taos_schemaless_insert() parse and insert data points into database according to
 * different protocol.
 *
 * @param $lines input array may contain multiple lines, each line indicates a data point.
 *               If protocol=2 is used input array should contain single JSON
 *               string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
 *               multiple data points in JSON format, should include them in $JSON_string
 *               as a JSON array.
 * @param $numLines indicates how many data points in $lines.
 *                  If protocol = 2 is used this param will be ignored as $lines should
 *                  contain single JSON string.
 * @param $protocol indicates which protocol to use for parsing:
 *                  0 - influxDB line protocol
 *                  1 - OpenTSDB telnet line protocol
 *                  2 - OpenTSDB JSON format protocol
dengyihao's avatar
dengyihao 已提交
2657
 * @return TAOS_RES
wmmhello's avatar
wmmhello 已提交
2658 2659
 */

2660 2661
TAOS_RES *taos_schemaless_insert_ttl_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision,
                                                int32_t ttl, int64_t reqid) {
wmmhello's avatar
wmmhello 已提交
2662 2663 2664 2665 2666
  if (NULL == taos) {
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    return NULL;
  }

2667
  SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
dengyihao's avatar
dengyihao 已提交
2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679
  if (!request) {
    uError("SML:taos_schemaless_insert error request is null");
    return NULL;
  }

  if (!lines) {
    SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
    request->code = TSDB_CODE_SML_INVALID_DATA;
    smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
    return (TAOS_RES *)request;
  }

2680
  return taos_schemaless_insert_inner(request, lines, NULL, NULL, numLines, protocol, precision, ttl);
dengyihao's avatar
dengyihao 已提交
2681 2682
}

2683 2684 2685
TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, TSDB_DEFAULT_TABLE_TTL, 0);
}
wmmhello's avatar
wmmhello 已提交
2686

2687 2688 2689
TAOS_RES *taos_schemaless_insert_ttl(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int32_t ttl) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, ttl, 0);
}
wmmhello's avatar
wmmhello 已提交
2690

2691 2692
TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision, int64_t reqid) {
  return taos_schemaless_insert_ttl_with_reqid(taos, lines, numLines, protocol, precision, TSDB_DEFAULT_TABLE_TTL, reqid);
wmmhello's avatar
wmmhello 已提交
2693 2694
}

2695 2696
TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
                                                int precision, int32_t ttl, int64_t reqid) {
dengyihao's avatar
dengyihao 已提交
2697 2698 2699 2700 2701
  if (NULL == taos) {
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    return NULL;
  }

2702
  SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
dengyihao's avatar
dengyihao 已提交
2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726
  if (!request) {
    uError("SML:taos_schemaless_insert error request is null");
    return NULL;
  }

  if (!lines || len <= 0) {
    SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
    request->code = TSDB_CODE_SML_INVALID_DATA;
    smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
    return (TAOS_RES *)request;
  }

  int numLines = 0;
  *totalRows = 0;
  char *tmp = lines;
  for (int i = 0; i < len; i++) {
    if (lines[i] == '\n' || i == len - 1) {
      numLines++;
      if (tmp[0] != '#' || protocol != TSDB_SML_LINE_PROTOCOL) {  // ignore comment
        (*totalRows)++;
      }
      tmp = lines + i + 1;
    }
  }
2727
  return taos_schemaless_insert_inner(request, NULL, lines, lines + len, numLines, protocol, precision, ttl);
dengyihao's avatar
dengyihao 已提交
2728 2729
}

2730 2731 2732 2733 2734 2735
TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int64_t reqid) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, TSDB_DEFAULT_TABLE_TTL, reqid);
}
TAOS_RES *taos_schemaless_insert_raw_ttl(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int32_t ttl) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, ttl, 0);
}
wmmhello's avatar
wmmhello 已提交
2736

2737 2738
TAOS_RES *taos_schemaless_insert_raw(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision) {
  return taos_schemaless_insert_raw_ttl_with_reqid(taos, lines, len, totalRows, protocol, precision, TSDB_DEFAULT_TABLE_TTL, 0);
wmmhello's avatar
wmmhello 已提交
2739
}