clientSml.c 77.9 KB
Newer Older
wmmhello's avatar
wmmhello 已提交
1 2 3 4 5 6 7
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include "query.h"
#include "taos.h"
wmmhello's avatar
wmmhello 已提交
8 9 10 11 12 13 14 15
#include "taoserror.h"
#include "tdef.h"
#include "tlog.h"
#include "tmsg.h"
#include "ttime.h"
#include "ttypes.h"
#include "tcommon.h"
#include "catalog.h"
16
#include "clientInt.h"
17
#include "tname.h"
18
#include "cJSON.h"
19
#include "tglobal.h"
wmmhello's avatar
wmmhello 已提交
20 21 22
#include "osSemaphore.h"
#include "osThread.h"

wmmhello's avatar
wmmhello 已提交
23
//=================================================================================================
wmmhello's avatar
wmmhello 已提交
24 25 26 27 28 29 30

#define SPACE ' '
#define COMMA ','
#define EQUAL '='
#define QUOTE '"'
#define SLASH '\\'

wmmhello's avatar
wmmhello 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
#define JUMP_SPACE(sql) while (*sql != '\0'){if(*sql == SPACE) sql++;else break;}
// comma ,
#define IS_SLASH_COMMA(sql) (*(sql) == COMMA && *((sql) - 1) == SLASH)
#define IS_COMMA(sql) (*(sql) == COMMA && *((sql) - 1) != SLASH)
// space
#define IS_SLASH_SPACE(sql) (*(sql) == SPACE && *((sql) - 1) == SLASH)
#define IS_SPACE(sql) (*(sql) == SPACE && *((sql) - 1) != SLASH)
// equal =
#define IS_SLASH_EQUAL(sql) (*(sql) == EQUAL && *((sql) - 1) == SLASH)
#define IS_EQUAL(sql) (*(sql) == EQUAL && *((sql) - 1) != SLASH)
// quote "
#define IS_SLASH_QUOTE(sql) (*(sql) == QUOTE && *((sql) - 1) == SLASH)
#define IS_QUOTE(sql) (*(sql) == QUOTE && *((sql) - 1) != SLASH)
// SLASH
#define IS_SLASH_SLASH(sql) (*(sql) == SLASH && *((sql) - 1) == SLASH)

#define IS_SLASH_LETTER(sql) (IS_SLASH_COMMA(sql) || IS_SLASH_SPACE(sql) || IS_SLASH_EQUAL(sql) || IS_SLASH_QUOTE(sql) || IS_SLASH_SLASH(sql))

#define MOVE_FORWARD_ONE(sql,len) (memmove((void*)((sql) - 1), (sql), len))

#define PROCESS_SLASH(key,keyLen)       \
for (int i = 1; i < keyLen; ++i) {      \
  if(IS_SLASH_LETTER(key+i)){           \
    MOVE_FORWARD_ONE(key+i, keyLen-i);  \
    i--;                                \
    keyLen--;                           \
  }                                     \
}

60 61 62
#define IS_INVALID_COL_LEN(len)   ((len) <= 0 || (len) >= TSDB_COL_NAME_LEN)
#define IS_INVALID_TABLE_LEN(len) ((len) <= 0 || (len) >= TSDB_TABLE_NAME_LEN)

63 64 65
#define OTD_JSON_SUB_FIELDS_NUM 2
#define OTD_JSON_FIELDS_NUM     4

wmmhello's avatar
wmmhello 已提交
66 67
#define TS              "_ts"
#define TS_LEN          3
wmmhello's avatar
wmmhello 已提交
68 69
#define VALUE           "_value"
#define VALUE_LEN       6
70 71 72

#define BINARY_ADD_LEN 2        // "binary"   2 means " "
#define NCHAR_ADD_LEN 3         // L"nchar"   3 means L" "
wmmhello's avatar
wmmhello 已提交
73 74

#define MAX_RETRY_TIMES 5
wmmhello's avatar
wmmhello 已提交
75
#define LINE_BATCH  20
wmmhello's avatar
wmmhello 已提交
76 77 78 79 80 81 82 83 84 85 86 87
//=================================================================================================
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;

typedef enum {
  SCHEMA_ACTION_CREATE_STABLE,
  SCHEMA_ACTION_ADD_COLUMN,
  SCHEMA_ACTION_ADD_TAG,
  SCHEMA_ACTION_CHANGE_COLUMN_SIZE,
  SCHEMA_ACTION_CHANGE_TAG_SIZE,
} ESchemaAction;

typedef struct {
88
  char          sTableName[TSDB_TABLE_NAME_LEN];
89 90
  SArray        *tags;
  SArray        *fields;
wmmhello's avatar
wmmhello 已提交
91 92 93
} SCreateSTableActionInfo;

typedef struct {
94
  char          sTableName[TSDB_TABLE_NAME_LEN];
95
  SSmlKv        *field;
wmmhello's avatar
wmmhello 已提交
96 97 98
} SAlterSTableActionInfo;

typedef struct {
99
  ESchemaAction             action;
wmmhello's avatar
wmmhello 已提交
100 101
  union {
    SCreateSTableActionInfo createSTable;
102
    SAlterSTableActionInfo  alterSTable;
wmmhello's avatar
wmmhello 已提交
103 104 105 106
  };
} SSchemaAction;

typedef struct {
107 108 109 110
  const char  *measure;
  const char  *tags;
  const char  *cols;
  const char  *timestamp;
wmmhello's avatar
wmmhello 已提交
111 112 113 114 115 116 117 118 119 120

  int32_t measureLen;
  int32_t measureTagsLen;
  int32_t tagsLen;
  int32_t colsLen;
  int32_t timestampLen;
} SSmlLineInfo;

typedef struct {
  const char     *sTableName;   // super table name
121
  int32_t        sTableNameLen;
wmmhello's avatar
wmmhello 已提交
122 123 124 125 126
  char           childTableName[TSDB_TABLE_NAME_LEN];
  uint64_t       uid;

  SArray         *tags;

127 128 129
  // if info->formatData is true, elements are SArray<SSmlKv*>.
  // if info->formatData is false, elements are SHashObj<cols key string, SSmlKv*> for find by key quickly
  SArray         *cols;
wmmhello's avatar
wmmhello 已提交
130 131 132
} SSmlTableInfo;

typedef struct {
133 134 135 136
  SArray     *tags;       // save the origin order to create table
  SHashObj   *tagHash;    // elements are <key, index in tags>

  SArray     *cols;
wmmhello's avatar
wmmhello 已提交
137
  SHashObj   *colHash;
138

wmmhello's avatar
wmmhello 已提交
139 140 141 142 143 144 145 146
  STableMeta *tableMeta;
} SSmlSTableMeta;

typedef struct {
  int32_t   len;
  char      *buf;
} SSmlMsgBuf;

147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
typedef struct {
  int32_t code;
  int32_t lineNum;

  int32_t numOfSTables;
  int32_t numOfCTables;
  int32_t numOfCreateSTables;

  int64_t parseTime;
  int64_t schemaTime;
  int64_t insertBindTime;
  int64_t insertRpcTime;
  int64_t endTime;
} SSmlCostInfo;

wmmhello's avatar
wmmhello 已提交
162 163 164 165 166 167 168
typedef struct{
  SRequestObj*    request;
  SCatalog*       catalog;
  tsem_t          sem;
  TdThreadSpinlock lock;
} Params;

wmmhello's avatar
wmmhello 已提交
169
typedef struct {
170
  int64_t           id;
wmmhello's avatar
wmmhello 已提交
171
  Params            *params;
wmmhello's avatar
wmmhello 已提交
172
  bool              isLast;
wmmhello's avatar
wmmhello 已提交
173 174 175

  SMLProtocolType   protocol;
  int8_t            precision;
176
  bool              dataFormat;     // true means that the name and order of keys in each line are the same(only for influx protocol)
wmmhello's avatar
wmmhello 已提交
177 178 179 180 181 182 183 184 185 186 187

  SHashObj          *childTables;
  SHashObj          *superTables;
  SHashObj          *pVgHash;
  void              *exec;

  STscObj           *taos;
  SCatalog          *pCatalog;
  SRequestObj       *pRequest;
  SQuery            *pQuery;

188
  SSmlCostInfo      cost;
wmmhello's avatar
wmmhello 已提交
189 190
  int32_t           affectedRows;
  SSmlMsgBuf        msgBuf;
191
  SHashObj          *dumplicateKey;  // for dumplicate key
192
  SArray            *colsContainer;  // for cols parse, if dataFormat == false
wmmhello's avatar
wmmhello 已提交
193
} SSmlHandle;
wmmhello's avatar
wmmhello 已提交
194 195
//=================================================================================================

wmmhello's avatar
wmmhello 已提交
196
//=================================================================================================
197
static volatile int64_t linesSmlHandleId = 0;
wmmhello's avatar
wmmhello 已提交
198 199
static int64_t smlGenId() {
  int64_t id;
wmmhello's avatar
wmmhello 已提交
200 201 202 203 204 205 206 207

  do {
    id = atomic_add_fetch_64(&linesSmlHandleId, 1);
  } while (id == 0);

  return id;
}

208 209 210 211 212 213 214 215 216 217 218 219 220 221
static inline bool smlDoubleToInt64OverFlow(double num) {
  if(num >= (double)INT64_MAX || num <= (double)INT64_MIN) return true;
  return false;
}

static inline bool smlCheckDuplicateKey(const char *key, int32_t keyLen, SHashObj *pHash) {
  void *val = taosHashGet(pHash, key, keyLen);
  if (val) {
    return true;
  }
  taosHashPut(pHash, key, keyLen, key, 1);
  return false;
}

wmmhello's avatar
wmmhello 已提交
222
static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf* pBuf, const char *msg1, const char *msg2) {
223
  memset(pBuf->buf, 0 , pBuf->len);
wmmhello's avatar
wmmhello 已提交
224 225 226 227 228 229
  if(msg1) strncat(pBuf->buf, msg1, pBuf->len);
  int32_t left = pBuf->len - strlen(pBuf->buf);
  if(left > 2 && msg2) {
    strncat(pBuf->buf, ":", left - 1);
    strncat(pBuf->buf, msg2, left - 2);
  }
wmmhello's avatar
wmmhello 已提交
230 231 232
  return TSDB_CODE_SML_INVALID_DATA;
}

233
static int32_t smlGenerateSchemaAction(SSchema* colField, SHashObj* colHash, SSmlKv* kv, bool isTag,
wmmhello's avatar
wmmhello 已提交
234
                                       SSchemaAction* action, bool* actionNeeded, SSmlHandle* info) {
235
  uint16_t *index = (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen);
236 237 238 239 240 241 242
  if (index) {
    if (colField[*index].type != kv->type) {
      uError("SML:0x%"PRIx64" point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id, kv->key,
             colField[*index].type, kv->type);
      return TSDB_CODE_TSC_INVALID_VALUE;
    }

243 244
    if ((colField[*index].type == TSDB_DATA_TYPE_VARCHAR && (colField[*index].bytes - VARSTR_HEADER_SIZE) < kv->length) ||
        (colField[*index].type == TSDB_DATA_TYPE_NCHAR &&((colField[*index].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE < kv->length))) {
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
      if (isTag) {
        action->action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
      } else {
        action->action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
      }
      action->alterSTable.field = kv;
      *actionNeeded = true;
    }
  } else {
    if (isTag) {
      action->action = SCHEMA_ACTION_ADD_TAG;
    } else {
      action->action = SCHEMA_ACTION_ADD_COLUMN;
    }
    action->alterSTable.field = kv;
    *actionNeeded = true;
  }
  if (*actionNeeded) {
    uDebug("SML:0x%" PRIx64 " generate schema action. column name: %s, action: %d", info->id, colField->name,
             action->action);
  }
wmmhello's avatar
wmmhello 已提交
266 267 268
  return 0;
}

wmmhello's avatar
wmmhello 已提交
269 270 271 272 273 274 275 276
static int32_t smlFindNearestPowerOf2(int32_t length){
  int32_t result = 1;
  while(result <= length){
    result *= 2;
  }
  return result;
}

wmmhello's avatar
wmmhello 已提交
277
static int32_t smlBuildColumnDescription(SSmlKv* field, char* buf, int32_t bufSize, int32_t* outBytes) {
wmmhello's avatar
wmmhello 已提交
278
  uint8_t type = field->type;
wmmhello's avatar
wmmhello 已提交
279 280
  char    tname[TSDB_TABLE_NAME_LEN] = {0};
  memcpy(tname, field->key, field->keyLen);
wmmhello's avatar
wmmhello 已提交
281
  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
wmmhello's avatar
wmmhello 已提交
282
    int32_t bytes = smlFindNearestPowerOf2(field->length);
283
    int out = snprintf(buf, bufSize, "`%s` %s(%d)",
284
                       tname, tDataTypes[field->type].name, bytes);
wmmhello's avatar
wmmhello 已提交
285 286
    *outBytes = out;
  } else {
287
    int out = snprintf(buf, bufSize, "`%s` %s", tname, tDataTypes[type].name);
wmmhello's avatar
wmmhello 已提交
288 289 290 291 292 293
    *outBytes = out;
  }

  return 0;
}

wmmhello's avatar
wmmhello 已提交
294
static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
wmmhello's avatar
wmmhello 已提交
295 296
  int32_t code = 0;
  int32_t outBytes = 0;
wmmhello's avatar
wmmhello 已提交
297 298
  char *result = (char *)taosMemoryCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN);
  int32_t capacity = TSDB_MAX_ALLOWED_SQL_LEN;
wmmhello's avatar
wmmhello 已提交
299 300 301 302

  uDebug("SML:0x%"PRIx64" apply schema action. action: %d", info->id, action->action);
  switch (action->action) {
    case SCHEMA_ACTION_ADD_COLUMN: {
303
      int n = sprintf(result, "alter stable `%s` add column ", action->alterSTable.sTableName);
wmmhello's avatar
wmmhello 已提交
304 305
      smlBuildColumnDescription(action->alterSTable.field, result+n, capacity-n, &outBytes);
      TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery
wmmhello's avatar
wmmhello 已提交
306
      code = taos_errno(res);
wmmhello's avatar
wmmhello 已提交
307
      const char* errStr = taos_errstr(res);
wmmhello's avatar
wmmhello 已提交
308 309 310 311 312
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%"PRIx64" apply schema action. error: %s", info->id, errStr);
      }
      taos_free_result(res);

wmmhello's avatar
wmmhello 已提交
313
//      if (code == TSDB_CODE_MND_FIELD_ALREADY_EXIST || code == TSDB_CODE_MND_TAG_ALREADY_EXIST || tscDupColNames) {
wmmhello's avatar
wmmhello 已提交
314
      if (code == TSDB_CODE_MND_TAG_ALREADY_EXIST) {
wmmhello's avatar
wmmhello 已提交
315
        TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
wmmhello's avatar
wmmhello 已提交
316 317 318 319 320
        code = taos_errno(res2);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
        taos_free_result(res2);
wmmhello's avatar
wmmhello 已提交
321
        taosMsleep(500);
wmmhello's avatar
wmmhello 已提交
322 323 324 325
      }
      break;
    }
    case SCHEMA_ACTION_ADD_TAG: {
326
      int n = sprintf(result, "alter stable `%s` add tag ", action->alterSTable.sTableName);
wmmhello's avatar
wmmhello 已提交
327
      smlBuildColumnDescription(action->alterSTable.field,
wmmhello's avatar
wmmhello 已提交
328
                             result+n, capacity-n, &outBytes);
wmmhello's avatar
wmmhello 已提交
329
      TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery
wmmhello's avatar
wmmhello 已提交
330
      code = taos_errno(res);
wmmhello's avatar
wmmhello 已提交
331
      const char* errStr = taos_errstr(res);
wmmhello's avatar
wmmhello 已提交
332 333 334 335 336
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
      }
      taos_free_result(res);

wmmhello's avatar
wmmhello 已提交
337
//      if (code ==TSDB_CODE_MND_TAG_ALREADY_EXIST || code == TSDB_CODE_MND_FIELD_ALREAY_EXIST || tscDupColNames) {
wmmhello's avatar
wmmhello 已提交
338
      if (code ==TSDB_CODE_MND_TAG_ALREADY_EXIST) {
wmmhello's avatar
wmmhello 已提交
339
        TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
wmmhello's avatar
wmmhello 已提交
340 341 342 343 344
        code = taos_errno(res2);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
        taos_free_result(res2);
wmmhello's avatar
wmmhello 已提交
345
        taosMsleep(500);
wmmhello's avatar
wmmhello 已提交
346 347 348 349
      }
      break;
    }
    case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: {
350
      int n = sprintf(result, "alter stable `%s` modify column ", action->alterSTable.sTableName);
wmmhello's avatar
wmmhello 已提交
351
      smlBuildColumnDescription(action->alterSTable.field, result+n,
wmmhello's avatar
wmmhello 已提交
352
                             capacity-n, &outBytes);
wmmhello's avatar
wmmhello 已提交
353
      TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery
wmmhello's avatar
wmmhello 已提交
354 355 356 357 358 359
      code = taos_errno(res);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
      }
      taos_free_result(res);

wmmhello's avatar
wmmhello 已提交
360 361
//      if (code == TSDB_CODE_MND_INVALID_COLUMN_LENGTH || code == TSDB_CODE_TSC_INVALID_COLUMN_LENGTH) {
      if (code == TSDB_CODE_TSC_INVALID_COLUMN_LENGTH) {
wmmhello's avatar
wmmhello 已提交
362
        TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
wmmhello's avatar
wmmhello 已提交
363 364 365 366 367
        code = taos_errno(res2);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
        taos_free_result(res2);
wmmhello's avatar
wmmhello 已提交
368
        taosMsleep(500);
wmmhello's avatar
wmmhello 已提交
369 370 371 372
      }
      break;
    }
    case SCHEMA_ACTION_CHANGE_TAG_SIZE: {
373
      int n = sprintf(result, "alter stable `%s` modify tag ", action->alterSTable.sTableName);
wmmhello's avatar
wmmhello 已提交
374
      smlBuildColumnDescription(action->alterSTable.field, result+n,
wmmhello's avatar
wmmhello 已提交
375
                             capacity-n, &outBytes);
wmmhello's avatar
wmmhello 已提交
376
      TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery
wmmhello's avatar
wmmhello 已提交
377 378 379 380 381 382
      code = taos_errno(res);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
      }
      taos_free_result(res);

wmmhello's avatar
wmmhello 已提交
383 384
//      if (code == TSDB_CODE_MND_INVALID_TAG_LENGTH || code == TSDB_CODE_TSC_INVALID_TAG_LENGTH) {
      if (code == TSDB_CODE_TSC_INVALID_TAG_LENGTH) {
wmmhello's avatar
wmmhello 已提交
385
        TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
wmmhello's avatar
wmmhello 已提交
386 387 388 389 390
        code = taos_errno(res2);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
        taos_free_result(res2);
wmmhello's avatar
wmmhello 已提交
391
        taosMsleep(500);
wmmhello's avatar
wmmhello 已提交
392 393 394 395
      }
      break;
    }
    case SCHEMA_ACTION_CREATE_STABLE: {
396
      int n = sprintf(result, "create stable `%s` (", action->createSTable.sTableName);
wmmhello's avatar
wmmhello 已提交
397
      char* pos = result + n; int freeBytes = capacity - n;
wmmhello's avatar
wmmhello 已提交
398

399
      SArray *cols = action->createSTable.fields;
wmmhello's avatar
wmmhello 已提交
400 401

      for(int i = 0; i < taosArrayGetSize(cols); i++){
wmmhello's avatar
wmmhello 已提交
402
        SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
403
        smlBuildColumnDescription(kv, pos, freeBytes, &outBytes);
wmmhello's avatar
wmmhello 已提交
404 405 406
        pos += outBytes; freeBytes -= outBytes;
        *pos = ','; ++pos; --freeBytes;
      }
wmmhello's avatar
wmmhello 已提交
407

wmmhello's avatar
wmmhello 已提交
408 409 410 411 412
      --pos; ++freeBytes;

      outBytes = snprintf(pos, freeBytes, ") tags (");
      pos += outBytes; freeBytes -= outBytes;

413 414
      cols = action->createSTable.tags;
      for(int i = 0; i < taosArrayGetSize(cols); i++){
wmmhello's avatar
wmmhello 已提交
415
        SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
416
        smlBuildColumnDescription(kv, pos, freeBytes, &outBytes);
wmmhello's avatar
wmmhello 已提交
417 418 419
        pos += outBytes; freeBytes -= outBytes;
        *pos = ','; ++pos; --freeBytes;
      }
420 421
      if(taosArrayGetSize(cols) == 0){
        outBytes = snprintf(pos, freeBytes,"`%s` %s(%d)",
wmmhello's avatar
wmmhello 已提交
422
                            tsSmlTagName, tDataTypes[TSDB_DATA_TYPE_NCHAR].name, 1);
423 424 425
        pos += outBytes; freeBytes -= outBytes;
        *pos = ','; ++pos; --freeBytes;
      }
wmmhello's avatar
wmmhello 已提交
426 427
      pos--; ++freeBytes;
      outBytes = snprintf(pos, freeBytes, ")");
wmmhello's avatar
wmmhello 已提交
428
      TAOS_RES* res = taos_query(info->taos, result);
wmmhello's avatar
wmmhello 已提交
429 430 431 432 433 434
      code = taos_errno(res);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
      }
      taos_free_result(res);

wmmhello's avatar
wmmhello 已提交
435
      if (code == TSDB_CODE_MND_STB_ALREADY_EXIST) {
wmmhello's avatar
wmmhello 已提交
436
        TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
wmmhello's avatar
wmmhello 已提交
437 438 439 440 441
        code = taos_errno(res2);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
        taos_free_result(res2);
wmmhello's avatar
wmmhello 已提交
442
        taosMsleep(500);
wmmhello's avatar
wmmhello 已提交
443 444 445 446 447 448 449 450
      }
      break;
    }

    default:
      break;
  }

wmmhello's avatar
wmmhello 已提交
451
  taosMemoryFreeClear(result);
wmmhello's avatar
wmmhello 已提交
452 453 454 455 456 457
  if (code != 0) {
    uError("SML:0x%"PRIx64 " apply schema action failure. %s", info->id, tstrerror(code));
  }
  return code;
}

458 459 460
static int32_t smlProcessSchemaAction(SSmlHandle* info, SSchema* schemaField, SHashObj* schemaHash, SArray *cols, SSchemaAction* action, bool isTag){
  int32_t code = TSDB_CODE_SUCCESS;
  for (int j = 0; j < taosArrayGetSize(cols); ++j) {
461
    SSmlKv* kv = (SSmlKv*)taosArrayGetP(cols, j);
462 463 464 465 466 467 468 469 470 471 472 473 474 475 476
    bool actionNeeded = false;
    code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, &actionNeeded, info);
    if(code != TSDB_CODE_SUCCESS){
      return code;
    }
    if (actionNeeded) {
      code = smlApplySchemaAction(info, action);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
477
static int32_t smlModifyDBSchemas(SSmlHandle* info) {
wmmhello's avatar
wmmhello 已提交
478
  int32_t code = 0;
479 480
  SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
  strcpy(pName.dbname, info->pRequest->pDb);
wmmhello's avatar
wmmhello 已提交
481

D
dapan1121 已提交
482 483 484 485 486
  SRequestConnInfo conn = {.pTrans = info->taos->pAppInfo->pTransporter, 
                           .requestId = info->pRequest->requestId,
                           .requestObjRefId = info->pRequest->self,
                           .mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp)};

wmmhello's avatar
wmmhello 已提交
487
  SSmlSTableMeta** tableMetaSml = (SSmlSTableMeta**)taosHashIterate(info->superTables, NULL);
wmmhello's avatar
wmmhello 已提交
488
  while (tableMetaSml) {
489
    SSmlSTableMeta* sTableData = *tableMetaSml;
wmmhello's avatar
wmmhello 已提交
490 491
    STableMeta *pTableMeta = NULL;

wmmhello's avatar
wmmhello 已提交
492
    size_t superTableLen = 0;
wmmhello's avatar
wmmhello 已提交
493
    void *superTable = taosHashGetKey(tableMetaSml, &superTableLen);
494
    memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
wmmhello's avatar
wmmhello 已提交
495
    memcpy(pName.tname, superTable, superTableLen);
wmmhello's avatar
wmmhello 已提交
496

D
dapan1121 已提交
497
    code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
wmmhello's avatar
wmmhello 已提交
498

D
dapan1121 已提交
499
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_INVALID_STB) {
500 501 502
      SSchemaAction schemaAction;
      schemaAction.action = SCHEMA_ACTION_CREATE_STABLE;
      memset(&schemaAction.createSTable, 0, sizeof(SCreateSTableActionInfo));
wmmhello's avatar
wmmhello 已提交
503
      memcpy(schemaAction.createSTable.sTableName, superTable, superTableLen);
504 505
      schemaAction.createSTable.tags = sTableData->tags;
      schemaAction.createSTable.fields = sTableData->cols;
wmmhello's avatar
wmmhello 已提交
506
      code = smlApplySchemaAction(info, &schemaAction);
507
      if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
508
        uError("SML:0x%"PRIx64" smlApplySchemaAction failed. can not create %s", info->id, schemaAction.createSTable.sTableName);
509
        goto end;
wmmhello's avatar
wmmhello 已提交
510
      }
511 512
      info->cost.numOfCreateSTables++;
    }else if (code == TSDB_CODE_SUCCESS) {
513
      SHashObj *hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
514 515 516
      for(uint16_t i = pTableMeta->tableInfo.numOfColumns; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++){
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
      }
wmmhello's avatar
wmmhello 已提交
517

518 519
      SSchemaAction schemaAction;
      memset(&schemaAction, 0, sizeof(SSchemaAction));
520
      memcpy(schemaAction.createSTable.sTableName, superTable, superTableLen);
521 522 523
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, &schemaAction, true);
      if (code != TSDB_CODE_SUCCESS) {
        taosHashCleanup(hashTmp);
524
        goto end;
525 526 527 528 529 530 531 532 533
      }

      taosHashClear(hashTmp);
      for(uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns; i++){
        taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
      }
      code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &schemaAction, false);
      taosHashCleanup(hashTmp);
      if (code != TSDB_CODE_SUCCESS) {
534
        goto end;
wmmhello's avatar
wmmhello 已提交
535
      }
wmmhello's avatar
wmmhello 已提交
536

D
dapan1121 已提交
537
      code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
wmmhello's avatar
wmmhello 已提交
538
      if (code != TSDB_CODE_SUCCESS) {
539
        goto end;
wmmhello's avatar
wmmhello 已提交
540
      }
wmmhello's avatar
wmmhello 已提交
541 542
    } else {
      uError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code));
543
      goto end;
wmmhello's avatar
wmmhello 已提交
544
    }
545 546
    if(pTableMeta) taosMemoryFree(pTableMeta);

D
dapan1121 已提交
547
    code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
548 549
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%"PRIx64" catalogGetSTableMeta failed. super table name %s", info->id, (char*)superTable);
550
      goto end;
551
    }
552
    sTableData->tableMeta = pTableMeta;
wmmhello's avatar
wmmhello 已提交
553

wmmhello's avatar
wmmhello 已提交
554
    tableMetaSml = (SSmlSTableMeta**)taosHashIterate(info->superTables, tableMetaSml);
wmmhello's avatar
wmmhello 已提交
555 556
  }
  return 0;
557 558

end:
D
dapan1121 已提交
559
  catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
560
  return code;
wmmhello's avatar
wmmhello 已提交
561 562
}

563
static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg){
wmmhello's avatar
wmmhello 已提交
564
  const char *pVal = kvVal->value;
wmmhello's avatar
wmmhello 已提交
565
  int32_t len = kvVal->length;
566
  char *endptr = NULL;
wafwerar's avatar
wafwerar 已提交
567
  double result = taosStr2Double(pVal, &endptr);
568 569
  if(pVal == endptr){
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
570 571 572
    return false;
  }

573 574 575 576 577 578 579 580
  int32_t left = len - (endptr - pVal);
  if(left == 0 || (left == 3 && strncasecmp(endptr, "f64", left) == 0)){
    kvVal->type = TSDB_DATA_TYPE_DOUBLE;
    kvVal->d = result;
  }else if ((left == 3 && strncasecmp(endptr, "f32", left) == 0)){
    if(!IS_VALID_FLOAT(result)){
      smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
581
    }
582 583 584
    kvVal->type = TSDB_DATA_TYPE_FLOAT;
    kvVal->f = (float)result;
  }else if ((left == 1 && *endptr == 'i') || (left == 3 && strncasecmp(endptr, "i64", left) == 0)){
wmmhello's avatar
wmmhello 已提交
585 586 587 588 589 590 591 592 593 594
    if(smlDoubleToInt64OverFlow(result)){
      errno = 0;
      int64_t tmp = taosStr2Int64(pVal, &endptr, 10);
      if(errno == ERANGE){
        smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_BIGINT;
      kvVal->i = tmp;
      return true;
wmmhello's avatar
wmmhello 已提交
595
    }
596
    kvVal->type = TSDB_DATA_TYPE_BIGINT;
wmmhello's avatar
wmmhello 已提交
597
    kvVal->i = (int64_t)result;
598
  }else if ((left == 3 && strncasecmp(endptr, "u64", left) == 0)){
wmmhello's avatar
wmmhello 已提交
599 600 601 602 603 604 605 606 607 608
    if(result >= (double)UINT64_MAX || result < 0){
      errno = 0;
      uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10);
      if(errno == ERANGE || result < 0){
        smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal);
        return false;
      }
      kvVal->type = TSDB_DATA_TYPE_UBIGINT;
      kvVal->u = tmp;
      return true;
609
    }
610
    kvVal->type = TSDB_DATA_TYPE_UBIGINT;
wmmhello's avatar
wmmhello 已提交
611
    kvVal->u = result;
612 613 614 615
  }else if (left == 3 && strncasecmp(endptr, "i32", left) == 0){
    if(!IS_VALID_INT(result)){
      smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
616
    }
617 618 619 620 621 622
    kvVal->type = TSDB_DATA_TYPE_INT;
    kvVal->i = result;
  }else if (left == 3 && strncasecmp(endptr, "u32", left) == 0){
    if(!IS_VALID_UINT(result)){
      smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
623
    }
624 625 626 627 628 629
    kvVal->type = TSDB_DATA_TYPE_UINT;
    kvVal->u = result;
  }else if (left == 3 && strncasecmp(endptr, "i16", left) == 0){
    if(!IS_VALID_SMALLINT(result)){
      smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
630
    }
631 632 633 634 635 636
    kvVal->type = TSDB_DATA_TYPE_SMALLINT;
    kvVal->i = result;
  }else if (left == 3 && strncasecmp(endptr, "u16", left) == 0){
    if(!IS_VALID_USMALLINT(result)){
      smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
637
    }
638 639 640 641 642 643
    kvVal->type = TSDB_DATA_TYPE_USMALLINT;
    kvVal->u = result;
  }else if (left == 2 && strncasecmp(endptr, "i8", left) == 0){
    if(!IS_VALID_TINYINT(result)){
      smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
644
    }
645 646 647 648 649 650
    kvVal->type = TSDB_DATA_TYPE_TINYINT;
    kvVal->i = result;
  }else if (left == 2 && strncasecmp(endptr, "u8", left) == 0){
    if(!IS_VALID_UTINYINT(result)){
      smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", pVal);
      return false;
wmmhello's avatar
wmmhello 已提交
651
    }
652 653 654 655
    kvVal->type = TSDB_DATA_TYPE_UTINYINT;
    kvVal->u = result;
  }else{
    smlBuildInvalidDataMsg(msg, "invalid data", pVal);
wmmhello's avatar
wmmhello 已提交
656 657
    return false;
  }
658
  return true;
wmmhello's avatar
wmmhello 已提交
659 660
}

wmmhello's avatar
wmmhello 已提交
661
static bool smlParseBool(SSmlKv *kvVal) {
wmmhello's avatar
wmmhello 已提交
662
  const char *pVal = kvVal->value;
wmmhello's avatar
wmmhello 已提交
663
  int32_t len = kvVal->length;
664
  if ((len == 1) && (pVal[0] == 't' || pVal[0] == 'T')) {
wmmhello's avatar
wmmhello 已提交
665
    kvVal->i = true;
wmmhello's avatar
wmmhello 已提交
666 667 668
    return true;
  }

669
  if ((len == 1) && (pVal[0] == 'f' || pVal[0] == 'F')) {
wmmhello's avatar
wmmhello 已提交
670
    kvVal->i = false;
wmmhello's avatar
wmmhello 已提交
671 672 673
    return true;
  }

wmmhello's avatar
wmmhello 已提交
674
  if((len == 4) && !strncasecmp(pVal, "true", len)) {
wmmhello's avatar
wmmhello 已提交
675
    kvVal->i = true;
wmmhello's avatar
wmmhello 已提交
676 677
    return true;
  }
wmmhello's avatar
wmmhello 已提交
678
  if((len == 5) && !strncasecmp(pVal, "false", len)) {
wmmhello's avatar
wmmhello 已提交
679
    kvVal->i = false;
wmmhello's avatar
wmmhello 已提交
680 681 682 683 684
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
685
static bool smlIsBinary(const char *pVal, uint16_t len) {
wmmhello's avatar
wmmhello 已提交
686 687 688 689 690 691 692 693 694 695
  //binary: "abc"
  if (len < 2) {
    return false;
  }
  if (pVal[0] == '"' && pVal[len - 1] == '"') {
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
696
static bool smlIsNchar(const char *pVal, uint16_t len) {
wmmhello's avatar
wmmhello 已提交
697 698 699 700 701 702 703 704 705 706
  //nchar: L"abc"
  if (len < 3) {
    return false;
  }
  if ((pVal[0] == 'l' || pVal[0] == 'L')&& pVal[1] == '"' && pVal[len - 1] == '"') {
    return true;
  }
  return false;
}

707 708
static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) {
  char *endPtr = NULL;
wafwerar's avatar
wafwerar 已提交
709
  int64_t tsInt64 = taosStr2Int64(value, &endPtr, 10);
710 711
  if(value + len != endPtr){
    return -1;
wmmhello's avatar
wmmhello 已提交
712
  }
713
  double ts = tsInt64;
714 715
  switch (type) {
    case TSDB_TIME_PRECISION_HOURS:
wmmhello's avatar
wmmhello 已提交
716 717
      ts *= NANOSECOND_PER_HOUR;
      tsInt64 *= NANOSECOND_PER_HOUR;
718 719
      break;
    case TSDB_TIME_PRECISION_MINUTES:
wmmhello's avatar
wmmhello 已提交
720 721
      ts *= NANOSECOND_PER_MINUTE;
      tsInt64 *= NANOSECOND_PER_MINUTE;
722 723
      break;
    case TSDB_TIME_PRECISION_SECONDS:
wmmhello's avatar
wmmhello 已提交
724 725
      ts *= NANOSECOND_PER_SEC;
      tsInt64 *= NANOSECOND_PER_SEC;
726 727
      break;
    case TSDB_TIME_PRECISION_MILLI:
wmmhello's avatar
wmmhello 已提交
728 729
      ts *= NANOSECOND_PER_MSEC;
      tsInt64 *= NANOSECOND_PER_MSEC;
730 731
      break;
    case TSDB_TIME_PRECISION_MICRO:
wmmhello's avatar
wmmhello 已提交
732 733
      ts *= NANOSECOND_PER_USEC;
      tsInt64 *= NANOSECOND_PER_USEC;
734 735 736 737 738
      break;
    case TSDB_TIME_PRECISION_NANO:
      break;
    default:
      ASSERT(0);
wmmhello's avatar
wmmhello 已提交
739
  }
wmmhello's avatar
wmmhello 已提交
740
  if(ts >= (double)INT64_MAX || ts < 0){
741
    return -1;
wmmhello's avatar
wmmhello 已提交
742 743
  }

744
  return tsInt64;
745 746 747 748 749 750
}

static int8_t smlGetTsTypeByLen(int32_t len) {
  if (len == TSDB_TIME_PRECISION_SEC_DIGITS) {
    return TSDB_TIME_PRECISION_SECONDS;
  } else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) {
751
    return TSDB_TIME_PRECISION_MILLI;
752 753
  } else {
    return -1;
wmmhello's avatar
wmmhello 已提交
754
  }
755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773
}

static int8_t smlGetTsTypeByPrecision(int8_t precision) {
  switch (precision) {
    case TSDB_SML_TIMESTAMP_HOURS:
      return TSDB_TIME_PRECISION_HOURS;
    case TSDB_SML_TIMESTAMP_MILLI_SECONDS:
      return TSDB_TIME_PRECISION_MILLI;
    case TSDB_SML_TIMESTAMP_NANO_SECONDS:
    case TSDB_SML_TIMESTAMP_NOT_CONFIGURED:
      return TSDB_TIME_PRECISION_NANO;
    case TSDB_SML_TIMESTAMP_MICRO_SECONDS:
      return TSDB_TIME_PRECISION_MICRO;
    case TSDB_SML_TIMESTAMP_SECONDS:
      return TSDB_TIME_PRECISION_SECONDS;
    case TSDB_SML_TIMESTAMP_MINUTES:
      return TSDB_TIME_PRECISION_MINUTES;
    default:
      return -1;
wmmhello's avatar
wmmhello 已提交
774
  }
775 776 777
}

static int64_t smlParseInfluxTime(SSmlHandle* info, const char* data, int32_t len){
778 779
  if(len == 0 || (len == 1 && data[0] == '0')){
    return taosGetTimestampNs();
wmmhello's avatar
wmmhello 已提交
780 781
  }

782 783 784 785
  int8_t tsType = smlGetTsTypeByPrecision(info->precision);
  if (tsType == -1) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp precision", NULL);
    return -1;
wmmhello's avatar
wmmhello 已提交
786
  }
787 788 789 790 791

  int64_t ts = smlGetTimeValue(data, len, tsType);
  if(ts == -1){
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
    return -1;
wmmhello's avatar
wmmhello 已提交
792
  }
793 794 795 796 797 798 799
  return ts;
}

static int64_t smlParseOpenTsdbTime(SSmlHandle* info, const char* data, int32_t len){
  if(!data){
    smlBuildInvalidDataMsg(&info->msgBuf, "timestamp can not be null", NULL);
    return -1;
wmmhello's avatar
wmmhello 已提交
800
  }
801 802 803
  if(len == 1 && data[0] == '0'){
    return taosGetTimestampNs();
  }
804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820
  int8_t tsType = smlGetTsTypeByLen(len);
  if (tsType == -1) {
    smlBuildInvalidDataMsg(&info->msgBuf, "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", data);
    return -1;
  }
  int64_t ts = smlGetTimeValue(data, len, tsType);
  if(ts == -1){
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
    return -1;
  }
  return ts;
}

static int32_t smlParseTS(SSmlHandle* info, const char* data, int32_t len, SArray *cols){
  int64_t ts = 0;
  if(info->protocol == TSDB_SML_LINE_PROTOCOL){
    ts = smlParseInfluxTime(info, data, len);
821
  }else if(info->protocol == TSDB_SML_TELNET_PROTOCOL){
822
    ts = smlParseOpenTsdbTime(info, data, len);
823 824
  }else{
    ASSERT(0);
825
  }
826

827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845
  if(ts == -1)  return TSDB_CODE_TSC_INVALID_TIME_STAMP;

  // add ts to
  SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
  if(!kv){
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  kv->key = TS;
  kv->keyLen = TS_LEN;
  kv->i = ts;
  kv->type = TSDB_DATA_TYPE_TIMESTAMP;
  kv->length = (int16_t)tDataTypes[kv->type].bytes;
  if(cols) taosArrayPush(cols, &kv);
  return TSDB_CODE_SUCCESS;
}

static bool smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
  //binary
wmmhello's avatar
wmmhello 已提交
846
  if (smlIsBinary(pVal->value, pVal->length)) {
847
    pVal->type = TSDB_DATA_TYPE_BINARY;
wmmhello's avatar
wmmhello 已提交
848
    pVal->length -= BINARY_ADD_LEN;
849 850 851 852
    pVal->value += (BINARY_ADD_LEN - 1);
    return true;
  }
  //nchar
wmmhello's avatar
wmmhello 已提交
853
  if (smlIsNchar(pVal->value, pVal->length)) {
854
    pVal->type = TSDB_DATA_TYPE_NCHAR;
wmmhello's avatar
wmmhello 已提交
855
    pVal->length -= NCHAR_ADD_LEN;
856 857 858 859 860 861 862 863
    pVal->value += (NCHAR_ADD_LEN - 1);
    return true;
  }

  //bool
  if (smlParseBool(pVal)) {
    pVal->type = TSDB_DATA_TYPE_BOOL;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
wmmhello's avatar
wmmhello 已提交
864 865
    return true;
  }
866 867
  //number
  if (smlParseNumber(pVal, msg)) {
wmmhello's avatar
wmmhello 已提交
868 869 870 871 872 873 874
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    return true;
  }

  return false;
}

875
static int32_t smlParseInfluxString(const char* sql, SSmlLineInfo *elements, SSmlMsgBuf *msg){
wmmhello's avatar
wmmhello 已提交
876
  if(!sql) return TSDB_CODE_SML_INVALID_DATA;
wmmhello's avatar
wmmhello 已提交
877 878 879
  JUMP_SPACE(sql)
  if(*sql == COMMA) return TSDB_CODE_SML_INVALID_DATA;
  elements->measure = sql;
wmmhello's avatar
wmmhello 已提交
880

wmmhello's avatar
wmmhello 已提交
881
  // parse measure
wmmhello's avatar
wmmhello 已提交
882
  while (*sql != '\0') {
wmmhello's avatar
wmmhello 已提交
883
    if((sql != elements->measure) && IS_SLASH_LETTER(sql)){
wmmhello's avatar
wmmhello 已提交
884
      MOVE_FORWARD_ONE(sql,strlen(sql) + 1);
wmmhello's avatar
wmmhello 已提交
885 886
      continue;
    }
wmmhello's avatar
wmmhello 已提交
887
    if(IS_COMMA(sql)){
wmmhello's avatar
wmmhello 已提交
888 889 890
      break;
    }

wmmhello's avatar
wmmhello 已提交
891 892 893
    if(IS_SPACE(sql)){
      break;
    }
wmmhello's avatar
wmmhello 已提交
894 895
    sql++;
  }
wmmhello's avatar
wmmhello 已提交
896
  elements->measureLen = sql - elements->measure;
897 898
  if(IS_INVALID_TABLE_LEN(elements->measureLen)) {
    smlBuildInvalidDataMsg(msg, "measure is empty or too large than 192", NULL);
wmmhello's avatar
wmmhello 已提交
899 900
    return TSDB_CODE_SML_INVALID_DATA;
  }
wmmhello's avatar
wmmhello 已提交
901

wmmhello's avatar
wmmhello 已提交
902 903 904 905 906 907 908 909 910 911 912
  // parse tag
  if(*sql == SPACE){
    elements->tagsLen = 0;
  }else{
    if(*sql == COMMA) sql++;
    elements->tags = sql;
    while (*sql != '\0') {
      if(IS_SPACE(sql)){
        break;
      }
      sql++;
wmmhello's avatar
wmmhello 已提交
913
    }
wmmhello's avatar
wmmhello 已提交
914
    elements->tagsLen = sql - elements->tags;
915
  }
wmmhello's avatar
wmmhello 已提交
916
  elements->measureTagsLen = sql - elements->measure;
wmmhello's avatar
wmmhello 已提交
917

wmmhello's avatar
wmmhello 已提交
918 919 920
  // parse cols
  JUMP_SPACE(sql)
  elements->cols = sql;
wmmhello's avatar
wmmhello 已提交
921
  bool isInQuote = false;
wmmhello's avatar
wmmhello 已提交
922
  while (*sql != '\0') {
wmmhello's avatar
wmmhello 已提交
923
    if(IS_QUOTE(sql)){
wmmhello's avatar
wmmhello 已提交
924 925
      isInQuote = !isInQuote;
    }
wmmhello's avatar
wmmhello 已提交
926
    if(!isInQuote && IS_SPACE(sql)){
wmmhello's avatar
wmmhello 已提交
927 928 929 930
      break;
    }
    sql++;
  }
931 932 933 934
  if(isInQuote){
    smlBuildInvalidDataMsg(msg, "only one quote", elements->cols);
    return TSDB_CODE_SML_INVALID_DATA;
  }
wmmhello's avatar
wmmhello 已提交
935
  elements->colsLen = sql - elements->cols;
wmmhello's avatar
wmmhello 已提交
936 937 938 939
  if(elements->colsLen == 0) {
    smlBuildInvalidDataMsg(msg, "cols is empty", NULL);
    return TSDB_CODE_SML_INVALID_DATA;
  }
wmmhello's avatar
wmmhello 已提交
940

wmmhello's avatar
wmmhello 已提交
941 942 943
  // parse timestamp
  JUMP_SPACE(sql)
  elements->timestamp = sql;
wmmhello's avatar
wmmhello 已提交
944
  while (*sql != '\0') {
wmmhello's avatar
wmmhello 已提交
945
    if(*sql == SPACE){
wmmhello's avatar
wmmhello 已提交
946 947 948 949
      break;
    }
    sql++;
  }
wmmhello's avatar
wmmhello 已提交
950
  elements->timestampLen = sql - elements->timestamp;
wmmhello's avatar
wmmhello 已提交
951 952 953 954

  return TSDB_CODE_SUCCESS;
}

955 956 957 958 959 960 961 962 963 964 965 966
static void smlParseTelnetElement(const char **sql, const char **data, int32_t *len){
  while (**sql != '\0') {
    if(**sql != SPACE && !(*data)) {
      *data = *sql;
    }else if (**sql == SPACE && *data) {
      *len = *sql - *data;
      break;
    }
    (*sql)++;
  }
}

967
static int32_t smlParseTelnetTags(const char* data, SArray *cols, char *childTableName, SHashObj *dumplicateKey, SSmlMsgBuf *msg){
wmmhello's avatar
wmmhello 已提交
968
  const char *sql = data;
969
  size_t childTableNameLen = strlen(tsSmlChildTableName);
wmmhello's avatar
wmmhello 已提交
970 971
  while(*sql != '\0'){
    JUMP_SPACE(sql)
wmmhello's avatar
wmmhello 已提交
972 973
    if(*sql == '\0') break;

wmmhello's avatar
wmmhello 已提交
974
    const char *key = sql;
975
    int32_t keyLen = 0;
wmmhello's avatar
wmmhello 已提交
976 977 978 979 980 981 982 983 984 985

    // parse key
    while(*sql != '\0'){
      if(*sql == SPACE) {
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      if(*sql == EQUAL) {
        keyLen = sql - key;
        sql++;
986 987
        break;
      }
wmmhello's avatar
wmmhello 已提交
988
      sql++;
989
    }
wmmhello's avatar
wmmhello 已提交
990

991
    if(IS_INVALID_COL_LEN(keyLen)){
992 993 994 995 996 997 998 999 1000
      smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
      return TSDB_CODE_SML_INVALID_DATA;
    }
    if(smlCheckDuplicateKey(key, keyLen, dumplicateKey)){
      smlBuildInvalidDataMsg(msg, "dumplicate key", key);
      return TSDB_CODE_TSC_DUP_TAG_NAMES;
    }

    // parse value
wmmhello's avatar
wmmhello 已提交
1001 1002 1003 1004 1005
    const char *value = sql;
    int32_t valueLen = 0;
    while(*sql != '\0') {
      // parse value
      if (*sql == SPACE) {
1006 1007
        break;
      }
wmmhello's avatar
wmmhello 已提交
1008 1009 1010 1011 1012
      if (*sql == EQUAL) {
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      sql++;
1013
    }
wmmhello's avatar
wmmhello 已提交
1014
    valueLen = sql - value;
wmmhello's avatar
wmmhello 已提交
1015

1016 1017 1018 1019
    if(valueLen == 0){
      smlBuildInvalidDataMsg(msg, "invalid value", value);
      return TSDB_CODE_SML_INVALID_DATA;
    }
wmmhello's avatar
wmmhello 已提交
1020

1021 1022 1023 1024 1025 1026 1027
    //handle child table name
    if(childTableNameLen != 0 && strncmp(key, tsSmlChildTableName, keyLen) == 0){
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, value, (valueLen < TSDB_TABLE_NAME_LEN ? valueLen : TSDB_TABLE_NAME_LEN));
      continue;
    }

1028 1029 1030 1031 1032 1033
    // add kv to SSmlKv
    SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
    if(!kv) return TSDB_CODE_OUT_OF_MEMORY;
    kv->key = key;
    kv->keyLen = keyLen;
    kv->value = value;
wmmhello's avatar
wmmhello 已提交
1034
    kv->length = valueLen;
wmmhello's avatar
wmmhello 已提交
1035
    kv->type = TSDB_DATA_TYPE_NCHAR;
1036 1037 1038 1039 1040 1041

    if(cols) taosArrayPush(cols, &kv);
  }

  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
1042

1043 1044 1045 1046 1047 1048
// format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
static int32_t smlParseTelnetString(SSmlHandle *info, const char* sql, SSmlTableInfo *tinfo, SArray *cols){
  if(!sql) return TSDB_CODE_SML_INVALID_DATA;

  // parse metric
  smlParseTelnetElement(&sql, &tinfo->sTableName, &tinfo->sTableNameLen);
1049
  if (!(tinfo->sTableName) || IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
    return TSDB_CODE_SML_INVALID_DATA;
  }

  // parse timestamp
  const char *timestamp = NULL;
  int32_t tLen = 0;
  smlParseTelnetElement(&sql, &timestamp, &tLen);
  if (!timestamp || tLen == 0) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
    return TSDB_CODE_SML_INVALID_DATA;
  }

  int32_t ret = smlParseTS(info, timestamp, tLen, cols);
  if (ret != TSDB_CODE_SUCCESS) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
    return TSDB_CODE_SML_INVALID_DATA;
  }

  // parse value
  const char *value = NULL;
  int32_t valueLen = 0;
  smlParseTelnetElement(&sql, &value, &valueLen);
  if (!value || valueLen == 0) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql);
    return TSDB_CODE_SML_INVALID_DATA;
  }

  SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
  if(!kv) return TSDB_CODE_OUT_OF_MEMORY;
  taosArrayPush(cols, &kv);
  kv->key = VALUE;
  kv->keyLen = VALUE_LEN;
  kv->value = value;
wmmhello's avatar
wmmhello 已提交
1084
  kv->length = valueLen;
wmmhello's avatar
wmmhello 已提交
1085
  if(!smlParseValue(kv, &info->msgBuf)){
1086 1087 1088 1089
    return TSDB_CODE_SML_INVALID_DATA;
  }

  // parse tags
1090
  ret = smlParseTelnetTags(sql, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
1091 1092 1093 1094 1095 1096 1097 1098
  if (ret != TSDB_CODE_SUCCESS) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
    return TSDB_CODE_SML_INVALID_DATA;
  }

  return TSDB_CODE_SUCCESS;
}

1099
static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, char *childTableName, bool isTag, SHashObj *dumplicateKey, SSmlMsgBuf *msg){
1100
  if(len == 0){
wmmhello's avatar
wmmhello 已提交
1101
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1102 1103
  }

1104
  size_t childTableNameLen = strlen(tsSmlChildTableName);
wmmhello's avatar
wmmhello 已提交
1105 1106 1107
  const char *sql = data;
  while(sql < data + len){
    const char *key = sql;
wmmhello's avatar
wmmhello 已提交
1108
    int32_t keyLen = 0;
wmmhello's avatar
wmmhello 已提交
1109 1110 1111 1112 1113 1114 1115 1116 1117 1118

    while(sql < data + len){
      // parse key
      if(IS_COMMA(sql)) {
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      if(IS_EQUAL(sql)) {
        keyLen = sql - key;
        sql++;
wmmhello's avatar
wmmhello 已提交
1119 1120
        break;
      }
wmmhello's avatar
wmmhello 已提交
1121
      sql++;
wmmhello's avatar
wmmhello 已提交
1122
    }
wmmhello's avatar
wmmhello 已提交
1123

1124
    if(IS_INVALID_COL_LEN(keyLen)){
wmmhello's avatar
wmmhello 已提交
1125
      smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
wmmhello's avatar
wmmhello 已提交
1126 1127
      return TSDB_CODE_SML_INVALID_DATA;
    }
1128
    if(smlCheckDuplicateKey(key, keyLen, dumplicateKey)){
1129
      smlBuildInvalidDataMsg(msg, "dumplicate key", key);
1130
      return TSDB_CODE_TSC_DUP_TAG_NAMES;
1131 1132
    }

wmmhello's avatar
wmmhello 已提交
1133
    // parse value
wmmhello's avatar
wmmhello 已提交
1134 1135
    const char *value = sql;
    int32_t valueLen = 0;
wmmhello's avatar
wmmhello 已提交
1136
    bool isInQuote = false;
wmmhello's avatar
wmmhello 已提交
1137 1138 1139
    while(sql < data + len) {
      // parse value
      if(!isTag && IS_QUOTE(sql)){
wmmhello's avatar
wmmhello 已提交
1140
        isInQuote = !isInQuote;
wmmhello's avatar
wmmhello 已提交
1141 1142
        sql++;
        continue;
wmmhello's avatar
wmmhello 已提交
1143
      }
wmmhello's avatar
wmmhello 已提交
1144
      if (!isInQuote && IS_COMMA(sql)) {
wmmhello's avatar
wmmhello 已提交
1145 1146
        break;
      }
wmmhello's avatar
wmmhello 已提交
1147 1148 1149 1150 1151
      if (!isInQuote && IS_EQUAL(sql)) {
        smlBuildInvalidDataMsg(msg, "invalid data", sql);
        return TSDB_CODE_SML_INVALID_DATA;
      }
      sql++;
wmmhello's avatar
wmmhello 已提交
1152
    }
wmmhello's avatar
wmmhello 已提交
1153 1154 1155 1156
    valueLen = sql - value;
    sql++;

    if(isInQuote){
wmmhello's avatar
wmmhello 已提交
1157 1158 1159
      smlBuildInvalidDataMsg(msg, "only one quote", value);
      return TSDB_CODE_SML_INVALID_DATA;
    }
wmmhello's avatar
wmmhello 已提交
1160
    if(valueLen == 0){
wmmhello's avatar
wmmhello 已提交
1161
      smlBuildInvalidDataMsg(msg, "invalid value", value);
wmmhello's avatar
wmmhello 已提交
1162 1163
      return TSDB_CODE_SML_INVALID_DATA;
    }
wmmhello's avatar
wmmhello 已提交
1164 1165
    PROCESS_SLASH(key, keyLen)
    PROCESS_SLASH(value, valueLen)
wmmhello's avatar
wmmhello 已提交
1166

1167
    //handle child table name
1168
    if(childTableName && childTableNameLen != 0 && strncmp(key, tsSmlChildTableName, keyLen) == 0){
1169 1170 1171 1172 1173
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, value, (valueLen < TSDB_TABLE_NAME_LEN ? valueLen : TSDB_TABLE_NAME_LEN));
      continue;
    }

wmmhello's avatar
wmmhello 已提交
1174
    // add kv to SSmlKv
wmmhello's avatar
wmmhello 已提交
1175
    SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
1176 1177 1178
    if(!kv) return TSDB_CODE_OUT_OF_MEMORY;
    if(cols) taosArrayPush(cols, &kv);

wmmhello's avatar
wmmhello 已提交
1179 1180 1181
    kv->key = key;
    kv->keyLen = keyLen;
    kv->value = value;
wmmhello's avatar
wmmhello 已提交
1182
    kv->length = valueLen;
wmmhello's avatar
wmmhello 已提交
1183 1184
    if(isTag){
      kv->type = TSDB_DATA_TYPE_NCHAR;
wmmhello's avatar
wmmhello 已提交
1185
    }else{
wmmhello's avatar
wmmhello 已提交
1186
      if(!smlParseValue(kv, msg)){
wmmhello's avatar
wmmhello 已提交
1187 1188
        return TSDB_CODE_SML_INVALID_DATA;
      }
wmmhello's avatar
wmmhello 已提交
1189 1190
    }
  }
wmmhello's avatar
wmmhello 已提交
1191

wmmhello's avatar
wmmhello 已提交
1192 1193 1194
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
1195
static bool smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, SSmlMsgBuf *msg){
1196
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {  //jump timestamp
wmmhello's avatar
wmmhello 已提交
1197
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
wmmhello's avatar
wmmhello 已提交
1198

wmmhello's avatar
wmmhello 已提交
1199 1200 1201 1202 1203 1204 1205 1206 1207 1208
    int16_t *index = (int16_t *)taosHashGet(metaHash, kv->key, kv->keyLen);
    if(index){
      SSmlKv **value = (SSmlKv **)taosArrayGet(metaArray, *index);
      if(kv->type != (*value)->type){
        smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
        return false;
      }else{
        if(IS_VAR_DATA_TYPE(kv->type)){     // update string len, if bigger
          if(kv->length > (*value)->length){
            *value = kv;
1209 1210 1211
          }
        }
      }
wmmhello's avatar
wmmhello 已提交
1212 1213 1214 1215 1216 1217
    }else{
      size_t tmp = taosArrayGetSize(metaArray);
      ASSERT(tmp <= INT16_MAX);
      int16_t size = tmp;
      taosArrayPush(metaArray, &kv);
      taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
1218
    }
wmmhello's avatar
wmmhello 已提交
1219
  }
wmmhello's avatar
wmmhello 已提交
1220

1221
  return true;
wmmhello's avatar
wmmhello 已提交
1222 1223
}

wmmhello's avatar
wmmhello 已提交
1224 1225 1226 1227 1228
static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols){
  for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
    taosArrayPush(metaArray, &kv);
    taosHashPut(metaHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
wmmhello's avatar
wmmhello 已提交
1229 1230 1231
  }
}

1232 1233 1234 1235
static SSmlTableInfo* smlBuildTableInfo(){
  SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
  if(!tag){
    return NULL;
wmmhello's avatar
wmmhello 已提交
1236
  }
1237 1238 1239 1240 1241

  tag->cols = taosArrayInit(16, POINTER_BYTES);
  if (tag->cols == NULL) {
    uError("SML:smlBuildTableInfo failed to allocate memory");
    goto cleanup;
wmmhello's avatar
wmmhello 已提交
1242
  }
1243 1244 1245 1246 1247

  tag->tags = taosArrayInit(16, POINTER_BYTES);
  if (tag->tags == NULL) {
    uError("SML:smlBuildTableInfo failed to allocate memory");
    goto cleanup;
wmmhello's avatar
wmmhello 已提交
1248
  }
1249 1250 1251 1252 1253
  return tag;

cleanup:
  taosMemoryFree(tag);
  return NULL;
wmmhello's avatar
wmmhello 已提交
1254 1255
}

wmmhello's avatar
wmmhello 已提交
1256 1257
static void smlDestroyTableInfo(SSmlHandle* info, SSmlTableInfo *tag){
  if(info->dataFormat){
1258 1259 1260
    for(size_t i = 0; i < taosArrayGetSize(tag->cols); i++){
      SArray *kvArray = (SArray *)taosArrayGetP(tag->cols, i);
      for (int j = 0; j < taosArrayGetSize(kvArray); ++j) {
wmmhello's avatar
wmmhello 已提交
1261 1262 1263 1264 1265
        SSmlKv *p = (SSmlKv *)taosArrayGetP(kvArray, j);
        if(info->protocol == TSDB_SML_JSON_PROTOCOL &&
            (p->type == TSDB_DATA_TYPE_NCHAR || p->type == TSDB_DATA_TYPE_BINARY)){
          taosMemoryFree((void*)p->value);
        }
1266 1267 1268 1269
        taosMemoryFree(p);
      }
      taosArrayDestroy(kvArray);
    }
wmmhello's avatar
wmmhello 已提交
1270
  }else{
1271 1272 1273 1274 1275 1276 1277 1278 1279 1280
    for(size_t i = 0; i < taosArrayGetSize(tag->cols); i++){
      SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i);
      void** p1 = (void**)taosHashIterate(kvHash, NULL);
      while (p1) {
        taosMemoryFree(*p1);
        p1 = (void**)taosHashIterate(kvHash, p1);
      }
      taosHashCleanup(kvHash);
    }
  }
wmmhello's avatar
wmmhello 已提交
1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293
  for(size_t i = 0; i < taosArrayGetSize(tag->tags); i++){
    SSmlKv *p = (SSmlKv *)taosArrayGetP(tag->tags, i);
    if(info->protocol == TSDB_SML_JSON_PROTOCOL){
      taosMemoryFree((void*)p->key);
      if(p->type == TSDB_DATA_TYPE_NCHAR || p->type == TSDB_DATA_TYPE_BINARY){
        taosMemoryFree((void*)p->value);
      }
    }
    taosMemoryFree(p);
  }
  if(info->protocol == TSDB_SML_JSON_PROTOCOL && tag->sTableName){
    taosMemoryFree((void*)tag->sTableName);
  }
1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311
  taosArrayDestroy(tag->cols);
  taosArrayDestroy(tag->tags);
  taosMemoryFree(tag);
}

static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *cols){
  if(dataFormat){
    taosArrayPush(oneTable->cols, &cols);
    return TSDB_CODE_SUCCESS;
  }

  SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if(!kvHash){
    uError("SML:smlDealCols failed to allocate memory");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  for(size_t i = 0; i < taosArrayGetSize(cols); i++){
    SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
wmmhello's avatar
wmmhello 已提交
1312
    taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329
  }
  taosArrayPush(oneTable->cols, &kvHash);

  return TSDB_CODE_SUCCESS;
}

static SSmlSTableMeta* smlBuildSTableMeta(){
  SSmlSTableMeta* meta = (SSmlSTableMeta*)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
  if(!meta){
    return NULL;
  }
  meta->tagHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (meta->tagHash == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }

wmmhello's avatar
wmmhello 已提交
1330 1331
  meta->colHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (meta->colHash == NULL) {
1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }

  meta->tags = taosArrayInit(32, POINTER_BYTES);
  if (meta->tags == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }

  meta->cols = taosArrayInit(32, POINTER_BYTES);
  if (meta->cols == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }
  return meta;

cleanup:
  taosMemoryFree(meta);
  return NULL;
}

static void smlDestroySTableMeta(SSmlSTableMeta *meta){
  taosHashCleanup(meta->tagHash);
wmmhello's avatar
wmmhello 已提交
1356
  taosHashCleanup(meta->colHash);
1357 1358 1359
  taosArrayDestroy(meta->tags);
  taosArrayDestroy(meta->cols);
  taosMemoryFree(meta->tableMeta);
wmmhello's avatar
wmmhello 已提交
1360
  taosMemoryFree(meta);
1361 1362 1363 1364 1365
}

static void smlDestroyCols(SArray *cols) {
  if (!cols) return;
  for (int i = 0; i < taosArrayGetSize(cols); ++i) {
wmmhello's avatar
wmmhello 已提交
1366
    void *kv = taosArrayGetP(cols, i);
1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378
    taosMemoryFree(kv);
  }
}

static void smlDestroyInfo(SSmlHandle* info){
  if(!info) return;
  qDestroyQuery(info->pQuery);
  smlDestroyHandle(info->exec);

  // destroy info->childTables
  void** p1 = (void**)taosHashIterate(info->childTables, NULL);
  while (p1) {
wmmhello's avatar
wmmhello 已提交
1379
    smlDestroyTableInfo(info, (SSmlTableInfo*)(*p1));
1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394
    p1 = (void**)taosHashIterate(info->childTables, p1);
  }
  taosHashCleanup(info->childTables);

  // destroy info->superTables
  p1 = (void**)taosHashIterate(info->superTables, NULL);
  while (p1) {
    smlDestroySTableMeta((SSmlSTableMeta*)(*p1));
    p1 = (void**)taosHashIterate(info->superTables, p1);
  }
  taosHashCleanup(info->superTables);

  // destroy info->pVgHash
  taosHashCleanup(info->pVgHash);
  taosHashCleanup(info->dumplicateKey);
wmmhello's avatar
wmmhello 已提交
1395 1396 1397
  if(!info->dataFormat){
    taosArrayDestroy(info->colsContainer);
  }
wmmhello's avatar
wmmhello 已提交
1398
  destroyRequest(info->pRequest);
1399 1400 1401
  taosMemoryFreeClear(info);
}

1402
static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocolType protocol, int8_t precision){
1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428
  int32_t code = TSDB_CODE_SUCCESS;
  SSmlHandle* info = (SSmlHandle*)taosMemoryCalloc(1, sizeof(SSmlHandle));
  if (NULL == info) {
    return NULL;
  }
  info->id          = smlGenId();

  info->pQuery      = (SQuery *)taosMemoryCalloc(1, sizeof(SQuery));
  if (NULL == info->pQuery) {
    uError("SML:0x%"PRIx64" create info->pQuery error", info->id);
    goto cleanup;
  }
  info->pQuery->execMode      = QUERY_EXEC_MODE_SCHEDULE;
  info->pQuery->haveResultSet = false;
  info->pQuery->msgType       = TDMT_VND_SUBMIT;
  info->pQuery->pRoot         = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
  if(NULL == info->pQuery->pRoot){
    uError("SML:0x%"PRIx64" create info->pQuery->pRoot error", info->id);
    goto cleanup;
  }
  ((SVnodeModifOpStmt*)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV;

  info->taos        = (STscObj *)taos;

  info->precision   = precision;
  info->protocol    = protocol;
1429 1430 1431 1432 1433
  if(protocol == TSDB_SML_LINE_PROTOCOL){
    info->dataFormat = tsSmlDataFormat;
  }else{
    info->dataFormat = true;
  }
1434 1435 1436 1437 1438 1439 1440 1441 1442 1443
  info->pRequest    = request;
  info->msgBuf.buf  = info->pRequest->msgBuf;
  info->msgBuf.len  = ERROR_MSG_BUF_DEFAULT_SIZE;

  info->exec        = smlInitHandle(info->pQuery);
  info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  info->pVgHash     = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);

  info->dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
1444
  if(!info->dataFormat){
1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465
    info->colsContainer = taosArrayInit(32, POINTER_BYTES);
    if(NULL == info->colsContainer){
      uError("SML:0x%"PRIx64" create info failed", info->id);
      goto cleanup;
    }
  }
  if(NULL == info->exec || NULL == info->childTables
      || NULL == info->superTables || NULL == info->pVgHash
      || NULL == info->dumplicateKey){
    uError("SML:0x%"PRIx64" create info failed", info->id);
    goto cleanup;
  }

  return info;
cleanup:
  smlDestroyInfo(info);
  return NULL;
}

/************* TSDB_SML_JSON_PROTOCOL function start **************/
static int32_t smlJsonCreateSring(const char **output, char *input, int32_t inputLen){
1466
  *output = (const char *)taosMemoryMalloc(inputLen);
1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481
  if (*output == NULL){
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  memcpy((void*)(*output), input, inputLen);
  return TSDB_CODE_SUCCESS;
}

static int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *root, SSmlTableInfo *tinfo) {
  cJSON *metric = cJSON_GetObjectItem(root, "metric");
  if (!cJSON_IsString(metric)) {
    return  TSDB_CODE_TSC_INVALID_JSON;
  }

  tinfo->sTableNameLen = strlen(metric->valuestring);
1482 1483
  if (IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
    uError("OTD:0x%"PRIx64" Metric lenght is 0 or large than 192", info->id);
1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
  }

  return smlJsonCreateSring(&tinfo->sTableName, metric->valuestring, tinfo->sTableNameLen);
}

static int32_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int64_t *tsVal) {
  int32_t size = cJSON_GetArraySize(root);
  if (size != OTD_JSON_SUB_FIELDS_NUM) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *value = cJSON_GetObjectItem(root, "value");
  if (!cJSON_IsNumber(value)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *type = cJSON_GetObjectItem(root, "type");
  if (!cJSON_IsString(type)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  double timeDouble = value->valuedouble;
  if(smlDoubleToInt64OverFlow(timeDouble)){
    smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
    return TSDB_CODE_TSC_INVALID_TIME_STAMP;
  }
  if(timeDouble <= 0){
    return TSDB_CODE_TSC_INVALID_TIME_STAMP;
  }

  size_t typeLen = strlen(type->valuestring);
wmmhello's avatar
wmmhello 已提交
1516
  if (typeLen == 1 && (type->valuestring[0] == 's' || type->valuestring[0] == 'S')) {
1517 1518 1519 1520 1521 1522 1523
    //seconds
    timeDouble = timeDouble * 1e9;
    if(smlDoubleToInt64OverFlow(timeDouble)){
      smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
      return TSDB_CODE_TSC_INVALID_TIME_STAMP;
    }
    *tsVal = timeDouble;
wmmhello's avatar
wmmhello 已提交
1524
  } else if (typeLen == 2 && (type->valuestring[1] == 's' || type->valuestring[1] == 'S')) {
1525 1526
    switch (type->valuestring[0]) {
      case 'm':
wmmhello's avatar
wmmhello 已提交
1527
      case 'M':
1528 1529 1530 1531 1532 1533 1534 1535 1536
        //milliseconds
        timeDouble = timeDouble * 1e6;
        if(smlDoubleToInt64OverFlow(timeDouble)){
          smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
          return TSDB_CODE_TSC_INVALID_TIME_STAMP;
        }
        *tsVal = timeDouble;
        break;
      case 'u':
wmmhello's avatar
wmmhello 已提交
1537
      case 'U':
1538 1539 1540 1541 1542 1543 1544 1545 1546
        //microseconds
        timeDouble = timeDouble * 1e3;
        if(smlDoubleToInt64OverFlow(timeDouble)){
          smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
          return TSDB_CODE_TSC_INVALID_TIME_STAMP;
        }
        *tsVal = timeDouble;
        break;
      case 'n':
wmmhello's avatar
wmmhello 已提交
1547
      case 'N':
1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581
        //nanoseconds
        *tsVal = timeDouble;
        break;
      default:
        return TSDB_CODE_TSC_INVALID_JSON;
    }
  } else {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  return TSDB_CODE_SUCCESS;
}

static uint8_t smlGetTimestampLen(int64_t num) {
  uint8_t len = 0;
  while ((num /= 10) != 0) {
    len++;
  }
  len++;
  return len;
}

static int32_t smlParseTSFromJSON(SSmlHandle *info, cJSON *root, SArray *cols) {
  //Timestamp must be the first KV to parse
  int64_t tsVal = 0;

  cJSON *timestamp = cJSON_GetObjectItem(root, "timestamp");
  if (cJSON_IsNumber(timestamp)) {
    //timestamp value 0 indicates current system time
    double timeDouble = timestamp->valuedouble;
    if(smlDoubleToInt64OverFlow(timeDouble)){
      smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
      return TSDB_CODE_TSC_INVALID_TIME_STAMP;
    }
wmmhello's avatar
wmmhello 已提交
1582 1583

    if(timeDouble < 0){
1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600
      return TSDB_CODE_TSC_INVALID_TIME_STAMP;
    }
    uint8_t tsLen = smlGetTimestampLen((int64_t)timeDouble);
    if (tsLen == TSDB_TIME_PRECISION_SEC_DIGITS) {
      timeDouble = timeDouble * 1e9;
      if(smlDoubleToInt64OverFlow(timeDouble)){
        smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
        return TSDB_CODE_TSC_INVALID_TIME_STAMP;
      }
      tsVal = timeDouble;
    } else if (tsLen == TSDB_TIME_PRECISION_MILLI_DIGITS) {
      timeDouble = timeDouble * 1e6;
      if(smlDoubleToInt64OverFlow(timeDouble)){
        smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
        return TSDB_CODE_TSC_INVALID_TIME_STAMP;
      }
      tsVal = timeDouble;
wmmhello's avatar
wmmhello 已提交
1601 1602 1603
    } else if(timeDouble == 0){
      tsVal = taosGetTimestampNs();
    }else {
1604 1605 1606 1607 1608 1609 1610 1611 1612 1613
      return TSDB_CODE_TSC_INVALID_TIME_STAMP;
    }
  } else if (cJSON_IsObject(timestamp)) {
    int32_t ret = smlParseTSFromJSONObj(info, timestamp, &tsVal);
    if (ret != TSDB_CODE_SUCCESS) {
      uError("SML:0x%"PRIx64" Failed to parse timestamp from JSON Obj", info->id);
      return ret;
    }
  } else {
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
1614 1615
  }

wmmhello's avatar
wmmhello 已提交
1616
  // add ts to
wmmhello's avatar
wmmhello 已提交
1617
  SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
wmmhello's avatar
wmmhello 已提交
1618 1619 1620 1621
  if(!kv){
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  kv->key = TS;
1622 1623
  kv->keyLen = TS_LEN;
  kv->i = tsVal;
wmmhello's avatar
wmmhello 已提交
1624
  kv->type = TSDB_DATA_TYPE_TIMESTAMP;
wmmhello's avatar
wmmhello 已提交
1625
  kv->length = (int16_t)tDataTypes[kv->type].bytes;
wmmhello's avatar
wmmhello 已提交
1626
  if(cols) taosArrayPush(cols, &kv);
wmmhello's avatar
wmmhello 已提交
1627
  return TSDB_CODE_SUCCESS;
1628

wmmhello's avatar
wmmhello 已提交
1629 1630
}

1631 1632 1633 1634 1635 1636 1637 1638
static int32_t smlConvertJSONBool(SSmlKv *pVal, char* typeStr, cJSON *value) {
  if (strcasecmp(typeStr, "bool") != 0) {
    uError("OTD:invalid type(%s) for JSON Bool", typeStr);
    return TSDB_CODE_TSC_INVALID_JSON_TYPE;
  }
  pVal->type = TSDB_DATA_TYPE_BOOL;
  pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
  pVal->i = value->valueint;
wmmhello's avatar
wmmhello 已提交
1639

1640 1641
  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
1642

1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684
static int32_t smlConvertJSONNumber(SSmlKv *pVal, char* typeStr, cJSON *value) {
  //tinyint
  if (strcasecmp(typeStr, "i8") == 0 ||
      strcasecmp(typeStr, "tinyint") == 0) {
    if (!IS_VALID_TINYINT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(tinyint)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_TINYINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
  //smallint
  if (strcasecmp(typeStr, "i16") == 0 ||
      strcasecmp(typeStr, "smallint") == 0) {
    if (!IS_VALID_SMALLINT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(smallint)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_SMALLINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
  //int
  if (strcasecmp(typeStr, "i32") == 0 ||
      strcasecmp(typeStr, "int") == 0) {
    if (!IS_VALID_INT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(int)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_INT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->i = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
  //bigint
  if (strcasecmp(typeStr, "i64") == 0 ||
      strcasecmp(typeStr, "bigint") == 0) {
    pVal->type = TSDB_DATA_TYPE_BIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
1685 1686 1687 1688 1689 1690
    if(value->valuedouble >= (double)INT64_MAX){
      pVal->i = INT64_MAX;
    }else if(value->valuedouble <= (double)INT64_MIN){
      pVal->i = INT64_MIN;
    }else{
      pVal->i = value->valuedouble;
1691 1692 1693 1694 1695 1696 1697 1698 1699
    }
    return TSDB_CODE_SUCCESS;
  }
  //float
  if (strcasecmp(typeStr, "f32") == 0 ||
      strcasecmp(typeStr, "float") == 0) {
    if (!IS_VALID_FLOAT(value->valuedouble)) {
      uError("OTD:JSON value(%f) cannot fit in type(float)", value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
wmmhello's avatar
wmmhello 已提交
1700
    }
1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712
    pVal->type = TSDB_DATA_TYPE_FLOAT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->f = value->valuedouble;
    return TSDB_CODE_SUCCESS;
  }
  //double
  if (strcasecmp(typeStr, "f64") == 0 ||
      strcasecmp(typeStr, "double") == 0) {
    pVal->type = TSDB_DATA_TYPE_DOUBLE;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->d = value->valuedouble;
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1713 1714
  }

1715 1716 1717 1718
  //if reach here means type is unsupported
  uError("OTD:invalid type(%s) for JSON Number", typeStr);
  return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
1719

1720 1721 1722 1723 1724 1725 1726 1727 1728 1729
static int32_t smlConvertJSONString(SSmlKv *pVal, char* typeStr, cJSON *value) {
  if (strcasecmp(typeStr, "binary") == 0) {
    pVal->type = TSDB_DATA_TYPE_BINARY;
  } else if (strcasecmp(typeStr, "nchar") == 0) {
    pVal->type = TSDB_DATA_TYPE_NCHAR;
  } else {
    uError("OTD:invalid type(%s) for JSON String", typeStr);
    return TSDB_CODE_TSC_INVALID_JSON_TYPE;
  }
  pVal->length = (int16_t)strlen(value->valuestring);
wmmhello's avatar
wmmhello 已提交
1730
  return smlJsonCreateSring(&pVal->value, value->valuestring, pVal->length);
1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756
}

static int32_t smlParseValueFromJSONObj(cJSON *root, SSmlKv *kv) {
  int32_t ret = TSDB_CODE_SUCCESS;
  int32_t size = cJSON_GetArraySize(root);

  if (size != OTD_JSON_SUB_FIELDS_NUM) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *value = cJSON_GetObjectItem(root, "value");
  if (value == NULL) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *type = cJSON_GetObjectItem(root, "type");
  if (!cJSON_IsString(type)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  switch (value->type) {
    case cJSON_True:
    case cJSON_False: {
      ret = smlConvertJSONBool(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
wmmhello's avatar
wmmhello 已提交
1757
      }
1758
      break;
wmmhello's avatar
wmmhello 已提交
1759
    }
1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775
    case cJSON_Number: {
      ret = smlConvertJSONNumber(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
      break;
    }
    case cJSON_String: {
      ret = smlConvertJSONString(kv, type->valuestring, value);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
      break;
    }
    default:
      return TSDB_CODE_TSC_INVALID_JSON_TYPE;
wmmhello's avatar
wmmhello 已提交
1776
  }
1777 1778

  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1779 1780
}

1781 1782 1783 1784 1785 1786 1787 1788
static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
  switch (root->type) {
    case cJSON_True:
    case cJSON_False: {
      kv->type = TSDB_DATA_TYPE_BOOL;
      kv->length = (int16_t)tDataTypes[kv->type].bytes;
      kv->i = root->valueint;
      break;
wmmhello's avatar
wmmhello 已提交
1789
    }
1790 1791 1792 1793 1794 1795 1796 1797 1798 1799
    case cJSON_Number: {
      kv->type = TSDB_DATA_TYPE_DOUBLE;
      kv->length = (int16_t)tDataTypes[kv->type].bytes;
      kv->d = root->valuedouble;
      break;
    }
    case cJSON_String: {
      /* set default JSON type to binary/nchar according to
       * user configured parameter tsDefaultJSONStrType
       */
wmmhello's avatar
wmmhello 已提交
1800

1801
      char *tsDefaultJSONStrType = "nchar";   //todo
1802 1803
      smlConvertJSONString(kv, tsDefaultJSONStrType, root);
      break;
wmmhello's avatar
wmmhello 已提交
1804
    }
1805 1806 1807 1808 1809 1810 1811 1812 1813 1814
    case cJSON_Object: {
      int32_t ret = smlParseValueFromJSONObj(root, kv);
      if (ret != TSDB_CODE_SUCCESS) {
        uError("OTD:Failed to parse value from JSON Obj");
        return ret;
      }
      break;
    }
    default:
      return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
1815
  }
1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838

  return TSDB_CODE_SUCCESS;
}

static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
  cJSON *metricVal = cJSON_GetObjectItem(root, "value");
  if (metricVal == NULL) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
  if(!kv){
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  if(cols) taosArrayPush(cols, &kv);

  kv->key = VALUE;
  kv->keyLen = VALUE_LEN;
  int32_t ret = smlParseValueFromJSON(metricVal, kv);
  if (ret != TSDB_CODE_SUCCESS) {
    return ret;
  }
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1839 1840
}

1841
static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, char *childTableName, SHashObj *dumplicateKey, SSmlMsgBuf *msg) {
1842 1843 1844 1845 1846 1847
  int32_t ret = TSDB_CODE_SUCCESS;

  cJSON *tags = cJSON_GetObjectItem(root, "tags");
  if (tags == NULL || tags->type != cJSON_Object) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }
wmmhello's avatar
wmmhello 已提交
1848

1849
  size_t childTableNameLen = strlen(tsSmlChildTableName);
1850 1851 1852 1853 1854
  int32_t tagNum = cJSON_GetArraySize(tags);
  for (int32_t i = 0; i < tagNum; ++i) {
    cJSON *tag = cJSON_GetArrayItem(tags, i);
    if (tag == NULL) {
      return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
1855
    }
1856 1857 1858 1859 1860
    size_t keyLen = strlen(tag->string);
    if (IS_INVALID_COL_LEN(keyLen)) {
      uError("OTD:Tag key length is 0 or too large than 64");
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
    }
1861
    //check duplicate keys
1862
    if (smlCheckDuplicateKey(tag->string, keyLen, dumplicateKey)) {
1863
      return TSDB_CODE_TSC_DUP_TAG_NAMES;
wmmhello's avatar
wmmhello 已提交
1864 1865
    }

1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876
    //handle child table name
    if(childTableNameLen != 0 && strcmp(tag->string, tsSmlChildTableName) == 0){
      if (!cJSON_IsString(tag)) {
        uError("OTD:ID must be JSON string");
        return TSDB_CODE_TSC_INVALID_JSON;
      }
      memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
      strncpy(childTableName, tag->valuestring, TSDB_TABLE_NAME_LEN);
      continue;
    }

1877 1878 1879 1880 1881 1882
    // add kv to SSmlKv
    SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
    if(!kv) return TSDB_CODE_OUT_OF_MEMORY;
    if(pKVs) taosArrayPush(pKVs, &kv);

    //key
1883
    kv->keyLen = keyLen;
1884 1885 1886 1887 1888 1889 1890 1891 1892
    ret = smlJsonCreateSring(&kv->key, tag->string, kv->keyLen);
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }
    //value
    ret = smlParseValueFromJSON(tag, kv);
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
    }
wmmhello's avatar
wmmhello 已提交
1893 1894
  }

1895
  return ret;
wmmhello's avatar
wmmhello 已提交
1896 1897 1898

}

1899 1900
static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo *tinfo, SArray *cols) {
  int32_t ret = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1901

1902 1903 1904
  if (!cJSON_IsObject(root)) {
    uError("OTD:0x%"PRIx64" data point needs to be JSON object", info->id);
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
1905
  }
1906

1907 1908 1909 1910 1911
  int32_t size = cJSON_GetArraySize(root);
  //outmost json fields has to be exactly 4
  if (size != OTD_JSON_FIELDS_NUM) {
    uError("OTD:0x%"PRIx64" Invalid number of JSON fields in data point %d", info->id, size);
    return TSDB_CODE_TSC_INVALID_JSON;
wmmhello's avatar
wmmhello 已提交
1912
  }
1913 1914 1915 1916 1917 1918

  //Parse metric
  ret = smlParseMetricFromJSON(info, root, tinfo);
  if (ret != TSDB_CODE_SUCCESS) {
    uError("OTD:0x%"PRIx64" Unable to parse metric from JSON payload", info->id);
    return ret;
wmmhello's avatar
wmmhello 已提交
1919
  }
1920
  uDebug("OTD:0x%"PRIx64" Parse metric from JSON payload finished", info->id);
wmmhello's avatar
wmmhello 已提交
1921

1922 1923 1924 1925 1926
  //Parse timestamp
  ret = smlParseTSFromJSON(info, root, cols);
  if (ret) {
    uError("OTD:0x%"PRIx64" Unable to parse timestamp from JSON payload", info->id);
    return ret;
wmmhello's avatar
wmmhello 已提交
1927
  }
1928
  uDebug("OTD:0x%"PRIx64" Parse timestamp from JSON payload finished", info->id);
1929

1930 1931 1932 1933 1934
  //Parse metric value
  ret = smlParseColsFromJSON(root, cols);
  if (ret) {
    uError("OTD:0x%"PRIx64" Unable to parse metric value from JSON payload", info->id);
    return ret;
1935
  }
1936
  uDebug("OTD:0x%"PRIx64" Parse metric value from JSON payload finished", info->id);
1937

1938
  //Parse tags
1939
  ret = smlParseTagsFromJSON(root, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
1940 1941 1942
  if (ret) {
    uError("OTD:0x%"PRIx64" Unable to parse tags from JSON payload", info->id);
    return ret;
1943
  }
1944
  uDebug("OTD:0x%"PRIx64" Parse tags from JSON payload finished", info->id);
wmmhello's avatar
wmmhello 已提交
1945

1946
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1947
}
1948
/************* TSDB_SML_JSON_PROTOCOL function end **************/
wmmhello's avatar
wmmhello 已提交
1949 1950


1951 1952

static int32_t smlParseInfluxLine(SSmlHandle* info, const char* sql) {
wmmhello's avatar
wmmhello 已提交
1953
  SSmlLineInfo elements = {0};
1954
  int ret = smlParseInfluxString(sql, &elements, &info->msgBuf);
wmmhello's avatar
wmmhello 已提交
1955
  if(ret != TSDB_CODE_SUCCESS){
1956
    uError("SML:0x%"PRIx64" smlParseInfluxLine failed", info->id);
wmmhello's avatar
wmmhello 已提交
1957 1958 1959
    return ret;
  }

1960 1961 1962 1963
  SArray *cols = NULL;
  if(info->dataFormat){   // if dataFormat, cols need new memory to save data
    cols = taosArrayInit(16, POINTER_BYTES);
    if (cols == NULL) {
1964
      uError("SML:0x%"PRIx64" smlParseInfluxLine failed to allocate memory", info->id);
1965 1966 1967 1968
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }else{      // if dataFormat is false, cols do not need to save data, there is another new memory to save data
    cols = info->colsContainer;
wmmhello's avatar
wmmhello 已提交
1969
  }
wmmhello's avatar
wmmhello 已提交
1970

wmmhello's avatar
wmmhello 已提交
1971
  ret = smlParseTS(info, elements.timestamp, elements.timestampLen, cols);
wmmhello's avatar
wmmhello 已提交
1972
  if(ret != TSDB_CODE_SUCCESS){
wmmhello's avatar
wmmhello 已提交
1973
    uError("SML:0x%"PRIx64" smlParseTS failed", info->id);
1974
    if(info->dataFormat) taosArrayDestroy(cols);
wmmhello's avatar
wmmhello 已提交
1975 1976
    return ret;
  }
1977
  ret = smlParseCols(elements.cols, elements.colsLen, cols, NULL, false, info->dumplicateKey, &info->msgBuf);
wmmhello's avatar
wmmhello 已提交
1978
  if(ret != TSDB_CODE_SUCCESS){
wmmhello's avatar
wmmhello 已提交
1979
    uError("SML:0x%"PRIx64" smlParseCols parse cloums fields failed", info->id);
1980 1981
    smlDestroyCols(cols);
    if(info->dataFormat) taosArrayDestroy(cols);
wmmhello's avatar
wmmhello 已提交
1982 1983
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
1984
  if(taosArrayGetSize(cols) > TSDB_MAX_COLUMNS){
wmmhello's avatar
wmmhello 已提交
1985
    smlBuildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL);
wmmhello's avatar
wmmhello 已提交
1986 1987
    return TSDB_CODE_SML_INVALID_DATA;
  }
wmmhello's avatar
wmmhello 已提交
1988

1989
  bool hasTable = true;
1990
  SSmlTableInfo *tinfo = NULL;
wmmhello's avatar
wmmhello 已提交
1991
  SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashGet(info->childTables, elements.measure, elements.measureTagsLen);
1992
  if(!oneTable){
1993
    tinfo = smlBuildTableInfo();
1994
    if(!tinfo){
wmmhello's avatar
wmmhello 已提交
1995 1996
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
1997
    taosHashPut(info->childTables, elements.measure, elements.measureTagsLen, &tinfo, POINTER_BYTES);
1998
    oneTable = &tinfo;
1999 2000 2001 2002 2003 2004 2005
    hasTable = false;
  }

  ret = smlDealCols(*oneTable, info->dataFormat, cols);
  if(ret != TSDB_CODE_SUCCESS){
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
2006

2007
  if(!hasTable){
2008
    ret = smlParseCols(elements.tags, elements.tagsLen, (*oneTable)->tags, (*oneTable)->childTableName, true, info->dumplicateKey, &info->msgBuf);
wmmhello's avatar
wmmhello 已提交
2009
    if(ret != TSDB_CODE_SUCCESS){
wmmhello's avatar
wmmhello 已提交
2010
      uError("SML:0x%"PRIx64" smlParseCols parse tag fields failed", info->id);
wmmhello's avatar
wmmhello 已提交
2011 2012 2013
      return ret;
    }

2014
    if(taosArrayGetSize((*oneTable)->tags) > TSDB_MAX_TAGS){
wmmhello's avatar
wmmhello 已提交
2015
      smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
wmmhello's avatar
wmmhello 已提交
2016 2017 2018
      return TSDB_CODE_SML_INVALID_DATA;
    }

2019 2020
    (*oneTable)->sTableName = elements.measure;
    (*oneTable)->sTableNameLen = elements.measureLen;
2021 2022 2023 2024 2025 2026 2027 2028 2029
    if(strlen((*oneTable)->childTableName) == 0){
      RandTableName rName = { (*oneTable)->tags, (*oneTable)->sTableName, (uint8_t)(*oneTable)->sTableNameLen,
                             (*oneTable)->childTableName, 0 };

      buildChildTableName(&rName);
      (*oneTable)->uid = rName.uid;
    }else{
      (*oneTable)->uid = *(uint64_t*)((*oneTable)->childTableName);
    }
wmmhello's avatar
wmmhello 已提交
2030

2031
  }
wmmhello's avatar
wmmhello 已提交
2032

2033 2034
  SSmlSTableMeta** tableMeta = (SSmlSTableMeta**)taosHashGet(info->superTables, elements.measure, elements.measureLen);
  if(tableMeta){  // update meta
wmmhello's avatar
wmmhello 已提交
2035 2036 2037 2038
    ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, cols, &info->msgBuf);
    if(!hasTable && ret){
      ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, (*oneTable)->tags, &info->msgBuf);
    }
2039 2040 2041
    if(!ret){
      uError("SML:0x%"PRIx64" smlUpdateMeta failed", info->id);
      return TSDB_CODE_SML_INVALID_DATA;
wmmhello's avatar
wmmhello 已提交
2042
    }
2043 2044
  }else{
    SSmlSTableMeta *meta = smlBuildSTableMeta();
wmmhello's avatar
wmmhello 已提交
2045 2046
    smlInsertMeta(meta->tagHash, meta->tags, (*oneTable)->tags);
    smlInsertMeta(meta->colHash, meta->cols, cols);
2047
    taosHashPut(info->superTables, elements.measure, elements.measureLen, &meta, POINTER_BYTES);
wmmhello's avatar
wmmhello 已提交
2048
  }
2049 2050 2051 2052 2053

  if(!info->dataFormat){
    taosArrayClear(info->colsContainer);
  }
  taosHashClear(info->dumplicateKey);
wmmhello's avatar
wmmhello 已提交
2054 2055 2056
  return TSDB_CODE_SUCCESS;
}

2057 2058 2059 2060 2061
static int32_t smlParseTelnetLine(SSmlHandle* info, void *data) {
  int ret = TSDB_CODE_SUCCESS;
  SSmlTableInfo *tinfo = smlBuildTableInfo();
  if(!tinfo){
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
2062 2063
  }

2064 2065 2066 2067
  SArray *cols = taosArrayInit(16, POINTER_BYTES);
  if (cols == NULL) {
    uError("SML:0x%"PRIx64" smlParseTelnetLine failed to allocate memory", info->id);
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
2068 2069
  }

2070
  if(info->protocol == TSDB_SML_TELNET_PROTOCOL){
2071
    ret = smlParseTelnetString(info, (const char*)data, tinfo, cols);
2072
  }else if(info->protocol == TSDB_SML_JSON_PROTOCOL){
2073
    ret = smlParseJSONString(info, (cJSON *)data, tinfo, cols);
2074 2075 2076 2077
  }else{
    ASSERT(0);
  }
  if(ret != TSDB_CODE_SUCCESS){
2078
    uError("SML:0x%"PRIx64" smlParseTelnetLine failed", info->id);
wmmhello's avatar
wmmhello 已提交
2079
    smlDestroyTableInfo(info, tinfo);
wmmhello's avatar
wmmhello 已提交
2080
    smlDestroyCols(cols);
2081 2082
    taosArrayDestroy(cols);
    return ret;
wmmhello's avatar
wmmhello 已提交
2083
  }
wmmhello's avatar
wmmhello 已提交
2084

2085 2086
  if(taosArrayGetSize(tinfo->tags) <= 0 || taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS){
    smlBuildInvalidDataMsg(&info->msgBuf, "invalidate tags length:[1,128]", NULL);
wmmhello's avatar
wmmhello 已提交
2087 2088 2089
    smlDestroyTableInfo(info, tinfo);
    smlDestroyCols(cols);
    taosArrayDestroy(cols);
2090
    return TSDB_CODE_SML_INVALID_DATA;
wmmhello's avatar
wmmhello 已提交
2091
  }
2092 2093
  taosHashClear(info->dumplicateKey);

2094 2095 2096 2097 2098 2099 2100 2101 2102
  if(strlen(tinfo->childTableName) == 0){
    RandTableName rName = { tinfo->tags, tinfo->sTableName, (uint8_t)tinfo->sTableNameLen,
                           tinfo->childTableName, 0 };
    buildChildTableName(&rName);
    tinfo->uid = rName.uid;
  }else{
    tinfo->uid = *(uint64_t*)(tinfo->childTableName);  // generate uid by name simple
  }

2103 2104

  bool hasTable = true;
2105
  SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashGet(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName));
2106 2107
  if(!oneTable) {
    taosHashPut(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName), &tinfo, POINTER_BYTES);
2108
    oneTable = &tinfo;
2109 2110
    hasTable = false;
  }else{
wmmhello's avatar
wmmhello 已提交
2111
    smlDestroyTableInfo(info, tinfo);
wmmhello's avatar
wmmhello 已提交
2112
  }
wmmhello's avatar
wmmhello 已提交
2113

2114
  taosArrayPush((*oneTable)->cols, &cols);
2115
  SSmlSTableMeta** tableMeta = (SSmlSTableMeta** )taosHashGet(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen);
2116
  if(tableMeta){  // update meta
wmmhello's avatar
wmmhello 已提交
2117 2118 2119 2120
    ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, cols, &info->msgBuf);
    if(!hasTable && ret){
      ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, (*oneTable)->tags, &info->msgBuf);
    }
2121 2122 2123 2124 2125 2126
    if(!ret){
      uError("SML:0x%"PRIx64" smlUpdateMeta failed", info->id);
      return TSDB_CODE_SML_INVALID_DATA;
    }
  }else{
    SSmlSTableMeta *meta = smlBuildSTableMeta();
wmmhello's avatar
wmmhello 已提交
2127 2128
    smlInsertMeta(meta->tagHash, meta->tags, (*oneTable)->tags);
    smlInsertMeta(meta->colHash, meta->cols, cols);
2129
    taosHashPut(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen, &meta, POINTER_BYTES);
wmmhello's avatar
wmmhello 已提交
2130 2131
  }

2132 2133
  return TSDB_CODE_SUCCESS;
}
wmmhello's avatar
wmmhello 已提交
2134

2135 2136 2137
static int32_t smlParseJSON(SSmlHandle *info, char* payload) {
  int32_t payloadNum = 0;
  int32_t ret = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2138

2139 2140 2141
  if (payload == NULL) {
    uError("SML:0x%"PRIx64" empty JSON Payload", info->id);
    return TSDB_CODE_TSC_INVALID_JSON;
2142
  }
2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157

  cJSON *root = cJSON_Parse(payload);
  if (root == NULL) {
    uError("SML:0x%"PRIx64" parse json failed:%s", info->id, payload);
    return TSDB_CODE_TSC_INVALID_JSON;
  }
  //multiple data points must be sent in JSON array
  if (cJSON_IsObject(root)) {
    payloadNum = 1;
  } else if (cJSON_IsArray(root)) {
    payloadNum = cJSON_GetArraySize(root);
  } else {
    uError("SML:0x%"PRIx64" Invalid JSON Payload", info->id);
    ret = TSDB_CODE_TSC_INVALID_JSON;
    goto end;
wmmhello's avatar
wmmhello 已提交
2158
  }
wmmhello's avatar
wmmhello 已提交
2159

2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171
  for (int32_t i = 0; i < payloadNum; ++i) {
    cJSON *dataPoint = (payloadNum == 1 && cJSON_IsObject(root)) ? root : cJSON_GetArrayItem(root, i);
    ret = smlParseTelnetLine(info, dataPoint);
    if(ret != TSDB_CODE_SUCCESS){
      uError("SML:0x%"PRIx64" Invalid JSON Payload", info->id);
      goto end;
    }
  }

end:
  cJSON_Delete(root);
  return ret;
wmmhello's avatar
wmmhello 已提交
2172
}
2173

wmmhello's avatar
wmmhello 已提交
2174
static int32_t smlInsertData(SSmlHandle* info) {
wmmhello's avatar
wmmhello 已提交
2175 2176
  int32_t code = TSDB_CODE_SUCCESS;

wmmhello's avatar
wmmhello 已提交
2177
  SSmlTableInfo** oneTable = (SSmlTableInfo**)taosHashIterate(info->childTables, NULL);
wmmhello's avatar
wmmhello 已提交
2178 2179 2180 2181 2182 2183
  while (oneTable) {
    SSmlTableInfo* tableData = *oneTable;

    SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
    strcpy(pName.dbname, info->pRequest->pDb);
    memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName));
D
dapan1121 已提交
2184 2185 2186 2187
    SRequestConnInfo conn = {.pTrans = info->taos->pAppInfo->pTransporter, 
                             .requestId = info->pRequest->requestId,
                             .requestObjRefId = info->pRequest->self,
                             .mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp)};
wmmhello's avatar
wmmhello 已提交
2188
    SVgroupInfo vg;
D
dapan1121 已提交
2189
    code = catalogGetTableHashVgroup(info->pCatalog, &conn, &pName, &vg);
2190
    if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2191 2192 2193 2194 2195
      uError("SML:0x%"PRIx64" catalogGetTableHashVgroup failed. table name: %s", info->id, tableData->childTableName);
      return code;
    }
    taosHashPut(info->pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));

wmmhello's avatar
wmmhello 已提交
2196
    SSmlSTableMeta** pMeta = (SSmlSTableMeta** )taosHashGet(info->superTables, tableData->sTableName, tableData->sTableNameLen);
wmmhello's avatar
wmmhello 已提交
2197 2198
    ASSERT (NULL != pMeta && NULL != *pMeta);

2199
    // use tablemeta of stable to save vgid and uid of child table
wmmhello's avatar
wmmhello 已提交
2200 2201 2202
    (*pMeta)->tableMeta->vgId = vg.vgId;
    (*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid

2203 2204
    code = smlBindData(info->exec, tableData->tags, (*pMeta)->cols, tableData->cols, info->dataFormat,
                       (*pMeta)->tableMeta, tableData->childTableName, info->msgBuf.buf, info->msgBuf.len);
wmmhello's avatar
wmmhello 已提交
2205 2206 2207
    if(code != TSDB_CODE_SUCCESS){
      return code;
    }
wmmhello's avatar
wmmhello 已提交
2208
    oneTable = (SSmlTableInfo**)taosHashIterate(info->childTables, oneTable);
wmmhello's avatar
wmmhello 已提交
2209
  }
wmmhello's avatar
wmmhello 已提交
2210

2211 2212 2213 2214 2215
  code = smlBuildOutput(info->exec, info->pVgHash);
  if (code != TSDB_CODE_SUCCESS) {
    uError("SML:0x%"PRIx64" smlBuildOutput failed", info->id);
    return code;
  }
2216 2217
  info->cost.insertRpcTime = taosGetTimestampUs();

wmmhello's avatar
wmmhello 已提交
2218 2219 2220
  //launchQueryImpl(info->pRequest, info->pQuery, false, NULL);
//  info->affectedRows = taos_affected_rows(info->pRequest);
//  return info->pRequest->code;
wmmhello's avatar
wmmhello 已提交
2221

wmmhello's avatar
wmmhello 已提交
2222 2223
  launchAsyncQuery(info->pRequest, info->pQuery);
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2224 2225
}

2226
static void smlPrintStatisticInfo(SSmlHandle *info){
2227
  uError("SML:0x%"PRIx64" smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d \
wmmhello's avatar
wmmhello 已提交
2228
        parse cost:%"PRId64",schema cost:%"PRId64",bind cost:%"PRId64",rpc cost:%"PRId64",total cost:%"PRId64"", info->id, info->cost.code,
2229 2230 2231 2232 2233 2234
         info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables, info->cost.numOfCreateSTables,
         info->cost.schemaTime-info->cost.parseTime, info->cost.insertBindTime-info->cost.schemaTime,
         info->cost.insertRpcTime-info->cost.insertBindTime, info->cost.endTime-info->cost.insertRpcTime,
         info->cost.endTime-info->cost.parseTime);
}

2235
static int32_t smlParseLine(SSmlHandle *info, char* lines[], int numLines){
wmmhello's avatar
wmmhello 已提交
2236
  int32_t code = TSDB_CODE_SUCCESS;
2237 2238 2239 2240 2241 2242
  if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
    code = smlParseJSON(info, *lines);
    if (code != TSDB_CODE_SUCCESS) {
      uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, *lines);
      return code;
    }
wmmhello's avatar
wmmhello 已提交
2243
    return code;
wmmhello's avatar
wmmhello 已提交
2244
  }
wmmhello's avatar
wmmhello 已提交
2245

wmmhello's avatar
wmmhello 已提交
2246
  for (int32_t i = 0; i < numLines; ++i) {
2247 2248 2249 2250 2251 2252 2253
    if(info->protocol == TSDB_SML_LINE_PROTOCOL){
      code = smlParseInfluxLine(info, lines[i]);
    }else if(info->protocol == TSDB_SML_TELNET_PROTOCOL){
      code = smlParseTelnetLine(info, lines[i]);
    }else{
      ASSERT(0);
    }
wmmhello's avatar
wmmhello 已提交
2254
    if (code != TSDB_CODE_SUCCESS) {
2255 2256
      uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, lines[i]);
      return code;
wmmhello's avatar
wmmhello 已提交
2257 2258
    }
  }
2259 2260 2261 2262 2263
  return code;
}

static int smlProcess(SSmlHandle *info, char* lines[], int numLines) {
  int32_t code = TSDB_CODE_SUCCESS;
2264 2265
  int32_t retryNum = 0;

2266 2267 2268 2269 2270 2271 2272
  info->cost.parseTime = taosGetTimestampUs();

  code = smlParseLine(info, lines, numLines);
  if (code != 0) {
    uError("SML:0x%"PRIx64" smlParseLine error : %s", info->id, tstrerror(code));
    goto cleanup;
  }
wmmhello's avatar
wmmhello 已提交
2273

2274 2275 2276 2277 2278
  info->cost.lineNum = numLines;
  info->cost.numOfSTables = taosHashGetSize(info->superTables);
  info->cost.numOfCTables = taosHashGetSize(info->childTables);

  info->cost.schemaTime = taosGetTimestampUs();
2279 2280 2281 2282

  do{
    code = smlModifyDBSchemas(info);
    if (code == 0) break;
wmmhello's avatar
wmmhello 已提交
2283
  } while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
2284

wmmhello's avatar
wmmhello 已提交
2285 2286 2287 2288
  if (code != 0) {
    uError("SML:0x%"PRIx64" smlModifyDBSchemas error : %s", info->id, tstrerror(code));
    goto cleanup;
  }
wmmhello's avatar
wmmhello 已提交
2289

2290
  info->cost.insertBindTime = taosGetTimestampUs();
wmmhello's avatar
wmmhello 已提交
2291 2292 2293
  code = smlInsertData(info);
  if (code != 0) {
    uError("SML:0x%"PRIx64" smlInsertData error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2294 2295
    goto cleanup;
  }
2296
  info->cost.endTime = taosGetTimestampUs();
wmmhello's avatar
wmmhello 已提交
2297 2298

cleanup:
2299
  info->cost.code = code;
2300
  smlPrintStatisticInfo(info);
wmmhello's avatar
wmmhello 已提交
2301 2302 2303
  return code;
}

D
dapan1121 已提交
2304
static int32_t isSchemalessDb(STscObj *taos, SRequestObj* request, SCatalog *catalog){
wmmhello's avatar
wmmhello 已提交
2305
  SName          name;
wmmhello's avatar
wmmhello 已提交
2306
  tNameSetDbName(&name, taos->acctId, taos->db, strlen(taos->db));
wmmhello's avatar
wmmhello 已提交
2307 2308 2309
  char dbFname[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(&name, dbFname);
  SDbCfgInfo pInfo = {0};
D
dapan1121 已提交
2310 2311 2312 2313
  SRequestConnInfo conn = {.pTrans = taos->pAppInfo->pTransporter, 
                           .requestId = request->requestId,
                           .requestObjRefId = request->self,
                           .mgmtEps = getEpSet_s(&taos->pAppInfo->mgmtEp)};
wmmhello's avatar
wmmhello 已提交
2314

D
dapan1121 已提交
2315
  int32_t code = catalogGetDBCfg(catalog, &conn, dbFname, &pInfo);
wmmhello's avatar
wmmhello 已提交
2316 2317 2318
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
D
dapan1121 已提交
2319 2320
  taosArrayDestroy(pInfo.pRetensions);
  
wmmhello's avatar
wmmhello 已提交
2321 2322 2323 2324 2325 2326
  if (!pInfo.schemaless){
    return TSDB_CODE_SML_INVALID_DB_CONF;
  }
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
2327 2328 2329 2330 2331 2332
static void smlInsertCallback(void* param, void* res, int32_t code) {
  SRequestObj *pRequest = (SRequestObj *)res;
  SSmlHandle* info = (SSmlHandle *)param;

  // lock
  if(code != TSDB_CODE_SUCCESS){
wmmhello's avatar
wmmhello 已提交
2333 2334 2335
    taosThreadSpinLock(&info->params->lock);
    info->params->request->code = code;
    taosThreadSpinUnlock(&info->params->lock);
wmmhello's avatar
wmmhello 已提交
2336 2337 2338
  }
  // unlock

wmmhello's avatar
wmmhello 已提交
2339
  printf("SML:0x%" PRIx64 " insert finished, code: %d, total: %d\n", info->id, code, info->affectedRows);
wmmhello's avatar
wmmhello 已提交
2340
  Params *pParam = info->params;
wmmhello's avatar
wmmhello 已提交
2341 2342 2343 2344
  bool isLast = info->isLast;
  smlDestroyInfo(info);

  if(isLast){
wmmhello's avatar
wmmhello 已提交
2345
    tsem_post(&pParam->sem);
wmmhello's avatar
wmmhello 已提交
2346 2347 2348
  }
}

wmmhello's avatar
wmmhello 已提交
2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369
/**
 * taos_schemaless_insert() parse and insert data points into database according to
 * different protocol.
 *
 * @param $lines input array may contain multiple lines, each line indicates a data point.
 *               If protocol=2 is used input array should contain single JSON
 *               string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
 *               multiple data points in JSON format, should include them in $JSON_string
 *               as a JSON array.
 * @param $numLines indicates how many data points in $lines.
 *                  If protocol = 2 is used this param will be ignored as $lines should
 *                  contain single JSON string.
 * @param $protocol indicates which protocol to use for parsing:
 *                  0 - influxDB line protocol
 *                  1 - OpenTSDB telnet line protocol
 *                  2 - OpenTSDB JSON format protocol
 * @return return zero for successful insertion. Otherwise return none-zero error code of
 *         failure reason.
 *
 */

wmmhello's avatar
wmmhello 已提交
2370
TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) {
2371
  SRequestObj* request = (SRequestObj*)createRequest((STscObj *)taos, TSDB_SQL_INSERT);
wmmhello's avatar
wmmhello 已提交
2372
  if(!request){
2373
    uError("SML:taos_schemaless_insert error request is null");
2374
    return NULL;
wmmhello's avatar
wmmhello 已提交
2375 2376
  }

wmmhello's avatar
wmmhello 已提交
2377
  ((STscObj *)taos)->schemalessType = 1;
wmmhello's avatar
wmmhello 已提交
2378
  SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
wmmhello's avatar
wmmhello 已提交
2379

wmmhello's avatar
wmmhello 已提交
2380
  int cnt = ceil(((double)numLines)/LINE_BATCH);
wmmhello's avatar
wmmhello 已提交
2381 2382
  Params params;
  params.request = request;
wmmhello's avatar
wmmhello 已提交
2383 2384 2385 2386 2387 2388 2389 2390
  tsem_init(&params.sem, 0, 0);
  taosThreadSpinInit(&(params.lock), 0);

  int32_t code = catalogGetHandle(((STscObj *)taos)->pAppInfo->clusterId, &params.catalog);
  if(code != TSDB_CODE_SUCCESS){
    uError("SML get catalog error %d", code);
    request->code = code;
    goto end;
wmmhello's avatar
wmmhello 已提交
2391 2392
  }

wmmhello's avatar
wmmhello 已提交
2393 2394
  if(request->pDb == NULL){
    request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
wmmhello's avatar
wmmhello 已提交
2395
    smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
wmmhello's avatar
wmmhello 已提交
2396 2397 2398
    goto end;
  }

D
dapan1121 已提交
2399
  if(isSchemalessDb(((STscObj *)taos), request, params.catalog) != TSDB_CODE_SUCCESS){
wmmhello's avatar
wmmhello 已提交
2400
    request->code = TSDB_CODE_SML_INVALID_DB_CONF;
wmmhello's avatar
wmmhello 已提交
2401
    smlBuildInvalidDataMsg(&msg, "Cannot write data to a non schemaless database", NULL);
wmmhello's avatar
wmmhello 已提交
2402 2403 2404
    goto end;
  }

2405
  if (!lines) {
2406
    request->code = TSDB_CODE_SML_INVALID_DATA;
wmmhello's avatar
wmmhello 已提交
2407
    smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
2408 2409 2410 2411 2412
    goto end;
  }

  if(protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL){
    request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
wmmhello's avatar
wmmhello 已提交
2413
    smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL);
2414
    goto end;
wmmhello's avatar
wmmhello 已提交
2415 2416
  }

2417
  if(protocol == TSDB_SML_LINE_PROTOCOL && (precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)){
2418
    request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
wmmhello's avatar
wmmhello 已提交
2419
    smlBuildInvalidDataMsg(&msg, "precision invalidate for line protocol", NULL);
2420 2421 2422
    goto end;
  }

wmmhello's avatar
wmmhello 已提交
2423
  for (int i = 0; i < cnt; ++i) {
wmmhello's avatar
wmmhello 已提交
2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436
    SRequestObj* req = (SRequestObj*)createRequest((STscObj *)taos, TSDB_SQL_INSERT);
    if(!req){
      request->code = TSDB_CODE_OUT_OF_MEMORY;
      uError("SML:taos_schemaless_insert error request is null");
      goto end;
    }
    SSmlHandle* info = smlBuildSmlInfo(taos, req, (SMLProtocolType)protocol, precision);
    if(!info){
      request->code = TSDB_CODE_OUT_OF_MEMORY;
      uError("SML:taos_schemaless_insert error SSmlHandle is null");
      goto end;
    }

wmmhello's avatar
wmmhello 已提交
2437 2438 2439
    int32_t perBatch = LINE_BATCH;

    if(numLines > perBatch){
wmmhello's avatar
wmmhello 已提交
2440 2441 2442 2443 2444 2445 2446 2447
      numLines -= perBatch;
      info->isLast = false;
    }else{
      perBatch = numLines;
      numLines = 0;
      info->isLast = true;
    }

wmmhello's avatar
wmmhello 已提交
2448
    info->params = &params;
wmmhello's avatar
wmmhello 已提交
2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459
    info->pCatalog = params.catalog;
    info->affectedRows = perBatch;
    info->pRequest->body.queryFp = smlInsertCallback;
    info->pRequest->body.param   = info;
    code = smlProcess(info, lines, perBatch);
    lines += perBatch;
    if (code != TSDB_CODE_SUCCESS){
      info->pRequest->body.queryFp(info, req, code);
    }
  }
  tsem_wait(&params.sem);
2460 2461

end:
wmmhello's avatar
wmmhello 已提交
2462 2463 2464 2465
  taosThreadSpinDestroy(&params.lock);
  tsem_destroy(&params.sem);
  ((STscObj *)taos)->schemalessType = 0;
  uDebug("result:%s", request->msgBuf);
wmmhello's avatar
wmmhello 已提交
2466
  return (TAOS_RES*)request;
wmmhello's avatar
wmmhello 已提交
2467
}
2468