tscParseLineProtocol.c 53.2 KB
Newer Older
S
shenglian zhou 已提交
1 2 3 4
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
5

S
shenglian zhou 已提交
6 7 8 9 10 11 12 13
#include "os.h"
#include "osString.h"
#include "ttype.h"
#include "tmd5.h"
#include "tstrbuild.h"
#include "tname.h"
#include "hash.h"
#include "tskiplist.h"
14

S
shenglian zhou 已提交
15
#include "tscUtil.h"
16 17
#include "tsclient.h"
#include "tscLog.h"
S
shenglian zhou 已提交
18

19
#include "taos.h"
20

S
shenglian zhou 已提交
21 22 23 24 25 26
typedef struct  {
  char sTableName[TSDB_TABLE_NAME_LEN];
  SHashObj* tagHash;
  SHashObj* fieldHash;
  SArray* tags; //SArray<SSchema>
  SArray* fields; //SArray<SSchema>
S
shenglian zhou 已提交
27
  uint8_t precision;
S
shenglian zhou 已提交
28 29
} SSmlSTableSchema;

S
shenglian zhou 已提交
30 31 32 33 34
typedef struct {
  char* key;
  uint8_t type;
  int16_t length;
  char* value;
35 36

  //===================================
37
  size_t fieldSchemaIdx;
S
shenglian zhou 已提交
38 39 40 41 42 43 44 45 46 47 48 49 50
} TAOS_SML_KV;

typedef struct {
  char* stableName;

  char* childTableName;
  TAOS_SML_KV* tags;
  int tagNum;

  // first kv must be timestamp
  TAOS_SML_KV* fields;
  int fieldNum;

S
shenglian zhou 已提交
51
  //================================
52
  size_t schemaIdx;
S
shenglian zhou 已提交
53 54
} TAOS_SML_DATA_POINT;

55 56 57 58 59 60 61 62
typedef enum {
  SML_TIME_STAMP_NOW,
  SML_TIME_STAMP_SECONDS,
  SML_TIME_STAMP_MILLI_SECONDS,
  SML_TIME_STAMP_MICRO_SECONDS,
  SML_TIME_STAMP_NANO_SECONDS
} SMLTimeStampType;

63 64
//=================================================================================================

S
shenglian zhou 已提交
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
int compareSmlColKv(const void* p1, const void* p2) {
  TAOS_SML_KV* kv1 = (TAOS_SML_KV*)p1;
  TAOS_SML_KV* kv2 = (TAOS_SML_KV*)p2;
  int kvLen1 = (int)strlen(kv1->key);
  int kvLen2 = (int)strlen(kv2->key);
  int res = strncasecmp(kv1->key, kv2->key, MIN(kvLen1, kvLen2));
  if (res != 0) {
    return res;
  } else {
    return kvLen1-kvLen2;
  }
}

typedef enum {
  SCHEMA_ACTION_CREATE_STABLE,
  SCHEMA_ACTION_ADD_COLUMN,
  SCHEMA_ACTION_ADD_TAG,
  SCHEMA_ACTION_CHANGE_COLUMN_SIZE,
  SCHEMA_ACTION_CHANGE_TAG_SIZE,
} ESchemaAction;

typedef struct {
  char sTableName[TSDB_TABLE_NAME_LEN];
  SArray* tags; //SArray<SSchema>
  SArray* fields; //SArray<SSchema>
} SCreateSTableActionInfo;

typedef struct {
  char sTableName[TSDB_TABLE_NAME_LEN];
  SSchema* field;
} SAlterSTableActionInfo;

typedef struct {
  ESchemaAction action;
  union {
    SCreateSTableActionInfo createSTable;
    SAlterSTableActionInfo alterSTable;
  };
} SSchemaAction;

S
shenglian zhou 已提交
105
static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes) {
S
shenglian zhou 已提交
106 107 108 109 110 111
  if (!IS_VAR_DATA_TYPE(kv->type)) {
    *bytes = tDataTypes[kv->type].bytes;
  } else {
    if (kv->type == TSDB_DATA_TYPE_NCHAR) {
      char* ucs = malloc(kv->length * TSDB_NCHAR_SIZE + 1);
      int32_t bytesNeeded = 0;
S
Shenglian Zhou 已提交
112 113 114 115 116 117
      bool succ = taosMbsToUcs4(kv->value, kv->length, ucs, kv->length * TSDB_NCHAR_SIZE, &bytesNeeded);
      if (!succ) {
        free(ucs);
        tscError("convert nchar string to UCS4_LE failed:%s", kv->value);
        return TSDB_CODE_TSC_INVALID_VALUE;
      }
S
shenglian zhou 已提交
118 119 120 121 122 123 124 125 126
      free(ucs);
      *bytes =  bytesNeeded + VARSTR_HEADER_SIZE;
    } else if (kv->type == TSDB_DATA_TYPE_BINARY) {
      *bytes = kv->length + VARSTR_HEADER_SIZE;
    }
  }
  return 0;
}

S
shenglian zhou 已提交
127
static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* array) {
S
shenglian zhou 已提交
128
  SSchema* pField = NULL;
129 130
  size_t* pFieldIdx = taosHashGet(hash, smlKv->key, strlen(smlKv->key));
  size_t fieldIdx = -1;
S
Shenglian Zhou 已提交
131
  int32_t code = 0;
132 133 134
  if (pFieldIdx) {
    fieldIdx = *pFieldIdx;
    pField = taosArrayGet(array, fieldIdx);
S
shenglian zhou 已提交
135 136

    if (pField->type != smlKv->type) {
S
Shenglian Zhou 已提交
137 138
      tscError("type mismatch. key %s, type %d. type before %d", smlKv->key, smlKv->type, pField->type);
      return TSDB_CODE_TSC_INVALID_VALUE;
S
shenglian zhou 已提交
139 140 141
    }

    int32_t bytes = 0;
S
Shenglian Zhou 已提交
142 143 144 145
    code = getFieldBytesFromSmlKv(smlKv, &bytes);
    if (code != 0) {
      return code;
    }
S
shenglian zhou 已提交
146 147 148
    pField->bytes = MAX(pField->bytes, bytes);

  } else {
S
shenglian zhou 已提交
149
    SSchema field = {0};
S
shenglian zhou 已提交
150 151 152 153 154 155
    size_t tagKeyLen = strlen(smlKv->key);
    strncpy(field.name, smlKv->key, tagKeyLen);
    field.name[tagKeyLen] = '\0';
    field.type = smlKv->type;

    int32_t bytes = 0;
S
Shenglian Zhou 已提交
156 157 158 159
    code = getFieldBytesFromSmlKv(smlKv, &bytes);
    if (code != 0) {
      return code;
    }
S
shenglian zhou 已提交
160 161 162
    field.bytes = bytes;

    pField = taosArrayPush(array, &field);
163 164
    fieldIdx = taosArrayGetSize(array) - 1;
    taosHashPut(hash, field.name, tagKeyLen, &fieldIdx, sizeof(fieldIdx));
S
shenglian zhou 已提交
165
  }
166

167
  smlKv->fieldSchemaIdx = fieldIdx;
168

S
shenglian zhou 已提交
169 170 171
  return 0;
}

S
shenglian zhou 已提交
172
static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, SArray* stableSchemas) {
S
Shenglian Zhou 已提交
173
  int32_t code = 0;
S
shenglian zhou 已提交
174 175 176 177 178 179
  SHashObj* sname2shema = taosHashInit(32,
                                       taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);

  for (int i = 0; i < numPoint; ++i) {
    TAOS_SML_DATA_POINT* point = &points[i];
    size_t stableNameLen = strlen(point->stableName);
180
    size_t* pStableIdx = taosHashGet(sname2shema, point->stableName, stableNameLen);
S
shenglian zhou 已提交
181
    SSmlSTableSchema* pStableSchema = NULL;
182 183 184 185
    size_t stableIdx = -1;
    if (pStableIdx) {
      pStableSchema= taosArrayGet(stableSchemas, *pStableIdx);
      stableIdx = *pStableIdx;
S
shenglian zhou 已提交
186 187 188 189 190 191 192 193 194 195
    } else {
      SSmlSTableSchema schema;
      strncpy(schema.sTableName, point->stableName, stableNameLen);
      schema.sTableName[stableNameLen] = '\0';
      schema.fields = taosArrayInit(64, sizeof(SSchema));
      schema.tags = taosArrayInit(8, sizeof(SSchema));
      schema.tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
      schema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);

      pStableSchema = taosArrayPush(stableSchemas, &schema);
196 197
      stableIdx = taosArrayGetSize(stableSchemas) - 1;
      taosHashPut(sname2shema, schema.sTableName, stableNameLen, &stableIdx, sizeof(size_t));
S
shenglian zhou 已提交
198 199 200 201
    }

    for (int j = 0; j < point->tagNum; ++j) {
      TAOS_SML_KV* tagKv = point->tags + j;
S
Shenglian Zhou 已提交
202 203 204 205 206
      code = buildSmlKvSchema(tagKv, pStableSchema->tagHash, pStableSchema->tags);
      if (code != 0) {
        tscError("build data point schema failed. point no.: %d, tag key: %s", i, tagKv->key);
        return code;
      }
S
shenglian zhou 已提交
207 208 209 210
    }

    for (int j = 0; j < point->fieldNum; ++j) {
      TAOS_SML_KV* fieldKv = point->fields + j;
S
Shenglian Zhou 已提交
211 212 213 214 215
      code = buildSmlKvSchema(fieldKv, pStableSchema->fieldHash, pStableSchema->fields);
      if (code != 0) {
        tscError("build data point schema failed. point no.: %d, tag key: %s", i, fieldKv->key);
        return code;
      }
S
shenglian zhou 已提交
216 217
    }

218
    point->schemaIdx = stableIdx;
S
shenglian zhou 已提交
219 220 221 222 223 224 225 226 227 228
  }

  size_t numStables = taosArrayGetSize(stableSchemas);
  for (int32_t i = 0; i < numStables; ++i) {
    SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i);
    taosHashCleanup(schema->tagHash);
    taosHashCleanup(schema->fieldHash);
  }
  taosHashCleanup(sname2shema);

S
Shenglian Zhou 已提交
229 230 231 232 233 234 235
  tscDebug("build point schema succeed. num of super table: %zu", numStables);
  for (int32_t i = 0; i < numStables; ++i) {
    SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i);
    tscDebug("\ttable name: %s, tags number: %zu, fields number: %zu", schema->sTableName,
             taosArrayGetSize(schema->tags), taosArrayGetSize(schema->fields));
  }

S
shenglian zhou 已提交
236 237 238
  return 0;
}

239
static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, SArray* dbAttrArray, bool isTag, char sTableName[],
S
shenglian zhou 已提交
240
                                       SSchemaAction* action, bool* actionNeeded) {
241 242 243 244
  size_t* pDbIndex = taosHashGet(dbAttrHash, pointColField->name, strlen(pointColField->name));
  if (pDbIndex) {
    SSchema* dbAttr = taosArrayGet(dbAttrArray, *pDbIndex);
    assert(strcasecmp(dbAttr->name, pointColField->name) == 0);
S
shenglian zhou 已提交
245
    if (pointColField->type != dbAttr->type) {
S
Shenglian Zhou 已提交
246 247 248
      tscError("point type and db type mismatch. key: %s. point type: %d, db type: %d", pointColField->name,
               pointColField->type, dbAttr->type);
      return TSDB_CODE_TSC_INVALID_VALUE;
S
shenglian zhou 已提交
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
    }

    if (IS_VAR_DATA_TYPE(pointColField->type) && (pointColField->bytes > dbAttr->bytes)) {
      if (isTag) {
        action->action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
      } else {
        action->action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
      }
      memset(&action->alterSTable, 0,  sizeof(SAlterSTableActionInfo));
      memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN);
      action->alterSTable.field = pointColField;
      *actionNeeded = true;
    }
  } else {
    if (isTag) {
      action->action = SCHEMA_ACTION_ADD_TAG;
    } else {
      action->action = SCHEMA_ACTION_ADD_COLUMN;
    }
    memset(&action->alterSTable, 0, sizeof(SAlterSTableActionInfo));
    memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN);
    action->alterSTable.field = pointColField;
    *actionNeeded = true;
  }
S
Shenglian Zhou 已提交
273
  tscDebug("generate schema action. action needed: %d, action: %d", *actionNeeded, action->action);
S
shenglian zhou 已提交
274 275 276
  return 0;
}

S
shenglian zhou 已提交
277
static int32_t buildColumnDescription(SSchema* field,
S
shenglian zhou 已提交
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
                               char* buf, int32_t bufSize, int32_t* outBytes) {
  uint8_t type = field->type;

  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
    int32_t bytes = field->bytes - VARSTR_HEADER_SIZE;
    if (type == TSDB_DATA_TYPE_NCHAR) {
      bytes =  bytes/TSDB_NCHAR_SIZE;
    }
    int out = snprintf(buf, bufSize,"%s %s(%d)",
                       field->name,tDataTypes[field->type].name, bytes);
    *outBytes = out;
  } else {
    int out = snprintf(buf, bufSize, "%s %s",
                       field->name, tDataTypes[type].name);
    *outBytes = out;
  }

  return 0;
}

S
shenglian zhou 已提交
298 299

static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) {
S
shenglian zhou 已提交
300 301 302 303 304
  int32_t code = 0;
  int32_t capacity = TSDB_MAX_BINARY_LEN;
  int32_t outBytes = 0;
  char *result = (char *)calloc(1, capacity);

S
Shenglian Zhou 已提交
305
  tscDebug("apply schema action: %d", action->action);
S
shenglian zhou 已提交
306 307 308 309 310 311
  switch (action->action) {
    case SCHEMA_ACTION_ADD_COLUMN: {
      int n = sprintf(result, "alter stable %s add column ", action->alterSTable.sTableName);
      buildColumnDescription(action->alterSTable.field, result+n, capacity-n, &outBytes);
      TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
      code = taos_errno(res);
S
Shenglian Zhou 已提交
312
      taos_free_result(res);
S
shenglian zhou 已提交
313 314 315 316 317 318 319 320
      break;
    }
    case SCHEMA_ACTION_ADD_TAG: {
      int n = sprintf(result, "alter stable %s add tag ", action->alterSTable.sTableName);
      buildColumnDescription(action->alterSTable.field,
                             result+n, capacity-n, &outBytes);
      TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
      code = taos_errno(res);
S
Shenglian Zhou 已提交
321
      taos_free_result(res);
S
shenglian zhou 已提交
322 323 324 325 326 327 328 329
      break;
    }
    case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: {
      int n = sprintf(result, "alter stable %s modify column ", action->alterSTable.sTableName);
      buildColumnDescription(action->alterSTable.field, result+n,
                             capacity-n, &outBytes);
      TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
      code = taos_errno(res);
S
Shenglian Zhou 已提交
330
      taos_free_result(res);
331
      break;
S
shenglian zhou 已提交
332 333 334 335 336 337 338
    }
    case SCHEMA_ACTION_CHANGE_TAG_SIZE: {
      int n = sprintf(result, "alter stable %s modify tag ", action->alterSTable.sTableName);
      buildColumnDescription(action->alterSTable.field, result+n,
                             capacity-n, &outBytes);
      TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
      code = taos_errno(res);
S
Shenglian Zhou 已提交
339
      taos_free_result(res);
S
shenglian zhou 已提交
340 341 342 343 344
      break;
    }
    case SCHEMA_ACTION_CREATE_STABLE: {
      int n = sprintf(result, "create stable %s (", action->createSTable.sTableName);
      char* pos = result + n; int freeBytes = capacity - n;
S
shenglian zhou 已提交
345
      size_t numCols = taosArrayGetSize(action->createSTable.fields);
S
shenglian zhou 已提交
346 347 348 349 350 351 352
      for (int32_t i = 0; i < numCols; ++i) {
        SSchema* field = taosArrayGet(action->createSTable.fields, i);
        buildColumnDescription(field, pos, freeBytes, &outBytes);
        pos += outBytes; freeBytes -= outBytes;
        *pos = ','; ++pos; --freeBytes;
      }
      --pos; ++freeBytes;
353

S
shenglian zhou 已提交
354 355
      outBytes = snprintf(pos, freeBytes, ") tags (");
      pos += outBytes; freeBytes -= outBytes;
356

S
shenglian zhou 已提交
357
      size_t numTags = taosArrayGetSize(action->createSTable.tags);
S
shenglian zhou 已提交
358 359 360 361 362 363 364 365 366 367
      for (int32_t i = 0; i < numTags; ++i) {
        SSchema* field = taosArrayGet(action->createSTable.tags, i);
        buildColumnDescription(field, pos, freeBytes, &outBytes);
        pos += outBytes; freeBytes -= outBytes;
        *pos = ','; ++pos; --freeBytes;
      }
      pos--; ++freeBytes;
      outBytes = snprintf(pos, freeBytes, ")");
      TAOS_RES* res = taos_query(taos, result);
      code = taos_errno(res);
S
Shenglian Zhou 已提交
368
      taos_free_result(res);
S
shenglian zhou 已提交
369 370
      break;
    }
S
shenglian zhou 已提交
371

S
shenglian zhou 已提交
372 373 374
    default:
      break;
  }
S
Shenglian Zhou 已提交
375

S
shenglian zhou 已提交
376
  free(result);
S
Shenglian Zhou 已提交
377 378 379
  if (code != 0) {
    tscError("apply schema action failure. %s", tstrerror(code));
  }
S
shenglian zhou 已提交
380 381 382
  return code;
}

383
static int32_t destroySmlSTableSchema(SSmlSTableSchema* schema) {
S
shenglian zhou 已提交
384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399
  taosHashCleanup(schema->tagHash);
  taosHashCleanup(schema->fieldHash);
  taosArrayDestroy(schema->tags);
  taosArrayDestroy(schema->fields);
  return 0;
}

int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) {
  int32_t code = 0;

  STscObj *pObj = (STscObj *)taos;
  if (pObj == NULL || pObj->signature != pObj) {
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    return TSDB_CODE_TSC_DISCONNECTED;
  }

S
Shenglian Zhou 已提交
400 401
  tscDebug("load table schema. super table name: %s", tableName);

S
shenglian zhou 已提交
402 403 404 405 406
  char sql[256];
  snprintf(sql, 256, "describe %s", tableName);
  TAOS_RES* res = taos_query(taos, sql);
  code = taos_errno(res);
  if (code != 0) {
S
Shenglian Zhou 已提交
407
    tscError("describe table failure. %s", taos_errstr(res));
S
shenglian zhou 已提交
408 409 410 411 412 413 414 415 416 417
    taos_free_result(res);
    return code;
  }
  taos_free_result(res);

  SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
  pSql->pTscObj = taos;
  pSql->signature = pSql;
  pSql->fp = NULL;

S
shenglian zhou 已提交
418
  SStrToken tableToken = {.z=tableName, .n=(uint32_t)strlen(tableName), .type=TK_ID};
S
shenglian zhou 已提交
419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455
  tGetToken(tableName, &tableToken.type);
  // Check if the table name available or not
  if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) {
    code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
    sprintf(pSql->cmd.payload, "table name is invalid");
    return code;
  }

  SName sname = {0};
  if ((code = tscSetTableFullName(&sname, &tableToken, pSql)) != TSDB_CODE_SUCCESS) {
    return code;
  }
  char  fullTableName[TSDB_TABLE_FNAME_LEN] = {0};
  memset(fullTableName, 0, tListLen(fullTableName));
  tNameExtractFullName(&sname, fullTableName);
  if (code != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pSql);
    return code;
  }
  tscFreeSqlObj(pSql);

  schema->tags = taosArrayInit(8, sizeof(SSchema));
  schema->fields = taosArrayInit(64, sizeof(SSchema));
  schema->tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
  schema->fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);

  uint32_t size = tscGetTableMetaMaxSize();
  STableMeta* tableMeta = calloc(1, size);
  taosHashGetClone(tscTableMetaInfo, fullTableName, strlen(fullTableName), NULL, tableMeta, -1);

  tstrncpy(schema->sTableName, tableName, strlen(tableName)+1);
  schema->precision = tableMeta->tableInfo.precision;
  for (int i=0; i<tableMeta->tableInfo.numOfColumns; ++i) {
    SSchema field;
    tstrncpy(field.name, tableMeta->schema[i].name, strlen(tableMeta->schema[i].name)+1);
    field.type = tableMeta->schema[i].type;
    field.bytes = tableMeta->schema[i].bytes;
456 457 458
    taosArrayPush(schema->fields, &field);
    size_t fieldIndex = taosArrayGetSize(schema->fields) - 1;
    taosHashPut(schema->fieldHash, field.name, strlen(field.name), &fieldIndex, sizeof(fieldIndex));
S
shenglian zhou 已提交
459 460 461 462 463 464 465 466
  }

  for (int i=0; i<tableMeta->tableInfo.numOfTags; ++i) {
    int j = i + tableMeta->tableInfo.numOfColumns;
    SSchema field;
    tstrncpy(field.name, tableMeta->schema[j].name, strlen(tableMeta->schema[j].name)+1);
    field.type = tableMeta->schema[j].type;
    field.bytes = tableMeta->schema[j].bytes;
467 468 469
    taosArrayPush(schema->tags, &field);
    size_t tagIndex = taosArrayGetSize(schema->tags) - 1;
    taosHashPut(schema->tagHash, field.name, strlen(field.name), &tagIndex, sizeof(tagIndex));
S
shenglian zhou 已提交
470
  }
S
Shenglian Zhou 已提交
471 472
  tscDebug("load table meta succeed. %s, columns number: %d, tag number: %d, precision: %d",
           tableName, tableMeta->tableInfo.numOfColumns, tableMeta->tableInfo.numOfTags, schema->precision);
S
shenglian zhou 已提交
473 474 475 476 477 478 479 480 481
  free(tableMeta); tableMeta = NULL;
  return code;
}

static int32_t reconcileDBSchemas(TAOS* taos, SArray* stableSchemas) {
  int32_t code = 0;
  size_t numStable = taosArrayGetSize(stableSchemas);
  for (int i = 0; i < numStable; ++i) {
    SSmlSTableSchema* pointSchema = taosArrayGet(stableSchemas, i);
S
shenglian zhou 已提交
482 483
    SSmlSTableSchema  dbSchema;
    memset(&dbSchema, 0, sizeof(SSmlSTableSchema));
S
shenglian zhou 已提交
484 485 486 487 488 489 490 491 492 493 494

    code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema);
    if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) {
      SSchemaAction schemaAction = {0};
      schemaAction.action = SCHEMA_ACTION_CREATE_STABLE;
      memset(&schemaAction.createSTable, 0, sizeof(SCreateSTableActionInfo));
      memcpy(schemaAction.createSTable.sTableName, pointSchema->sTableName, TSDB_TABLE_NAME_LEN);
      schemaAction.createSTable.tags = pointSchema->tags;
      schemaAction.createSTable.fields = pointSchema->fields;
      applySchemaAction(taos, &schemaAction);
      code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema);
S
Shenglian Zhou 已提交
495 496 497 498 499 500
      if (code != 0) {
        tscError("reconcile point schema failed. can not create %s", pointSchema->sTableName);
      } else {
        pointSchema->precision = dbSchema.precision;
        destroySmlSTableSchema(&dbSchema);
      }
S
shenglian zhou 已提交
501 502 503 504 505 506 507 508 509 510 511
    } else if (code == TSDB_CODE_SUCCESS) {
      size_t pointTagSize = taosArrayGetSize(pointSchema->tags);
      size_t pointFieldSize = taosArrayGetSize(pointSchema->fields);

      SHashObj* dbTagHash = dbSchema.tagHash;
      SHashObj* dbFieldHash = dbSchema.fieldHash;

      for (int j = 0; j < pointTagSize; ++j) {
        SSchema* pointTag = taosArrayGet(pointSchema->tags, j);
        SSchemaAction schemaAction = {0};
        bool actionNeeded = false;
512
        generateSchemaAction(pointTag, dbTagHash, dbSchema.tags, true, pointSchema->sTableName, &schemaAction, &actionNeeded);
S
shenglian zhou 已提交
513 514 515 516 517 518 519 520 521 522 523 524 525
        if (actionNeeded) {
          applySchemaAction(taos, &schemaAction);
        }
      }

      SSchema* pointColTs = taosArrayGet(pointSchema->fields, 0);
      SSchema* dbColTs = taosArrayGet(dbSchema.fields, 0);
      memcpy(pointColTs->name, dbColTs->name, TSDB_COL_NAME_LEN);

      for (int j = 1; j < pointFieldSize; ++j) {
        SSchema* pointCol = taosArrayGet(pointSchema->fields, j);
        SSchemaAction schemaAction = {0};
        bool actionNeeded = false;
526
        generateSchemaAction(pointCol, dbFieldHash, dbSchema.fields,false, pointSchema->sTableName, &schemaAction, &actionNeeded);
S
shenglian zhou 已提交
527 528 529 530 531 532 533
        if (actionNeeded) {
          applySchemaAction(taos, &schemaAction);
        }
      }

      pointSchema->precision = dbSchema.precision;

534
      destroySmlSTableSchema(&dbSchema);
S
shenglian zhou 已提交
535
    } else {
S
Shenglian Zhou 已提交
536
      tscError("load table meta error: %s", tstrerror(code));
S
shenglian zhou 已提交
537 538 539 540 541 542 543
      return code;
    }
  }
  return 0;
}

static int32_t getChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen) {
544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561
  qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv);

  SStringBuilder sb; memset(&sb, 0, sizeof(sb));
  taosStringBuilderAppendString(&sb, point->stableName);
  for (int j = 0; j < point->tagNum; ++j) {
    taosStringBuilderAppendChar(&sb, ',');
    TAOS_SML_KV* tagKv = point->tags + j;
    taosStringBuilderAppendString(&sb, tagKv->key);
    taosStringBuilderAppendChar(&sb, '=');
    taosStringBuilderAppend(&sb, tagKv->value, tagKv->length);
  }
  size_t len = 0;
  char* keyJoined = taosStringBuilderGetResult(&sb, &len);
  MD5_CTX context;
  MD5Init(&context);
  MD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len);
  MD5Final(&context);
  *tableNameLen = snprintf(tableName, *tableNameLen,
S
Shenglian Zhou 已提交
562
                           "t_%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0],
563 564 565 566
                           context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6],
                           context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11],
                           context.digest[12], context.digest[13], context.digest[14], context.digest[15]);
  taosStringBuilderDestroy(&sb);
S
Shenglian Zhou 已提交
567
  tscDebug("child table name: %s", tableName);
568 569 570
  return 0;
}

S
shenglian zhou 已提交
571
static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, const char* sTableName, SArray* tagsSchema, SArray* tagsBind) {
S
shenglian zhou 已提交
572
  size_t numTags = taosArrayGetSize(tagsSchema);
S
shenglian zhou 已提交
573 574 575
  char sql[TSDB_MAX_BINARY_LEN] = {0};
  int freeBytes = TSDB_MAX_BINARY_LEN;
  sprintf(sql, "create table if not exists %s using %s", cTableName, sTableName);
576

S
shenglian zhou 已提交
577 578 579 580 581 582
  snprintf(sql+strlen(sql), freeBytes-strlen(sql), "(");
  for (int i = 0; i < numTags; ++i) {
    SSchema* tagSchema = taosArrayGet(tagsSchema, i);
    snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", tagSchema->name);
  }
  snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")");
583

S
shenglian zhou 已提交
584
  snprintf(sql + strlen(sql), freeBytes-strlen(sql), " tags (");
585

S
shenglian zhou 已提交
586
  for (int i = 0; i < numTags; ++i) {
S
shenglian zhou 已提交
587
    snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
S
shenglian zhou 已提交
588
  }
S
shenglian zhou 已提交
589 590
  snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")");

S
Shenglian Zhou 已提交
591 592
  tscDebug("create table : %s", sql);

S
shenglian zhou 已提交
593 594
  TAOS_STMT* stmt = taos_stmt_init(taos);
  int32_t code;
S
shenglian zhou 已提交
595
  code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql));
S
shenglian zhou 已提交
596
  if (code != 0) {
S
Shenglian Zhou 已提交
597
    tscError("%s", taos_stmt_errstr(stmt));
S
shenglian zhou 已提交
598 599 600 601 602
    return code;
  }

  code = taos_stmt_bind_param(stmt, TARRAY_GET_START(tagsBind));
  if (code != 0) {
S
Shenglian Zhou 已提交
603
    tscError("%s", taos_stmt_errstr(stmt));
S
shenglian zhou 已提交
604 605 606 607 608
    return code;
  }

  code = taos_stmt_execute(stmt);
  if (code != 0) {
S
Shenglian Zhou 已提交
609
    tscError("%s", taos_stmt_errstr(stmt));
S
shenglian zhou 已提交
610 611
    return code;
  }
S
shenglian zhou 已提交
612

613 614 615 616 617 618
  code = taos_stmt_close(stmt);
  if (code != 0) {
    tscError("%s", taos_stmt_errstr(stmt));
    return code;
  }
  return code;
S
shenglian zhou 已提交
619 620
}

S
shenglian zhou 已提交
621
static int32_t insertChildTableBatch(TAOS* taos,  char* cTableName, SArray* colsSchema, SArray* rowsBind) {
S
shenglian zhou 已提交
622
  size_t numCols = taosArrayGetSize(colsSchema);
S
shenglian zhou 已提交
623 624
  char sql[TSDB_MAX_BINARY_LEN];
  int32_t freeBytes = TSDB_MAX_BINARY_LEN;
S
shenglian zhou 已提交
625
  sprintf(sql, "insert into ? (");
626

S
shenglian zhou 已提交
627 628
  for (int i = 0; i < numCols; ++i) {
    SSchema* colSchema = taosArrayGet(colsSchema, i);
S
shenglian zhou 已提交
629
    snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", colSchema->name);
S
shenglian zhou 已提交
630
  }
S
shenglian zhou 已提交
631
  snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ") values (");
632

S
shenglian zhou 已提交
633
  for (int i = 0; i < numCols; ++i) {
S
shenglian zhou 已提交
634
    snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
635
  }
S
shenglian zhou 已提交
636
  snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ")");
637

S
Shenglian Zhou 已提交
638
  tscDebug("insert rows %zu into child table %s. ", taosArrayGetSize(rowsBind), cTableName);
W
wpan 已提交
639

640 641
  int32_t code = 0;
  int32_t try = 0;
S
shenglian zhou 已提交
642

W
wpan 已提交
643
  TAOS_STMT* stmt = taos_stmt_init(taos);
644

S
shenglian zhou 已提交
645
  code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql));
W
wpan 已提交
646
  if (code != 0) {
S
Shenglian Zhou 已提交
647
    tscError("%s", taos_stmt_errstr(stmt));
W
wpan 已提交
648 649 650 651
    return code;
  }

  do {
652 653

    code = taos_stmt_set_tbname(stmt, cTableName);
654
    if (code != 0) {
S
Shenglian Zhou 已提交
655
      tscError("%s", taos_stmt_errstr(stmt));
656 657
      return code;
    }
658

659 660 661 662 663
    size_t rows = taosArrayGetSize(rowsBind);
    for (int32_t i = 0; i < rows; ++i) {
      TAOS_BIND* colsBinds = taosArrayGetP(rowsBind, i);
      code = taos_stmt_bind_param(stmt, colsBinds);
      if (code != 0) {
S
Shenglian Zhou 已提交
664
        tscError("%s", taos_stmt_errstr(stmt));
665 666 667 668
        return code;
      }
      code = taos_stmt_add_batch(stmt);
      if (code != 0) {
S
Shenglian Zhou 已提交
669
        tscError("%s", taos_stmt_errstr(stmt));
670 671 672 673 674 675
        return code;
      }
    }

    code = taos_stmt_execute(stmt);
    if (code != 0) {
S
Shenglian Zhou 已提交
676
      tscError("%s", taos_stmt_errstr(stmt));
677 678
    }
  } while (code == TSDB_CODE_TDB_TABLE_RECONFIGURE && try++ < TSDB_MAX_REPLICA);
S
shenglian zhou 已提交
679

W
wpan 已提交
680
  if (code != 0) {
S
Shenglian Zhou 已提交
681
    tscError("%s", taos_stmt_errstr(stmt));
W
wpan 已提交
682 683 684 685 686
    taos_stmt_close(stmt);
  } else {
    taos_stmt_close(stmt);
  }

S
shenglian zhou 已提交
687
  return code;
S
shenglian zhou 已提交
688 689
}

690 691
static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int numPoints,
                                             SHashObj* cname2points, SArray* stableSchemas) {
S
shenglian zhou 已提交
692 693 694 695
  for (int32_t i = 0; i < numPoints; ++i) {
    TAOS_SML_DATA_POINT * point = points + i;
    if (!point->childTableName) {
      char childTableName[TSDB_TABLE_NAME_LEN];
696
      int32_t tableNameLen = TSDB_TABLE_NAME_LEN;
S
shenglian zhou 已提交
697 698 699 700 701
      getChildTableName(point, childTableName, &tableNameLen);
      point->childTableName = calloc(1, tableNameLen+1);
      strncpy(point->childTableName, childTableName, tableNameLen);
      point->childTableName[tableNameLen] = '\0';
    }
S
shenglian zhou 已提交
702

703 704
    SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, point->schemaIdx);

S
shenglian zhou 已提交
705 706 707 708
    for (int j = 0; j < point->tagNum; ++j) {
      TAOS_SML_KV* kv =  point->tags + j;
      if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) {
        int64_t ts = *(int64_t*)(kv->value);
709
        ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision);
S
shenglian zhou 已提交
710 711 712 713 714 715 716 717
        *(int64_t*)(kv->value) = ts;
      }
    }

    for (int j = 0; j < point->fieldNum; ++j) {
      TAOS_SML_KV* kv =  point->fields + j;
      if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) {
        int64_t ts = *(int64_t*)(kv->value);
718
        ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision);
S
shenglian zhou 已提交
719 720 721 722
        *(int64_t*)(kv->value) = ts;
      }
    }

S
shenglian zhou 已提交
723 724 725 726 727 728 729 730
    SArray* cTablePoints = NULL;
    SArray** pCTablePoints = taosHashGet(cname2points, point->childTableName, strlen(point->childTableName));
    if (pCTablePoints) {
      cTablePoints = *pCTablePoints;
    } else {
      cTablePoints = taosArrayInit(64, sizeof(point));
      taosHashPut(cname2points, point->childTableName, strlen(point->childTableName), &cTablePoints, POINTER_BYTES);
    }
731
    taosArrayPush(cTablePoints, &point);
732 733
  }

S
shenglian zhou 已提交
734 735 736
  return 0;
}

737 738 739
static int32_t insertPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas) {
  int32_t code = TSDB_CODE_SUCCESS;

S
shenglian zhou 已提交
740 741
  SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
                                        true, false);
742
  arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas);
S
shenglian zhou 已提交
743

744
  int isNullColBind = TSDB_TRUE;
S
shenglian zhou 已提交
745 746 747
  SArray** pCTablePoints = taosHashIterate(cname2points, NULL);
  while (pCTablePoints) {
    SArray* cTablePoints = *pCTablePoints;
748 749

    TAOS_SML_DATA_POINT * point = taosArrayGetP(cTablePoints, 0);
750 751 752
    SSmlSTableSchema* sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
    size_t numTags = taosArrayGetSize(sTableSchema->tags);
    size_t numCols = taosArrayGetSize(sTableSchema->fields);
753 754 755 756 757 758 759 760 761

    SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND));
    taosArraySetSize(tagBinds, numTags);
    for (int j = 0; j < numTags; ++j) {
      TAOS_BIND* bind = taosArrayGet(tagBinds, j);
      bind->is_null = &isNullColBind;
    }
    for (int j = 0; j < point->tagNum; ++j) {
      TAOS_SML_KV* kv =  point->tags + j;
762
      TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx);
763
      bind->buffer_type = kv->type;
764 765
      bind->length = malloc(sizeof(uintptr_t*));
      *bind->length = kv->length;
766 767 768
      bind->buffer = kv->value;
      bind->is_null = NULL;
    }
S
shenglian zhou 已提交
769 770 771 772 773

    size_t rows = taosArrayGetSize(cTablePoints);
    SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES);

    for (int i = 0; i < rows; ++i) {
774
      point = taosArrayGetP(cTablePoints, i);
S
shenglian zhou 已提交
775

S
shenglian zhou 已提交
776
      TAOS_BIND* colBinds = calloc(numCols, sizeof(TAOS_BIND));
777
      for (int j = 0; j < numCols; ++j) {
S
shenglian zhou 已提交
778
        TAOS_BIND* bind = colBinds + j;
779 780 781
        bind->is_null = &isNullColBind;
      }
      for (int j = 0; j < point->fieldNum; ++j) {
S
shenglian zhou 已提交
782
        TAOS_SML_KV* kv = point->fields + j;
783
        TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx;
S
shenglian zhou 已提交
784
        bind->buffer_type = kv->type;
785 786
        bind->length = malloc(sizeof(uintptr_t*));
        *bind->length = kv->length;
S
shenglian zhou 已提交
787
        bind->buffer = kv->value;
788
        bind->is_null = NULL;
S
shenglian zhou 已提交
789 790 791 792
      }
      taosArrayPush(rowsBind, &colBinds);
    }

793 794 795 796 797 798 799 800 801 802
    code = creatChildTableIfNotExists(taos, point->childTableName, point->stableName, sTableSchema->tags, tagBinds);
    if (code == 0) {
      code = insertChildTableBatch(taos, point->childTableName, sTableSchema->fields, rowsBind);
      if (code != 0) {
        tscError("insert into child table %s failed. error %s", point->childTableName, tstrerror(code));
      }
    } else {
      tscError("Create Child Table %s failed, error %s", point->childTableName, tstrerror(code));
    }

S
shenglian zhou 已提交
803 804 805 806 807 808 809 810 811 812 813
    for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) {
      TAOS_BIND* bind = taosArrayGet(tagBinds, i);
      free(bind->length);
    }
    taosArrayDestroy(tagBinds);
    for (int i = 0; i < rows; ++i) {
      TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i);
      for (int j = 0; j < numCols; ++j) {
        TAOS_BIND* bind = colBinds + j;
        free(bind->length);
      }
S
Shenglian Zhou 已提交
814
      free(colBinds);
S
shenglian zhou 已提交
815 816 817
    }
    taosArrayDestroy(rowsBind);
    taosArrayDestroy(cTablePoints);
818 819 820
    if (code != 0) {
      break;
    }
S
shenglian zhou 已提交
821
    pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
822
  }
S
shenglian zhou 已提交
823 824

  taosHashCleanup(cname2points);
825
  return code;
S
shenglian zhou 已提交
826
}
827

S
shenglian zhou 已提交
828
int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
S
Shenglian Zhou 已提交
829 830
  tscDebug("taos_sml_insert. number of points: %d", numPoint);

S
shenglian zhou 已提交
831 832
  int32_t code = TSDB_CODE_SUCCESS;

S
shenglian zhou 已提交
833
  SArray* stableSchemas = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray<STableColumnsSchema>
S
shenglian zhou 已提交
834 835 836 837 838 839 840 841 842 843 844
  code = buildDataPointSchemas(points, numPoint, stableSchemas);
  if (code != 0) {
    tscError("error building data point schemas : %s", tstrerror(code));
    goto clean_up;
  }

  code = reconcileDBSchemas(taos, stableSchemas);
  if (code != 0) {
    tscError("error change db schema : %s", tstrerror(code));
    goto clean_up;
  }
S
shenglian zhou 已提交
845

846
  code = insertPoints(taos, points, numPoint, stableSchemas);
S
shenglian zhou 已提交
847 848 849 850 851
  if (code != 0) {
    tscError("error insert points : %s", tstrerror(code));
  }

clean_up:
S
shenglian zhou 已提交
852 853 854 855
  for (int i = 0; i < taosArrayGetSize(stableSchemas); ++i) {
    SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i);
    taosArrayDestroy(schema->fields);
    taosArrayDestroy(schema->tags);
856
  }
S
shenglian zhou 已提交
857
  taosArrayDestroy(stableSchemas);
858 859
  return code;
}
S
shenglian zhou 已提交
860

861 862
//=========================================================================

863 864 865 866 867
/*        Field                          Escape charaters
    1: measurement                        Comma,Space
    2: tag_key, tag_value, field_key  Comma,Equal Sign,Space
    3: field_value                    Double quote,Backslash
*/
868
static void escapeSpecialCharacter(uint8_t field, const char **pos) {
869 870 871
  const char *cur = *pos;
  if (*cur != '\\') {
    return;
872
  }
873 874 875 876 877 878
  switch (field) {
    case 1:
      switch (*(cur + 1)) {
        case ',':
        case ' ':
          cur++;
879
          break;
880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906
        default:
          break;
      }
      break;
    case 2:
      switch (*(cur + 1)) {
        case ',':
        case ' ':
        case '=':
          cur++;
          break;
        default:
          break;
      }
      break;
    case 3:
      switch (*(cur + 1)) {
        case '"':
        case '\\':
          cur++;
          break;
        default:
          break;
      }
      break;
    default:
      break;
907
  }
908
  *pos = cur;
909
}
910

911
static bool isValidInteger(char *str) {
912 913
  char *c = str;
  if (*c != '+' && *c != '-' && !isdigit(*c)) {
914 915
    return false;
  }
916 917 918 919 920 921
  c++;
  while (*c != '\0') {
    if (!isdigit(*c)) {
      return false;
    }
    c++;
922
  }
923
  return true;
924
}
925

926
static bool isValidFloat(char *str) {
927 928 929 930 931 932 933
  char *c = str;
  uint8_t has_dot, has_exp, has_sign;
  has_dot = 0;
  has_exp = 0;
  has_sign = 0;

  if (*c != '+' && *c != '-' && *c != '.' && !isdigit(*c)) {
934 935
    return false;
  }
936 937
  if (*c == '.' && isdigit(*(c + 1))) {
    has_dot = 1;
938
  }
939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979
  c++;
  while (*c != '\0') {
    if (!isdigit(*c)) {
      switch (*c) {
        case '.': {
          if (!has_dot && !has_exp && isdigit(*(c + 1))) {
            has_dot = 1;
          } else {
            return false;
          }
          break;
        }
        case 'e':
        case 'E': {
          if (!has_exp && isdigit(*(c - 1)) &&
              (isdigit(*(c + 1)) ||
               *(c + 1) == '+' ||
               *(c + 1) == '-')) {
            has_exp = 1;
          } else {
            return false;
          }
          break;
        }
        case '+':
        case '-': {
          if (!has_sign && has_exp && isdigit(*(c + 1))) {
            has_sign = 1;
          } else {
            return false;
          }
          break;
        }
        default: {
          return false;
        }
      }
    }
    c++;
  } //while
  return true;
980
}
981

982
static bool isTinyInt(char *pVal, uint16_t len) {
983 984 985 986
  if (len <= 2) {
    return false;
  }
  if (!strcmp(&pVal[len - 2], "i8")) {
987
    //printf("Type is int8(%s)\n", pVal);
988 989 990 991
    return true;
  }
  return false;
}
992

993
static bool isTinyUint(char *pVal, uint16_t len) {
994 995 996 997 998
  if (len <= 2) {
    return false;
  }
  if (pVal[0] == '-') {
    return false;
999
  }
1000
  if (!strcmp(&pVal[len - 2], "u8")) {
1001
    //printf("Type is uint8(%s)\n", pVal);
1002 1003 1004
    return true;
  }
  return false;
1005 1006
}

1007
static bool isSmallInt(char *pVal, uint16_t len) {
1008 1009 1010 1011
  if (len <= 3) {
    return false;
  }
  if (!strcmp(&pVal[len - 3], "i16")) {
1012
    //printf("Type is int16(%s)\n", pVal);
1013
    return true;
1014
  }
1015
  return false;
1016 1017
}

1018
static bool isSmallUint(char *pVal, uint16_t len) {
1019 1020
  if (len <= 3) {
    return false;
1021
  }
1022 1023 1024 1025
  if (pVal[0] == '-') {
    return false;
  }
  if (strcmp(&pVal[len - 3], "u16") == 0) {
1026
    //printf("Type is uint16(%s)\n", pVal);
1027 1028 1029
    return true;
  }
  return false;
1030 1031
}

1032
static bool isInt(char *pVal, uint16_t len) {
1033 1034
  if (len <= 3) {
    return false;
1035
  }
1036
  if (strcmp(&pVal[len - 3], "i32") == 0) {
1037
    //printf("Type is int32(%s)\n", pVal);
1038 1039 1040
    return true;
  }
  return false;
1041 1042
}

1043
static bool isUint(char *pVal, uint16_t len) {
1044 1045 1046 1047 1048 1049 1050
  if (len <= 3) {
    return false;
  }
  if (pVal[0] == '-') {
    return false;
  }
  if (strcmp(&pVal[len - 3], "u32") == 0) {
1051
    //printf("Type is uint32(%s)\n", pVal);
1052 1053 1054
    return true;
  }
  return false;
1055 1056
}

1057
static bool isBigInt(char *pVal, uint16_t len) {
1058 1059
  if (len <= 3) {
    return false;
1060
  }
1061
  if (strcmp(&pVal[len - 3], "i64") == 0) {
1062
    //printf("Type is int64(%s)\n", pVal);
1063 1064 1065
    return true;
  }
  return false;
1066 1067
}

1068
static bool isBigUint(char *pVal, uint16_t len) {
1069 1070 1071 1072 1073
  if (len <= 3) {
    return false;
  }
  if (pVal[0] == '-') {
    return false;
1074
  }
1075
  if (strcmp(&pVal[len - 3], "u64") == 0) {
1076
    //printf("Type is uint64(%s)\n", pVal);
1077 1078 1079
    return true;
  }
  return false;
1080 1081
}

1082
static bool isFloat(char *pVal, uint16_t len) {
1083 1084 1085 1086
  if (len <= 3) {
    return false;
  }
  if (strcmp(&pVal[len - 3], "f32") == 0) {
1087
    //printf("Type is float(%s)\n", pVal);
1088 1089 1090
    return true;
  }
  return false;
1091 1092
}

1093
static bool isDouble(char *pVal, uint16_t len) {
1094 1095 1096 1097
  if (len <= 3) {
    return false;
  }
  if (strcmp(&pVal[len - 3], "f64") == 0) {
1098
    //printf("Type is double(%s)\n", pVal);
1099 1100 1101 1102 1103
    return true;
  }
  return false;
}

1104
static bool isBool(char *pVal, uint16_t len, bool *bVal) {
1105 1106 1107
  if ((len == 1) &&
      (pVal[len - 1] == 't' ||
       pVal[len - 1] == 'T')) {
1108 1109
    //printf("Type is bool(%c)\n", pVal[len - 1]);
    *bVal = true;
1110
    return true;
1111
  }
1112 1113 1114 1115

  if ((len == 1) &&
      (pVal[len - 1] == 'f' ||
       pVal[len - 1] == 'F')) {
1116 1117
    //printf("Type is bool(%c)\n", pVal[len - 1]);
    *bVal = false;
1118
    return true;
1119
  }
1120 1121 1122 1123 1124

  if((len == 4) &&
     (!strcmp(&pVal[len - 4], "true") ||
      !strcmp(&pVal[len - 4], "True") ||
      !strcmp(&pVal[len - 4], "TRUE"))) {
1125 1126
    //printf("Type is bool(%s)\n", &pVal[len - 4]);
    *bVal = true;
1127 1128 1129 1130 1131 1132
    return true;
  }
  if((len == 5) &&
     (!strcmp(&pVal[len - 5], "false") ||
      !strcmp(&pVal[len - 5], "False") ||
      !strcmp(&pVal[len - 5], "FALSE"))) {
1133 1134
    //printf("Type is bool(%s)\n", &pVal[len - 5]);
    *bVal = false;
1135 1136 1137
    return true;
  }
  return false;
1138 1139
}

1140
static bool isBinary(char *pVal, uint16_t len) {
1141 1142 1143 1144 1145 1146
  //binary: "abc"
  if (len < 2) {
    return false;
  }
  //binary
  if (pVal[0] == '"' && pVal[len - 1] == '"') {
1147
    //printf("Type is binary(%s)\n", pVal);
1148 1149 1150 1151
    return true;
  }
  return false;
}
1152

1153
static bool isNchar(char *pVal, uint16_t len) {
1154 1155
  //nchar: L"abc"
  if (len < 3) {
1156 1157
    return false;
  }
1158
  if (pVal[0] == 'L' && pVal[1] == '"' && pVal[len - 1] == '"') {
1159
    //printf("Type is nchar(%s)\n", pVal);
1160
    return true;
1161
  }
1162 1163 1164
  return false;
}

1165
static bool isTimeStamp(char *pVal, uint16_t len, SMLTimeStampType *tsType) {
1166 1167 1168 1169 1170
  if (len == 0) {
    return true;
  }
  if ((len == 1) && pVal[0] == '0') {
    *tsType = SML_TIME_STAMP_NOW;
1171
    //printf("Type is timestamp(%s)\n", pVal);
1172 1173 1174 1175 1176 1177 1178 1179
    return true;
  }
  if (len < 2) {
    return false;
  }
  //No appendix use usec as default
  if (isdigit(pVal[len - 1]) && isdigit(pVal[len - 2])) {
    *tsType = SML_TIME_STAMP_MICRO_SECONDS;
1180
    //printf("Type is timestamp(%s)\n", pVal);
1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196
    return true;
  }
  if (pVal[len - 1] == 's') {
    switch (pVal[len - 2]) {
      case 'm':
        *tsType = SML_TIME_STAMP_MILLI_SECONDS;
        break;
      case 'u':
        *tsType = SML_TIME_STAMP_MICRO_SECONDS;
        break;
      case 'n':
        *tsType = SML_TIME_STAMP_NANO_SECONDS;
        break;
      default:
        if (isdigit(pVal[len - 2])) {
          *tsType = SML_TIME_STAMP_SECONDS;
1197
          break;
1198
        } else {
1199 1200 1201
          return false;
        }
    }
1202
    //printf("Type is timestamp(%s)\n", pVal);
1203 1204 1205 1206
    return true;
  }
  return false;
}
1207

1208
//len does not include '\0' from value.
1209 1210
static bool convertSmlValueType(TAOS_SML_KV *pVal, char *value,
                                uint16_t len) {
1211 1212 1213
  if (len <= 0) {
    return false;
  }
G
Ganlin Zhao 已提交
1214

1215
  //integer number
1216
  if (isTinyInt(value, len)) {
1217 1218 1219
    pVal->type = TSDB_DATA_TYPE_TINYINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 2] = '\0';
1220
    if (!isValidInteger(value)) {
1221
      return false;
1222 1223 1224 1225 1226 1227
    }
    pVal->value = calloc(pVal->length, 1);
    int8_t val = (int8_t)strtoll(value, NULL, 10);
    memcpy(pVal->value, &val, pVal->length);
    return true;
  }
1228
  if (isTinyUint(value, len)) {
1229 1230 1231
    pVal->type = TSDB_DATA_TYPE_UTINYINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 2] = '\0';
1232
    if (!isValidInteger(value)) {
1233 1234 1235 1236 1237 1238 1239
      return false;
    }
    pVal->value = calloc(pVal->length, 1);
    uint8_t val = (uint8_t)strtoul(value, NULL, 10);
    memcpy(pVal->value, &val, pVal->length);
    return true;
  }
1240
  if (isSmallInt(value, len)) {
1241 1242 1243
    pVal->type = TSDB_DATA_TYPE_SMALLINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1244
    if (!isValidInteger(value)) {
1245 1246 1247 1248 1249 1250 1251
      return false;
    }
    pVal->value = calloc(pVal->length, 1);
    int16_t val = (int16_t)strtoll(value, NULL, 10);
    memcpy(pVal->value, &val, pVal->length);
    return true;
  }
1252
  if (isSmallUint(value, len)) {
1253 1254 1255
    pVal->type = TSDB_DATA_TYPE_USMALLINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1256
    if (!isValidInteger(value)) {
1257 1258 1259 1260 1261 1262 1263 1264
      return false;
    }
    pVal->value = calloc(pVal->length, 1);
    uint16_t val = (uint16_t)strtoul(value, NULL, 10);
    memcpy(pVal->value, &val, pVal->length);
    //memcpy(pVal->value, &val, pVal->length);
    return true;
  }
1265
  if (isInt(value, len)) {
1266 1267 1268
    pVal->type = TSDB_DATA_TYPE_INT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1269
    if (!isValidInteger(value)) {
1270 1271 1272 1273 1274 1275 1276
      return false;
    }
    pVal->value = calloc(pVal->length, 1);
    int32_t val = (int32_t)strtoll(value, NULL, 10);
    memcpy(pVal->value, &val, pVal->length);
    return true;
  }
1277
  if (isUint(value, len)) {
1278 1279 1280
    pVal->type = TSDB_DATA_TYPE_UINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1281
    if (!isValidInteger(value)) {
1282 1283 1284 1285 1286 1287 1288
      return false;
    }
    pVal->value = calloc(pVal->length, 1);
    uint32_t val = (uint32_t)strtoul(value, NULL, 10);
    memcpy(pVal->value, &val, pVal->length);
    return true;
  }
1289
  if (isBigInt(value, len)) {
1290 1291 1292
    pVal->type = TSDB_DATA_TYPE_BIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1293
    if (!isValidInteger(value)) {
1294 1295 1296 1297 1298 1299 1300
      return false;
    }
    pVal->value = calloc(pVal->length, 1);
    int64_t val = (int64_t)strtoll(value, NULL, 10);
    memcpy(pVal->value, &val, pVal->length);
    return true;
  }
1301
  if (isBigUint(value, len)) {
1302 1303 1304
    pVal->type = TSDB_DATA_TYPE_UBIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1305
    if (!isValidInteger(value)) {
1306 1307 1308 1309 1310 1311 1312
      return false;
    }
    pVal->value = calloc(pVal->length, 1);
    uint64_t val = (uint64_t)strtoul(value, NULL, 10);
    memcpy(pVal->value, &val, pVal->length);
    return true;
  }
1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364
  //floating number
  if (isFloat(value, len)) {
    pVal->type = TSDB_DATA_TYPE_FLOAT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
    if (!isValidFloat(value)) {
      return false;
    }
    pVal->value = calloc(pVal->length, 1);
    float val = (float)strtold(value, NULL);
    memcpy(pVal->value, &val, pVal->length);
    return true;
  }
  if (isDouble(value, len)) {
    pVal->type = TSDB_DATA_TYPE_DOUBLE;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
    if (!isValidFloat(value)) {
      return false;
    }
    pVal->value = calloc(pVal->length, 1);
    double val = (double)strtold(value, NULL);
    memcpy(pVal->value, &val, pVal->length);
    return true;
  }
  //binary
  if (isBinary(value, len)) {
    pVal->type = TSDB_DATA_TYPE_BINARY;
    pVal->length = len - 2;
    pVal->value = calloc(pVal->length, 1);
    //copy after "
    memcpy(pVal->value, value + 1, pVal->length);
    return true;
  }
  //nchar
  if (isNchar(value, len)) {
    pVal->type = TSDB_DATA_TYPE_NCHAR;
    pVal->length = len - 3;
    pVal->value = calloc(pVal->length, 1);
    //copy after L"
    memcpy(pVal->value, value + 2, pVal->length);
    return true;
  }
  //bool
  bool bVal;
  if (isBool(value, len, &bVal)) {
    pVal->type = TSDB_DATA_TYPE_BOOL;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->value = calloc(pVal->length, 1);
    memcpy(pVal->value, &bVal, pVal->length);
    return true;
  }
G
Ganlin Zhao 已提交
1365 1366 1367 1368 1369 1370 1371 1372 1373
  //Handle default(no appendix) as float
  if (isValidInteger(value) || isValidFloat(value)) {
    pVal->type = TSDB_DATA_TYPE_FLOAT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->value = calloc(pVal->length, 1);
    float val = (float)strtold(value, NULL);
    memcpy(pVal->value, &val, pVal->length);
    return true;
  }
1374 1375
  return  false;
}
1376

1377 1378
static int32_t getTimeStampValue(char *value, uint16_t len,
                                 SMLTimeStampType type, int64_t *ts) {
1379 1380 1381 1382 1383

  if (len >= 2) {
    for (int i = 0; i < len - 2; ++i) {
      if(!isdigit(value[i])) {
        return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
1384
      }
1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397
    }
  }
  //No appendix or no timestamp given (len = 0)
  if (len >= 1 && isdigit(value[len - 1]) && type != SML_TIME_STAMP_NOW) {
    type = SML_TIME_STAMP_MICRO_SECONDS;
  }
  if (len != 0) {
    *ts = (int64_t)strtoll(value, NULL, 10);
  } else {
    type = SML_TIME_STAMP_NOW;
  }
  switch (type) {
    case SML_TIME_STAMP_NOW: {
1398
      *ts = taosGetTimestampNs();
1399
      break;
1400 1401
    }
    case SML_TIME_STAMP_SECONDS: {
1402
      *ts = (int64_t)(*ts * 1e9);
1403
      break;
1404 1405
    }
    case SML_TIME_STAMP_MILLI_SECONDS: {
1406
      *ts = convertTimePrecision(*ts, TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_NANO);
1407
      break;
1408 1409
    }
    case SML_TIME_STAMP_MICRO_SECONDS: {
1410
      *ts = convertTimePrecision(*ts, TSDB_TIME_PRECISION_MICRO, TSDB_TIME_PRECISION_NANO);
1411
      break;
1412 1413
    }
    case SML_TIME_STAMP_NANO_SECONDS: {
1414
      *ts = *ts * 1;
1415 1416 1417 1418 1419
      break;
    }
    default: {
      return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
    }
1420
  }
1421
  return TSDB_CODE_SUCCESS;
1422 1423
}

1424 1425
static int32_t convertSmlTimeStamp(TAOS_SML_KV *pVal, char *value,
                                   uint16_t len) {
1426 1427 1428
  int32_t ret;
  SMLTimeStampType type;
  int64_t tsVal;
1429

1430
  if (!isTimeStamp(value, len, &type)) {
1431
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
1432 1433
  }

1434
  ret = getTimeStampValue(value, len, type, &tsVal);
1435 1436 1437
  if (ret) {
    return ret;
  }
1438
  tscDebug("Timestamp after conversion:%"PRId64, tsVal);
1439 1440 1441 1442 1443 1444 1445 1446

  pVal->type = TSDB_DATA_TYPE_TIMESTAMP;
  pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
  pVal->value = calloc(pVal->length, 1);
  memcpy(pVal->value, &tsVal, pVal->length);
  return TSDB_CODE_SUCCESS;
}

1447
static int32_t parseSmlTimeStamp(TAOS_SML_KV **pTS, const char **index) {
1448
  const char *start, *cur;
1449 1450 1451
  int32_t ret = TSDB_CODE_SUCCESS;
  int len = 0;
  char key[] = "_ts";
1452
  char *value = NULL;
1453

1454
  start = cur = *index;
1455
  *pTS = calloc(1, sizeof(TAOS_SML_KV));
1456

1457
  while(*cur != '\0') {
1458 1459 1460 1461
    cur++;
    len++;
  }

1462
  if (len > 0) {
1463
    value = calloc(len+1, 1);
1464 1465 1466 1467 1468
    memcpy(value, start, len);
  }

  ret = convertSmlTimeStamp(*pTS, value, len);
  if (ret) {
1469
    free(value);
1470 1471
    free(*pTS);
    return ret;
1472
  }
1473
  free(value);
1474

1475 1476 1477
  (*pTS)->key = calloc(sizeof(key), 1);
  memcpy((*pTS)->key, key, sizeof(key));
  return ret;
1478
}
1479

1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497
static bool getChildTableNameFromTags(TAOS_SML_DATA_POINT *pData) {
  TAOS_SML_KV *pTags = pData->tags;
  int tagNum = pData->tagNum;
  char *childTableName = pData->childTableName;

  for (int i = 0; i < tagNum; ++i) {
    //use tag value as child table name if key is "ID"
    //tag value has to be binary for now
    if (!strcasecmp(pTags->key, "ID") && pTags->type == TSDB_DATA_TYPE_BINARY) {
      memcpy(childTableName, pTags->value, pTags->length);
      return true;
    }
    pTags++;
  }
  return false;
}

static int32_t parseSmlKey(TAOS_SML_KV *pKV, const char **index) {
1498 1499 1500 1501
  const char *cur = *index;
  char key[TSDB_COL_NAME_LEN];
  uint16_t len = 0;

G
Ganlin Zhao 已提交
1502 1503 1504
  //key field cannot start with digit
  if (isdigit(*cur)) {
    tscError("Tag key cannnot start with digit\n");
1505
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
1506 1507 1508
  }
  while (*cur != '\0') {
    if (len > TSDB_COL_NAME_LEN) {
1509
      tscDebug("Key field cannot exceeds 65 characters");
1510
      return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
1511 1512 1513 1514 1515 1516 1517
    }
    //unescaped '=' identifies a tag key
    if (*cur == '=' && *(cur - 1) != '\\') {
      break;
    }
    //Escape special character
    if (*cur == '\\') {
1518
      escapeSpecialCharacter(2, &cur);
1519
    }
1520 1521 1522 1523 1524
    key[len] = *cur;
    cur++;
    len++;
  }
  key[len] = '\0';
1525

1526 1527
  pKV->key = calloc(len + 1, 1);
  memcpy(pKV->key, key, len + 1);
G
Ganlin Zhao 已提交
1528
  //tscDebug("Key:%s|len:%d", pKV->key, len);
1529
  *index = cur + 1;
1530
  return TSDB_CODE_SUCCESS;
1531
}
1532

1533

1534 1535
static bool parseSmlValue(TAOS_SML_KV *pKV, const char **index,
                          bool *is_last_kv) {
1536 1537
  const char *start, *cur;
  char *value = NULL;
1538
  uint16_t len = 0;
1539 1540
  start = cur = *index;

1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558
  while (1) {
    // unescaped ',' or ' ' or '\0' identifies a value
    if ((*cur == ',' || *cur == ' ' || *cur == '\0') && *(cur - 1) != '\\') {
      value = calloc(len + 1, 1);
      memcpy(value, start, len);
      value[len] = '\0';
      if (!convertSmlValueType(pKV, value, len)) {
        free(value);
        return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
      }
      //unescaped ' ' or '\0' indicates end of value
      *is_last_kv = (*cur == ' ' || *cur == '\0') ? true : false;
      break;
    }
    //Escape special character
    if (*cur == '\\') {
      escapeSpecialCharacter(2, &cur);
    }
1559 1560 1561
    cur++;
    len++;
  }
1562

1563 1564
  if (value) {
    free(value);
1565 1566
  }

1567 1568 1569 1570 1571 1572 1573 1574 1575 1576
  *index = (*cur == '\0') ? cur : cur + 1;
  return TSDB_CODE_SUCCESS;
}

static int32_t parseSmlMeasurement(TAOS_SML_DATA_POINT *pSml, const char **index,
                                   uint8_t *has_tags) {
  const char *cur = *index;
  uint16_t len = 0;

  pSml->stableName = calloc(TSDB_TABLE_NAME_LEN, 1);
G
Ganlin Zhao 已提交
1577 1578
  if (isdigit(*cur)) {
    tscError("Measurement field cannnot start with digit");
1579 1580
    free(pSml->stableName);
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
1581 1582
  }

1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610
  while (*cur != '\0') {
    if (len > TSDB_TABLE_NAME_LEN) {
      tscError("Measurement field cannot exceeds 193 characters");
      free(pSml->stableName);
      return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
    }
    //first unescaped comma or space identifies measurement
    //if space detected first, meaning no tag in the input
    if (*cur == ',' && *(cur - 1) != '\\') {
      *has_tags = 1;
      break;
    }
    if (*cur == ' ' && *(cur - 1) != '\\') {
      break;
    }
    //Comma, Space, Backslash needs to be escaped if any
    if (*cur == '\\') {
      escapeSpecialCharacter(1, &cur);
    }
    pSml->stableName[len] = *cur;
    cur++;
    len++;
  }
  pSml->stableName[len] = '\0';
  *index = cur + 1;
  tscDebug("Stable name in measurement:%s|len:%d", pSml->stableName, len);

  return TSDB_CODE_SUCCESS;
1611
}
1612

1613 1614
static int32_t parseSmlKvPairs(TAOS_SML_KV **pKVs, int *num_kvs,
                               const char **index, bool isField) {
1615
  const char *cur = *index;
1616
  int32_t ret = TSDB_CODE_SUCCESS;
1617 1618
  TAOS_SML_KV *pkv;
  bool is_last_kv = false;
1619

1620
  int32_t capacity = 0;
1621
  if (isField) {
1622 1623 1624
    capacity = 64;
    *pKVs = calloc(capacity, sizeof(TAOS_SML_KV));
    // leave space for timestamp;
1625 1626
    pkv = *pKVs;
    pkv++;
1627 1628 1629
  } else {
    capacity = 8;
    *pKVs = calloc(capacity, sizeof(TAOS_SML_KV));
1630 1631
    pkv = *pKVs;
  }
1632

1633
  while (*cur != '\0') {
1634
    ret = parseSmlKey(pkv, &cur);
1635
    if (ret) {
1636
      tscError("Unable to parse key field");
1637 1638
      goto error;
    }
1639
    ret = parseSmlValue(pkv, &cur, &is_last_kv);
1640
    if (ret) {
1641
      tscError("Unable to parse value field");
1642 1643
      goto error;
    }
1644
    *num_kvs += 1;
1645

1646
    if (is_last_kv) {
G
Ganlin Zhao 已提交
1647
      //tscDebug("last key-value field detected");
1648 1649 1650 1651 1652
      goto done;
    }

    //reallocate addtional memory for more kvs
    TAOS_SML_KV *more_kvs = NULL;
1653

1654
    if (isField) {
1655 1656
      if ((*num_kvs + 2) > capacity) {
        capacity *= 3; capacity /= 2;
1657 1658 1659
        more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV));
      } else {
        more_kvs = *pKVs;
1660
      }
1661
    } else {
1662 1663
      if ((*num_kvs + 1) > capacity) {
        capacity *= 3; capacity /= 2;
1664 1665 1666
        more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV));
      } else {
        more_kvs = *pKVs;
1667
      }
1668
    }
1669

1670
    if (!more_kvs) {
1671 1672
      goto error;
    }
1673 1674 1675 1676 1677 1678 1679 1680 1681
    *pKVs = more_kvs;
    //move pKV points to next TAOS_SML_KV block
    if (isField) {
      pkv = *pKVs + *num_kvs + 1;
    } else {
      pkv = *pKVs + *num_kvs;
    }
  }
  goto done;
1682

1683
error:
1684
  return ret;
1685
done:
1686
  *index = cur;
1687
  return ret;
1688
}
1689

1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705
static void removeChildTableNameFromTags(TAOS_SML_DATA_POINT** smlData) {
  TAOS_SML_KV* destTags = calloc((*smlData)->tagNum, sizeof(TAOS_SML_KV));
  TAOS_SML_KV* srcTags = (*smlData)->tags;
  int numDestTags = 0;
  for (int32_t i = 0; i < (*smlData)->tagNum; ++i) {
    TAOS_SML_KV* srcTag = srcTags + i;
    if (strcasecmp(srcTag->key, "ID") == 0) {
      continue;
    } else {
      TAOS_SML_KV* destTag = destTags + numDestTags;
      memcpy(destTag, srcTag, sizeof(TAOS_SML_KV));
      destTag->key = calloc(1, strlen(srcTag->key) + 1);
      memcpy(destTag->key, srcTag->key, strlen(srcTag->key) + 1);
      destTag->value = calloc(1, srcTag->length);
      memcpy(destTag->value, srcTag->value, srcTag->length);
      numDestTags++;
1706
    }
1707 1708
    free(srcTag->key);
    free(srcTag->value);
1709
  }
1710 1711
  (*smlData)->tags = destTags;
  (*smlData)->tagNum = numDestTags;
1712

1713
  free(srcTags);
1714
}
1715

1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728
static void moveTimeStampToFirstKv(TAOS_SML_DATA_POINT** smlData, TAOS_SML_KV *ts) {
  TAOS_SML_KV* tsField = (*smlData)->fields;
  tsField->length = ts->length;
  tsField->type = ts->type;
  tsField->value = malloc(ts->length);
  tsField->key = malloc(strlen(ts->key) + 1);
  memcpy(tsField->key, ts->key, strlen(ts->key) + 1);
  memcpy(tsField->value, ts->value, ts->length);
  (*smlData)->fieldNum = (*smlData)->fieldNum + 1;

  free(ts->key);
  free(ts->value);
  free(ts);
1729 1730
}

1731
int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData) {
1732
  const char* index = sql;
1733
  int32_t ret = TSDB_CODE_SUCCESS;
1734 1735 1736
  uint8_t has_tags = 0;
  TAOS_SML_KV *timestamp = NULL;

1737
  ret = parseSmlMeasurement(smlData, &index, &has_tags);
1738
  if (ret) {
1739
    tscError("Unable to parse measurement");
1740
    return ret;
1741
  }
1742
  tscDebug("Parse measurement finished, has_tags:%d", has_tags);
1743 1744 1745

  //Parse Tags
  if (has_tags) {
1746
    ret = parseSmlKvPairs(&smlData->tags, &smlData->tagNum, &index, false);
1747
    if (ret) {
1748
      tscError("Unable to parse tag");
1749 1750
      return ret;
    }
1751 1752 1753 1754 1755
    smlData->childTableName = calloc(TSDB_TABLE_NAME_LEN, 1);
    if (!getChildTableNameFromTags(smlData)) {
      free(smlData->childTableName);
      smlData->childTableName = NULL;
      tscDebug("No child table name in tags");
1756
    }
1757
    removeChildTableNameFromTags(&smlData);
1758

1759
  }
1760
  tscDebug("Parse tags finished, num of tags:%d", smlData->tagNum);
1761 1762

  //Parse fields
1763
  ret = parseSmlKvPairs(&smlData->fields, &smlData->fieldNum, &index, true);
1764
  if (ret) {
1765
    tscError("Unable to parse field");
1766
    return ret;
1767
  }
1768
  tscDebug("Parse fields finished, num of fields:%d", smlData->fieldNum);
1769

1770
  //Parse timestamp
1771
  ret = parseSmlTimeStamp(&timestamp, &index);
1772
  if (ret) {
1773
    tscError("Unable to parse timestamp");
1774
    return ret;
1775
  }
1776 1777
  moveTimeStampToFirstKv(&smlData, timestamp);
  tscDebug("Parse timestamp finished");
1778

1779
  return TSDB_CODE_SUCCESS;
1780 1781
}

1782
//=========================================================================
1783

S
shenglian zhou 已提交
1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798
void destroySmlDataPoint(TAOS_SML_DATA_POINT* point) {
  for (int i=0; i<point->tagNum; ++i) {
    free((point->tags+i)->key);
    free((point->tags+i)->value);
  }
  free(point->tags);
  for (int i=0; i<point->fieldNum; ++i) {
    free((point->fields+i)->key);
    free((point->fields+i)->value);
  }
  free(point->fields);
  free(point->stableName);
  free(point->childTableName);
}

1799 1800
int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines) {
  for (int32_t i = 0; i < numLines; ++i) {
1801
    TAOS_SML_DATA_POINT point = {0};
1802 1803
    int32_t code = tscParseLine(lines[i], &point);
    if (code != TSDB_CODE_SUCCESS) {
1804
      tscError("data point line parse failed. line %d : %s", i, lines[i]);
S
shenglian zhou 已提交
1805
      destroySmlDataPoint(&point);
1806 1807 1808 1809 1810
      return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
    } else {
      tscDebug("data point line parse success. line %d", i);
    }

1811 1812 1813 1814 1815
    taosArrayPush(points, &point);
  }
  return 0;
}

1816
int taos_insert_lines(TAOS* taos, char* lines[], int numLines) {
1817
  int32_t code = 0;
1818
  SArray* lpPoints = taosArrayInit(numLines, sizeof(TAOS_SML_DATA_POINT));
1819

1820
  code = tscParseLines(lines, numLines, lpPoints, NULL);
S
shenglian zhou 已提交
1821 1822
  size_t numPoints = taosArrayGetSize(lpPoints);

1823 1824
  if (code != 0) {
    goto cleanup;
1825 1826
  }

1827
  TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints);
1828 1829 1830 1831
  code = taos_sml_insert(taos, points, (int)numPoints);
  if (code != 0) {
    tscError("taos_sml_insert error: %s", tstrerror((code)));
  }
S
Shenglian Zhou 已提交
1832

1833
cleanup:
S
Shenglian Zhou 已提交
1834 1835 1836
  for (int i=0; i<numPoints; ++i) {
    destroySmlDataPoint(points+i);
  }
1837 1838

  taosArrayDestroy(lpPoints);
1839
  return code;
1840 1841
}