tscParseLineProtocol.c 71.8 KB
Newer Older
S
shenglian zhou 已提交
1 2 3 4
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
5

S
shenglian zhou 已提交
6 7 8 9 10 11 12 13
#include "os.h"
#include "osString.h"
#include "ttype.h"
#include "tmd5.h"
#include "tstrbuild.h"
#include "tname.h"
#include "hash.h"
#include "tskiplist.h"
14

S
shenglian zhou 已提交
15
#include "tscUtil.h"
16 17
#include "tsclient.h"
#include "tscLog.h"
S
shenglian zhou 已提交
18

19
#include "taos.h"
20

S
shenglian zhou 已提交
21 22 23 24 25 26
typedef struct  {
  char sTableName[TSDB_TABLE_NAME_LEN];
  SHashObj* tagHash;
  SHashObj* fieldHash;
  SArray* tags; //SArray<SSchema>
  SArray* fields; //SArray<SSchema>
S
shenglian zhou 已提交
27
  uint8_t precision;
S
shenglian zhou 已提交
28 29
} SSmlSTableSchema;

S
shenglian zhou 已提交
30 31 32 33 34 35 36 37 38 39 40 41
typedef struct {
  char* key;
  uint8_t type;
  int16_t length;
  char* value;
} TAOS_SML_KV;

typedef struct {
  char* stableName;

  char* childTableName;
  TAOS_SML_KV* tags;
S
shenglian zhou 已提交
42
  int32_t tagNum;
S
shenglian zhou 已提交
43 44 45

  // first kv must be timestamp
  TAOS_SML_KV* fields;
S
shenglian zhou 已提交
46
  int32_t fieldNum;
S
shenglian zhou 已提交
47 48
} TAOS_SML_DATA_POINT;

49 50 51 52 53 54 55 56
typedef enum {
  SML_TIME_STAMP_NOW,
  SML_TIME_STAMP_SECONDS,
  SML_TIME_STAMP_MILLI_SECONDS,
  SML_TIME_STAMP_MICRO_SECONDS,
  SML_TIME_STAMP_NANO_SECONDS
} SMLTimeStampType;

S
shenglian zhou 已提交
57 58
typedef struct {
  uint64_t id;
59
  SHashObj* smlDataToSchema;
S
shenglian zhou 已提交
60
} SSmlLinesInfo;
61

62 63
//=================================================================================================

64 65 66 67 68 69 70 71 72 73 74 75
static uint64_t linesSmlHandleId = 0;

uint64_t genLinesSmlId() {
  uint64_t id;

  do {
    id = atomic_add_fetch_64(&linesSmlHandleId, 1);
  } while (id == 0);

  return id;
}

S
shenglian zhou 已提交
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
int compareSmlColKv(const void* p1, const void* p2) {
  TAOS_SML_KV* kv1 = (TAOS_SML_KV*)p1;
  TAOS_SML_KV* kv2 = (TAOS_SML_KV*)p2;
  int kvLen1 = (int)strlen(kv1->key);
  int kvLen2 = (int)strlen(kv2->key);
  int res = strncasecmp(kv1->key, kv2->key, MIN(kvLen1, kvLen2));
  if (res != 0) {
    return res;
  } else {
    return kvLen1-kvLen2;
  }
}

typedef enum {
  SCHEMA_ACTION_CREATE_STABLE,
  SCHEMA_ACTION_ADD_COLUMN,
  SCHEMA_ACTION_ADD_TAG,
  SCHEMA_ACTION_CHANGE_COLUMN_SIZE,
  SCHEMA_ACTION_CHANGE_TAG_SIZE,
} ESchemaAction;

typedef struct {
  char sTableName[TSDB_TABLE_NAME_LEN];
  SArray* tags; //SArray<SSchema>
  SArray* fields; //SArray<SSchema>
} SCreateSTableActionInfo;

typedef struct {
  char sTableName[TSDB_TABLE_NAME_LEN];
  SSchema* field;
} SAlterSTableActionInfo;

typedef struct {
  ESchemaAction action;
  union {
    SCreateSTableActionInfo createSTable;
    SAlterSTableActionInfo alterSTable;
  };
} SSchemaAction;

S
shenglian zhou 已提交
116
static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes, uint64_t id) {
S
shenglian zhou 已提交
117 118 119 120 121 122
  if (!IS_VAR_DATA_TYPE(kv->type)) {
    *bytes = tDataTypes[kv->type].bytes;
  } else {
    if (kv->type == TSDB_DATA_TYPE_NCHAR) {
      char* ucs = malloc(kv->length * TSDB_NCHAR_SIZE + 1);
      int32_t bytesNeeded = 0;
S
Shenglian Zhou 已提交
123 124 125
      bool succ = taosMbsToUcs4(kv->value, kv->length, ucs, kv->length * TSDB_NCHAR_SIZE, &bytesNeeded);
      if (!succ) {
        free(ucs);
S
shenglian zhou 已提交
126
        tscError("SML:0x%"PRIx64" convert nchar string to UCS4_LE failed:%s", id, kv->value);
S
Shenglian Zhou 已提交
127 128
        return TSDB_CODE_TSC_INVALID_VALUE;
      }
S
shenglian zhou 已提交
129 130 131 132 133 134 135 136 137
      free(ucs);
      *bytes =  bytesNeeded + VARSTR_HEADER_SIZE;
    } else if (kv->type == TSDB_DATA_TYPE_BINARY) {
      *bytes = kv->length + VARSTR_HEADER_SIZE;
    }
  }
  return 0;
}

S
shenglian zhou 已提交
138
static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* array, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
139
  SSchema* pField = NULL;
140 141
  size_t* pFieldIdx = taosHashGet(hash, smlKv->key, strlen(smlKv->key));
  size_t fieldIdx = -1;
S
Shenglian Zhou 已提交
142
  int32_t code = 0;
143 144 145
  if (pFieldIdx) {
    fieldIdx = *pFieldIdx;
    pField = taosArrayGet(array, fieldIdx);
S
shenglian zhou 已提交
146 147

    if (pField->type != smlKv->type) {
S
shenglian zhou 已提交
148
      tscError("SML:0x%"PRIx64" type mismatch. key %s, type %d. type before %d", info->id, smlKv->key, smlKv->type, pField->type);
S
Shenglian Zhou 已提交
149
      return TSDB_CODE_TSC_INVALID_VALUE;
S
shenglian zhou 已提交
150 151 152
    }

    int32_t bytes = 0;
S
shenglian zhou 已提交
153
    code = getFieldBytesFromSmlKv(smlKv, &bytes, info->id);
S
Shenglian Zhou 已提交
154 155 156
    if (code != 0) {
      return code;
    }
S
shenglian zhou 已提交
157 158 159
    pField->bytes = MAX(pField->bytes, bytes);

  } else {
S
shenglian zhou 已提交
160
    SSchema field = {0};
S
shenglian zhou 已提交
161 162 163 164 165 166
    size_t tagKeyLen = strlen(smlKv->key);
    strncpy(field.name, smlKv->key, tagKeyLen);
    field.name[tagKeyLen] = '\0';
    field.type = smlKv->type;

    int32_t bytes = 0;
S
shenglian zhou 已提交
167
    code = getFieldBytesFromSmlKv(smlKv, &bytes, info->id);
S
Shenglian Zhou 已提交
168 169 170
    if (code != 0) {
      return code;
    }
S
shenglian zhou 已提交
171 172 173
    field.bytes = bytes;

    pField = taosArrayPush(array, &field);
174 175
    fieldIdx = taosArrayGetSize(array) - 1;
    taosHashPut(hash, field.name, tagKeyLen, &fieldIdx, sizeof(fieldIdx));
S
shenglian zhou 已提交
176
  }
177

178 179
  uintptr_t valPointer = (uintptr_t)smlKv;
  taosHashPut(info->smlDataToSchema, &valPointer, sizeof(uintptr_t), &fieldIdx, sizeof(fieldIdx));
180

S
shenglian zhou 已提交
181 182 183
  return 0;
}

184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen,
                                       SSmlLinesInfo* info) {
  tscDebug("SML:0x%"PRIx64" taos_sml_insert get child table name through md5", info->id);
  qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv);

  SStringBuilder sb; memset(&sb, 0, sizeof(sb));
  char sTableName[TSDB_TABLE_NAME_LEN] = {0};
  strtolower(sTableName, point->stableName);
  taosStringBuilderAppendString(&sb, sTableName);
  for (int j = 0; j < point->tagNum; ++j) {
    taosStringBuilderAppendChar(&sb, ',');
    TAOS_SML_KV* tagKv = point->tags + j;
    char tagName[TSDB_COL_NAME_LEN] = {0};
    strtolower(tagName, tagKv->key);
    taosStringBuilderAppendString(&sb, tagName);
    taosStringBuilderAppendChar(&sb, '=');
    taosStringBuilderAppend(&sb, tagKv->value, tagKv->length);
  }
  size_t len = 0;
  char* keyJoined = taosStringBuilderGetResult(&sb, &len);
  MD5_CTX context;
  MD5Init(&context);
  MD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len);
  MD5Final(&context);
  *tableNameLen = snprintf(tableName, *tableNameLen,
                           "t_%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0],
                           context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6],
                           context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11],
                           context.digest[12], context.digest[13], context.digest[14], context.digest[15]);
  taosStringBuilderDestroy(&sb);
  tscDebug("SML:0x%"PRIx64" child table name: %s", info->id, tableName);
  return 0;
}

S
shenglian zhou 已提交
218
static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, SArray* stableSchemas, SSmlLinesInfo* info) {
S
Shenglian Zhou 已提交
219
  int32_t code = 0;
S
shenglian zhou 已提交
220 221 222 223 224 225
  SHashObj* sname2shema = taosHashInit(32,
                                       taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);

  for (int i = 0; i < numPoint; ++i) {
    TAOS_SML_DATA_POINT* point = &points[i];
    size_t stableNameLen = strlen(point->stableName);
226
    size_t* pStableIdx = taosHashGet(sname2shema, point->stableName, stableNameLen);
S
shenglian zhou 已提交
227
    SSmlSTableSchema* pStableSchema = NULL;
228 229 230 231
    size_t stableIdx = -1;
    if (pStableIdx) {
      pStableSchema= taosArrayGet(stableSchemas, *pStableIdx);
      stableIdx = *pStableIdx;
S
shenglian zhou 已提交
232 233 234 235 236 237 238 239 240 241
    } else {
      SSmlSTableSchema schema;
      strncpy(schema.sTableName, point->stableName, stableNameLen);
      schema.sTableName[stableNameLen] = '\0';
      schema.fields = taosArrayInit(64, sizeof(SSchema));
      schema.tags = taosArrayInit(8, sizeof(SSchema));
      schema.tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
      schema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);

      pStableSchema = taosArrayPush(stableSchemas, &schema);
242 243
      stableIdx = taosArrayGetSize(stableSchemas) - 1;
      taosHashPut(sname2shema, schema.sTableName, stableNameLen, &stableIdx, sizeof(size_t));
S
shenglian zhou 已提交
244 245 246 247
    }

    for (int j = 0; j < point->tagNum; ++j) {
      TAOS_SML_KV* tagKv = point->tags + j;
248 249 250 251 252 253 254 255 256
      if (!point->childTableName) {
        char childTableName[TSDB_TABLE_NAME_LEN];
        int32_t tableNameLen = TSDB_TABLE_NAME_LEN;
        getSmlMd5ChildTableName(point, childTableName, &tableNameLen, info);
        point->childTableName = calloc(1, tableNameLen+1);
        strncpy(point->childTableName, childTableName, tableNameLen);
        point->childTableName[tableNameLen] = '\0';
      }

S
shenglian zhou 已提交
257
      code = buildSmlKvSchema(tagKv, pStableSchema->tagHash, pStableSchema->tags, info);
S
Shenglian Zhou 已提交
258
      if (code != 0) {
S
shenglian zhou 已提交
259
        tscError("SML:0x%"PRIx64" build data point schema failed. point no.: %d, tag key: %s", info->id, i, tagKv->key);
S
Shenglian Zhou 已提交
260 261
        return code;
      }
S
shenglian zhou 已提交
262 263 264 265
    }

    for (int j = 0; j < point->fieldNum; ++j) {
      TAOS_SML_KV* fieldKv = point->fields + j;
S
shenglian zhou 已提交
266
      code = buildSmlKvSchema(fieldKv, pStableSchema->fieldHash, pStableSchema->fields, info);
S
Shenglian Zhou 已提交
267
      if (code != 0) {
S
shenglian zhou 已提交
268
        tscError("SML:0x%"PRIx64" build data point schema failed. point no.: %d, tag key: %s", info->id, i, fieldKv->key);
S
Shenglian Zhou 已提交
269 270
        return code;
      }
S
shenglian zhou 已提交
271 272
    }

273 274
    uintptr_t valPointer = (uintptr_t)point;
    taosHashPut(info->smlDataToSchema, &valPointer, sizeof(uintptr_t), &stableIdx, sizeof(stableIdx));
S
shenglian zhou 已提交
275 276 277 278 279 280 281 282 283 284
  }

  size_t numStables = taosArrayGetSize(stableSchemas);
  for (int32_t i = 0; i < numStables; ++i) {
    SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i);
    taosHashCleanup(schema->tagHash);
    taosHashCleanup(schema->fieldHash);
  }
  taosHashCleanup(sname2shema);

S
shenglian zhou 已提交
285
  tscDebug("SML:0x%"PRIx64" build point schema succeed. num of super table: %zu", info->id, numStables);
S
Shenglian Zhou 已提交
286 287 288 289 290 291
  for (int32_t i = 0; i < numStables; ++i) {
    SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i);
    tscDebug("\ttable name: %s, tags number: %zu, fields number: %zu", schema->sTableName,
             taosArrayGetSize(schema->tags), taosArrayGetSize(schema->fields));
  }

S
shenglian zhou 已提交
292 293 294
  return 0;
}

295
static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, SArray* dbAttrArray, bool isTag, char sTableName[],
S
shenglian zhou 已提交
296
                                       SSchemaAction* action, bool* actionNeeded, SSmlLinesInfo* info) {
297 298 299 300
  char fieldNameLowerCase[TSDB_COL_NAME_LEN] = {0};
  strtolower(fieldNameLowerCase, pointColField->name);

  size_t* pDbIndex = taosHashGet(dbAttrHash, fieldNameLowerCase, strlen(fieldNameLowerCase));
301 302 303
  if (pDbIndex) {
    SSchema* dbAttr = taosArrayGet(dbAttrArray, *pDbIndex);
    assert(strcasecmp(dbAttr->name, pointColField->name) == 0);
S
shenglian zhou 已提交
304
    if (pointColField->type != dbAttr->type) {
S
shenglian zhou 已提交
305
      tscError("SML:0x%"PRIx64" point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id, pointColField->name,
S
Shenglian Zhou 已提交
306 307
               pointColField->type, dbAttr->type);
      return TSDB_CODE_TSC_INVALID_VALUE;
S
shenglian zhou 已提交
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331
    }

    if (IS_VAR_DATA_TYPE(pointColField->type) && (pointColField->bytes > dbAttr->bytes)) {
      if (isTag) {
        action->action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
      } else {
        action->action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
      }
      memset(&action->alterSTable, 0,  sizeof(SAlterSTableActionInfo));
      memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN);
      action->alterSTable.field = pointColField;
      *actionNeeded = true;
    }
  } else {
    if (isTag) {
      action->action = SCHEMA_ACTION_ADD_TAG;
    } else {
      action->action = SCHEMA_ACTION_ADD_COLUMN;
    }
    memset(&action->alterSTable, 0, sizeof(SAlterSTableActionInfo));
    memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN);
    action->alterSTable.field = pointColField;
    *actionNeeded = true;
  }
S
shenglian zhou 已提交
332 333 334 335
  if (*actionNeeded) {
    tscDebug("SML:0x%" PRIx64 " generate schema action. column name: %s, action: %d", info->id, fieldNameLowerCase,
             action->action);
  }
S
shenglian zhou 已提交
336 337 338
  return 0;
}

S
shenglian zhou 已提交
339
static int32_t buildColumnDescription(SSchema* field,
S
shenglian zhou 已提交
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
                               char* buf, int32_t bufSize, int32_t* outBytes) {
  uint8_t type = field->type;

  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
    int32_t bytes = field->bytes - VARSTR_HEADER_SIZE;
    if (type == TSDB_DATA_TYPE_NCHAR) {
      bytes =  bytes/TSDB_NCHAR_SIZE;
    }
    int out = snprintf(buf, bufSize,"%s %s(%d)",
                       field->name,tDataTypes[field->type].name, bytes);
    *outBytes = out;
  } else {
    int out = snprintf(buf, bufSize, "%s %s",
                       field->name, tDataTypes[type].name);
    *outBytes = out;
  }

  return 0;
}

S
shenglian zhou 已提交
360

S
shenglian zhou 已提交
361
static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
362 363
  int32_t code = 0;
  int32_t outBytes = 0;
364 365
  char *result = (char *)calloc(1, tsMaxSQLStringLen+1);
  int32_t capacity = tsMaxSQLStringLen +  1;
S
shenglian zhou 已提交
366

S
shenglian zhou 已提交
367
  tscDebug("SML:0x%"PRIx64" apply schema action. action: %d", info->id, action->action);
S
shenglian zhou 已提交
368 369 370 371 372 373
  switch (action->action) {
    case SCHEMA_ACTION_ADD_COLUMN: {
      int n = sprintf(result, "alter stable %s add column ", action->alterSTable.sTableName);
      buildColumnDescription(action->alterSTable.field, result+n, capacity-n, &outBytes);
      TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
      code = taos_errno(res);
S
shenglian zhou 已提交
374
      if (code != TSDB_CODE_SUCCESS) {
375
        tscError("SML:0x%"PRIx64" apply schema action. error: %s", info->id, taos_errstr(res));
S
shenglian zhou 已提交
376
      }
377 378
      taos_free_result(res);

S
shenglian zhou 已提交
379
      if (code == TSDB_CODE_MND_FIELD_ALREAY_EXIST || code == TSDB_CODE_TSC_DUP_COL_NAMES) {
380 381
        TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
        code = taos_errno(res2);
382 383 384
        if (code != TSDB_CODE_SUCCESS) {
          tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
385 386
        taos_free_result(res2);
      }
S
shenglian zhou 已提交
387 388 389 390 391 392 393 394
      break;
    }
    case SCHEMA_ACTION_ADD_TAG: {
      int n = sprintf(result, "alter stable %s add tag ", action->alterSTable.sTableName);
      buildColumnDescription(action->alterSTable.field,
                             result+n, capacity-n, &outBytes);
      TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
      code = taos_errno(res);
S
shenglian zhou 已提交
395 396 397
      if (code != TSDB_CODE_SUCCESS) {
        tscError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
      }
398 399
      taos_free_result(res);

S
shenglian zhou 已提交
400
      if (code == TSDB_CODE_MND_TAG_ALREAY_EXIST || code == TSDB_CODE_TSC_DUP_COL_NAMES) {
401 402
        TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
        code = taos_errno(res2);
403 404 405
        if (code != TSDB_CODE_SUCCESS) {
          tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
406 407
        taos_free_result(res2);
      }
S
shenglian zhou 已提交
408 409 410 411 412 413 414 415
      break;
    }
    case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: {
      int n = sprintf(result, "alter stable %s modify column ", action->alterSTable.sTableName);
      buildColumnDescription(action->alterSTable.field, result+n,
                             capacity-n, &outBytes);
      TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
      code = taos_errno(res);
S
shenglian zhou 已提交
416 417 418
      if (code != TSDB_CODE_SUCCESS) {
        tscError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
      }
419 420
      taos_free_result(res);

421
      if (code == TSDB_CODE_MND_INVALID_COLUMN_LENGTH || code == TSDB_CODE_TSC_INVALID_COLUMN_LENGTH) {
422 423
        TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
        code = taos_errno(res2);
424 425 426
        if (code != TSDB_CODE_SUCCESS) {
          tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
427 428
        taos_free_result(res2);
      }
429
      break;
S
shenglian zhou 已提交
430 431 432 433 434 435 436
    }
    case SCHEMA_ACTION_CHANGE_TAG_SIZE: {
      int n = sprintf(result, "alter stable %s modify tag ", action->alterSTable.sTableName);
      buildColumnDescription(action->alterSTable.field, result+n,
                             capacity-n, &outBytes);
      TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
      code = taos_errno(res);
S
shenglian zhou 已提交
437 438 439
      if (code != TSDB_CODE_SUCCESS) {
        tscError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
      }
440 441
      taos_free_result(res);

442
      if (code == TSDB_CODE_MND_INVALID_TAG_LENGTH || code == TSDB_CODE_TSC_INVALID_TAG_LENGTH) {
443 444
        TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
        code = taos_errno(res2);
445 446 447
        if (code != TSDB_CODE_SUCCESS) {
          tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
448 449
        taos_free_result(res2);
      }
S
shenglian zhou 已提交
450 451 452 453 454
      break;
    }
    case SCHEMA_ACTION_CREATE_STABLE: {
      int n = sprintf(result, "create stable %s (", action->createSTable.sTableName);
      char* pos = result + n; int freeBytes = capacity - n;
S
shenglian zhou 已提交
455
      size_t numCols = taosArrayGetSize(action->createSTable.fields);
S
shenglian zhou 已提交
456 457 458 459 460 461 462
      for (int32_t i = 0; i < numCols; ++i) {
        SSchema* field = taosArrayGet(action->createSTable.fields, i);
        buildColumnDescription(field, pos, freeBytes, &outBytes);
        pos += outBytes; freeBytes -= outBytes;
        *pos = ','; ++pos; --freeBytes;
      }
      --pos; ++freeBytes;
463

S
shenglian zhou 已提交
464 465
      outBytes = snprintf(pos, freeBytes, ") tags (");
      pos += outBytes; freeBytes -= outBytes;
466

S
shenglian zhou 已提交
467
      size_t numTags = taosArrayGetSize(action->createSTable.tags);
S
shenglian zhou 已提交
468 469 470 471 472 473 474 475 476 477
      for (int32_t i = 0; i < numTags; ++i) {
        SSchema* field = taosArrayGet(action->createSTable.tags, i);
        buildColumnDescription(field, pos, freeBytes, &outBytes);
        pos += outBytes; freeBytes -= outBytes;
        *pos = ','; ++pos; --freeBytes;
      }
      pos--; ++freeBytes;
      outBytes = snprintf(pos, freeBytes, ")");
      TAOS_RES* res = taos_query(taos, result);
      code = taos_errno(res);
S
shenglian zhou 已提交
478 479 480
      if (code != TSDB_CODE_SUCCESS) {
        tscError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
      }
481 482
      taos_free_result(res);

483 484 485
      if (code == TSDB_CODE_MND_TABLE_ALREADY_EXIST) {
        TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
        code = taos_errno(res2);
486 487 488
        if (code != TSDB_CODE_SUCCESS) {
          tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
489 490
        taos_free_result(res2);
      }
S
shenglian zhou 已提交
491 492
      break;
    }
S
shenglian zhou 已提交
493

S
shenglian zhou 已提交
494 495 496
    default:
      break;
  }
S
Shenglian Zhou 已提交
497

S
shenglian zhou 已提交
498
  free(result);
S
Shenglian Zhou 已提交
499
  if (code != 0) {
S
shenglian zhou 已提交
500
    tscError("SML:0x%"PRIx64 " apply schema action failure. %s", info->id, tstrerror(code));
S
Shenglian Zhou 已提交
501
  }
S
shenglian zhou 已提交
502 503 504
  return code;
}

505
static int32_t destroySmlSTableSchema(SSmlSTableSchema* schema) {
S
shenglian zhou 已提交
506 507 508 509 510 511 512
  taosHashCleanup(schema->tagHash);
  taosHashCleanup(schema->fieldHash);
  taosArrayDestroy(schema->tags);
  taosArrayDestroy(schema->fields);
  return 0;
}

513
static int32_t fillDbSchema(STableMeta* tableMeta, char* tableName, SSmlSTableSchema* schema, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
514 515 516 517 518 519 520 521 522 523 524 525
  schema->tags = taosArrayInit(8, sizeof(SSchema));
  schema->fields = taosArrayInit(64, sizeof(SSchema));
  schema->tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
  schema->fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);

  tstrncpy(schema->sTableName, tableName, strlen(tableName)+1);
  schema->precision = tableMeta->tableInfo.precision;
  for (int i=0; i<tableMeta->tableInfo.numOfColumns; ++i) {
    SSchema field;
    tstrncpy(field.name, tableMeta->schema[i].name, strlen(tableMeta->schema[i].name)+1);
    field.type = tableMeta->schema[i].type;
    field.bytes = tableMeta->schema[i].bytes;
526 527 528
    taosArrayPush(schema->fields, &field);
    size_t fieldIndex = taosArrayGetSize(schema->fields) - 1;
    taosHashPut(schema->fieldHash, field.name, strlen(field.name), &fieldIndex, sizeof(fieldIndex));
S
shenglian zhou 已提交
529 530 531 532 533 534 535 536
  }

  for (int i=0; i<tableMeta->tableInfo.numOfTags; ++i) {
    int j = i + tableMeta->tableInfo.numOfColumns;
    SSchema field;
    tstrncpy(field.name, tableMeta->schema[j].name, strlen(tableMeta->schema[j].name)+1);
    field.type = tableMeta->schema[j].type;
    field.bytes = tableMeta->schema[j].bytes;
537 538 539
    taosArrayPush(schema->tags, &field);
    size_t tagIndex = taosArrayGetSize(schema->tags) - 1;
    taosHashPut(schema->tagHash, field.name, strlen(field.name), &tagIndex, sizeof(tagIndex));
S
shenglian zhou 已提交
540
  }
541
  tscDebug("SML:0x%"PRIx64 " load table schema succeed. table name: %s, columns number: %d, tag number: %d, precision: %d",
S
shenglian zhou 已提交
542
           info->id, tableName, tableMeta->tableInfo.numOfColumns, tableMeta->tableInfo.numOfTags, schema->precision);
543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582
  return TSDB_CODE_SUCCESS;
}

static int32_t retrieveTableMeta(TAOS* taos, char* tableName, STableMeta** pTableMeta, SSmlLinesInfo* info) {
  int32_t code = 0;
  int32_t retries = 0;
  STableMeta* tableMeta = NULL;
  while (retries++ < TSDB_MAX_REPLICA && tableMeta == NULL) {
    STscObj* pObj = (STscObj*)taos;
    if (pObj == NULL || pObj->signature != pObj) {
      terrno = TSDB_CODE_TSC_DISCONNECTED;
      return TSDB_CODE_TSC_DISCONNECTED;
    }

    tscDebug("SML:0x%" PRIx64 " retrieve table meta. super table name: %s", info->id, tableName);

    char tableNameLowerCase[TSDB_TABLE_NAME_LEN];
    strtolower(tableNameLowerCase, tableName);

    char sql[256];
    snprintf(sql, 256, "describe %s", tableNameLowerCase);
    TAOS_RES* res = taos_query(taos, sql);
    code = taos_errno(res);
    if (code != 0) {
      tscError("SML:0x%" PRIx64 " describe table failure. %s", info->id, taos_errstr(res));
      taos_free_result(res);
      return code;
    }
    taos_free_result(res);

    SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
    if (pSql == NULL) {
      tscError("SML:0x%" PRIx64 " failed to allocate memory, reason:%s", info->id, strerror(errno));
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      return code;
    }
    pSql->pTscObj = taos;
    pSql->signature = pSql;
    pSql->fp = NULL;

583
    registerSqlObj(pSql);
584 585 586 587 588 589
    SStrToken tableToken = {.z = tableNameLowerCase, .n = (uint32_t)strlen(tableNameLowerCase), .type = TK_ID};
    tGetToken(tableNameLowerCase, &tableToken.type);
    // Check if the table name available or not
    if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) {
      code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
      sprintf(pSql->cmd.payload, "table name is invalid");
590
      tscFreeRegisteredSqlObj(pSql);
591 592 593 594 595
      return code;
    }

    SName sname = {0};
    if ((code = tscSetTableFullName(&sname, &tableToken, pSql)) != TSDB_CODE_SUCCESS) {
596
      tscFreeRegisteredSqlObj(pSql);
597 598 599 600 601
      return code;
    }
    char fullTableName[TSDB_TABLE_FNAME_LEN] = {0};
    memset(fullTableName, 0, tListLen(fullTableName));
    tNameExtractFullName(&sname, fullTableName);
602
    tscFreeRegisteredSqlObj(pSql);
603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627

    size_t size = 0;
    taosHashGetCloneExt(tscTableMetaMap, fullTableName, strlen(fullTableName), NULL, (void**)&tableMeta, &size);
  }

  if (tableMeta != NULL) {
    *pTableMeta = tableMeta;
    return TSDB_CODE_SUCCESS;
  } else {
    tscError("SML:0x%" PRIx64 " failed to retrieve table meta. super table name: %s", info->id, tableName);
    return TSDB_CODE_TSC_NO_META_CACHED;
  }
}

static int32_t loadTableSchemaFromDB(TAOS* taos, char* tableName, SSmlSTableSchema* schema, SSmlLinesInfo* info) {
  int32_t code = 0;
  STableMeta* tableMeta = NULL;
  code = retrieveTableMeta(taos, tableName, &tableMeta, info);
  if (code == TSDB_CODE_SUCCESS) {
    assert(tableMeta != NULL);
    fillDbSchema(tableMeta, tableName, schema, info);
    free(tableMeta);
    tableMeta = NULL;
  }

S
shenglian zhou 已提交
628 629 630
  return code;
}

S
shenglian zhou 已提交
631
static int32_t modifyDBSchemas(TAOS* taos, SArray* stableSchemas, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
632 633 634 635
  int32_t code = 0;
  size_t numStable = taosArrayGetSize(stableSchemas);
  for (int i = 0; i < numStable; ++i) {
    SSmlSTableSchema* pointSchema = taosArrayGet(stableSchemas, i);
S
shenglian zhou 已提交
636 637
    SSmlSTableSchema  dbSchema;
    memset(&dbSchema, 0, sizeof(SSmlSTableSchema));
S
shenglian zhou 已提交
638

639
    code = loadTableSchemaFromDB(taos, pointSchema->sTableName, &dbSchema, info);
S
shenglian zhou 已提交
640 641 642 643 644 645 646
    if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) {
      SSchemaAction schemaAction = {0};
      schemaAction.action = SCHEMA_ACTION_CREATE_STABLE;
      memset(&schemaAction.createSTable, 0, sizeof(SCreateSTableActionInfo));
      memcpy(schemaAction.createSTable.sTableName, pointSchema->sTableName, TSDB_TABLE_NAME_LEN);
      schemaAction.createSTable.tags = pointSchema->tags;
      schemaAction.createSTable.fields = pointSchema->fields;
S
shenglian zhou 已提交
647
      applySchemaAction(taos, &schemaAction, info);
648
      code = loadTableSchemaFromDB(taos, pointSchema->sTableName, &dbSchema, info);
S
Shenglian Zhou 已提交
649
      if (code != 0) {
S
shenglian zhou 已提交
650
        tscError("SML:0x%"PRIx64" reconcile point schema failed. can not create %s", info->id, pointSchema->sTableName);
651
        return code;
S
Shenglian Zhou 已提交
652 653 654 655
      } else {
        pointSchema->precision = dbSchema.precision;
        destroySmlSTableSchema(&dbSchema);
      }
S
shenglian zhou 已提交
656 657 658 659 660 661 662 663 664 665 666
    } else if (code == TSDB_CODE_SUCCESS) {
      size_t pointTagSize = taosArrayGetSize(pointSchema->tags);
      size_t pointFieldSize = taosArrayGetSize(pointSchema->fields);

      SHashObj* dbTagHash = dbSchema.tagHash;
      SHashObj* dbFieldHash = dbSchema.fieldHash;

      for (int j = 0; j < pointTagSize; ++j) {
        SSchema* pointTag = taosArrayGet(pointSchema->tags, j);
        SSchemaAction schemaAction = {0};
        bool actionNeeded = false;
S
shenglian zhou 已提交
667 668
        generateSchemaAction(pointTag, dbTagHash, dbSchema.tags, true, pointSchema->sTableName,
                             &schemaAction, &actionNeeded, info);
S
shenglian zhou 已提交
669
        if (actionNeeded) {
S
shenglian zhou 已提交
670
          code = applySchemaAction(taos, &schemaAction, info);
671 672 673 674
          if (code != 0) {
            destroySmlSTableSchema(&dbSchema);
            return code;
          }
S
shenglian zhou 已提交
675 676 677 678 679 680 681 682 683 684 685
        }
      }

      SSchema* pointColTs = taosArrayGet(pointSchema->fields, 0);
      SSchema* dbColTs = taosArrayGet(dbSchema.fields, 0);
      memcpy(pointColTs->name, dbColTs->name, TSDB_COL_NAME_LEN);

      for (int j = 1; j < pointFieldSize; ++j) {
        SSchema* pointCol = taosArrayGet(pointSchema->fields, j);
        SSchemaAction schemaAction = {0};
        bool actionNeeded = false;
S
shenglian zhou 已提交
686 687
        generateSchemaAction(pointCol, dbFieldHash, dbSchema.fields,false, pointSchema->sTableName,
                             &schemaAction, &actionNeeded, info);
S
shenglian zhou 已提交
688
        if (actionNeeded) {
S
shenglian zhou 已提交
689
          code = applySchemaAction(taos, &schemaAction, info);
690 691 692 693
          if (code != 0) {
            destroySmlSTableSchema(&dbSchema);
            return code;
          }
S
shenglian zhou 已提交
694 695 696 697 698
        }
      }

      pointSchema->precision = dbSchema.precision;

699
      destroySmlSTableSchema(&dbSchema);
S
shenglian zhou 已提交
700
    } else {
S
shenglian zhou 已提交
701
      tscError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code));
S
shenglian zhou 已提交
702 703 704 705 706 707
      return code;
    }
  }
  return 0;
}

S
shenglian zhou 已提交
708
static int32_t changeChildTableTagValue(TAOS* taos, const char* cTableName, const char* tagName, TAOS_BIND* bind, SSmlLinesInfo* info) {
709 710 711 712 713 714 715 716
  char sql[512];
  sprintf(sql, "alter table %s set tag %s=?", cTableName, tagName);

  int32_t code;
  TAOS_STMT* stmt = taos_stmt_init(taos);
  code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql));

  if (code != 0) {
717
    tscError("SML:0x%"PRIx64" taos_stmt_prepare return %d:%s", info->id, code, tstrerror(code));
718
    taos_stmt_close(stmt);
719 720 721 722 723
    return code;
  }

  code = taos_stmt_bind_param(stmt, bind);
  if (code != 0) {
724
    tscError("SML:0x%"PRIx64" taos_stmt_bind_param return %d:%s", info->id, code, tstrerror(code));
725
    taos_stmt_close(stmt);
726 727 728 729 730
    return code;
  }

  code = taos_stmt_execute(stmt);
  if (code != 0) {
731
    tscError("SML:0x%"PRIx64" taos_stmt_execute return %d:%s", info->id, code, tstrerror(code));
732
    taos_stmt_close(stmt);
733 734 735 736 737
    return code;
  }

  code = taos_stmt_close(stmt);
  if (code != 0) {
738
    tscError("SML:0x%"PRIx64" taos_stmt_close return %d:%s", info->id, code, tstrerror(code));
739 740 741 742 743
    return code;
  }
  return code;
}

S
shenglian zhou 已提交
744 745
static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, const char* sTableName,
                                          SArray* tagsSchema, SArray* tagsBind, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
746
  size_t numTags = taosArrayGetSize(tagsSchema);
747
  char* sql = malloc(tsMaxSQLStringLen+1);
748 749 750 751
  if (sql == NULL) {
    tscError("malloc sql memory error");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
752
  int freeBytes = tsMaxSQLStringLen + 1;
S
shenglian zhou 已提交
753
  sprintf(sql, "create table if not exists %s using %s", cTableName, sTableName);
754

S
shenglian zhou 已提交
755 756 757 758 759 760
  snprintf(sql+strlen(sql), freeBytes-strlen(sql), "(");
  for (int i = 0; i < numTags; ++i) {
    SSchema* tagSchema = taosArrayGet(tagsSchema, i);
    snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", tagSchema->name);
  }
  snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")");
761

S
shenglian zhou 已提交
762
  snprintf(sql + strlen(sql), freeBytes-strlen(sql), " tags (");
763

S
shenglian zhou 已提交
764
  for (int i = 0; i < numTags; ++i) {
S
shenglian zhou 已提交
765
    snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
S
shenglian zhou 已提交
766
  }
S
shenglian zhou 已提交
767
  snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")");
768
  sql[strlen(sql)] = '\0';
S
shenglian zhou 已提交
769

S
shenglian zhou 已提交
770
  tscDebug("SML:0x%"PRIx64" create table : %s", info->id, sql);
S
Shenglian Zhou 已提交
771

S
shenglian zhou 已提交
772
  TAOS_STMT* stmt = taos_stmt_init(taos);
773 774 775 776
  if (stmt == NULL) {
    free(sql);
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
S
shenglian zhou 已提交
777
  int32_t code;
S
shenglian zhou 已提交
778
  code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql));
779 780
  free(sql);

S
shenglian zhou 已提交
781
  if (code != 0) {
782
    tscError("SML:0x%"PRIx64" taos_stmt_prepare returns %d:%s", info->id, code, tstrerror(code));
783
    taos_stmt_close(stmt);
S
shenglian zhou 已提交
784 785 786 787 788
    return code;
  }

  code = taos_stmt_bind_param(stmt, TARRAY_GET_START(tagsBind));
  if (code != 0) {
789
    tscError("SML:0x%"PRIx64" taos_stmt_bind_param returns %d:%s", info->id, code, tstrerror(code));
790
    taos_stmt_close(stmt);
S
shenglian zhou 已提交
791 792 793 794 795
    return code;
  }

  code = taos_stmt_execute(stmt);
  if (code != 0) {
796
    tscError("SML:0x%"PRIx64" taos_stmt_execute returns %d:%s", info->id, code, tstrerror(code));
797
    taos_stmt_close(stmt);
S
shenglian zhou 已提交
798 799
    return code;
  }
S
shenglian zhou 已提交
800

801 802
  code = taos_stmt_close(stmt);
  if (code != 0) {
803
    tscError("SML:0x%"PRIx64" taos_stmt_close return %d:%s", info->id, code, tstrerror(code));
804 805 806
    return code;
  }
  return code;
S
shenglian zhou 已提交
807 808
}

S
shenglian zhou 已提交
809
static int32_t insertChildTableBatch(TAOS* taos,  char* cTableName, SArray* colsSchema, SArray* rowsBind, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
810
  size_t numCols = taosArrayGetSize(colsSchema);
811
  char* sql = malloc(tsMaxSQLStringLen+1);
812 813 814 815 816
  if (sql == NULL) {
    tscError("malloc sql memory error");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

817
  int32_t freeBytes = tsMaxSQLStringLen + 1 ;
S
shenglian zhou 已提交
818
  sprintf(sql, "insert into ? (");
819

S
shenglian zhou 已提交
820 821
  for (int i = 0; i < numCols; ++i) {
    SSchema* colSchema = taosArrayGet(colsSchema, i);
S
shenglian zhou 已提交
822
    snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", colSchema->name);
S
shenglian zhou 已提交
823
  }
S
shenglian zhou 已提交
824
  snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ") values (");
825

S
shenglian zhou 已提交
826
  for (int i = 0; i < numCols; ++i) {
S
shenglian zhou 已提交
827
    snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
828
  }
S
shenglian zhou 已提交
829
  snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ")");
830
  sql[strlen(sql)] = '\0';
831

S
shenglian zhou 已提交
832
  tscDebug("SML:0x%"PRIx64" insert rows into child table %s. num of rows: %zu", info->id, cTableName, taosArrayGetSize(rowsBind));
W
wpan 已提交
833

834
  int32_t code = 0;
S
shenglian zhou 已提交
835

W
wpan 已提交
836
  TAOS_STMT* stmt = taos_stmt_init(taos);
837
  if (stmt == NULL) {
838
    tfree(sql);
839 840
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
841

S
shenglian zhou 已提交
842
  code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql));
843
  tfree(sql);
844

W
wpan 已提交
845
  if (code != 0) {
846
    tscError("SML:0x%"PRIx64" taos_stmt_prepare return %d:%s", info->id, code, tstrerror(code));
847
    taos_stmt_close(stmt);
W
wpan 已提交
848 849 850
    return code;
  }

851
  bool tryAgain = false;
852
  int32_t try = 0;
W
wpan 已提交
853
  do {
854
    code = taos_stmt_set_tbname(stmt, cTableName);
855
    if (code != 0) {
856
      tscError("SML:0x%"PRIx64" taos_stmt_set_tbname return %d:%s", info->id, code, tstrerror(code));
857
      taos_stmt_close(stmt);
858 859
      return code;
    }
860

861 862 863 864 865
    size_t rows = taosArrayGetSize(rowsBind);
    for (int32_t i = 0; i < rows; ++i) {
      TAOS_BIND* colsBinds = taosArrayGetP(rowsBind, i);
      code = taos_stmt_bind_param(stmt, colsBinds);
      if (code != 0) {
866
        tscError("SML:0x%"PRIx64" taos_stmt_bind_param return %d:%s", info->id, code, tstrerror(code));
867
        taos_stmt_close(stmt);
868 869 870 871
        return code;
      }
      code = taos_stmt_add_batch(stmt);
      if (code != 0) {
872
        tscError("SML:0x%"PRIx64" taos_stmt_add_batch return %d:%s", info->id, code, tstrerror(code));
873
        taos_stmt_close(stmt);
874 875 876 877 878 879
        return code;
      }
    }

    code = taos_stmt_execute(stmt);
    if (code != 0) {
880
      tscError("SML:0x%"PRIx64" taos_stmt_execute return %d:%s, try:%d", info->id, code, tstrerror(code), try);
881
    }
882 883 884 885

    tryAgain = false;
    if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID
        || code == TSDB_CODE_VND_INVALID_VGROUP_ID
886 887 888
        || code == TSDB_CODE_TDB_TABLE_RECONFIGURE
        || code == TSDB_CODE_APP_NOT_READY
        || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) {
889 890 891
      tryAgain = true;
    }

892 893
    if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL ||
        code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
894 895 896
      TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
      int32_t   code2 = taos_errno(res2);
      if (code2 != TSDB_CODE_SUCCESS) {
897
        tscError("SML:0x%" PRIx64 " insert child table. reset query cache. error: %s", info->id, taos_errstr(res2));
898 899 900
      }
      taos_free_result(res2);
      if (tryAgain) {
901
        taosMsleep(50 * (2 << (try)));
902 903 904
      }
    }
  } while (tryAgain);
S
shenglian zhou 已提交
905

W
wpan 已提交
906

907
  taos_stmt_close(stmt);
S
shenglian zhou 已提交
908
  return code;
S
shenglian zhou 已提交
909 910
}

911
static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int numPoints,
S
shenglian zhou 已提交
912
                                             SHashObj* cname2points, SArray* stableSchemas, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
913 914
  for (int32_t i = 0; i < numPoints; ++i) {
    TAOS_SML_DATA_POINT * point = points + i;
915 916 917 918
    uintptr_t valPointer = (uintptr_t)point;
    size_t* pSchemaIndex = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t));
    assert(pSchemaIndex != NULL);
    SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, *pSchemaIndex);
919

S
shenglian zhou 已提交
920 921 922 923
    for (int j = 0; j < point->tagNum; ++j) {
      TAOS_SML_KV* kv =  point->tags + j;
      if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) {
        int64_t ts = *(int64_t*)(kv->value);
924
        ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision);
S
shenglian zhou 已提交
925 926 927 928 929 930 931 932
        *(int64_t*)(kv->value) = ts;
      }
    }

    for (int j = 0; j < point->fieldNum; ++j) {
      TAOS_SML_KV* kv =  point->fields + j;
      if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) {
        int64_t ts = *(int64_t*)(kv->value);
933
        ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision);
S
shenglian zhou 已提交
934 935 936 937
        *(int64_t*)(kv->value) = ts;
      }
    }

S
shenglian zhou 已提交
938 939 940 941 942 943 944 945
    SArray* cTablePoints = NULL;
    SArray** pCTablePoints = taosHashGet(cname2points, point->childTableName, strlen(point->childTableName));
    if (pCTablePoints) {
      cTablePoints = *pCTablePoints;
    } else {
      cTablePoints = taosArrayInit(64, sizeof(point));
      taosHashPut(cname2points, point->childTableName, strlen(point->childTableName), &cTablePoints, POINTER_BYTES);
    }
946
    taosArrayPush(cTablePoints, &point);
947 948
  }

S
shenglian zhou 已提交
949 950 951
  return 0;
}

952
static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableName,
S
shenglian zhou 已提交
953
                                   SSmlSTableSchema* sTableSchema, SArray* cTablePoints, SSmlLinesInfo* info) {
954 955 956 957 958 959 960 961
  size_t numTags = taosArrayGetSize(sTableSchema->tags);
  size_t rows = taosArrayGetSize(cTablePoints);

  TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0};
  for (int i= 0; i < rows; ++i) {
    TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i);
    for (int j = 0; j < pDataPoint->tagNum; ++j) {
      TAOS_SML_KV* kv = pDataPoint->tags + j;
962 963 964 965
      uintptr_t valPointer = (uintptr_t)kv;
      size_t* pFieldSchemaIdx = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t));
      assert(pFieldSchemaIdx != NULL);
      tagKVs[*pFieldSchemaIdx] = kv;
966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987
    }
  }

  int32_t notNullTagsIndices[TSDB_MAX_TAGS] = {0};
  int32_t numNotNullTags = 0;
  for (int32_t i = 0; i < numTags; ++i) {
    if (tagKVs[i] != NULL) {
      notNullTagsIndices[numNotNullTags] = i;
      ++numNotNullTags;
    }
  }
  
  SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND));
  taosArraySetSize(tagBinds, numTags);
  int isNullColBind = TSDB_TRUE;
  for (int j = 0; j < numTags; ++j) {
    TAOS_BIND* bind = taosArrayGet(tagBinds, j);
    bind->is_null = &isNullColBind;
  }
  for (int j = 0; j < numTags; ++j) {
    if (tagKVs[j] == NULL) continue;
    TAOS_SML_KV* kv =  tagKVs[j];
988 989 990 991
    uintptr_t valPointer = (uintptr_t)kv;
    size_t* pFieldSchemaIdx = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t));
    assert(pFieldSchemaIdx != NULL);
    TAOS_BIND* bind = taosArrayGet(tagBinds, *pFieldSchemaIdx);
992 993 994 995 996 997 998 999
    bind->buffer_type = kv->type;
    bind->length = malloc(sizeof(uintptr_t*));
    *bind->length = kv->length;
    bind->buffer = kv->value;
    bind->is_null = NULL;
  }

  // select tag1,tag2,... from stable where tbname in (ctable)
S
shenglian zhou 已提交
1000 1001 1002
  char* sql = malloc(tsMaxSQLStringLen+1);
  int freeBytes = tsMaxSQLStringLen + 1;
  snprintf(sql, freeBytes, "select tbname, ");
1003
  for (int i = 0; i < numNotNullTags ; ++i)  {
S
shenglian zhou 已提交
1004
    snprintf(sql + strlen(sql), freeBytes-strlen(sql), "%s,", tagKVs[notNullTagsIndices[i]]->key);
1005
  }
S
shenglian zhou 已提交
1006
  snprintf(sql + strlen(sql) - 1, freeBytes - strlen(sql) + 1,
1007
           " from %s where tbname in (\'%s\')", sTableName, cTableName);
S
shenglian zhou 已提交
1008 1009
  sql[strlen(sql)] = '\0';

1010
  TAOS_RES* result = taos_query(taos, sql);
S
shenglian zhou 已提交
1011 1012
  free(sql);

1013 1014
  int32_t code = taos_errno(result);
  if (code != 0) {
S
shenglian zhou 已提交
1015
    tscError("SML:0x%"PRIx64" get child table %s tags failed. error string %s", info->id, cTableName, taos_errstr(result));
1016 1017 1018 1019 1020 1021 1022 1023 1024 1025
    goto cleanup;
  }

  // check tag value and set tag values if different
  TAOS_ROW row = taos_fetch_row(result);
  if (row != NULL) {
    int numFields = taos_field_count(result);
    TAOS_FIELD* fields = taos_fetch_fields(result);
    int* lengths = taos_fetch_lengths(result);
    for (int i = 1; i < numFields; ++i) {
S
shenglian zhou 已提交
1026
      uint8_t dbType = fields[i].type;
1027 1028 1029 1030
      int32_t length = lengths[i];
      char* val = row[i];

      TAOS_SML_KV* tagKV = tagKVs[notNullTagsIndices[i-1]];
S
shenglian zhou 已提交
1031
      if (tagKV->type != dbType) {
S
shenglian zhou 已提交
1032 1033
        tscError("SML:0x%"PRIx64" child table %s tag %s type mismatch. point type : %d, db type : %d",
                 info->id, cTableName, tagKV->key, tagKV->type, dbType);
1034 1035 1036
        return TSDB_CODE_TSC_INVALID_VALUE;
      }

S
shenglian zhou 已提交
1037 1038 1039
      assert(tagKV->value);

      if (val == NULL || length != tagKV->length || memcmp(tagKV->value, val, length) != 0) {
1040 1041 1042 1043
        uintptr_t valPointer = (uintptr_t)tagKV;
        size_t* pFieldSchemaIdx = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t));
        assert(pFieldSchemaIdx != NULL);
        TAOS_BIND* bind = taosArrayGet(tagBinds, *pFieldSchemaIdx);
S
shenglian zhou 已提交
1044
        code = changeChildTableTagValue(taos, cTableName, tagKV->key, bind, info);
1045
        if (code != 0) {
S
shenglian zhou 已提交
1046
          tscError("SML:0x%"PRIx64" change child table tag failed. table name %s, tag %s", info->id, cTableName, tagKV->key);
1047 1048 1049 1050
          goto cleanup;
        }
      }
    }
S
shenglian zhou 已提交
1051
    tscDebug("SML:0x%"PRIx64" successfully applied point tags. child table: %s", info->id, cTableName);
1052
  } else {
S
shenglian zhou 已提交
1053
    code = creatChildTableIfNotExists(taos, cTableName, sTableName, sTableSchema->tags, tagBinds, info);
1054 1055 1056 1057 1058 1059
    if (code != 0) {
      goto cleanup;
    }
  }

cleanup:
S
shenglian zhou 已提交
1060
  taos_free_result(result);
1061 1062 1063 1064 1065 1066 1067 1068
  for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) {
    TAOS_BIND* bind = taosArrayGet(tagBinds, i);
    free(bind->length);
  }
  taosArrayDestroy(tagBinds);
  return code;
}

S
shenglian zhou 已提交
1069 1070
static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema, char* cTableName,
                                     SArray* cTablePoints, SSmlLinesInfo* info) {
1071 1072
  int32_t code = TSDB_CODE_SUCCESS;

1073 1074 1075
  size_t numCols = taosArrayGetSize(sTableSchema->fields);
  size_t rows = taosArrayGetSize(cTablePoints);
  SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES);
S
shenglian zhou 已提交
1076

1077 1078
  for (int i = 0; i < rows; ++i) {
    TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, i);
1079

1080 1081
    TAOS_BIND* colBinds = calloc(numCols, sizeof(TAOS_BIND));
    if (colBinds == NULL) {
S
shenglian zhou 已提交
1082 1083
      tscError("SML:0x%"PRIx64" taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, "
               "num of rows: %zu, num of cols: %zu", info->id, rows, numCols);
1084 1085
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
1086

1087 1088 1089
    int isNullColBind = TSDB_TRUE;
    for (int j = 0; j < numCols; ++j) {
      TAOS_BIND* bind = colBinds + j;
1090 1091
      bind->is_null = &isNullColBind;
    }
1092 1093
    for (int j = 0; j < point->fieldNum; ++j) {
      TAOS_SML_KV* kv = point->fields + j;
1094 1095 1096 1097
      uintptr_t valPointer = (uintptr_t)kv;
      size_t* pFieldSchemaIdx = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t));
      assert(pFieldSchemaIdx != NULL);
      TAOS_BIND* bind = colBinds + *pFieldSchemaIdx;
1098
      bind->buffer_type = kv->type;
1099 1100
      bind->length = malloc(sizeof(uintptr_t*));
      *bind->length = kv->length;
1101 1102 1103
      bind->buffer = kv->value;
      bind->is_null = NULL;
    }
1104 1105
    taosArrayPush(rowsBind, &colBinds);
  }
S
shenglian zhou 已提交
1106

1107 1108 1109 1110
  code = insertChildTableBatch(taos, cTableName, sTableSchema->fields, rowsBind, info);
  if (code != 0) {
    tscError("SML:0x%"PRIx64" insert into child table %s failed. error %s", info->id, cTableName, tstrerror(code));
  }
S
shenglian zhou 已提交
1111

1112 1113 1114 1115 1116
  for (int i = 0; i < rows; ++i) {
    TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i);
    for (int j = 0; j < numCols; ++j) {
      TAOS_BIND* bind = colBinds + j;
      free(bind->length);
S
shenglian zhou 已提交
1117
    }
1118 1119 1120 1121 1122
    free(colBinds);
  }
  taosArrayDestroy(rowsBind);
  return code;
}
S
shenglian zhou 已提交
1123

S
shenglian zhou 已提交
1124
static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas, SSmlLinesInfo* info) {
1125
  int32_t code = TSDB_CODE_SUCCESS;
1126

1127
  SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
S
shenglian zhou 已提交
1128
  arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas, info);
1129 1130 1131 1132 1133 1134

  SArray** pCTablePoints = taosHashIterate(cname2points, NULL);
  while (pCTablePoints) {
    SArray* cTablePoints = *pCTablePoints;

    TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0);
1135 1136 1137 1138
    uintptr_t valPointer = (uintptr_t)point;
    size_t* pSchemaIndex = taosHashGet(info->smlDataToSchema, &valPointer, sizeof(uintptr_t));
    assert(pSchemaIndex != NULL);
    SSmlSTableSchema*    sTableSchema = taosArrayGet(stableSchemas, *pSchemaIndex);
S
shenglian zhou 已提交
1139 1140 1141

    tscDebug("SML:0x%"PRIx64" apply child table tags. child table: %s", info->id, point->childTableName);
    code = applyChildTableTags(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints, info);
1142 1143 1144
    if (code != 0) {
      tscError("apply child table tags failed. child table %s, error %s", point->childTableName, tstrerror(code));
      goto cleanup;
S
shenglian zhou 已提交
1145
    }
S
shenglian zhou 已提交
1146 1147 1148

    tscDebug("SML:0x%"PRIx64" apply child table points. child table: %s", info->id, point->childTableName);
    code = applyChildTableFields(taos, sTableSchema, point->childTableName, cTablePoints, info);
1149
    if (code != 0) {
1150
      tscError("SML:0x%"PRIx64" Apply child table fields failed. child table %s, error %s", info->id, point->childTableName, tstrerror(code));
1151
      goto cleanup;
1152
    }
1153

S
shenglian zhou 已提交
1154
    tscDebug("SML:0x%"PRIx64" successfully applied data points of child table %s", info->id, point->childTableName);
S
shenglian zhou 已提交
1155

S
shenglian zhou 已提交
1156
    pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
1157
  }
S
shenglian zhou 已提交
1158

1159 1160 1161 1162 1163
cleanup:
  pCTablePoints = taosHashIterate(cname2points, NULL);
  while (pCTablePoints) {
    SArray* pPoints = *pCTablePoints;
    taosArrayDestroy(pPoints);
S
shenglian zhou 已提交
1164
    pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
1165
  }
S
shenglian zhou 已提交
1166
  taosHashCleanup(cname2points);
1167
  return code;
S
shenglian zhou 已提交
1168
}
1169

1170
int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
1171
  tscDebug("SML:0x%"PRIx64" taos_sml_insert. number of points: %d", info->id, numPoint);
S
Shenglian Zhou 已提交
1172

S
shenglian zhou 已提交
1173
  int32_t code = TSDB_CODE_SUCCESS;
1174
  info->smlDataToSchema = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, false);
S
shenglian zhou 已提交
1175

S
shenglian zhou 已提交
1176
  tscDebug("SML:0x%"PRIx64" build data point schemas", info->id);
S
shenglian zhou 已提交
1177
  SArray* stableSchemas = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray<STableColumnsSchema>
S
shenglian zhou 已提交
1178
  code = buildDataPointSchemas(points, numPoint, stableSchemas, info);
S
shenglian zhou 已提交
1179
  if (code != 0) {
S
shenglian zhou 已提交
1180
    tscError("SML:0x%"PRIx64" error building data point schemas : %s", info->id, tstrerror(code));
S
shenglian zhou 已提交
1181 1182 1183
    goto clean_up;
  }

S
shenglian zhou 已提交
1184 1185
  tscDebug("SML:0x%"PRIx64" modify db schemas", info->id);
  code = modifyDBSchemas(taos, stableSchemas, info);
S
shenglian zhou 已提交
1186
  if (code != 0) {
S
shenglian zhou 已提交
1187
    tscError("SML:0x%"PRIx64" error change db schema : %s", info->id, tstrerror(code));
S
shenglian zhou 已提交
1188 1189
    goto clean_up;
  }
S
shenglian zhou 已提交
1190

S
shenglian zhou 已提交
1191 1192
  tscDebug("SML:0x%"PRIx64" apply data points", info->id);
  code = applyDataPoints(taos, points, numPoint, stableSchemas, info);
S
shenglian zhou 已提交
1193
  if (code != 0) {
S
shenglian zhou 已提交
1194
    tscError("SML:0x%"PRIx64" error apply data points : %s", info->id, tstrerror(code));
S
shenglian zhou 已提交
1195 1196 1197
  }

clean_up:
S
shenglian zhou 已提交
1198 1199 1200 1201
  for (int i = 0; i < taosArrayGetSize(stableSchemas); ++i) {
    SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i);
    taosArrayDestroy(schema->fields);
    taosArrayDestroy(schema->tags);
1202
  }
S
shenglian zhou 已提交
1203
  taosArrayDestroy(stableSchemas);
1204 1205 1206 1207 1208 1209 1210 1211 1212
  taosHashCleanup(info->smlDataToSchema);
  return code;
}

int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
  SSmlLinesInfo* info = calloc(1, sizeof(SSmlLinesInfo));
  info->id = genLinesSmlId();
  int code = tscSmlInsert(taos, points, numPoint, info);
  free(info);
1213 1214
  return code;
}
S
shenglian zhou 已提交
1215

1216 1217
//=========================================================================

1218 1219 1220 1221 1222
/*        Field                          Escape charaters
    1: measurement                        Comma,Space
    2: tag_key, tag_value, field_key  Comma,Equal Sign,Space
    3: field_value                    Double quote,Backslash
*/
1223
static void escapeSpecialCharacter(uint8_t field, const char **pos) {
1224 1225 1226
  const char *cur = *pos;
  if (*cur != '\\') {
    return;
1227
  }
1228 1229 1230 1231 1232 1233
  switch (field) {
    case 1:
      switch (*(cur + 1)) {
        case ',':
        case ' ':
          cur++;
1234
          break;
1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261
        default:
          break;
      }
      break;
    case 2:
      switch (*(cur + 1)) {
        case ',':
        case ' ':
        case '=':
          cur++;
          break;
        default:
          break;
      }
      break;
    case 3:
      switch (*(cur + 1)) {
        case '"':
        case '\\':
          cur++;
          break;
        default:
          break;
      }
      break;
    default:
      break;
1262
  }
1263
  *pos = cur;
1264
}
1265

1266
static bool isValidInteger(char *str) {
1267 1268
  char *c = str;
  if (*c != '+' && *c != '-' && !isdigit(*c)) {
1269 1270
    return false;
  }
1271 1272 1273 1274 1275 1276
  c++;
  while (*c != '\0') {
    if (!isdigit(*c)) {
      return false;
    }
    c++;
1277
  }
1278
  return true;
1279
}
1280

1281
static bool isValidFloat(char *str) {
1282 1283 1284 1285 1286 1287 1288
  char *c = str;
  uint8_t has_dot, has_exp, has_sign;
  has_dot = 0;
  has_exp = 0;
  has_sign = 0;

  if (*c != '+' && *c != '-' && *c != '.' && !isdigit(*c)) {
1289 1290
    return false;
  }
1291 1292
  if (*c == '.' && isdigit(*(c + 1))) {
    has_dot = 1;
1293
  }
1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334
  c++;
  while (*c != '\0') {
    if (!isdigit(*c)) {
      switch (*c) {
        case '.': {
          if (!has_dot && !has_exp && isdigit(*(c + 1))) {
            has_dot = 1;
          } else {
            return false;
          }
          break;
        }
        case 'e':
        case 'E': {
          if (!has_exp && isdigit(*(c - 1)) &&
              (isdigit(*(c + 1)) ||
               *(c + 1) == '+' ||
               *(c + 1) == '-')) {
            has_exp = 1;
          } else {
            return false;
          }
          break;
        }
        case '+':
        case '-': {
          if (!has_sign && has_exp && isdigit(*(c + 1))) {
            has_sign = 1;
          } else {
            return false;
          }
          break;
        }
        default: {
          return false;
        }
      }
    }
    c++;
  } //while
  return true;
1335
}
1336

1337
static bool isTinyInt(char *pVal, uint16_t len) {
1338 1339 1340 1341
  if (len <= 2) {
    return false;
  }
  if (!strcmp(&pVal[len - 2], "i8")) {
1342
    //printf("Type is int8(%s)\n", pVal);
1343 1344 1345 1346
    return true;
  }
  return false;
}
1347

1348
static bool isTinyUint(char *pVal, uint16_t len) {
1349 1350 1351 1352 1353
  if (len <= 2) {
    return false;
  }
  if (pVal[0] == '-') {
    return false;
1354
  }
1355
  if (!strcmp(&pVal[len - 2], "u8")) {
1356
    //printf("Type is uint8(%s)\n", pVal);
1357 1358 1359
    return true;
  }
  return false;
1360 1361
}

1362
static bool isSmallInt(char *pVal, uint16_t len) {
1363 1364 1365 1366
  if (len <= 3) {
    return false;
  }
  if (!strcmp(&pVal[len - 3], "i16")) {
1367
    //printf("Type is int16(%s)\n", pVal);
1368
    return true;
1369
  }
1370
  return false;
1371 1372
}

1373
static bool isSmallUint(char *pVal, uint16_t len) {
1374 1375
  if (len <= 3) {
    return false;
1376
  }
1377 1378 1379 1380
  if (pVal[0] == '-') {
    return false;
  }
  if (strcmp(&pVal[len - 3], "u16") == 0) {
1381
    //printf("Type is uint16(%s)\n", pVal);
1382 1383 1384
    return true;
  }
  return false;
1385 1386
}

1387
static bool isInt(char *pVal, uint16_t len) {
1388 1389
  if (len <= 3) {
    return false;
1390
  }
1391
  if (strcmp(&pVal[len - 3], "i32") == 0) {
1392
    //printf("Type is int32(%s)\n", pVal);
1393 1394 1395
    return true;
  }
  return false;
1396 1397
}

1398
static bool isUint(char *pVal, uint16_t len) {
1399 1400 1401 1402 1403 1404 1405
  if (len <= 3) {
    return false;
  }
  if (pVal[0] == '-') {
    return false;
  }
  if (strcmp(&pVal[len - 3], "u32") == 0) {
1406
    //printf("Type is uint32(%s)\n", pVal);
1407 1408 1409
    return true;
  }
  return false;
1410 1411
}

1412
static bool isBigInt(char *pVal, uint16_t len) {
1413 1414
  if (len <= 3) {
    return false;
1415
  }
1416
  if (strcmp(&pVal[len - 3], "i64") == 0) {
1417
    //printf("Type is int64(%s)\n", pVal);
1418 1419 1420
    return true;
  }
  return false;
1421 1422
}

1423
static bool isBigUint(char *pVal, uint16_t len) {
1424 1425 1426 1427 1428
  if (len <= 3) {
    return false;
  }
  if (pVal[0] == '-') {
    return false;
1429
  }
1430
  if (strcmp(&pVal[len - 3], "u64") == 0) {
1431
    //printf("Type is uint64(%s)\n", pVal);
1432 1433 1434
    return true;
  }
  return false;
1435 1436
}

1437
static bool isFloat(char *pVal, uint16_t len) {
1438 1439 1440 1441
  if (len <= 3) {
    return false;
  }
  if (strcmp(&pVal[len - 3], "f32") == 0) {
1442
    //printf("Type is float(%s)\n", pVal);
1443 1444 1445
    return true;
  }
  return false;
1446 1447
}

1448
static bool isDouble(char *pVal, uint16_t len) {
1449 1450 1451 1452
  if (len <= 3) {
    return false;
  }
  if (strcmp(&pVal[len - 3], "f64") == 0) {
1453
    //printf("Type is double(%s)\n", pVal);
1454 1455 1456 1457 1458
    return true;
  }
  return false;
}

1459
static bool isBool(char *pVal, uint16_t len, bool *bVal) {
1460 1461 1462
  if ((len == 1) &&
      (pVal[len - 1] == 't' ||
       pVal[len - 1] == 'T')) {
1463 1464
    //printf("Type is bool(%c)\n", pVal[len - 1]);
    *bVal = true;
1465
    return true;
1466
  }
1467 1468 1469 1470

  if ((len == 1) &&
      (pVal[len - 1] == 'f' ||
       pVal[len - 1] == 'F')) {
1471 1472
    //printf("Type is bool(%c)\n", pVal[len - 1]);
    *bVal = false;
1473
    return true;
1474
  }
1475 1476 1477 1478 1479

  if((len == 4) &&
     (!strcmp(&pVal[len - 4], "true") ||
      !strcmp(&pVal[len - 4], "True") ||
      !strcmp(&pVal[len - 4], "TRUE"))) {
1480 1481
    //printf("Type is bool(%s)\n", &pVal[len - 4]);
    *bVal = true;
1482 1483 1484 1485 1486 1487
    return true;
  }
  if((len == 5) &&
     (!strcmp(&pVal[len - 5], "false") ||
      !strcmp(&pVal[len - 5], "False") ||
      !strcmp(&pVal[len - 5], "FALSE"))) {
1488 1489
    //printf("Type is bool(%s)\n", &pVal[len - 5]);
    *bVal = false;
1490 1491 1492
    return true;
  }
  return false;
1493 1494
}

1495
static bool isBinary(char *pVal, uint16_t len) {
1496 1497 1498 1499 1500 1501
  //binary: "abc"
  if (len < 2) {
    return false;
  }
  //binary
  if (pVal[0] == '"' && pVal[len - 1] == '"') {
1502
    //printf("Type is binary(%s)\n", pVal);
1503 1504 1505 1506
    return true;
  }
  return false;
}
1507

1508
static bool isNchar(char *pVal, uint16_t len) {
1509 1510
  //nchar: L"abc"
  if (len < 3) {
1511 1512
    return false;
  }
1513
  if (pVal[0] == 'L' && pVal[1] == '"' && pVal[len - 1] == '"') {
1514
    //printf("Type is nchar(%s)\n", pVal);
1515
    return true;
1516
  }
1517 1518 1519
  return false;
}

1520
static bool isTimeStamp(char *pVal, uint16_t len, SMLTimeStampType *tsType) {
1521 1522 1523 1524 1525
  if (len == 0) {
    return true;
  }
  if ((len == 1) && pVal[0] == '0') {
    *tsType = SML_TIME_STAMP_NOW;
1526
    //printf("Type is timestamp(%s)\n", pVal);
1527 1528 1529 1530 1531 1532 1533 1534
    return true;
  }
  if (len < 2) {
    return false;
  }
  //No appendix use usec as default
  if (isdigit(pVal[len - 1]) && isdigit(pVal[len - 2])) {
    *tsType = SML_TIME_STAMP_MICRO_SECONDS;
1535
    //printf("Type is timestamp(%s)\n", pVal);
1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551
    return true;
  }
  if (pVal[len - 1] == 's') {
    switch (pVal[len - 2]) {
      case 'm':
        *tsType = SML_TIME_STAMP_MILLI_SECONDS;
        break;
      case 'u':
        *tsType = SML_TIME_STAMP_MICRO_SECONDS;
        break;
      case 'n':
        *tsType = SML_TIME_STAMP_NANO_SECONDS;
        break;
      default:
        if (isdigit(pVal[len - 2])) {
          *tsType = SML_TIME_STAMP_SECONDS;
1552
          break;
1553
        } else {
1554 1555 1556
          return false;
        }
    }
1557
    //printf("Type is timestamp(%s)\n", pVal);
1558 1559 1560 1561
    return true;
  }
  return false;
}
1562

1563
static bool convertStrToNumber(TAOS_SML_KV *pVal, char*str, SSmlLinesInfo* info) {
1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581
  errno = 0;
  uint8_t type = pVal->type;
  int16_t length = pVal->length;
  int64_t val_s;
  uint64_t val_u;
  double val_d;

  if (IS_FLOAT_TYPE(type)) {
    val_d = strtod(str, NULL);
  } else {
    if (IS_SIGNED_NUMERIC_TYPE(type)) {
      val_s = strtoll(str, NULL, 10);
    } else {
      val_u = strtoull(str, NULL, 10);
    }
  }

  if (errno == ERANGE) {
1582
    tscError("SML:0x%"PRIx64" Convert number(%s) out of range", info->id, str);
1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661
    return false;
  }

  switch (type) {
    case TSDB_DATA_TYPE_TINYINT:
      if (!IS_VALID_TINYINT(val_s)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(int8_t *)(pVal->value) = (int8_t)val_s;
      break;
    case TSDB_DATA_TYPE_UTINYINT:
      if (!IS_VALID_UTINYINT(val_u)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(uint8_t *)(pVal->value) = (uint8_t)val_u;
      break;
    case TSDB_DATA_TYPE_SMALLINT:
      if (!IS_VALID_SMALLINT(val_s)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(int16_t *)(pVal->value) = (int16_t)val_s;
      break;
    case TSDB_DATA_TYPE_USMALLINT:
      if (!IS_VALID_USMALLINT(val_u)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(uint16_t *)(pVal->value) = (uint16_t)val_u;
      break;
    case TSDB_DATA_TYPE_INT:
      if (!IS_VALID_INT(val_s)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(int32_t *)(pVal->value) = (int32_t)val_s;
      break;
    case TSDB_DATA_TYPE_UINT:
      if (!IS_VALID_UINT(val_u)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(uint32_t *)(pVal->value) = (uint32_t)val_u;
      break;
    case TSDB_DATA_TYPE_BIGINT:
      if (!IS_VALID_BIGINT(val_s)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(int64_t *)(pVal->value) = (int64_t)val_s;
      break;
    case TSDB_DATA_TYPE_UBIGINT:
      if (!IS_VALID_UBIGINT(val_u)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(uint64_t *)(pVal->value) = (uint64_t)val_u;
      break;
    case TSDB_DATA_TYPE_FLOAT:
      if (!IS_VALID_FLOAT(val_d)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(float *)(pVal->value) = (float)val_d;
      break;
    case TSDB_DATA_TYPE_DOUBLE:
      if (!IS_VALID_DOUBLE(val_d)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(double *)(pVal->value) = (double)val_d;
      break;
    default:
      return false;
  }
  return true;
}
1662
//len does not include '\0' from value.
1663
static bool convertSmlValueType(TAOS_SML_KV *pVal, char *value,
1664
                                uint16_t len, SSmlLinesInfo* info) {
1665 1666 1667
  if (len <= 0) {
    return false;
  }
G
Ganlin Zhao 已提交
1668

1669
  //integer number
1670
  if (isTinyInt(value, len)) {
1671 1672 1673
    pVal->type = TSDB_DATA_TYPE_TINYINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 2] = '\0';
1674
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1675
      return false;
1676 1677 1678
    }
    return true;
  }
1679
  if (isTinyUint(value, len)) {
1680 1681 1682
    pVal->type = TSDB_DATA_TYPE_UTINYINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 2] = '\0';
1683
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1684 1685 1686 1687
      return false;
    }
    return true;
  }
1688
  if (isSmallInt(value, len)) {
1689 1690 1691
    pVal->type = TSDB_DATA_TYPE_SMALLINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1692
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1693 1694 1695 1696
      return false;
    }
    return true;
  }
1697
  if (isSmallUint(value, len)) {
1698 1699 1700
    pVal->type = TSDB_DATA_TYPE_USMALLINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1701
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1702 1703 1704 1705
      return false;
    }
    return true;
  }
1706
  if (isInt(value, len)) {
1707 1708 1709
    pVal->type = TSDB_DATA_TYPE_INT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1710
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1711 1712 1713 1714
      return false;
    }
    return true;
  }
1715
  if (isUint(value, len)) {
1716 1717 1718
    pVal->type = TSDB_DATA_TYPE_UINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1719
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1720 1721 1722 1723
      return false;
    }
    return true;
  }
1724
  if (isBigInt(value, len)) {
1725 1726 1727
    pVal->type = TSDB_DATA_TYPE_BIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1728
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1729 1730 1731 1732
      return false;
    }
    return true;
  }
1733
  if (isBigUint(value, len)) {
1734 1735 1736
    pVal->type = TSDB_DATA_TYPE_UBIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1737
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1738 1739 1740 1741
      return false;
    }
    return true;
  }
1742 1743 1744 1745 1746
  //floating number
  if (isFloat(value, len)) {
    pVal->type = TSDB_DATA_TYPE_FLOAT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1747
    if (!isValidFloat(value) || !convertStrToNumber(pVal, value, info)) {
1748 1749 1750 1751 1752 1753 1754 1755
      return false;
    }
    return true;
  }
  if (isDouble(value, len)) {
    pVal->type = TSDB_DATA_TYPE_DOUBLE;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1756
    if (!isValidFloat(value) || !convertStrToNumber(pVal, value, info)) {
1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787
      return false;
    }
    return true;
  }
  //binary
  if (isBinary(value, len)) {
    pVal->type = TSDB_DATA_TYPE_BINARY;
    pVal->length = len - 2;
    pVal->value = calloc(pVal->length, 1);
    //copy after "
    memcpy(pVal->value, value + 1, pVal->length);
    return true;
  }
  //nchar
  if (isNchar(value, len)) {
    pVal->type = TSDB_DATA_TYPE_NCHAR;
    pVal->length = len - 3;
    pVal->value = calloc(pVal->length, 1);
    //copy after L"
    memcpy(pVal->value, value + 2, pVal->length);
    return true;
  }
  //bool
  bool bVal;
  if (isBool(value, len, &bVal)) {
    pVal->type = TSDB_DATA_TYPE_BOOL;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->value = calloc(pVal->length, 1);
    memcpy(pVal->value, &bVal, pVal->length);
    return true;
  }
G
Ganlin Zhao 已提交
1788 1789 1790 1791
  //Handle default(no appendix) as float
  if (isValidInteger(value) || isValidFloat(value)) {
    pVal->type = TSDB_DATA_TYPE_FLOAT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
1792
    if (!convertStrToNumber(pVal, value, info)) {
1793 1794
      return false;
    }
G
Ganlin Zhao 已提交
1795 1796
    return true;
  }
1797
  return false;
1798
}
1799

1800 1801
static int32_t getTimeStampValue(char *value, uint16_t len,
                                 SMLTimeStampType type, int64_t *ts) {
1802 1803 1804 1805 1806

  if (len >= 2) {
    for (int i = 0; i < len - 2; ++i) {
      if(!isdigit(value[i])) {
        return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
1807
      }
1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820
    }
  }
  //No appendix or no timestamp given (len = 0)
  if (len >= 1 && isdigit(value[len - 1]) && type != SML_TIME_STAMP_NOW) {
    type = SML_TIME_STAMP_MICRO_SECONDS;
  }
  if (len != 0) {
    *ts = (int64_t)strtoll(value, NULL, 10);
  } else {
    type = SML_TIME_STAMP_NOW;
  }
  switch (type) {
    case SML_TIME_STAMP_NOW: {
1821
      *ts = taosGetTimestampNs();
1822
      break;
1823 1824
    }
    case SML_TIME_STAMP_SECONDS: {
1825
      *ts = (int64_t)(*ts * 1e9);
1826
      break;
1827 1828
    }
    case SML_TIME_STAMP_MILLI_SECONDS: {
1829
      *ts = convertTimePrecision(*ts, TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_NANO);
1830
      break;
1831 1832
    }
    case SML_TIME_STAMP_MICRO_SECONDS: {
1833
      *ts = convertTimePrecision(*ts, TSDB_TIME_PRECISION_MICRO, TSDB_TIME_PRECISION_NANO);
1834
      break;
1835 1836
    }
    case SML_TIME_STAMP_NANO_SECONDS: {
1837
      *ts = *ts * 1;
1838 1839 1840 1841 1842
      break;
    }
    default: {
      return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
    }
1843
  }
1844
  return TSDB_CODE_SUCCESS;
1845 1846
}

1847
static int32_t convertSmlTimeStamp(TAOS_SML_KV *pVal, char *value,
1848
                                   uint16_t len, SSmlLinesInfo* info) {
1849 1850 1851
  int32_t ret;
  SMLTimeStampType type;
  int64_t tsVal;
1852

1853
  if (!isTimeStamp(value, len, &type)) {
1854
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
1855 1856
  }

1857
  ret = getTimeStampValue(value, len, type, &tsVal);
1858 1859 1860
  if (ret) {
    return ret;
  }
1861
  tscDebug("SML:0x%"PRIx64"Timestamp after conversion:%"PRId64, info->id, tsVal);
1862 1863 1864 1865 1866 1867 1868 1869

  pVal->type = TSDB_DATA_TYPE_TIMESTAMP;
  pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
  pVal->value = calloc(pVal->length, 1);
  memcpy(pVal->value, &tsVal, pVal->length);
  return TSDB_CODE_SUCCESS;
}

1870
static int32_t parseSmlTimeStamp(TAOS_SML_KV **pTS, const char **index, SSmlLinesInfo* info) {
1871
  const char *start, *cur;
1872 1873 1874
  int32_t ret = TSDB_CODE_SUCCESS;
  int len = 0;
  char key[] = "_ts";
1875
  char *value = NULL;
1876

1877
  start = cur = *index;
1878
  *pTS = calloc(1, sizeof(TAOS_SML_KV));
1879

1880
  while(*cur != '\0') {
1881 1882 1883 1884
    cur++;
    len++;
  }

1885
  if (len > 0) {
1886
    value = calloc(len + 1, 1);
1887 1888 1889
    memcpy(value, start, len);
  }

1890
  ret = convertSmlTimeStamp(*pTS, value, len, info);
1891
  if (ret) {
1892
    free(value);
1893 1894
    free(*pTS);
    return ret;
1895
  }
1896
  free(value);
1897

1898 1899 1900
  (*pTS)->key = calloc(sizeof(key), 1);
  memcpy((*pTS)->key, key, sizeof(key));
  return ret;
1901
}
1902

1903
static bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info) {
1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916
  char *val = NULL;
  char *cur = key;
  char keyLower[TSDB_COL_NAME_LEN];
  size_t keyLen = 0;
  while(*cur != '\0') {
    keyLower[keyLen] = tolower(*cur);
    keyLen++;
    cur++;
  }
  keyLower[keyLen] = '\0';

  val = taosHashGet(pHash, keyLower, keyLen);
  if (val) {
1917
    tscError("SML:0x%"PRIx64" Duplicate key detected:%s", info->id, keyLower);
1918 1919 1920 1921 1922 1923 1924 1925 1926
    return true;
  }

  uint8_t dummy_val = 0;
  taosHashPut(pHash, keyLower, strlen(key), &dummy_val, sizeof(uint8_t));

  return false;
}

1927
static int32_t parseSmlKey(TAOS_SML_KV *pKV, const char **index, SHashObj *pHash, SSmlLinesInfo* info) {
1928
  const char *cur = *index;
1929
  char key[TSDB_COL_NAME_LEN + 1];  // +1 to avoid key[len] over write
1930 1931
  uint16_t len = 0;

G
Ganlin Zhao 已提交
1932 1933
  //key field cannot start with digit
  if (isdigit(*cur)) {
1934
    tscError("SML:0x%"PRIx64" Tag key cannnot start with digit", info->id);
1935
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
1936 1937 1938
  }
  while (*cur != '\0') {
    if (len > TSDB_COL_NAME_LEN) {
1939
      tscError("SML:0x%"PRIx64" Key field cannot exceeds 65 characters", info->id);
1940
      return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
1941 1942 1943 1944 1945 1946 1947
    }
    //unescaped '=' identifies a tag key
    if (*cur == '=' && *(cur - 1) != '\\') {
      break;
    }
    //Escape special character
    if (*cur == '\\') {
1948
      escapeSpecialCharacter(2, &cur);
1949
    }
1950 1951 1952 1953 1954
    key[len] = *cur;
    cur++;
    len++;
  }
  key[len] = '\0';
1955

1956
  if (checkDuplicateKey(key, pHash, info)) {
1957 1958 1959
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
  }

1960 1961
  pKV->key = calloc(len + 1, 1);
  memcpy(pKV->key, key, len + 1);
1962
  //tscDebug("SML:0x%"PRIx64" Key:%s|len:%d", info->id, pKV->key, len);
1963
  *index = cur + 1;
1964
  return TSDB_CODE_SUCCESS;
1965
}
1966

1967

1968
static bool parseSmlValue(TAOS_SML_KV *pKV, const char **index,
1969
                          bool *is_last_kv, SSmlLinesInfo* info) {
1970 1971
  const char *start, *cur;
  char *value = NULL;
1972
  uint16_t len = 0;
1973 1974
  start = cur = *index;

1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985
  while (1) {
    // unescaped ',' or ' ' or '\0' identifies a value
    if ((*cur == ',' || *cur == ' ' || *cur == '\0') && *(cur - 1) != '\\') {
      //unescaped ' ' or '\0' indicates end of value
      *is_last_kv = (*cur == ' ' || *cur == '\0') ? true : false;
      break;
    }
    //Escape special character
    if (*cur == '\\') {
      escapeSpecialCharacter(2, &cur);
    }
1986 1987 1988
    cur++;
    len++;
  }
1989

1990 1991 1992
  value = calloc(len + 1, 1);
  memcpy(value, start, len);
  value[len] = '\0';
1993 1994 1995
  if (!convertSmlValueType(pKV, value, len, info)) {
    tscError("SML:0x%"PRIx64" Failed to convert sml value string(%s) to any type",
            info->id, value);
1996 1997
    //free previous alocated key field
    free(pKV->key);
1998
    pKV->key = NULL;
1999
    free(value);
2000
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
2001
  }
2002
  free(value);
2003

2004 2005 2006 2007 2008
  *index = (*cur == '\0') ? cur : cur + 1;
  return TSDB_CODE_SUCCESS;
}

static int32_t parseSmlMeasurement(TAOS_SML_DATA_POINT *pSml, const char **index,
2009
                                   uint8_t *has_tags, SSmlLinesInfo* info) {
2010 2011 2012
  const char *cur = *index;
  uint16_t len = 0;

2013 2014 2015 2016
  pSml->stableName = calloc(TSDB_TABLE_NAME_LEN + 1, 1);    // +1 to avoid 1772 line over write
  if (pSml->stableName == NULL){
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
G
Ganlin Zhao 已提交
2017
  if (isdigit(*cur)) {
2018
    tscError("SML:0x%"PRIx64" Measurement field cannnot start with digit", info->id);
2019
    free(pSml->stableName);
2020
    pSml->stableName = NULL;
2021
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
2022 2023
  }

2024 2025
  while (*cur != '\0') {
    if (len > TSDB_TABLE_NAME_LEN) {
2026
      tscError("SML:0x%"PRIx64" Measurement field cannot exceeds 193 characters", info->id);
2027
      free(pSml->stableName);
2028
      pSml->stableName = NULL;
2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049
      return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
    }
    //first unescaped comma or space identifies measurement
    //if space detected first, meaning no tag in the input
    if (*cur == ',' && *(cur - 1) != '\\') {
      *has_tags = 1;
      break;
    }
    if (*cur == ' ' && *(cur - 1) != '\\') {
      break;
    }
    //Comma, Space, Backslash needs to be escaped if any
    if (*cur == '\\') {
      escapeSpecialCharacter(1, &cur);
    }
    pSml->stableName[len] = *cur;
    cur++;
    len++;
  }
  pSml->stableName[len] = '\0';
  *index = cur + 1;
2050
  tscDebug("SML:0x%"PRIx64" Stable name in measurement:%s|len:%d", info->id, pSml->stableName, len);
2051 2052

  return TSDB_CODE_SUCCESS;
2053
}
2054

2055 2056 2057 2058
//Table name can only contain digits(0-9),alphebet(a-z),underscore(_)
static int32_t isValidChildTableName(const char *pTbName, int16_t len) {
  const char *cur = pTbName;
  for (int i = 0; i < len; ++i) {
2059
    if(!isdigit(cur[i]) && !isalpha(cur[i]) && (cur[i] != '_')) {
2060 2061 2062 2063 2064 2065 2066
      return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
    }
  }
  return TSDB_CODE_SUCCESS;
}


2067
static int32_t parseSmlKvPairs(TAOS_SML_KV **pKVs, int *num_kvs,
2068
                               const char **index, bool isField,
2069 2070
                               TAOS_SML_DATA_POINT* smlData, SHashObj *pHash,
                               SSmlLinesInfo* info) {
2071
  const char *cur = *index;
2072
  int32_t ret = TSDB_CODE_SUCCESS;
2073 2074
  TAOS_SML_KV *pkv;
  bool is_last_kv = false;
2075

2076
  int32_t capacity = 0;
2077
  if (isField) {
2078 2079 2080
    capacity = 64;
    *pKVs = calloc(capacity, sizeof(TAOS_SML_KV));
    // leave space for timestamp;
2081 2082
    pkv = *pKVs;
    pkv++;
2083 2084 2085
  } else {
    capacity = 8;
    *pKVs = calloc(capacity, sizeof(TAOS_SML_KV));
2086 2087
    pkv = *pKVs;
  }
2088

2089
  while (*cur != '\0') {
2090
    ret = parseSmlKey(pkv, &cur, pHash, info);
2091
    if (ret) {
2092
      tscError("SML:0x%"PRIx64" Unable to parse key", info->id);
2093 2094
      goto error;
    }
2095
    ret = parseSmlValue(pkv, &cur, &is_last_kv, info);
2096
    if (ret) {
2097
      tscError("SML:0x%"PRIx64" Unable to parse value", info->id);
2098 2099
      goto error;
    }
2100 2101
    if (!isField &&
        (strcasecmp(pkv->key, "ID") == 0) && pkv->type == TSDB_DATA_TYPE_BINARY) {
2102
      ret = isValidChildTableName(pkv->value, pkv->length);
2103
      if (ret) {
2104 2105
        goto error;
      }
2106 2107 2108 2109 2110 2111 2112 2113
      smlData->childTableName = malloc( pkv->length + 1);
      memcpy(smlData->childTableName, pkv->value, pkv->length);
      smlData->childTableName[pkv->length] = '\0';
      free(pkv->key);
      free(pkv->value);
    } else {
      *num_kvs += 1;
    }
2114
    if (is_last_kv) {
2115 2116 2117 2118 2119
      goto done;
    }

    //reallocate addtional memory for more kvs
    TAOS_SML_KV *more_kvs = NULL;
2120

2121
    if (isField) {
2122 2123
      if ((*num_kvs + 2) > capacity) {
        capacity *= 3; capacity /= 2;
2124 2125 2126
        more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV));
      } else {
        more_kvs = *pKVs;
2127
      }
2128
    } else {
2129 2130
      if ((*num_kvs + 1) > capacity) {
        capacity *= 3; capacity /= 2;
2131 2132 2133
        more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV));
      } else {
        more_kvs = *pKVs;
2134
      }
2135
    }
2136

2137
    if (!more_kvs) {
2138 2139
      goto error;
    }
2140 2141 2142 2143 2144 2145 2146 2147 2148
    *pKVs = more_kvs;
    //move pKV points to next TAOS_SML_KV block
    if (isField) {
      pkv = *pKVs + *num_kvs + 1;
    } else {
      pkv = *pKVs + *num_kvs;
    }
  }
  goto done;
2149

2150
error:
2151
  return ret;
2152
done:
2153
  *index = cur;
2154
  return ret;
2155
}
2156

2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169
static void moveTimeStampToFirstKv(TAOS_SML_DATA_POINT** smlData, TAOS_SML_KV *ts) {
  TAOS_SML_KV* tsField = (*smlData)->fields;
  tsField->length = ts->length;
  tsField->type = ts->type;
  tsField->value = malloc(ts->length);
  tsField->key = malloc(strlen(ts->key) + 1);
  memcpy(tsField->key, ts->key, strlen(ts->key) + 1);
  memcpy(tsField->value, ts->value, ts->length);
  (*smlData)->fieldNum = (*smlData)->fieldNum + 1;

  free(ts->key);
  free(ts->value);
  free(ts);
2170 2171
}

2172
int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLinesInfo* info) {
2173
  const char* index = sql;
2174
  int32_t ret = TSDB_CODE_SUCCESS;
2175 2176
  uint8_t has_tags = 0;
  TAOS_SML_KV *timestamp = NULL;
2177
  SHashObj *keyHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
2178

2179
  ret = parseSmlMeasurement(smlData, &index, &has_tags, info);
2180
  if (ret) {
2181
    tscError("SML:0x%"PRIx64" Unable to parse measurement", info->id);
2182
    taosHashCleanup(keyHashTable);
2183
    return ret;
2184
  }
2185
  tscDebug("SML:0x%"PRIx64" Parse measurement finished, has_tags:%d", info->id, has_tags);
2186 2187 2188

  //Parse Tags
  if (has_tags) {
2189
    ret = parseSmlKvPairs(&smlData->tags, &smlData->tagNum, &index, false, smlData, keyHashTable, info);
2190
    if (ret) {
2191
      tscError("SML:0x%"PRIx64" Unable to parse tag", info->id);
2192
      taosHashCleanup(keyHashTable);
2193 2194
      return ret;
    }
2195
  }
2196
  tscDebug("SML:0x%"PRIx64" Parse tags finished, num of tags:%d", info->id, smlData->tagNum);
2197 2198

  //Parse fields
2199
  ret = parseSmlKvPairs(&smlData->fields, &smlData->fieldNum, &index, true, smlData, keyHashTable, info);
2200
  if (ret) {
2201
    tscError("SML:0x%"PRIx64" Unable to parse field", info->id);
2202
    taosHashCleanup(keyHashTable);
2203
    return ret;
2204
  }
2205
  tscDebug("SML:0x%"PRIx64" Parse fields finished, num of fields:%d", info->id, smlData->fieldNum);
2206
  taosHashCleanup(keyHashTable);
2207

2208
  //Parse timestamp
2209
  ret = parseSmlTimeStamp(&timestamp, &index, info);
2210
  if (ret) {
2211
    tscError("SML:0x%"PRIx64" Unable to parse timestamp", info->id);
2212
    return ret;
2213
  }
2214
  moveTimeStampToFirstKv(&smlData, timestamp);
2215
  tscDebug("SML:0x%"PRIx64" Parse timestamp finished", info->id);
2216

2217
  return TSDB_CODE_SUCCESS;
2218 2219
}

2220
//=========================================================================
2221

S
shenglian zhou 已提交
2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236
void destroySmlDataPoint(TAOS_SML_DATA_POINT* point) {
  for (int i=0; i<point->tagNum; ++i) {
    free((point->tags+i)->key);
    free((point->tags+i)->value);
  }
  free(point->tags);
  for (int i=0; i<point->fieldNum; ++i) {
    free((point->fields+i)->key);
    free((point->fields+i)->value);
  }
  free(point->fields);
  free(point->stableName);
  free(point->childTableName);
}

S
shenglian zhou 已提交
2237
int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines, SSmlLinesInfo* info) {
2238
  for (int32_t i = 0; i < numLines; ++i) {
2239
    TAOS_SML_DATA_POINT point = {0};
2240
    int32_t code = tscParseLine(lines[i], &point, info);
2241
    if (code != TSDB_CODE_SUCCESS) {
S
shenglian zhou 已提交
2242
      tscError("SML:0x%"PRIx64" data point line parse failed. line %d : %s", info->id, i, lines[i]);
S
shenglian zhou 已提交
2243
      destroySmlDataPoint(&point);
2244 2245
      return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
    } else {
S
shenglian zhou 已提交
2246
      tscDebug("SML:0x%"PRIx64" data point line parse success. line %d", info->id, i);
2247 2248
    }

2249 2250 2251 2252 2253
    taosArrayPush(points, &point);
  }
  return 0;
}

2254
int taos_insert_lines(TAOS* taos, char* lines[], int numLines) {
2255
  int32_t code = 0;
2256

S
shenglian zhou 已提交
2257 2258 2259
  SSmlLinesInfo* info = calloc(1, sizeof(SSmlLinesInfo));
  info->id = genLinesSmlId();

2260
  if (numLines <= 0 || numLines > 65536) {
S
shenglian zhou 已提交
2261
    tscError("SML:0x%"PRIx64" taos_insert_lines numLines should be between 1 and 65536. numLines: %d", info->id, numLines);
2262 2263 2264 2265 2266 2267
    code = TSDB_CODE_TSC_APP_ERROR;
    return code;
  }

  for (int i = 0; i < numLines; ++i) {
    if (lines[i] == NULL) {
S
shenglian zhou 已提交
2268 2269
      tscError("SML:0x%"PRIx64" taos_insert_lines line %d is NULL", info->id, i);
      free(info);
2270 2271 2272 2273 2274
      code = TSDB_CODE_TSC_APP_ERROR;
      return code;
    }
  }

2275
  SArray* lpPoints = taosArrayInit(numLines, sizeof(TAOS_SML_DATA_POINT));
2276
  if (lpPoints == NULL) {
S
shenglian zhou 已提交
2277 2278
    tscError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id);
    free(info);
2279 2280
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
2281

S
shenglian zhou 已提交
2282 2283
  tscDebug("SML:0x%"PRIx64" taos_insert_lines begin inserting %d lines, first line: %s", info->id, numLines, lines[0]);
  code = tscParseLines(lines, numLines, lpPoints, NULL, info);
S
shenglian zhou 已提交
2284 2285
  size_t numPoints = taosArrayGetSize(lpPoints);

2286 2287
  if (code != 0) {
    goto cleanup;
2288 2289
  }

2290
  TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints);
2291
  code = tscSmlInsert(taos, points, (int)numPoints, info);
2292
  if (code != 0) {
S
shenglian zhou 已提交
2293
    tscError("SML:0x%"PRIx64" taos_sml_insert error: %s", info->id, tstrerror((code)));
2294
  }
S
Shenglian Zhou 已提交
2295

2296
cleanup:
S
shenglian zhou 已提交
2297
  tscDebug("SML:0x%"PRIx64" taos_insert_lines finish inserting %d lines. code: %d", info->id, numLines, code);
2298 2299
  points = TARRAY_GET_START(lpPoints);
  numPoints = taosArrayGetSize(lpPoints);
S
Shenglian Zhou 已提交
2300 2301 2302
  for (int i=0; i<numPoints; ++i) {
    destroySmlDataPoint(points+i);
  }
2303 2304

  taosArrayDestroy(lpPoints);
S
shenglian zhou 已提交
2305 2306

  free(info);
2307
  return code;
2308 2309
}