tscParseOpenTSDB.c 31.3 KB
Newer Older
1 2 3 4 5
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

6
#include "cJSON.h"
7 8 9 10 11 12 13 14
#include "hash.h"
#include "taos.h"

#include "tscUtil.h"
#include "tsclient.h"
#include "tscLog.h"

#include "tscParseLine.h"
15

16 17 18
#define OTD_MAX_FIELDS_NUM 2
#define OTD_JSON_SUB_FIELDS_NUM 2
#define OTD_JSON_FIELDS_NUM 4
19

20 21
#define OTD_TIMESTAMP_COLUMN_NAME "ts"
#define OTD_METRIC_VALUE_COLUMN_NAME "value"
22

23
/* telnet style API parser */
24 25
static uint64_t HandleId = 0;

26
static uint64_t genUID() {
27 28 29 30 31 32 33 34 35
  uint64_t id;

  do {
    id = atomic_add_fetch_64(&HandleId, 1);
  } while (id == 0);

  return id;
}

36 37 38 39
static int32_t parseTelnetMetric(TAOS_SML_DATA_POINT *pSml, const char **index, SSmlLinesInfo* info) {
  const char *cur = *index;
  uint16_t len = 0;

40
  pSml->stableName = tcalloc(TSDB_TABLE_NAME_LEN, 1);
41
  if (pSml->stableName == NULL) {
42 43 44
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  if (isdigit(*cur)) {
45
    tscError("OTD:0x%"PRIx64" Metric cannnot start with digit", info->id);
46 47 48 49 50
    tfree(pSml->stableName);
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
  }

  while (*cur != '\0') {
51 52
    if (len >= TSDB_TABLE_NAME_LEN - 1) {
      tscError("OTD:0x%"PRIx64" Metric cannot exceeds %d characters", info->id, TSDB_TABLE_NAME_LEN - 1);
53
      tfree(pSml->stableName);
54
      return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
55 56 57 58 59 60
    }

    if (*cur == ' ') {
      break;
    }

61 62 63 64
    //convert dot to underscore for now, will be removed once dot is allowed in tbname.
    if (*cur == '.') {
      pSml->stableName[len] = '_';
    } else {
65
      pSml->stableName[len] = tolower(*cur);
66 67
    }

68 69 70
    cur++;
    len++;
  }
71
  if (len == 0 || *cur == '\0') {
72 73 74 75
    tfree(pSml->stableName);
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
  }

76 77
  pSml->stableName[len] = '\0';
  *index = cur + 1;
78
  tscDebug("OTD:0x%"PRIx64" Stable name in metric:%s|len:%d", info->id, pSml->stableName, len);
79 80 81 82 83 84 85 86 87 88 89

  return TSDB_CODE_SUCCESS;
}

static int32_t parseTelnetTimeStamp(TAOS_SML_KV **pTS, int *num_kvs, const char **index, SSmlLinesInfo* info) {
  //Timestamp must be the first KV to parse
  assert(*num_kvs == 0);

  const char *start, *cur;
  int32_t ret = TSDB_CODE_SUCCESS;
  int len = 0;
90
  char key[] = OTD_TIMESTAMP_COLUMN_NAME;
91 92 93
  char *value = NULL;

  start = cur = *index;
94
  //allocate fields for timestamp and value
95
  *pTS = tcalloc(OTD_MAX_FIELDS_NUM, sizeof(TAOS_SML_KV));
96 97 98 99 100 101 102 103 104

  while(*cur != '\0') {
    if (*cur == ' ') {
      break;
    }
    cur++;
    len++;
  }

105
  if (len > 0 && *cur != '\0') {
106
    value = tcalloc(len + 1, 1);
107 108
    memcpy(value, start, len);
  } else {
109
    tfree(*pTS);
110 111 112 113 114
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
  }

  ret = convertSmlTimeStamp(*pTS, value, len, info);
  if (ret) {
115 116
    tfree(value);
    tfree(*pTS);
117 118
    return ret;
  }
119
  tfree(value);
120

121
  (*pTS)->key = tcalloc(sizeof(key), 1);
122 123 124 125 126 127 128 129 130
  memcpy((*pTS)->key, key, sizeof(key));

  *num_kvs += 1;
  *index = cur + 1;

  return ret;
}

static int32_t parseTelnetMetricValue(TAOS_SML_KV **pKVs, int *num_kvs, const char **index, SSmlLinesInfo* info) {
131 132
  //skip timestamp
  TAOS_SML_KV *pVal = *pKVs + 1;
133 134 135
  const char *start, *cur;
  int32_t ret = TSDB_CODE_SUCCESS;
  int len = 0;
136
  char key[] = OTD_METRIC_VALUE_COLUMN_NAME;
137 138 139 140 141 142 143 144 145 146 147 148
  char *value = NULL;

  start = cur = *index;

  while(*cur != '\0') {
    if (*cur == ' ') {
      break;
    }
    cur++;
    len++;
  }

149
  if (len > 0 && *cur != '\0') {
150
    value = tcalloc(len + 1, 1);
151 152 153 154 155 156
    memcpy(value, start, len);
  } else {
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
  }

  if (!convertSmlValueType(pVal, value, len, info)) {
157
    tscError("OTD:0x%"PRIx64" Failed to convert metric value string(%s) to any type",
158
            info->id, value);
159
    tfree(value);
160
    return TSDB_CODE_TSC_INVALID_VALUE;
161
  }
162
  tfree(value);
163

164
  pVal->key = tcalloc(sizeof(key), 1);
165 166 167 168 169 170 171 172 173
  memcpy(pVal->key, key, sizeof(key));
  *num_kvs += 1;

  *index = cur + 1;
  return ret;
}

static int32_t parseTelnetTagKey(TAOS_SML_KV *pKV, const char **index, SHashObj *pHash, SSmlLinesInfo* info) {
  const char *cur = *index;
174
  char key[TSDB_COL_NAME_LEN];
175 176 177 178
  uint16_t len = 0;

  //key field cannot start with digit
  if (isdigit(*cur)) {
179
    tscError("OTD:0x%"PRIx64" Tag key cannnot start with digit", info->id);
180 181 182
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
  }
  while (*cur != '\0') {
183 184
    if (len >= TSDB_COL_NAME_LEN - 1) {
      tscError("OTD:0x%"PRIx64" Tag key cannot exceeds %d characters", info->id, TSDB_COL_NAME_LEN - 1);
185
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
186
    }
187 188 189
    if (*cur == ' ') {
      return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
    }
190 191 192 193 194 195 196 197
    if (*cur == '=') {
      break;
    }

    key[len] = *cur;
    cur++;
    len++;
  }
198
  if (len == 0 || *cur == '\0') {
199 200
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
  }
201 202 203
  key[len] = '\0';

  if (checkDuplicateKey(key, pHash, info)) {
204
    return TSDB_CODE_TSC_DUP_TAG_NAMES;
205 206
  }

207
  pKV->key = tcalloc(len + 1, 1);
208
  memcpy(pKV->key, key, len + 1);
209
  //tscDebug("OTD:0x%"PRIx64" Key:%s|len:%d", info->id, pKV->key, len);
210 211 212 213 214
  *index = cur + 1;
  return TSDB_CODE_SUCCESS;
}


215
static int32_t parseTelnetTagValue(TAOS_SML_KV *pKV, const char **index,
216 217 218 219 220 221 222
                          bool *is_last_kv, SSmlLinesInfo* info) {
  const char *start, *cur;
  char *value = NULL;
  uint16_t len = 0;
  start = cur = *index;

  while (1) {
223 224
    // whitespace or '\0' identifies a value
    if (*cur == ' ' || *cur == '\0') {
225 226 227 228 229 230 231 232
      // '\0' indicates end of value
      *is_last_kv = (*cur == '\0') ? true : false;
      break;
    }
    cur++;
    len++;
  }

233 234 235 236 237
  if (len == 0) {
    tfree(pKV->key);
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
  }

238
  value = tcalloc(len + 1, 1);
239 240 241
  memcpy(value, start, len);
  value[len] = '\0';
  if (!convertSmlValueType(pKV, value, len, info)) {
242
    tscError("OTD:0x%"PRIx64" Failed to convert sml value string(%s) to any type",
243 244
            info->id, value);
    //free previous alocated key field
245 246
    tfree(pKV->key);
    tfree(value);
247
    return TSDB_CODE_TSC_INVALID_VALUE;
248
  }
249
  tfree(value);
250

251
  *index = (*cur == '\0') ? cur : cur + 1;
252 253 254 255
  return TSDB_CODE_SUCCESS;
}

static int32_t parseTelnetTagKvs(TAOS_SML_KV **pKVs, int *num_kvs,
256
                               const char **index,  char **childTableName,
257 258 259 260 261 262 263
                               SHashObj *pHash, SSmlLinesInfo* info) {
  const char *cur = *index;
  int32_t ret = TSDB_CODE_SUCCESS;
  TAOS_SML_KV *pkv;
  bool is_last_kv = false;

  int32_t capacity = 4;
264
  *pKVs = tcalloc(capacity, sizeof(TAOS_SML_KV));
265 266 267 268 269
  pkv = *pKVs;

  while (*cur != '\0') {
    ret = parseTelnetTagKey(pkv, &cur, pHash, info);
    if (ret) {
270
      tscError("OTD:0x%"PRIx64" Unable to parse key", info->id);
271 272 273 274
      return ret;
    }
    ret = parseTelnetTagValue(pkv, &cur, &is_last_kv, info);
    if (ret) {
275
      tscError("OTD:0x%"PRIx64" Unable to parse value", info->id);
276 277 278
      return ret;
    }
    if ((strcasecmp(pkv->key, "ID") == 0) && pkv->type == TSDB_DATA_TYPE_BINARY) {
279
      ret = isValidChildTableName(pkv->value, pkv->length, info);
280 281 282
      if (ret) {
        return ret;
      }
283 284 285
      *childTableName = malloc(pkv->length + 1);
      memcpy(*childTableName, pkv->value, pkv->length);
      (*childTableName)[pkv->length] = '\0';
286
      strntolower_s(*childTableName, *childTableName, (int32_t)pkv->length);
287 288
      tfree(pkv->key);
      tfree(pkv->value);
289 290 291 292 293 294 295 296 297 298
    } else {
      *num_kvs += 1;
    }

    if (is_last_kv) {
      break;
    }

    //reallocate addtional memory for more kvs
    if ((*num_kvs + 1) > capacity) {
299
      TAOS_SML_KV *more_kvs = NULL;
300 301
      capacity *= 3; capacity /= 2;
      more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV));
302 303 304 305
      if (!more_kvs) {
        return TSDB_CODE_TSC_OUT_OF_MEMORY;
      }
      *pKVs = more_kvs;
306 307 308 309 310 311 312 313 314
    }

    //move pKV points to next TAOS_SML_KV block
    pkv = *pKVs + *num_kvs;
  }

  return ret;
}

315
static int32_t tscParseTelnetLine(const char* line, TAOS_SML_DATA_POINT* smlData, SSmlLinesInfo* info) {
316 317 318 319 320 321
  const char* index = line;
  int32_t ret = TSDB_CODE_SUCCESS;

  //Parse metric
  ret = parseTelnetMetric(smlData, &index, info);
  if (ret) {
322
    tscError("OTD:0x%"PRIx64" Unable to parse metric", info->id);
323 324
    return ret;
  }
325
  tscDebug("OTD:0x%"PRIx64" Parse metric finished", info->id);
326 327 328 329

  //Parse timestamp
  ret = parseTelnetTimeStamp(&smlData->fields, &smlData->fieldNum, &index, info);
  if (ret) {
330
    tscError("OTD:0x%"PRIx64" Unable to parse timestamp", info->id);
331 332
    return ret;
  }
333
  tscDebug("OTD:0x%"PRIx64" Parse timestamp finished", info->id);
334 335 336 337

  //Parse value
  ret = parseTelnetMetricValue(&smlData->fields, &smlData->fieldNum, &index, info);
  if (ret) {
338
    tscError("OTD:0x%"PRIx64" Unable to parse metric value", info->id);
339 340
    return ret;
  }
341
  tscDebug("OTD:0x%"PRIx64" Parse metric value finished", info->id);
342 343 344

  //Parse tagKVs
  SHashObj *keyHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
345
  ret = parseTelnetTagKvs(&smlData->tags, &smlData->tagNum, &index, &smlData->childTableName, keyHashTable, info);
346
  if (ret) {
347
    tscError("OTD:0x%"PRIx64" Unable to parse tags", info->id);
348 349 350
    taosHashCleanup(keyHashTable);
    return ret;
  }
351
  tscDebug("OTD:0x%"PRIx64" Parse tags finished", info->id);
352 353 354 355 356 357
  taosHashCleanup(keyHashTable);


  return TSDB_CODE_SUCCESS;
}

358
static int32_t tscParseTelnetLines(char* lines[], int numLines, SArray* points, SArray* failedLines, SSmlLinesInfo* info) {
359 360 361 362
  for (int32_t i = 0; i < numLines; ++i) {
    TAOS_SML_DATA_POINT point = {0};
    int32_t code = tscParseTelnetLine(lines[i], &point, info);
    if (code != TSDB_CODE_SUCCESS) {
363
      tscError("OTD:0x%"PRIx64" data point line parse failed. line %d : %s", info->id, i, lines[i]);
364
      destroySmlDataPoint(&point);
365
      return code;
366
    } else {
367
      tscDebug("OTD:0x%"PRIx64" data point line parse success. line %d", info->id, i);
368 369 370 371
    }

    taosArrayPush(points, &point);
  }
372
  return TSDB_CODE_SUCCESS;
373 374 375 376 377
}

int taos_insert_telnet_lines(TAOS* taos, char* lines[], int numLines) {
  int32_t code = 0;

378
  SSmlLinesInfo* info = tcalloc(1, sizeof(SSmlLinesInfo));
379 380 381
  info->id = genUID();

  if (numLines <= 0 || numLines > 65536) {
382
    tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines numLines should be between 1 and 65536. numLines: %d", info->id, numLines);
383
    tfree(info);
384 385 386 387 388 389
    code = TSDB_CODE_TSC_APP_ERROR;
    return code;
  }

  for (int i = 0; i < numLines; ++i) {
    if (lines[i] == NULL) {
390
      tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines line %d is NULL", info->id, i);
391
      tfree(info);
392 393 394 395 396 397 398
      code = TSDB_CODE_TSC_APP_ERROR;
      return code;
    }
  }

  SArray* lpPoints = taosArrayInit(numLines, sizeof(TAOS_SML_DATA_POINT));
  if (lpPoints == NULL) {
399
    tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines failed to allocate memory", info->id);
400
    tfree(info);
401 402 403
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

404
  tscDebug("OTD:0x%"PRIx64" taos_insert_telnet_lines begin inserting %d lines, first line: %s", info->id, numLines, lines[0]);
405 406 407 408 409 410 411 412 413 414
  code = tscParseTelnetLines(lines, numLines, lpPoints, NULL, info);
  size_t numPoints = taosArrayGetSize(lpPoints);

  if (code != 0) {
    goto cleanup;
  }

  TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints);
  code = tscSmlInsert(taos, points, (int)numPoints, info);
  if (code != 0) {
415
    tscError("OTD:0x%"PRIx64" taos_insert_telnet_lines error: %s", info->id, tstrerror((code)));
416 417 418
  }

cleanup:
419
  tscDebug("OTD:0x%"PRIx64" taos_insert_telnet_lines finish inserting %d lines. code: %d", info->id, numLines, code);
420 421
  points = TARRAY_GET_START(lpPoints);
  numPoints = taosArrayGetSize(lpPoints);
422
  for (int i = 0; i < numPoints; ++i) {
423 424 425 426 427
    destroySmlDataPoint(points+i);
  }

  taosArrayDestroy(lpPoints);

428
  tfree(info);
429 430
  return code;
}
431 432

int taos_telnet_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
433
  SSmlLinesInfo* info = tcalloc(1, sizeof(SSmlLinesInfo));
434 435
  info->id = genUID();
  int code = tscSmlInsert(taos, points, numPoint, info);
436
  tfree(info);
437 438
  return code;
}
439 440 441


/* telnet style API parser */
442
static int32_t parseMetricFromJSON(cJSON *root, TAOS_SML_DATA_POINT* pSml, SSmlLinesInfo* info) {
443
  cJSON *metric = cJSON_GetObjectItem(root, "metric");
444
  if (!cJSON_IsString(metric)) {
445 446 447
    return  TSDB_CODE_TSC_INVALID_JSON;
  }

448
  size_t stableLen = strlen(metric->valuestring);
449
  if (stableLen > TSDB_TABLE_NAME_LEN - 1) {
450
      tscError("OTD:0x%"PRIx64" Metric cannot exceeds %d characters in JSON", info->id, TSDB_TABLE_NAME_LEN - 1);
451 452 453 454 455 456 457 458 459
      return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
  }

  pSml->stableName = tcalloc(stableLen + 1, sizeof(char));
  if (pSml->stableName == NULL){
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  if (isdigit(metric->valuestring[0])) {
460
    tscError("OTD:0x%"PRIx64" Metric cannnot start with digit in JSON", info->id);
461 462 463 464
    tfree(pSml->stableName);
    return TSDB_CODE_TSC_INVALID_JSON;
  }

465
  //convert dot to underscore for now, will be removed once dot is allowed in tbname.
466
  for (int i = 0; i < stableLen; ++i) {
467 468 469 470 471
    if (metric->valuestring[i] == '.') {
      metric->valuestring[i] = '_';
    }
  }

472
  tstrncpy(pSml->stableName, metric->valuestring, stableLen + 1);
473
  strntolower_s(pSml->stableName, pSml->stableName, (int32_t)stableLen);
474 475 476 477 478

  return TSDB_CODE_SUCCESS;

}

479
static int32_t parseTimestampFromJSONObj(cJSON *root, int64_t *tsVal, SSmlLinesInfo* info) {
480
  int32_t size = cJSON_GetArraySize(root);
481
  if (size != OTD_JSON_SUB_FIELDS_NUM) {
482 483 484 485 486 487 488 489 490 491 492 493 494
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *value = cJSON_GetObjectItem(root, "value");
  if (!cJSON_IsNumber(value)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *type = cJSON_GetObjectItem(root, "type");
  if (!cJSON_IsString(type)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

495
  *tsVal = strtoll(value->numberstring, NULL, 10);
496 497 498 499 500 501
  //if timestamp value is 0 use current system time
  if (*tsVal == 0) {
    *tsVal = taosGetTimestampNs();
    return TSDB_CODE_SUCCESS;
  }

502
  size_t typeLen = strlen(type->valuestring);
503
  strntolower_s(type->valuestring, type->valuestring, (int32_t)typeLen);
504 505
  if (typeLen == 1 && type->valuestring[0] == 's') {
    //seconds
506
    *tsVal = (int64_t)(*tsVal * 1e9);
507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523
  } else if (typeLen == 2 && type->valuestring[1] == 's') {
    switch (type->valuestring[0]) {
      case 'm':
        //milliseconds
        *tsVal = convertTimePrecision(*tsVal, TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_NANO);
        break;
      case 'u':
        //microseconds
        *tsVal = convertTimePrecision(*tsVal, TSDB_TIME_PRECISION_MICRO, TSDB_TIME_PRECISION_NANO);
        break;
      case 'n':
        //nanoseconds
        *tsVal = *tsVal * 1;
        break;
      default:
        return TSDB_CODE_TSC_INVALID_JSON;
    }
524 525
  } else {
    return TSDB_CODE_TSC_INVALID_JSON;
526 527 528 529 530
  }

  return TSDB_CODE_SUCCESS;
}

531
static int32_t parseTimestampFromJSON(cJSON *root, TAOS_SML_KV **pTS, int *num_kvs, SSmlLinesInfo* info) {
532 533 534
  //Timestamp must be the first KV to parse
  assert(*num_kvs == 0);
  int64_t tsVal;
535
  char key[] = OTD_TIMESTAMP_COLUMN_NAME;
536 537

  cJSON *timestamp = cJSON_GetObjectItem(root, "timestamp");
538 539 540 541 542
  if (cJSON_IsNumber(timestamp)) {
    //timestamp value 0 indicates current system time
    if (timestamp->valueint == 0) {
      tsVal = taosGetTimestampNs();
    } else {
543 544
      tsVal = strtoll(timestamp->numberstring, NULL, 10);
      tsVal = convertTimePrecision(tsVal, TSDB_TIME_PRECISION_MICRO, TSDB_TIME_PRECISION_NANO);
545 546
    }
  } else if (cJSON_IsObject(timestamp)) {
547
    int32_t ret = parseTimestampFromJSONObj(timestamp, &tsVal, info);
548 549 550 551
    if (ret != TSDB_CODE_SUCCESS) {
      tscError("OTD:0x%"PRIx64" Failed to parse timestamp from JSON Obj", info->id);
      return ret;
    }
552
  } else {
553
    return TSDB_CODE_TSC_INVALID_JSON;
554 555 556
  }

  //allocate fields for timestamp and value
557
  *pTS = tcalloc(OTD_MAX_FIELDS_NUM, sizeof(TAOS_SML_KV));
558 559 560 561 562 563 564 565 566 567 568 569 570 571 572


  (*pTS)->key = tcalloc(sizeof(key), 1);
  memcpy((*pTS)->key, key, sizeof(key));

  (*pTS)->type = TSDB_DATA_TYPE_TIMESTAMP;
  (*pTS)->length = (int16_t)tDataTypes[(*pTS)->type].bytes;
  (*pTS)->value = tcalloc((*pTS)->length, 1);
  memcpy((*pTS)->value, &tsVal, (*pTS)->length);

  *num_kvs += 1;
  return TSDB_CODE_SUCCESS;

}

573
static int32_t convertJSONBool(TAOS_SML_KV *pVal, char* typeStr, int64_t valueInt, SSmlLinesInfo* info) {
574 575 576 577 578 579 580 581 582 583 584 585
  if (strcasecmp(typeStr, "bool") != 0) {
    tscError("OTD:0x%"PRIx64" invalid type(%s) for JSON Bool", info->id, typeStr);
    return TSDB_CODE_TSC_INVALID_JSON_TYPE;
  }
  pVal->type = TSDB_DATA_TYPE_BOOL;
  pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
  pVal->value = tcalloc(pVal->length, 1);
  *(bool *)(pVal->value) = valueInt ? true : false;

  return TSDB_CODE_SUCCESS;
}

586
static int32_t convertJSONNumber(TAOS_SML_KV *pVal, char* typeStr, cJSON *value, SSmlLinesInfo* info) {
587
  //tinyint
588 589
  if (strcasecmp(typeStr, "i8") == 0 ||
      strcasecmp(typeStr, "tinyint") == 0) {
590
    if (!IS_VALID_TINYINT(value->valueint)) {
591
      tscError("OTD:0x%"PRIx64" JSON value(%"PRId64") cannot fit in type(tinyint)", info->id, value->valueint);
592 593 594 595 596 597 598 599 600
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_TINYINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->value = tcalloc(pVal->length, 1);
    *(int8_t *)(pVal->value) = (int8_t)(value->valueint);
    return TSDB_CODE_SUCCESS;
  }
  //smallint
601 602
  if (strcasecmp(typeStr, "i16") == 0 ||
      strcasecmp(typeStr, "smallint") == 0) {
603
    if (!IS_VALID_SMALLINT(value->valueint)) {
604
      tscError("OTD:0x%"PRIx64" JSON value(%"PRId64") cannot fit in type(smallint)", info->id, value->valueint);
605 606 607 608 609 610 611 612 613
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_SMALLINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->value = tcalloc(pVal->length, 1);
    *(int16_t *)(pVal->value) = (int16_t)(value->valueint);
    return TSDB_CODE_SUCCESS;
  }
  //int
614 615
  if (strcasecmp(typeStr, "i32") == 0 ||
      strcasecmp(typeStr, "int") == 0) {
616
    if (!IS_VALID_INT(value->valueint)) {
617
      tscError("OTD:0x%"PRIx64" JSON value(%"PRId64") cannot fit in type(int)", info->id, value->valueint);
618 619 620 621 622 623 624 625 626
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_INT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->value = tcalloc(pVal->length, 1);
    *(int32_t *)(pVal->value) = (int32_t)(value->valueint);
    return TSDB_CODE_SUCCESS;
  }
  //bigint
627 628
  if (strcasecmp(typeStr, "i64") == 0 ||
      strcasecmp(typeStr, "bigint") == 0) {
629 630 631
    pVal->type = TSDB_DATA_TYPE_BIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->value = tcalloc(pVal->length, 1);
632 633 634 635 636 637 638 639 640 641
    /* cJSON conversion of legit BIGINT may overflow,
     * use original string to do the conversion.
     */
    errno = 0;
    int64_t val = (int64_t)strtoll(value->numberstring, NULL, 10);
    if (errno == ERANGE || !IS_VALID_BIGINT(val)) {
      tscError("OTD:0x%"PRIx64" JSON value(%s) cannot fit in type(bigint)", info->id, value->numberstring);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    *(int64_t *)(pVal->value) = val;
642 643 644
    return TSDB_CODE_SUCCESS;
  }
  //float
645 646
  if (strcasecmp(typeStr, "f32") == 0 ||
      strcasecmp(typeStr, "float") == 0) {
647 648 649 650 651 652 653 654 655 656 657
    if (!IS_VALID_FLOAT(value->valuedouble)) {
      tscError("OTD:0x%"PRIx64" JSON value(%f) cannot fit in type(float)", info->id, value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_FLOAT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->value = tcalloc(pVal->length, 1);
    *(float *)(pVal->value) = (float)(value->valuedouble);
    return TSDB_CODE_SUCCESS;
  }
  //double
658 659
  if (strcasecmp(typeStr, "f64") == 0 ||
      strcasecmp(typeStr, "double") == 0) {
660 661 662 663 664 665 666 667 668 669 670 671
    if (!IS_VALID_DOUBLE(value->valuedouble)) {
      tscError("OTD:0x%"PRIx64" JSON value(%f) cannot fit in type(double)", info->id, value->valuedouble);
      return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
    }
    pVal->type = TSDB_DATA_TYPE_DOUBLE;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->value = tcalloc(pVal->length, 1);
    *(double *)(pVal->value) = (double)(value->valuedouble);
    return TSDB_CODE_SUCCESS;
  }

  //if reach here means type is unsupported
672
  tscError("OTD:0x%"PRIx64" invalid type(%s) for JSON Number", info->id, typeStr);
673
  return TSDB_CODE_TSC_INVALID_JSON_TYPE;
674 675
}

676
static int32_t convertJSONString(TAOS_SML_KV *pVal, char* typeStr, cJSON *value, SSmlLinesInfo* info) {
677 678 679 680 681 682 683 684
  if (strcasecmp(typeStr, "binary") == 0) {
    pVal->type = TSDB_DATA_TYPE_BINARY;
  } else if (strcasecmp(typeStr, "nchar") == 0) {
    pVal->type = TSDB_DATA_TYPE_NCHAR;
  } else {
    tscError("OTD:0x%"PRIx64" invalid type(%s) for JSON String", info->id, typeStr);
    return TSDB_CODE_TSC_INVALID_JSON_TYPE;
  }
685
  pVal->length = (int16_t)strlen(value->valuestring);
686 687 688 689 690
  pVal->value = tcalloc(pVal->length + 1, 1);
  memcpy(pVal->value, value->valuestring, pVal->length);
  return TSDB_CODE_SUCCESS;
}

691
static int32_t parseValueFromJSONObj(cJSON *root, TAOS_SML_KV *pVal, SSmlLinesInfo* info) {
692 693 694
  int32_t ret = TSDB_CODE_SUCCESS;
  int32_t size = cJSON_GetArraySize(root);

695
  if (size != OTD_JSON_SUB_FIELDS_NUM) {
696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *value = cJSON_GetObjectItem(root, "value");
  if (value == NULL) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *type = cJSON_GetObjectItem(root, "type");
  if (!cJSON_IsString(type)) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  switch (value->type) {
    case cJSON_True:
    case cJSON_False: {
712
      ret = convertJSONBool(pVal, type->valuestring, value->valueint, info);
713 714 715 716 717 718
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
      }
      break;
    }
    case cJSON_Number: {
719 720 721
      ret = convertJSONNumber(pVal, type->valuestring, value, info);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
722
      }
723
      break;
724 725
    }
    case cJSON_String: {
726 727 728
      ret = convertJSONString(pVal, type->valuestring, value, info);
      if (ret != TSDB_CODE_SUCCESS) {
        return ret;
729
      }
730
      break;
731 732
    }
    default:
733
      return TSDB_CODE_TSC_INVALID_JSON_TYPE;
734 735 736 737 738
  }

  return TSDB_CODE_SUCCESS;
}

739
static int32_t parseValueFromJSON(cJSON *root, TAOS_SML_KV *pVal, SSmlLinesInfo* info) {
740
  int type = root->type;
741

742
  switch (type) {
743 744 745 746 747
    case cJSON_True:
    case cJSON_False: {
      pVal->type = TSDB_DATA_TYPE_BOOL;
      pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
      pVal->value = tcalloc(pVal->length, 1);
748
      *(bool *)(pVal->value) = root->valueint ? true : false;
749 750 751
      break;
    }
    case cJSON_Number: {
752 753 754 755 756
      //convert default JSON Number type to BIGINT/DOUBLE
      if (isValidInteger(root->numberstring)) {
        pVal->type = TSDB_DATA_TYPE_BIGINT;
        pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
        pVal->value = tcalloc(pVal->length, 1);
757 758 759 760 761 762 763 764 765 766
        /* cJSON conversion of legit BIGINT may overflow,
         * use original string to do the conversion.
         */
        errno = 0;
        int64_t val = (int64_t)strtoll(root->numberstring, NULL, 10);
        if (errno == ERANGE || !IS_VALID_BIGINT(val)) {
          tscError("OTD:0x%"PRIx64" JSON value(%s) cannot fit in type(bigint)", info->id, root->numberstring);
          return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
        }
        *(int64_t *)(pVal->value) = val;
767 768 769 770 771 772 773 774
      } else if (isValidFloat(root->numberstring)) {
        pVal->type = TSDB_DATA_TYPE_DOUBLE;
        pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
        pVal->value = tcalloc(pVal->length, 1);
        *(double *)(pVal->value) = (double)(root->valuedouble);
      } else {
        return TSDB_CODE_TSC_INVALID_JSON_TYPE;
      }
775 776 777
      break;
    }
    case cJSON_String: {
778
      /* set default JSON type to binary/nchar according to
779
       * user configured parameter tsDefaultJSONStrType
780
       */
781
      if (strcasecmp(tsDefaultJSONStrType, "binary") == 0) {
782
        pVal->type = TSDB_DATA_TYPE_BINARY;
783
      } else if (strcasecmp(tsDefaultJSONStrType, "nchar") == 0) {
784 785
        pVal->type = TSDB_DATA_TYPE_NCHAR;
      } else {
786
        tscError("OTD:0x%"PRIx64" Invalid default JSON string type set from config %s", info->id, tsDefaultJSONStrType);
787 788
        return TSDB_CODE_TSC_INVALID_JSON_CONFIG;
      }
789
      //pVal->length = wcslen((wchar_t *)root->valuestring) * TSDB_NCHAR_SIZE;
790
      pVal->length = (int16_t)strlen(root->valuestring);
791
      pVal->value = tcalloc(pVal->length + 1, 1);
792
      memcpy(pVal->value, root->valuestring, pVal->length);
793 794
      break;
    }
795
    case cJSON_Object: {
796 797 798 799 800
      int32_t ret = parseValueFromJSONObj(root, pVal, info);
      if (ret != TSDB_CODE_SUCCESS) {
        tscError("OTD:0x%"PRIx64" Failed to parse timestamp from JSON Obj", info->id);
        return ret;
      }
801 802
      break;
    }
803 804 805 806
    default:
      return TSDB_CODE_TSC_INVALID_JSON;
  }

807 808 809
  return TSDB_CODE_SUCCESS;
}

810
static int32_t parseMetricValueFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs, SSmlLinesInfo* info) {
811 812
  //skip timestamp
  TAOS_SML_KV *pVal = *pKVs + 1;
813
  char key[] = OTD_METRIC_VALUE_COLUMN_NAME;
814 815 816 817 818 819

  cJSON *metricVal = cJSON_GetObjectItem(root, "value");
  if (metricVal == NULL) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

820
  int32_t ret = parseValueFromJSON(metricVal, pVal, info);
821 822 823 824
  if (ret != TSDB_CODE_SUCCESS) {
    return ret;
  }

825 826 827 828 829 830 831 832
  pVal->key = tcalloc(sizeof(key), 1);
  memcpy(pVal->key, key, sizeof(key));

  *num_kvs += 1;
  return TSDB_CODE_SUCCESS;

}

833

834 835
static int32_t parseTagsFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs, char **childTableName,
                                 SHashObj *pHash, SSmlLinesInfo* info) {
836
  int32_t ret = TSDB_CODE_SUCCESS;
837 838 839 840 841 842

  cJSON *tags = cJSON_GetObjectItem(root, "tags");
  if (tags == NULL || tags->type != cJSON_Object) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

843
  //only pick up the first ID value as child table name
844
  cJSON *id = cJSON_GetObjectItem(tags, "ID");
845
  if (id != NULL) {
846
    size_t idLen = strlen(id->valuestring);
847
    ret = isValidChildTableName(id->valuestring, (int16_t)idLen, info);
848
    if (ret != TSDB_CODE_SUCCESS) {
849 850 851
      return ret;
    }
    *childTableName = tcalloc(idLen + 1, sizeof(char));
852
    memcpy(*childTableName, id->valuestring, idLen);
853
    strntolower_s(*childTableName, *childTableName, (int32_t)idLen);
854

855
    //check duplicate IDs
856 857 858 859
    cJSON_DeleteItemFromObject(tags, "ID");
    id = cJSON_GetObjectItem(tags, "ID");
    if (id != NULL) {
      return TSDB_CODE_TSC_DUP_TAG_NAMES;
860
    }
861 862 863 864 865 866 867 868 869 870 871 872 873 874
  }

  int32_t tagNum = cJSON_GetArraySize(tags);
  //at least one tag pair required
  if (tagNum <= 0) {
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  //allocate memory for tags
  *pKVs = tcalloc(tagNum, sizeof(TAOS_SML_KV));
  TAOS_SML_KV *pkv = *pKVs;

  for (int32_t i = 0; i < tagNum; ++i) {
    cJSON *tag = cJSON_GetArrayItem(tags, i);
875 876 877
    if (tag == NULL) {
      return TSDB_CODE_TSC_INVALID_JSON;
    }
878 879 880 881
    //check duplicate keys
    if (checkDuplicateKey(tag->string, pHash, info)) {
      return TSDB_CODE_TSC_DUP_TAG_NAMES;
    }
882
    //key
883
    size_t keyLen = strlen(tag->string);
884
    if (keyLen > TSDB_COL_NAME_LEN - 1) {
885
      tscError("OTD:0x%"PRIx64" Tag key cannot exceeds %d characters in JSON", info->id, TSDB_COL_NAME_LEN - 1);
886 887
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
    }
888 889
    pkv->key = tcalloc(keyLen + 1, sizeof(char));
    strncpy(pkv->key, tag->string, keyLen);
890
    //value
891
    ret = parseValueFromJSON(tag, pkv, info);
892 893
    if (ret != TSDB_CODE_SUCCESS) {
      return ret;
894 895
    }
    *num_kvs += 1;
896
    pkv++;
897

898 899
  }

900
  return ret;
901 902 903

}

904
static int32_t tscParseJSONPayload(cJSON *root, TAOS_SML_DATA_POINT* pSml, SSmlLinesInfo* info) {
905 906
  int32_t ret = TSDB_CODE_SUCCESS;

907
  if (!cJSON_IsObject(root)) {
908
    tscError("OTD:0x%"PRIx64" data point needs to be JSON object", info->id);
909 910 911
    return TSDB_CODE_TSC_INVALID_JSON;
  }

912 913
  int32_t size = cJSON_GetArraySize(root);
  //outmost json fields has to be exactly 4
914
  if (size != OTD_JSON_FIELDS_NUM) {
915
    tscError("OTD:0x%"PRIx64" Invalid number of JSON fields in data point %d", info->id, size);
916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  //Parse metric
  ret = parseMetricFromJSON(root, pSml, info);
  if (ret != TSDB_CODE_SUCCESS) {
    tscError("OTD:0x%"PRIx64" Unable to parse metric from JSON payload", info->id);
    return ret;
  }
  tscDebug("OTD:0x%"PRIx64" Parse metric from JSON payload finished", info->id);

  //Parse timestamp
  ret = parseTimestampFromJSON(root, &pSml->fields, &pSml->fieldNum, info);
  if (ret) {
    tscError("OTD:0x%"PRIx64" Unable to parse timestamp from JSON payload", info->id);
    return ret;
  }
  tscDebug("OTD:0x%"PRIx64" Parse timestamp from JSON payload finished", info->id);

  //Parse metric value
  ret = parseMetricValueFromJSON(root, &pSml->fields, &pSml->fieldNum, info);
  if (ret) {
    tscError("OTD:0x%"PRIx64" Unable to parse metric value from JSON payload", info->id);
    return ret;
  }
  tscDebug("OTD:0x%"PRIx64" Parse metric value from JSON payload finished", info->id);

  //Parse tags
944 945
  SHashObj *keyHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
  ret = parseTagsFromJSON(root, &pSml->tags, &pSml->tagNum, &pSml->childTableName, keyHashTable, info);
946 947
  if (ret) {
    tscError("OTD:0x%"PRIx64" Unable to parse tags from JSON payload", info->id);
948
    taosHashCleanup(keyHashTable);
949 950 951
    return ret;
  }
  tscDebug("OTD:0x%"PRIx64" Parse tags from JSON payload finished", info->id);
952
  taosHashCleanup(keyHashTable);
953 954

  return TSDB_CODE_SUCCESS;
955
}
956

957
static int32_t tscParseMultiJSONPayload(char* payload, SArray* points, SSmlLinesInfo* info) {
958
  int32_t payloadNum, ret;
959
  ret = TSDB_CODE_SUCCESS;
960 961 962 963 964 965 966 967 968 969 970 971

  if (payload == NULL) {
    tscError("OTD:0x%"PRIx64" empty JSON Payload", info->id);
    return TSDB_CODE_TSC_INVALID_JSON;
  }

  cJSON *root = cJSON_Parse(payload);
  //multiple data points must be sent in JSON array
  if (cJSON_IsObject(root)) {
    payloadNum = 1;
  } else if (cJSON_IsArray(root)) {
    payloadNum = cJSON_GetArraySize(root);
972
  } else {
973
    tscError("OTD:0x%"PRIx64" Invalid JSON Payload", info->id);
974 975
    ret = TSDB_CODE_TSC_INVALID_JSON;
    goto PARSE_JSON_OVER;
976 977
  }

978 979 980 981 982 983
  for (int32_t i = 0; i < payloadNum; ++i) {
    TAOS_SML_DATA_POINT point = {0};
    cJSON *dataPoint = (payloadNum == 1) ? root : cJSON_GetArrayItem(root, i);

    ret = tscParseJSONPayload(dataPoint, &point, info);
    if (ret != TSDB_CODE_SUCCESS) {
984
      tscError("OTD:0x%"PRIx64" JSON data point parse failed", info->id);
985
      destroySmlDataPoint(&point);
986
      goto PARSE_JSON_OVER;
987
    } else {
988
      tscDebug("OTD:0x%"PRIx64" JSON data point parse success", info->id);
989 990 991
    }
    taosArrayPush(points, &point);
  }
992

993 994 995
PARSE_JSON_OVER:
  cJSON_Delete(root);
  return ret;
996 997 998 999 1000 1001 1002 1003 1004 1005
}

int taos_insert_json_payload(TAOS* taos, char* payload) {
  int32_t code = 0;

  SSmlLinesInfo* info = tcalloc(1, sizeof(SSmlLinesInfo));
  info->id = genUID();

  if (payload == NULL) {
    tscError("OTD:0x%"PRIx64" taos_insert_json_payload payload is NULL", info->id);
1006
    tfree(info);
1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035
    code = TSDB_CODE_TSC_APP_ERROR;
    return code;
  }

  SArray* lpPoints = taosArrayInit(1, sizeof(TAOS_SML_DATA_POINT));
  if (lpPoints == NULL) {
    tscError("OTD:0x%"PRIx64" taos_insert_json_payload failed to allocate memory", info->id);
    tfree(info);
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  tscDebug("OTD:0x%"PRIx64" taos_insert_telnet_lines begin inserting %d points", info->id, 1);
  code = tscParseMultiJSONPayload(payload, lpPoints, info);
  size_t numPoints = taosArrayGetSize(lpPoints);

  if (code != 0) {
    goto cleanup;
  }

  TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints);
  code = tscSmlInsert(taos, points, (int)numPoints, info);
  if (code != 0) {
    tscError("OTD:0x%"PRIx64" taos_insert_json_payload error: %s", info->id, tstrerror((code)));
  }

cleanup:
  tscDebug("OTD:0x%"PRIx64" taos_insert_json_payload finish inserting 1 Point. code: %d", info->id, code);
  points = TARRAY_GET_START(lpPoints);
  numPoints = taosArrayGetSize(lpPoints);
1036
  for (int i = 0; i < numPoints; ++i) {
1037 1038 1039 1040 1041 1042 1043 1044
    destroySmlDataPoint(points+i);
  }

  taosArrayDestroy(lpPoints);

  tfree(info);
  return code;
}