tscParseLineProtocol.c 75.2 KB
Newer Older
S
shenglian zhou 已提交
1 2 3 4
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
5

S
shenglian zhou 已提交
6 7 8 9 10 11 12 13
#include "os.h"
#include "osString.h"
#include "ttype.h"
#include "tmd5.h"
#include "tstrbuild.h"
#include "tname.h"
#include "hash.h"
#include "tskiplist.h"
14

S
shenglian zhou 已提交
15
#include "tscUtil.h"
16 17
#include "tsclient.h"
#include "tscLog.h"
S
shenglian zhou 已提交
18

19
#include "taos.h"
20
#include "tscParseLine.h"
21

S
shenglian zhou 已提交
22 23 24 25 26 27
typedef struct  {
  char sTableName[TSDB_TABLE_NAME_LEN];
  SHashObj* tagHash;
  SHashObj* fieldHash;
  SArray* tags; //SArray<SSchema>
  SArray* fields; //SArray<SSchema>
S
shenglian zhou 已提交
28
  uint8_t precision;
S
shenglian zhou 已提交
29 30
} SSmlSTableSchema;

31 32
//=================================================================================================

33 34 35 36 37 38 39 40 41 42 43 44
static uint64_t linesSmlHandleId = 0;

uint64_t genLinesSmlId() {
  uint64_t id;

  do {
    id = atomic_add_fetch_64(&linesSmlHandleId, 1);
  } while (id == 0);

  return id;
}

S
shenglian zhou 已提交
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
int compareSmlColKv(const void* p1, const void* p2) {
  TAOS_SML_KV* kv1 = (TAOS_SML_KV*)p1;
  TAOS_SML_KV* kv2 = (TAOS_SML_KV*)p2;
  int kvLen1 = (int)strlen(kv1->key);
  int kvLen2 = (int)strlen(kv2->key);
  int res = strncasecmp(kv1->key, kv2->key, MIN(kvLen1, kvLen2));
  if (res != 0) {
    return res;
  } else {
    return kvLen1-kvLen2;
  }
}

typedef enum {
  SCHEMA_ACTION_CREATE_STABLE,
  SCHEMA_ACTION_ADD_COLUMN,
  SCHEMA_ACTION_ADD_TAG,
  SCHEMA_ACTION_CHANGE_COLUMN_SIZE,
  SCHEMA_ACTION_CHANGE_TAG_SIZE,
} ESchemaAction;

typedef struct {
  char sTableName[TSDB_TABLE_NAME_LEN];
  SArray* tags; //SArray<SSchema>
  SArray* fields; //SArray<SSchema>
} SCreateSTableActionInfo;

typedef struct {
  char sTableName[TSDB_TABLE_NAME_LEN];
  SSchema* field;
} SAlterSTableActionInfo;

typedef struct {
  ESchemaAction action;
  union {
    SCreateSTableActionInfo createSTable;
    SAlterSTableActionInfo alterSTable;
  };
} SSchemaAction;

S
shenglian zhou 已提交
85
static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes, uint64_t id) {
S
shenglian zhou 已提交
86 87 88 89 90 91
  if (!IS_VAR_DATA_TYPE(kv->type)) {
    *bytes = tDataTypes[kv->type].bytes;
  } else {
    if (kv->type == TSDB_DATA_TYPE_NCHAR) {
      char* ucs = malloc(kv->length * TSDB_NCHAR_SIZE + 1);
      int32_t bytesNeeded = 0;
S
Shenglian Zhou 已提交
92 93 94
      bool succ = taosMbsToUcs4(kv->value, kv->length, ucs, kv->length * TSDB_NCHAR_SIZE, &bytesNeeded);
      if (!succ) {
        free(ucs);
S
shenglian zhou 已提交
95
        tscError("SML:0x%"PRIx64" convert nchar string to UCS4_LE failed:%s", id, kv->value);
S
Shenglian Zhou 已提交
96 97
        return TSDB_CODE_TSC_INVALID_VALUE;
      }
S
shenglian zhou 已提交
98 99 100 101 102 103 104 105 106
      free(ucs);
      *bytes =  bytesNeeded + VARSTR_HEADER_SIZE;
    } else if (kv->type == TSDB_DATA_TYPE_BINARY) {
      *bytes = kv->length + VARSTR_HEADER_SIZE;
    }
  }
  return 0;
}

S
shenglian zhou 已提交
107
static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* array, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
108
  SSchema* pField = NULL;
109 110
  size_t* pFieldIdx = taosHashGet(hash, smlKv->key, strlen(smlKv->key));
  size_t fieldIdx = -1;
S
Shenglian Zhou 已提交
111
  int32_t code = 0;
112 113 114
  if (pFieldIdx) {
    fieldIdx = *pFieldIdx;
    pField = taosArrayGet(array, fieldIdx);
S
shenglian zhou 已提交
115 116

    if (pField->type != smlKv->type) {
S
shenglian zhou 已提交
117
      tscError("SML:0x%"PRIx64" type mismatch. key %s, type %d. type before %d", info->id, smlKv->key, smlKv->type, pField->type);
S
Shenglian Zhou 已提交
118
      return TSDB_CODE_TSC_INVALID_VALUE;
S
shenglian zhou 已提交
119 120 121
    }

    int32_t bytes = 0;
S
shenglian zhou 已提交
122
    code = getFieldBytesFromSmlKv(smlKv, &bytes, info->id);
S
Shenglian Zhou 已提交
123 124 125
    if (code != 0) {
      return code;
    }
S
shenglian zhou 已提交
126 127 128
    pField->bytes = MAX(pField->bytes, bytes);

  } else {
S
shenglian zhou 已提交
129
    SSchema field = {0};
S
shenglian zhou 已提交
130 131 132 133 134 135
    size_t tagKeyLen = strlen(smlKv->key);
    strncpy(field.name, smlKv->key, tagKeyLen);
    field.name[tagKeyLen] = '\0';
    field.type = smlKv->type;

    int32_t bytes = 0;
S
shenglian zhou 已提交
136
    code = getFieldBytesFromSmlKv(smlKv, &bytes, info->id);
S
Shenglian Zhou 已提交
137 138 139
    if (code != 0) {
      return code;
    }
S
shenglian zhou 已提交
140 141 142
    field.bytes = bytes;

    pField = taosArrayPush(array, &field);
143 144
    fieldIdx = taosArrayGetSize(array) - 1;
    taosHashPut(hash, field.name, tagKeyLen, &fieldIdx, sizeof(fieldIdx));
S
shenglian zhou 已提交
145
  }
146

147
  smlKv->fieldSchemaIdx = (uint32_t)fieldIdx;
148 149 150 151

  return 0;
}

S
Shengliang Guan 已提交
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen,
                                       SSmlLinesInfo* info) {
  tscDebug("SML:0x%"PRIx64" taos_sml_insert get child table name through md5", info->id);
  qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv);

  SStringBuilder sb; memset(&sb, 0, sizeof(sb));
  char sTableName[TSDB_TABLE_NAME_LEN] = {0};
  strtolower(sTableName, point->stableName);
  taosStringBuilderAppendString(&sb, sTableName);
  for (int j = 0; j < point->tagNum; ++j) {
    taosStringBuilderAppendChar(&sb, ',');
    TAOS_SML_KV* tagKv = point->tags + j;
    char tagName[TSDB_COL_NAME_LEN] = {0};
    strtolower(tagName, tagKv->key);
    taosStringBuilderAppendString(&sb, tagName);
    taosStringBuilderAppendChar(&sb, '=');
    taosStringBuilderAppend(&sb, tagKv->value, tagKv->length);
  }
  size_t len = 0;
  char* keyJoined = taosStringBuilderGetResult(&sb, &len);
  MD5_CTX context;
  MD5Init(&context);
  MD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len);
  MD5Final(&context);
  *tableNameLen = snprintf(tableName, *tableNameLen,
                           "t_%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0],
                           context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6],
                           context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11],
                           context.digest[12], context.digest[13], context.digest[14], context.digest[15]);
  taosStringBuilderDestroy(&sb);
  tscDebug("SML:0x%"PRIx64" child table name: %s", info->id, tableName);
  return 0;
}

S
shenglian zhou 已提交
186
static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, SArray* stableSchemas, SSmlLinesInfo* info) {
S
Shenglian Zhou 已提交
187
  int32_t code = 0;
S
shenglian zhou 已提交
188 189 190 191 192 193
  SHashObj* sname2shema = taosHashInit(32,
                                       taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);

  for (int i = 0; i < numPoint; ++i) {
    TAOS_SML_DATA_POINT* point = &points[i];
    size_t stableNameLen = strlen(point->stableName);
194
    size_t* pStableIdx = taosHashGet(sname2shema, point->stableName, stableNameLen);
S
shenglian zhou 已提交
195
    SSmlSTableSchema* pStableSchema = NULL;
196 197 198 199
    size_t stableIdx = -1;
    if (pStableIdx) {
      pStableSchema= taosArrayGet(stableSchemas, *pStableIdx);
      stableIdx = *pStableIdx;
S
shenglian zhou 已提交
200 201 202 203 204 205 206 207 208 209
    } else {
      SSmlSTableSchema schema;
      strncpy(schema.sTableName, point->stableName, stableNameLen);
      schema.sTableName[stableNameLen] = '\0';
      schema.fields = taosArrayInit(64, sizeof(SSchema));
      schema.tags = taosArrayInit(8, sizeof(SSchema));
      schema.tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
      schema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);

      pStableSchema = taosArrayPush(stableSchemas, &schema);
210 211
      stableIdx = taosArrayGetSize(stableSchemas) - 1;
      taosHashPut(sname2shema, schema.sTableName, stableNameLen, &stableIdx, sizeof(size_t));
S
shenglian zhou 已提交
212 213 214 215
    }

    for (int j = 0; j < point->tagNum; ++j) {
      TAOS_SML_KV* tagKv = point->tags + j;
216 217 218 219 220 221 222 223 224
      if (!point->childTableName) {
        char childTableName[TSDB_TABLE_NAME_LEN];
        int32_t tableNameLen = TSDB_TABLE_NAME_LEN;
        getSmlMd5ChildTableName(point, childTableName, &tableNameLen, info);
        point->childTableName = calloc(1, tableNameLen+1);
        strncpy(point->childTableName, childTableName, tableNameLen);
        point->childTableName[tableNameLen] = '\0';
      }

S
shenglian zhou 已提交
225
      code = buildSmlKvSchema(tagKv, pStableSchema->tagHash, pStableSchema->tags, info);
S
Shenglian Zhou 已提交
226
      if (code != 0) {
S
shenglian zhou 已提交
227
        tscError("SML:0x%"PRIx64" build data point schema failed. point no.: %d, tag key: %s", info->id, i, tagKv->key);
S
Shenglian Zhou 已提交
228 229
        return code;
      }
S
shenglian zhou 已提交
230 231 232 233
    }

    for (int j = 0; j < point->fieldNum; ++j) {
      TAOS_SML_KV* fieldKv = point->fields + j;
S
shenglian zhou 已提交
234
      code = buildSmlKvSchema(fieldKv, pStableSchema->fieldHash, pStableSchema->fields, info);
S
Shenglian Zhou 已提交
235
      if (code != 0) {
S
shenglian zhou 已提交
236
        tscError("SML:0x%"PRIx64" build data point schema failed. point no.: %d, tag key: %s", info->id, i, fieldKv->key);
S
Shenglian Zhou 已提交
237 238
        return code;
      }
S
shenglian zhou 已提交
239 240
    }

241
    point->schemaIdx = (uint32_t)stableIdx;
S
shenglian zhou 已提交
242 243 244 245 246 247 248 249 250 251
  }

  size_t numStables = taosArrayGetSize(stableSchemas);
  for (int32_t i = 0; i < numStables; ++i) {
    SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i);
    taosHashCleanup(schema->tagHash);
    taosHashCleanup(schema->fieldHash);
  }
  taosHashCleanup(sname2shema);

S
shenglian zhou 已提交
252
  tscDebug("SML:0x%"PRIx64" build point schema succeed. num of super table: %zu", info->id, numStables);
S
Shenglian Zhou 已提交
253 254 255 256 257 258
  for (int32_t i = 0; i < numStables; ++i) {
    SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i);
    tscDebug("\ttable name: %s, tags number: %zu, fields number: %zu", schema->sTableName,
             taosArrayGetSize(schema->tags), taosArrayGetSize(schema->fields));
  }

S
shenglian zhou 已提交
259 260 261
  return 0;
}

262
static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, SArray* dbAttrArray, bool isTag, char sTableName[],
S
shenglian zhou 已提交
263
                                       SSchemaAction* action, bool* actionNeeded, SSmlLinesInfo* info) {
264 265 266 267
  char fieldNameLowerCase[TSDB_COL_NAME_LEN] = {0};
  strtolower(fieldNameLowerCase, pointColField->name);

  size_t* pDbIndex = taosHashGet(dbAttrHash, fieldNameLowerCase, strlen(fieldNameLowerCase));
268 269 270
  if (pDbIndex) {
    SSchema* dbAttr = taosArrayGet(dbAttrArray, *pDbIndex);
    assert(strcasecmp(dbAttr->name, pointColField->name) == 0);
S
shenglian zhou 已提交
271
    if (pointColField->type != dbAttr->type) {
S
shenglian zhou 已提交
272
      tscError("SML:0x%"PRIx64" point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id, pointColField->name,
S
Shenglian Zhou 已提交
273 274
               pointColField->type, dbAttr->type);
      return TSDB_CODE_TSC_INVALID_VALUE;
S
shenglian zhou 已提交
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298
    }

    if (IS_VAR_DATA_TYPE(pointColField->type) && (pointColField->bytes > dbAttr->bytes)) {
      if (isTag) {
        action->action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
      } else {
        action->action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
      }
      memset(&action->alterSTable, 0,  sizeof(SAlterSTableActionInfo));
      memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN);
      action->alterSTable.field = pointColField;
      *actionNeeded = true;
    }
  } else {
    if (isTag) {
      action->action = SCHEMA_ACTION_ADD_TAG;
    } else {
      action->action = SCHEMA_ACTION_ADD_COLUMN;
    }
    memset(&action->alterSTable, 0, sizeof(SAlterSTableActionInfo));
    memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN);
    action->alterSTable.field = pointColField;
    *actionNeeded = true;
  }
S
shenglian zhou 已提交
299 300 301 302
  if (*actionNeeded) {
    tscDebug("SML:0x%" PRIx64 " generate schema action. column name: %s, action: %d", info->id, fieldNameLowerCase,
             action->action);
  }
S
shenglian zhou 已提交
303 304 305
  return 0;
}

S
shenglian zhou 已提交
306
static int32_t buildColumnDescription(SSchema* field,
S
shenglian zhou 已提交
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
                               char* buf, int32_t bufSize, int32_t* outBytes) {
  uint8_t type = field->type;

  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
    int32_t bytes = field->bytes - VARSTR_HEADER_SIZE;
    if (type == TSDB_DATA_TYPE_NCHAR) {
      bytes =  bytes/TSDB_NCHAR_SIZE;
    }
    int out = snprintf(buf, bufSize,"%s %s(%d)",
                       field->name,tDataTypes[field->type].name, bytes);
    *outBytes = out;
  } else {
    int out = snprintf(buf, bufSize, "%s %s",
                       field->name, tDataTypes[type].name);
    *outBytes = out;
  }

  return 0;
}

S
shenglian zhou 已提交
327

S
shenglian zhou 已提交
328
static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
329 330
  int32_t code = 0;
  int32_t outBytes = 0;
331 332
  char *result = (char *)calloc(1, tsMaxSQLStringLen+1);
  int32_t capacity = tsMaxSQLStringLen +  1;
S
shenglian zhou 已提交
333

S
shenglian zhou 已提交
334
  tscDebug("SML:0x%"PRIx64" apply schema action. action: %d", info->id, action->action);
S
shenglian zhou 已提交
335 336 337 338 339 340
  switch (action->action) {
    case SCHEMA_ACTION_ADD_COLUMN: {
      int n = sprintf(result, "alter stable %s add column ", action->alterSTable.sTableName);
      buildColumnDescription(action->alterSTable.field, result+n, capacity-n, &outBytes);
      TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
      code = taos_errno(res);
S
shenglian zhou 已提交
341 342 343
      char* errStr = taos_errstr(res);
      char* begin = strstr(errStr, "duplicated column names");
      bool tscDupColNames = (begin != NULL);
S
shenglian zhou 已提交
344
      if (code != TSDB_CODE_SUCCESS) {
S
shenglian zhou 已提交
345
        tscError("SML:0x%"PRIx64" apply schema action. error: %s", info->id, errStr);
S
shenglian zhou 已提交
346
      }
347 348
      taos_free_result(res);

S
shenglian zhou 已提交
349
      if (code == TSDB_CODE_MND_FIELD_ALREAY_EXIST || code == TSDB_CODE_MND_TAG_ALREAY_EXIST || tscDupColNames) {
350 351
        TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
        code = taos_errno(res2);
352 353 354
        if (code != TSDB_CODE_SUCCESS) {
          tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
355
        taos_free_result(res2);
356
        taosMsleep(500);
357
      }
S
shenglian zhou 已提交
358 359 360 361 362 363 364 365
      break;
    }
    case SCHEMA_ACTION_ADD_TAG: {
      int n = sprintf(result, "alter stable %s add tag ", action->alterSTable.sTableName);
      buildColumnDescription(action->alterSTable.field,
                             result+n, capacity-n, &outBytes);
      TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
      code = taos_errno(res);
S
shenglian zhou 已提交
366 367 368
      char* errStr = taos_errstr(res);
      char* begin = strstr(errStr, "duplicated column names");
      bool tscDupColNames = (begin != NULL);
S
shenglian zhou 已提交
369 370 371
      if (code != TSDB_CODE_SUCCESS) {
        tscError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
      }
372 373
      taos_free_result(res);

S
shenglian zhou 已提交
374
      if (code == TSDB_CODE_MND_TAG_ALREAY_EXIST || code == TSDB_CODE_MND_FIELD_ALREAY_EXIST || tscDupColNames) {
375 376
        TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
        code = taos_errno(res2);
377 378 379
        if (code != TSDB_CODE_SUCCESS) {
          tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
380
        taos_free_result(res2);
381
        taosMsleep(500);
382
      }
S
shenglian zhou 已提交
383 384 385 386 387 388 389 390
      break;
    }
    case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: {
      int n = sprintf(result, "alter stable %s modify column ", action->alterSTable.sTableName);
      buildColumnDescription(action->alterSTable.field, result+n,
                             capacity-n, &outBytes);
      TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
      code = taos_errno(res);
S
shenglian zhou 已提交
391 392 393
      if (code != TSDB_CODE_SUCCESS) {
        tscError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
      }
394 395
      taos_free_result(res);

396
      if (code == TSDB_CODE_MND_INVALID_COLUMN_LENGTH || code == TSDB_CODE_TSC_INVALID_COLUMN_LENGTH) {
397 398
        TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
        code = taos_errno(res2);
399 400 401
        if (code != TSDB_CODE_SUCCESS) {
          tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
402
        taos_free_result(res2);
403
        taosMsleep(500);
404
      }
405
      break;
S
shenglian zhou 已提交
406 407 408 409 410 411 412
    }
    case SCHEMA_ACTION_CHANGE_TAG_SIZE: {
      int n = sprintf(result, "alter stable %s modify tag ", action->alterSTable.sTableName);
      buildColumnDescription(action->alterSTable.field, result+n,
                             capacity-n, &outBytes);
      TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
      code = taos_errno(res);
S
shenglian zhou 已提交
413 414 415
      if (code != TSDB_CODE_SUCCESS) {
        tscError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
      }
416 417
      taos_free_result(res);

418
      if (code == TSDB_CODE_MND_INVALID_TAG_LENGTH || code == TSDB_CODE_TSC_INVALID_TAG_LENGTH) {
419 420
        TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
        code = taos_errno(res2);
421 422 423
        if (code != TSDB_CODE_SUCCESS) {
          tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
424
        taos_free_result(res2);
425
        taosMsleep(500);
426
      }
S
shenglian zhou 已提交
427 428 429 430 431
      break;
    }
    case SCHEMA_ACTION_CREATE_STABLE: {
      int n = sprintf(result, "create stable %s (", action->createSTable.sTableName);
      char* pos = result + n; int freeBytes = capacity - n;
S
shenglian zhou 已提交
432
      size_t numCols = taosArrayGetSize(action->createSTable.fields);
S
shenglian zhou 已提交
433 434 435 436 437 438 439
      for (int32_t i = 0; i < numCols; ++i) {
        SSchema* field = taosArrayGet(action->createSTable.fields, i);
        buildColumnDescription(field, pos, freeBytes, &outBytes);
        pos += outBytes; freeBytes -= outBytes;
        *pos = ','; ++pos; --freeBytes;
      }
      --pos; ++freeBytes;
440

S
shenglian zhou 已提交
441 442
      outBytes = snprintf(pos, freeBytes, ") tags (");
      pos += outBytes; freeBytes -= outBytes;
443

S
shenglian zhou 已提交
444
      size_t numTags = taosArrayGetSize(action->createSTable.tags);
S
shenglian zhou 已提交
445 446 447 448 449 450 451 452 453 454
      for (int32_t i = 0; i < numTags; ++i) {
        SSchema* field = taosArrayGet(action->createSTable.tags, i);
        buildColumnDescription(field, pos, freeBytes, &outBytes);
        pos += outBytes; freeBytes -= outBytes;
        *pos = ','; ++pos; --freeBytes;
      }
      pos--; ++freeBytes;
      outBytes = snprintf(pos, freeBytes, ")");
      TAOS_RES* res = taos_query(taos, result);
      code = taos_errno(res);
S
shenglian zhou 已提交
455 456 457
      if (code != TSDB_CODE_SUCCESS) {
        tscError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
      }
458 459
      taos_free_result(res);

460 461 462
      if (code == TSDB_CODE_MND_TABLE_ALREADY_EXIST) {
        TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
        code = taos_errno(res2);
463 464 465
        if (code != TSDB_CODE_SUCCESS) {
          tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
466
        taos_free_result(res2);
467
        taosMsleep(500);
468
      }
S
shenglian zhou 已提交
469 470
      break;
    }
S
shenglian zhou 已提交
471

S
shenglian zhou 已提交
472 473 474
    default:
      break;
  }
S
Shenglian Zhou 已提交
475

S
shenglian zhou 已提交
476
  free(result);
S
Shenglian Zhou 已提交
477
  if (code != 0) {
S
shenglian zhou 已提交
478
    tscError("SML:0x%"PRIx64 " apply schema action failure. %s", info->id, tstrerror(code));
S
Shenglian Zhou 已提交
479
  }
S
shenglian zhou 已提交
480 481 482
  return code;
}

483
static int32_t destroySmlSTableSchema(SSmlSTableSchema* schema) {
S
shenglian zhou 已提交
484 485 486 487 488 489 490
  taosHashCleanup(schema->tagHash);
  taosHashCleanup(schema->fieldHash);
  taosArrayDestroy(schema->tags);
  taosArrayDestroy(schema->fields);
  return 0;
}

491
static int32_t fillDbSchema(STableMeta* tableMeta, char* tableName, SSmlSTableSchema* schema, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
492 493 494 495 496 497 498 499 500 501 502 503
  schema->tags = taosArrayInit(8, sizeof(SSchema));
  schema->fields = taosArrayInit(64, sizeof(SSchema));
  schema->tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
  schema->fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);

  tstrncpy(schema->sTableName, tableName, strlen(tableName)+1);
  schema->precision = tableMeta->tableInfo.precision;
  for (int i=0; i<tableMeta->tableInfo.numOfColumns; ++i) {
    SSchema field;
    tstrncpy(field.name, tableMeta->schema[i].name, strlen(tableMeta->schema[i].name)+1);
    field.type = tableMeta->schema[i].type;
    field.bytes = tableMeta->schema[i].bytes;
504 505 506
    taosArrayPush(schema->fields, &field);
    size_t fieldIndex = taosArrayGetSize(schema->fields) - 1;
    taosHashPut(schema->fieldHash, field.name, strlen(field.name), &fieldIndex, sizeof(fieldIndex));
S
shenglian zhou 已提交
507 508 509 510 511 512 513 514
  }

  for (int i=0; i<tableMeta->tableInfo.numOfTags; ++i) {
    int j = i + tableMeta->tableInfo.numOfColumns;
    SSchema field;
    tstrncpy(field.name, tableMeta->schema[j].name, strlen(tableMeta->schema[j].name)+1);
    field.type = tableMeta->schema[j].type;
    field.bytes = tableMeta->schema[j].bytes;
515 516 517
    taosArrayPush(schema->tags, &field);
    size_t tagIndex = taosArrayGetSize(schema->tags) - 1;
    taosHashPut(schema->tagHash, field.name, strlen(field.name), &tagIndex, sizeof(tagIndex));
S
shenglian zhou 已提交
518
  }
519
  tscDebug("SML:0x%"PRIx64 " load table schema succeed. table name: %s, columns number: %d, tag number: %d, precision: %d",
S
shenglian zhou 已提交
520
           info->id, tableName, tableMeta->tableInfo.numOfColumns, tableMeta->tableInfo.numOfTags, schema->precision);
521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560
  return TSDB_CODE_SUCCESS;
}

static int32_t retrieveTableMeta(TAOS* taos, char* tableName, STableMeta** pTableMeta, SSmlLinesInfo* info) {
  int32_t code = 0;
  int32_t retries = 0;
  STableMeta* tableMeta = NULL;
  while (retries++ < TSDB_MAX_REPLICA && tableMeta == NULL) {
    STscObj* pObj = (STscObj*)taos;
    if (pObj == NULL || pObj->signature != pObj) {
      terrno = TSDB_CODE_TSC_DISCONNECTED;
      return TSDB_CODE_TSC_DISCONNECTED;
    }

    tscDebug("SML:0x%" PRIx64 " retrieve table meta. super table name: %s", info->id, tableName);

    char tableNameLowerCase[TSDB_TABLE_NAME_LEN];
    strtolower(tableNameLowerCase, tableName);

    char sql[256];
    snprintf(sql, 256, "describe %s", tableNameLowerCase);
    TAOS_RES* res = taos_query(taos, sql);
    code = taos_errno(res);
    if (code != 0) {
      tscError("SML:0x%" PRIx64 " describe table failure. %s", info->id, taos_errstr(res));
      taos_free_result(res);
      return code;
    }
    taos_free_result(res);

    SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
    if (pSql == NULL) {
      tscError("SML:0x%" PRIx64 " failed to allocate memory, reason:%s", info->id, strerror(errno));
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      return code;
    }
    pSql->pTscObj = taos;
    pSql->signature = pSql;
    pSql->fp = NULL;

561
    registerSqlObj(pSql);
562 563
    SStrToken tableToken = {.z = tableNameLowerCase, .n = (uint32_t)strlen(tableNameLowerCase), .type = TK_ID};
    tGetToken(tableNameLowerCase, &tableToken.type);
W
wpan 已提交
564
    bool dbIncluded = false;
565
    // Check if the table name available or not
W
wpan 已提交
566
    if (tscValidateName(&tableToken, true, &dbIncluded) != TSDB_CODE_SUCCESS) {
567 568
      code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
      sprintf(pSql->cmd.payload, "table name is invalid");
569
      taosReleaseRef(tscObjRef, pSql->self);
570 571 572 573
      return code;
    }

    SName sname = {0};
W
wpan 已提交
574
    if ((code = tscSetTableFullName(&sname, &tableToken, pSql, dbIncluded)) != TSDB_CODE_SUCCESS) {
575
      taosReleaseRef(tscObjRef, pSql->self);
576 577
      return code;
    }
578

579 580 581
    char fullTableName[TSDB_TABLE_FNAME_LEN] = {0};
    memset(fullTableName, 0, tListLen(fullTableName));
    tNameExtractFullName(&sname, fullTableName);
582 583

    size_t size = 0;
584
    taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), fullTableName, strlen(fullTableName), NULL, (void**)&tableMeta, &size);
D
dapan1121 已提交
585
    taosReleaseRef(tscObjRef, pSql->self);
586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607
  }

  if (tableMeta != NULL) {
    *pTableMeta = tableMeta;
    return TSDB_CODE_SUCCESS;
  } else {
    tscError("SML:0x%" PRIx64 " failed to retrieve table meta. super table name: %s", info->id, tableName);
    return TSDB_CODE_TSC_NO_META_CACHED;
  }
}

static int32_t loadTableSchemaFromDB(TAOS* taos, char* tableName, SSmlSTableSchema* schema, SSmlLinesInfo* info) {
  int32_t code = 0;
  STableMeta* tableMeta = NULL;
  code = retrieveTableMeta(taos, tableName, &tableMeta, info);
  if (code == TSDB_CODE_SUCCESS) {
    assert(tableMeta != NULL);
    fillDbSchema(tableMeta, tableName, schema, info);
    free(tableMeta);
    tableMeta = NULL;
  }

S
shenglian zhou 已提交
608 609 610
  return code;
}

S
shenglian zhou 已提交
611
static int32_t modifyDBSchemas(TAOS* taos, SArray* stableSchemas, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
612 613 614 615
  int32_t code = 0;
  size_t numStable = taosArrayGetSize(stableSchemas);
  for (int i = 0; i < numStable; ++i) {
    SSmlSTableSchema* pointSchema = taosArrayGet(stableSchemas, i);
S
shenglian zhou 已提交
616 617
    SSmlSTableSchema  dbSchema;
    memset(&dbSchema, 0, sizeof(SSmlSTableSchema));
S
shenglian zhou 已提交
618

619
    code = loadTableSchemaFromDB(taos, pointSchema->sTableName, &dbSchema, info);
S
shenglian zhou 已提交
620 621 622 623 624 625 626
    if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) {
      SSchemaAction schemaAction = {0};
      schemaAction.action = SCHEMA_ACTION_CREATE_STABLE;
      memset(&schemaAction.createSTable, 0, sizeof(SCreateSTableActionInfo));
      memcpy(schemaAction.createSTable.sTableName, pointSchema->sTableName, TSDB_TABLE_NAME_LEN);
      schemaAction.createSTable.tags = pointSchema->tags;
      schemaAction.createSTable.fields = pointSchema->fields;
S
shenglian zhou 已提交
627
      applySchemaAction(taos, &schemaAction, info);
628
      code = loadTableSchemaFromDB(taos, pointSchema->sTableName, &dbSchema, info);
S
Shenglian Zhou 已提交
629
      if (code != 0) {
S
shenglian zhou 已提交
630
        tscError("SML:0x%"PRIx64" reconcile point schema failed. can not create %s", info->id, pointSchema->sTableName);
631
        return code;
S
Shenglian Zhou 已提交
632 633 634 635
      } else {
        pointSchema->precision = dbSchema.precision;
        destroySmlSTableSchema(&dbSchema);
      }
S
shenglian zhou 已提交
636 637 638 639 640 641 642 643 644 645 646
    } else if (code == TSDB_CODE_SUCCESS) {
      size_t pointTagSize = taosArrayGetSize(pointSchema->tags);
      size_t pointFieldSize = taosArrayGetSize(pointSchema->fields);

      SHashObj* dbTagHash = dbSchema.tagHash;
      SHashObj* dbFieldHash = dbSchema.fieldHash;

      for (int j = 0; j < pointTagSize; ++j) {
        SSchema* pointTag = taosArrayGet(pointSchema->tags, j);
        SSchemaAction schemaAction = {0};
        bool actionNeeded = false;
S
shenglian zhou 已提交
647 648
        generateSchemaAction(pointTag, dbTagHash, dbSchema.tags, true, pointSchema->sTableName,
                             &schemaAction, &actionNeeded, info);
S
shenglian zhou 已提交
649
        if (actionNeeded) {
S
shenglian zhou 已提交
650
          code = applySchemaAction(taos, &schemaAction, info);
651 652 653 654
          if (code != 0) {
            destroySmlSTableSchema(&dbSchema);
            return code;
          }
S
shenglian zhou 已提交
655 656 657 658 659 660 661 662 663 664 665
        }
      }

      SSchema* pointColTs = taosArrayGet(pointSchema->fields, 0);
      SSchema* dbColTs = taosArrayGet(dbSchema.fields, 0);
      memcpy(pointColTs->name, dbColTs->name, TSDB_COL_NAME_LEN);

      for (int j = 1; j < pointFieldSize; ++j) {
        SSchema* pointCol = taosArrayGet(pointSchema->fields, j);
        SSchemaAction schemaAction = {0};
        bool actionNeeded = false;
S
shenglian zhou 已提交
666 667
        generateSchemaAction(pointCol, dbFieldHash, dbSchema.fields,false, pointSchema->sTableName,
                             &schemaAction, &actionNeeded, info);
S
shenglian zhou 已提交
668
        if (actionNeeded) {
S
shenglian zhou 已提交
669
          code = applySchemaAction(taos, &schemaAction, info);
670 671 672 673
          if (code != 0) {
            destroySmlSTableSchema(&dbSchema);
            return code;
          }
S
shenglian zhou 已提交
674 675 676 677 678
        }
      }

      pointSchema->precision = dbSchema.precision;

679
      destroySmlSTableSchema(&dbSchema);
S
shenglian zhou 已提交
680
    } else {
S
shenglian zhou 已提交
681
      tscError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code));
S
shenglian zhou 已提交
682 683 684 685 686 687
      return code;
    }
  }
  return 0;
}

S
shenglian zhou 已提交
688 689
static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, const char* sTableName,
                                          SArray* tagsSchema, SArray* tagsBind, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
690
  size_t numTags = taosArrayGetSize(tagsSchema);
691
  char* sql = malloc(tsMaxSQLStringLen+1);
692 693 694 695
  if (sql == NULL) {
    tscError("malloc sql memory error");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
696
  int freeBytes = tsMaxSQLStringLen + 1;
S
shenglian zhou 已提交
697
  sprintf(sql, "create table if not exists %s using %s", cTableName, sTableName);
698

S
shenglian zhou 已提交
699 700 701 702 703 704
  snprintf(sql+strlen(sql), freeBytes-strlen(sql), "(");
  for (int i = 0; i < numTags; ++i) {
    SSchema* tagSchema = taosArrayGet(tagsSchema, i);
    snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", tagSchema->name);
  }
  snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")");
705

S
shenglian zhou 已提交
706
  snprintf(sql + strlen(sql), freeBytes-strlen(sql), " tags (");
707

S
shenglian zhou 已提交
708
  for (int i = 0; i < numTags; ++i) {
S
shenglian zhou 已提交
709
    snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
S
shenglian zhou 已提交
710
  }
S
shenglian zhou 已提交
711
  snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")");
712
  sql[strlen(sql)] = '\0';
S
shenglian zhou 已提交
713

S
shenglian zhou 已提交
714
  tscDebug("SML:0x%"PRIx64" create table : %s", info->id, sql);
S
Shenglian Zhou 已提交
715

S
shenglian zhou 已提交
716
  TAOS_STMT* stmt = taos_stmt_init(taos);
717 718 719 720
  if (stmt == NULL) {
    free(sql);
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
S
shenglian zhou 已提交
721
  int32_t code;
S
shenglian zhou 已提交
722
  code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql));
723 724
  free(sql);

S
shenglian zhou 已提交
725
  if (code != 0) {
726
    tscError("SML:0x%"PRIx64" taos_stmt_prepare returns %d:%s", info->id, code, tstrerror(code));
727
    taos_stmt_close(stmt);
S
shenglian zhou 已提交
728 729 730 731 732
    return code;
  }

  code = taos_stmt_bind_param(stmt, TARRAY_GET_START(tagsBind));
  if (code != 0) {
733
    tscError("SML:0x%"PRIx64" taos_stmt_bind_param returns %d:%s", info->id, code, tstrerror(code));
734
    taos_stmt_close(stmt);
S
shenglian zhou 已提交
735 736 737 738 739
    return code;
  }

  code = taos_stmt_execute(stmt);
  if (code != 0) {
740
    tscError("SML:0x%"PRIx64" taos_stmt_execute returns %d:%s", info->id, code, tstrerror(code));
741
    taos_stmt_close(stmt);
S
shenglian zhou 已提交
742 743
    return code;
  }
S
shenglian zhou 已提交
744

745 746
  code = taos_stmt_close(stmt);
  if (code != 0) {
747
    tscError("SML:0x%"PRIx64" taos_stmt_close return %d:%s", info->id, code, tstrerror(code));
748 749 750
    return code;
  }
  return code;
S
shenglian zhou 已提交
751 752
}

753
static int32_t doInsertChildTableWithStmt(TAOS* taos, char* sql, char* cTableName, SArray* batchBind, SSmlLinesInfo* info) {
754
  int32_t code = 0;
S
shenglian zhou 已提交
755

W
wpan 已提交
756
  TAOS_STMT* stmt = taos_stmt_init(taos);
757 758 759
  if (stmt == NULL) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
760

S
shenglian zhou 已提交
761
  code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql));
762

W
wpan 已提交
763
  if (code != 0) {
764
    tscError("SML:0x%"PRIx64" taos_stmt_prepare return %d:%s", info->id, code, taos_stmt_errstr(stmt));
765
    taos_stmt_close(stmt);
W
wpan 已提交
766 767 768
    return code;
  }

769
  bool tryAgain = false;
770
  int32_t try = 0;
W
wpan 已提交
771
  do {
772
    code = taos_stmt_set_tbname(stmt, cTableName);
773
    if (code != 0) {
774 775 776 777 778
      tscError("SML:0x%"PRIx64" taos_stmt_set_tbname return %d:%s", info->id, code, taos_stmt_errstr(stmt));

      int affectedRows = taos_stmt_affected_rows(stmt);
      info->affectedRows += affectedRows;

779
      taos_stmt_close(stmt);
780 781
      return code;
    }
782

783
    size_t rows = taosArrayGetSize(batchBind);
784
    for (int32_t i = 0; i < rows; ++i) {
785
      TAOS_BIND* colsBinds = taosArrayGetP(batchBind, i);
786 787
      code = taos_stmt_bind_param(stmt, colsBinds);
      if (code != 0) {
788 789 790 791 792
        tscError("SML:0x%"PRIx64" taos_stmt_bind_param return %d:%s", info->id, code, taos_stmt_errstr(stmt));

        int affectedRows = taos_stmt_affected_rows(stmt);
        info->affectedRows += affectedRows;

793
        taos_stmt_close(stmt);
794 795 796 797
        return code;
      }
      code = taos_stmt_add_batch(stmt);
      if (code != 0) {
798 799 800 801 802
        tscError("SML:0x%"PRIx64" taos_stmt_add_batch return %d:%s", info->id, code, taos_stmt_errstr(stmt));

        int affectedRows = taos_stmt_affected_rows(stmt);
        info->affectedRows += affectedRows;

803
        taos_stmt_close(stmt);
804 805 806 807 808 809
        return code;
      }
    }

    code = taos_stmt_execute(stmt);
    if (code != 0) {
810
      tscError("SML:0x%"PRIx64" taos_stmt_execute return %d:%s, try:%d", info->id, code, taos_stmt_errstr(stmt), try);
811
    }
812 813
    tscDebug("SML:0x%"PRIx64" taos_stmt_execute inserted %d rows", info->id, taos_stmt_affected_rows(stmt));
    
814 815
    tryAgain = false;
    if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID
816 817 818 819
         || code == TSDB_CODE_VND_INVALID_VGROUP_ID
         || code == TSDB_CODE_TDB_TABLE_RECONFIGURE
         || code == TSDB_CODE_APP_NOT_READY
         || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) {
820 821 822
      tryAgain = true;
    }

823
    if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
824 825 826 827 828 829
      TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
      int32_t   code2 = taos_errno(res2);
      if (code2 != TSDB_CODE_SUCCESS) {
        tscError("SML:0x%" PRIx64 " insert child table. reset query cache. error: %s", info->id, taos_errstr(res2));
      }
      taos_free_result(res2);
830
      if (tryAgain) {
831
        taosMsleep(100 * (2 << try));
832
      }
833
    }
834
    if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
835
      if (tryAgain) {
836
        taosMsleep( 100 * (2 << try));
837 838 839 840
      }
    }
  } while (tryAgain);

841 842
  int affectedRows = taos_stmt_affected_rows(stmt);
  info->affectedRows += affectedRows;
843

844
  taos_stmt_close(stmt);
S
shenglian zhou 已提交
845
  return code;
S
shenglian zhou 已提交
846 847
}

848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871
static int32_t insertChildTableBatch(TAOS* taos,  char* cTableName, SArray* colsSchema, SArray* rowsBind, size_t rowSize, SSmlLinesInfo* info) {
  size_t numCols = taosArrayGetSize(colsSchema);
  char* sql = malloc(tsMaxSQLStringLen+1);
  if (sql == NULL) {
    tscError("malloc sql memory error");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  int32_t freeBytes = tsMaxSQLStringLen + 1 ;
  sprintf(sql, "insert into ? (");

  for (int i = 0; i < numCols; ++i) {
    SSchema* colSchema = taosArrayGet(colsSchema, i);
    snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", colSchema->name);
  }
  snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ") values (");

  for (int i = 0; i < numCols; ++i) {
    snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
  }
  snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ")");
  sql[strlen(sql)] = '\0';

  size_t rows = taosArrayGetSize(rowsBind);
S
shenglian zhou 已提交
872 873
  size_t maxBatchSize = TSDB_MAX_WAL_SIZE/rowSize * 4 / 5;
  size_t batchSize = MIN(maxBatchSize, rows);
874 875
  tscDebug("SML:0x%"PRIx64" insert rows into child table %s. num of rows: %zu, batch size: %zu",
           info->id, cTableName, rows, batchSize);
876 877
  SArray* batchBind = taosArrayInit(batchSize, POINTER_BYTES);
  int32_t code = TSDB_CODE_SUCCESS;
878
  for (int i = 0; i < rows;) {
879 880 881 882
    int j = i;
    for (; j < i + batchSize && j<rows; ++j) {
      taosArrayPush(batchBind, taosArrayGet(rowsBind, j));
    }
883
    if (j > i) {
884
      tscDebug("SML:0x%"PRIx64" insert child table batch from line %d to line %d.", info->id, i, j - 1);
885 886 887 888 889 890 891 892 893 894 895 896 897 898 899
      code = doInsertChildTableWithStmt(taos, sql, cTableName, batchBind, info);
      if (code != 0) {
        taosArrayDestroy(batchBind);
        tfree(sql);
        return code;
      }
      taosArrayClear(batchBind);
    }
    i = j;
  }
  taosArrayDestroy(batchBind);
  tfree(sql);
  return code;
}

900
static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int numPoints,
S
shenglian zhou 已提交
901
                                             SHashObj* cname2points, SArray* stableSchemas, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
902 903
  for (int32_t i = 0; i < numPoints; ++i) {
    TAOS_SML_DATA_POINT * point = points + i;
904
    SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
905

S
shenglian zhou 已提交
906 907 908 909
    for (int j = 0; j < point->tagNum; ++j) {
      TAOS_SML_KV* kv =  point->tags + j;
      if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) {
        int64_t ts = *(int64_t*)(kv->value);
910
        ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision);
S
shenglian zhou 已提交
911 912 913 914 915 916 917 918
        *(int64_t*)(kv->value) = ts;
      }
    }

    for (int j = 0; j < point->fieldNum; ++j) {
      TAOS_SML_KV* kv =  point->fields + j;
      if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) {
        int64_t ts = *(int64_t*)(kv->value);
919
        ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision);
S
shenglian zhou 已提交
920 921 922 923
        *(int64_t*)(kv->value) = ts;
      }
    }

S
shenglian zhou 已提交
924 925 926 927 928 929 930 931
    SArray* cTablePoints = NULL;
    SArray** pCTablePoints = taosHashGet(cname2points, point->childTableName, strlen(point->childTableName));
    if (pCTablePoints) {
      cTablePoints = *pCTablePoints;
    } else {
      cTablePoints = taosArrayInit(64, sizeof(point));
      taosHashPut(cname2points, point->childTableName, strlen(point->childTableName), &cTablePoints, POINTER_BYTES);
    }
932
    taosArrayPush(cTablePoints, &point);
933 934
  }

S
shenglian zhou 已提交
935 936 937
  return 0;
}

938
static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableName,
S
shenglian zhou 已提交
939
                                   SSmlSTableSchema* sTableSchema, SArray* cTablePoints, SSmlLinesInfo* info) {
940 941 942 943 944 945 946 947
  size_t numTags = taosArrayGetSize(sTableSchema->tags);
  size_t rows = taosArrayGetSize(cTablePoints);

  TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0};
  for (int i= 0; i < rows; ++i) {
    TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i);
    for (int j = 0; j < pDataPoint->tagNum; ++j) {
      TAOS_SML_KV* kv = pDataPoint->tags + j;
948
      tagKVs[kv->fieldSchemaIdx] = kv;
949 950 951 952 953 954 955 956 957 958 959 960 961
    }
  }
  
  SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND));
  taosArraySetSize(tagBinds, numTags);
  int isNullColBind = TSDB_TRUE;
  for (int j = 0; j < numTags; ++j) {
    TAOS_BIND* bind = taosArrayGet(tagBinds, j);
    bind->is_null = &isNullColBind;
  }
  for (int j = 0; j < numTags; ++j) {
    if (tagKVs[j] == NULL) continue;
    TAOS_SML_KV* kv =  tagKVs[j];
962
    TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx);
963 964 965 966 967 968 969
    bind->buffer_type = kv->type;
    bind->length = malloc(sizeof(uintptr_t*));
    *bind->length = kv->length;
    bind->buffer = kv->value;
    bind->is_null = NULL;
  }

970
  int32_t code = creatChildTableIfNotExists(taos, cTableName, sTableName, sTableSchema->tags, tagBinds, info);
971 972 973 974 975 976 977 978 979

  for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) {
    TAOS_BIND* bind = taosArrayGet(tagBinds, i);
    free(bind->length);
  }
  taosArrayDestroy(tagBinds);
  return code;
}

S
shenglian zhou 已提交
980
static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema, char* cTableName,
981
                                     SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) {
982 983
  int32_t code = TSDB_CODE_SUCCESS;

984 985 986
  size_t numCols = taosArrayGetSize(sTableSchema->fields);
  size_t rows = taosArrayGetSize(cTablePoints);
  SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES);
S
shenglian zhou 已提交
987

988
  int isNullColBind = TSDB_TRUE;
989 990
  for (int i = 0; i < rows; ++i) {
    TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, i);
991

992 993
    TAOS_BIND* colBinds = calloc(numCols, sizeof(TAOS_BIND));
    if (colBinds == NULL) {
S
shenglian zhou 已提交
994 995
      tscError("SML:0x%"PRIx64" taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, "
               "num of rows: %zu, num of cols: %zu", info->id, rows, numCols);
996 997
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
998

999 1000
    for (int j = 0; j < numCols; ++j) {
      TAOS_BIND* bind = colBinds + j;
1001 1002
      bind->is_null = &isNullColBind;
    }
1003 1004
    for (int j = 0; j < point->fieldNum; ++j) {
      TAOS_SML_KV* kv = point->fields + j;
1005
      TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx;
1006
      bind->buffer_type = kv->type;
1007 1008
      bind->length = malloc(sizeof(uintptr_t*));
      *bind->length = kv->length;
1009 1010 1011
      bind->buffer = kv->value;
      bind->is_null = NULL;
    }
1012 1013
    taosArrayPush(rowsBind, &colBinds);
  }
S
shenglian zhou 已提交
1014

1015
  code = insertChildTableBatch(taos, cTableName, sTableSchema->fields, rowsBind, rowSize, info);
1016 1017 1018
  if (code != 0) {
    tscError("SML:0x%"PRIx64" insert into child table %s failed. error %s", info->id, cTableName, tstrerror(code));
  }
S
shenglian zhou 已提交
1019

1020 1021 1022 1023 1024
  for (int i = 0; i < rows; ++i) {
    TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i);
    for (int j = 0; j < numCols; ++j) {
      TAOS_BIND* bind = colBinds + j;
      free(bind->length);
S
shenglian zhou 已提交
1025
    }
1026 1027 1028 1029 1030
    free(colBinds);
  }
  taosArrayDestroy(rowsBind);
  return code;
}
S
shenglian zhou 已提交
1031

S
shenglian zhou 已提交
1032
static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas, SSmlLinesInfo* info) {
1033
  int32_t code = TSDB_CODE_SUCCESS;
1034

1035
  SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
S
shenglian zhou 已提交
1036
  arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas, info);
1037 1038 1039 1040 1041 1042

  SArray** pCTablePoints = taosHashIterate(cname2points, NULL);
  while (pCTablePoints) {
    SArray* cTablePoints = *pCTablePoints;

    TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0);
1043
    SSmlSTableSchema*    sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
S
shenglian zhou 已提交
1044 1045 1046

    tscDebug("SML:0x%"PRIx64" apply child table tags. child table: %s", info->id, point->childTableName);
    code = applyChildTableTags(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints, info);
1047 1048 1049
    if (code != 0) {
      tscError("apply child table tags failed. child table %s, error %s", point->childTableName, tstrerror(code));
      goto cleanup;
S
shenglian zhou 已提交
1050
    }
S
shenglian zhou 已提交
1051

1052 1053 1054 1055 1056 1057 1058 1059 1060
    size_t rowSize = 0;
    size_t numCols = taosArrayGetSize(sTableSchema->fields);
    for (int i = 0; i < numCols; ++i) {
      SSchema* colSchema = taosArrayGet(sTableSchema->fields, i);
      rowSize += colSchema->bytes;
    }

    tscDebug("SML:0x%"PRIx64" apply child table points. child table: %s, row size: %zu", info->id, point->childTableName, rowSize);
    code = applyChildTableFields(taos, sTableSchema, point->childTableName, cTablePoints, rowSize, info);
1061
    if (code != 0) {
1062
      tscError("SML:0x%"PRIx64" Apply child table fields failed. child table %s, error %s", info->id, point->childTableName, tstrerror(code));
1063
      goto cleanup;
1064
    }
1065

S
shenglian zhou 已提交
1066
    tscDebug("SML:0x%"PRIx64" successfully applied data points of child table %s", info->id, point->childTableName);
S
shenglian zhou 已提交
1067

S
shenglian zhou 已提交
1068
    pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
1069
  }
S
shenglian zhou 已提交
1070

1071 1072 1073 1074 1075
cleanup:
  pCTablePoints = taosHashIterate(cname2points, NULL);
  while (pCTablePoints) {
    SArray* pPoints = *pCTablePoints;
    taosArrayDestroy(pPoints);
S
shenglian zhou 已提交
1076
    pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
1077
  }
S
shenglian zhou 已提交
1078
  taosHashCleanup(cname2points);
1079
  return code;
S
shenglian zhou 已提交
1080
}
1081

1082
int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
1083
  tscDebug("SML:0x%"PRIx64" taos_sml_insert. number of points: %d", info->id, numPoint);
S
Shenglian Zhou 已提交
1084

S
shenglian zhou 已提交
1085 1086
  int32_t code = TSDB_CODE_SUCCESS;

1087
  info->affectedRows = 0;
S
shenglian zhou 已提交
1088

S
shenglian zhou 已提交
1089
  tscDebug("SML:0x%"PRIx64" build data point schemas", info->id);
S
shenglian zhou 已提交
1090
  SArray* stableSchemas = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray<STableColumnsSchema>
S
shenglian zhou 已提交
1091
  code = buildDataPointSchemas(points, numPoint, stableSchemas, info);
S
shenglian zhou 已提交
1092
  if (code != 0) {
S
shenglian zhou 已提交
1093
    tscError("SML:0x%"PRIx64" error building data point schemas : %s", info->id, tstrerror(code));
S
shenglian zhou 已提交
1094 1095 1096
    goto clean_up;
  }

S
shenglian zhou 已提交
1097 1098
  tscDebug("SML:0x%"PRIx64" modify db schemas", info->id);
  code = modifyDBSchemas(taos, stableSchemas, info);
S
shenglian zhou 已提交
1099
  if (code != 0) {
S
shenglian zhou 已提交
1100
    tscError("SML:0x%"PRIx64" error change db schema : %s", info->id, tstrerror(code));
S
shenglian zhou 已提交
1101 1102
    goto clean_up;
  }
S
shenglian zhou 已提交
1103

S
shenglian zhou 已提交
1104 1105
  tscDebug("SML:0x%"PRIx64" apply data points", info->id);
  code = applyDataPoints(taos, points, numPoint, stableSchemas, info);
S
shenglian zhou 已提交
1106
  if (code != 0) {
S
shenglian zhou 已提交
1107
    tscError("SML:0x%"PRIx64" error apply data points : %s", info->id, tstrerror(code));
S
shenglian zhou 已提交
1108 1109 1110
  }

clean_up:
S
shenglian zhou 已提交
1111 1112 1113 1114
  for (int i = 0; i < taosArrayGetSize(stableSchemas); ++i) {
    SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i);
    taosArrayDestroy(schema->fields);
    taosArrayDestroy(schema->tags);
1115
  }
S
shenglian zhou 已提交
1116
  taosArrayDestroy(stableSchemas);
1117 1118 1119
  return code;
}

1120
int tsc_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
1121 1122
  SSmlLinesInfo* info = calloc(1, sizeof(SSmlLinesInfo));
  info->id = genLinesSmlId();
1123 1124
  int code = tscSmlInsert(taos, points, numPoint, info);
  free(info);
1125 1126
  return code;
}
S
shenglian zhou 已提交
1127

1128 1129
//=========================================================================

1130 1131 1132 1133 1134
/*        Field                          Escape charaters
    1: measurement                        Comma,Space
    2: tag_key, tag_value, field_key  Comma,Equal Sign,Space
    3: field_value                    Double quote,Backslash
*/
1135
static void escapeSpecialCharacter(uint8_t field, const char **pos) {
1136 1137 1138
  const char *cur = *pos;
  if (*cur != '\\') {
    return;
1139
  }
1140 1141 1142 1143 1144 1145
  switch (field) {
    case 1:
      switch (*(cur + 1)) {
        case ',':
        case ' ':
          cur++;
1146
          break;
1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173
        default:
          break;
      }
      break;
    case 2:
      switch (*(cur + 1)) {
        case ',':
        case ' ':
        case '=':
          cur++;
          break;
        default:
          break;
      }
      break;
    case 3:
      switch (*(cur + 1)) {
        case '"':
        case '\\':
          cur++;
          break;
        default:
          break;
      }
      break;
    default:
      break;
1174
  }
1175
  *pos = cur;
1176
}
1177

1178
bool isValidInteger(char *str) {
1179 1180
  char *c = str;
  if (*c != '+' && *c != '-' && !isdigit(*c)) {
1181 1182
    return false;
  }
1183 1184 1185 1186 1187 1188
  c++;
  while (*c != '\0') {
    if (!isdigit(*c)) {
      return false;
    }
    c++;
1189
  }
1190
  return true;
1191
}
1192

1193
bool isValidFloat(char *str) {
1194 1195 1196 1197 1198 1199 1200
  char *c = str;
  uint8_t has_dot, has_exp, has_sign;
  has_dot = 0;
  has_exp = 0;
  has_sign = 0;

  if (*c != '+' && *c != '-' && *c != '.' && !isdigit(*c)) {
1201 1202
    return false;
  }
1203 1204
  if (*c == '.' && isdigit(*(c + 1))) {
    has_dot = 1;
1205
  }
1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246
  c++;
  while (*c != '\0') {
    if (!isdigit(*c)) {
      switch (*c) {
        case '.': {
          if (!has_dot && !has_exp && isdigit(*(c + 1))) {
            has_dot = 1;
          } else {
            return false;
          }
          break;
        }
        case 'e':
        case 'E': {
          if (!has_exp && isdigit(*(c - 1)) &&
              (isdigit(*(c + 1)) ||
               *(c + 1) == '+' ||
               *(c + 1) == '-')) {
            has_exp = 1;
          } else {
            return false;
          }
          break;
        }
        case '+':
        case '-': {
          if (!has_sign && has_exp && isdigit(*(c + 1))) {
            has_sign = 1;
          } else {
            return false;
          }
          break;
        }
        default: {
          return false;
        }
      }
    }
    c++;
  } //while
  return true;
1247
}
1248

1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264
static bool isInteger(char *pVal, uint16_t len, bool *has_sign) {
  if (len <= 1) {
    return false;
  }
  if (pVal[len - 1] == 'i') {
    *has_sign = true;
    return true;
  }
  if (pVal[len - 1] == 'u') {
    *has_sign = false;
    return true;
  }

  return false;
}

1265
static bool isTinyInt(char *pVal, uint16_t len) {
1266 1267 1268
  if (len <= 2) {
    return false;
  }
1269
  if (!strcasecmp(&pVal[len - 2], "i8")) {
1270
    //printf("Type is int8(%s)\n", pVal);
1271 1272 1273 1274
    return true;
  }
  return false;
}
1275

1276
static bool isTinyUint(char *pVal, uint16_t len) {
1277 1278 1279 1280 1281
  if (len <= 2) {
    return false;
  }
  if (pVal[0] == '-') {
    return false;
1282
  }
1283
  if (!strcasecmp(&pVal[len - 2], "u8")) {
1284
    //printf("Type is uint8(%s)\n", pVal);
1285 1286 1287
    return true;
  }
  return false;
1288 1289
}

1290
static bool isSmallInt(char *pVal, uint16_t len) {
1291 1292 1293
  if (len <= 3) {
    return false;
  }
1294
  if (!strcasecmp(&pVal[len - 3], "i16")) {
1295
    //printf("Type is int16(%s)\n", pVal);
1296
    return true;
1297
  }
1298
  return false;
1299 1300
}

1301
static bool isSmallUint(char *pVal, uint16_t len) {
1302 1303
  if (len <= 3) {
    return false;
1304
  }
1305 1306 1307
  if (pVal[0] == '-') {
    return false;
  }
1308
  if (strcasecmp(&pVal[len - 3], "u16") == 0) {
1309
    //printf("Type is uint16(%s)\n", pVal);
1310 1311 1312
    return true;
  }
  return false;
1313 1314
}

1315
static bool isInt(char *pVal, uint16_t len) {
1316 1317
  if (len <= 3) {
    return false;
1318
  }
1319
  if (strcasecmp(&pVal[len - 3], "i32") == 0) {
1320
    //printf("Type is int32(%s)\n", pVal);
1321 1322 1323
    return true;
  }
  return false;
1324 1325
}

1326
static bool isUint(char *pVal, uint16_t len) {
1327 1328 1329 1330 1331 1332
  if (len <= 3) {
    return false;
  }
  if (pVal[0] == '-') {
    return false;
  }
1333
  if (strcasecmp(&pVal[len - 3], "u32") == 0) {
1334
    //printf("Type is uint32(%s)\n", pVal);
1335 1336 1337
    return true;
  }
  return false;
1338 1339
}

1340
static bool isBigInt(char *pVal, uint16_t len) {
1341 1342
  if (len <= 3) {
    return false;
1343
  }
1344
  if (strcasecmp(&pVal[len - 3], "i64") == 0) {
1345
    //printf("Type is int64(%s)\n", pVal);
1346 1347 1348
    return true;
  }
  return false;
1349 1350
}

1351
static bool isBigUint(char *pVal, uint16_t len) {
1352 1353 1354 1355 1356
  if (len <= 3) {
    return false;
  }
  if (pVal[0] == '-') {
    return false;
1357
  }
1358
  if (strcasecmp(&pVal[len - 3], "u64") == 0) {
1359
    //printf("Type is uint64(%s)\n", pVal);
1360 1361 1362
    return true;
  }
  return false;
1363 1364
}

1365
static bool isFloat(char *pVal, uint16_t len) {
1366 1367 1368
  if (len <= 3) {
    return false;
  }
1369
  if (strcasecmp(&pVal[len - 3], "f32") == 0) {
1370
    //printf("Type is float(%s)\n", pVal);
1371 1372 1373
    return true;
  }
  return false;
1374 1375
}

1376
static bool isDouble(char *pVal, uint16_t len) {
1377 1378 1379
  if (len <= 3) {
    return false;
  }
1380
  if (strcasecmp(&pVal[len - 3], "f64") == 0) {
1381
    //printf("Type is double(%s)\n", pVal);
1382 1383 1384 1385 1386
    return true;
  }
  return false;
}

1387
static bool isBool(char *pVal, uint16_t len, bool *bVal) {
1388
  if ((len == 1) && !strcasecmp(&pVal[len - 1], "t")) {
1389 1390
    //printf("Type is bool(%c)\n", pVal[len - 1]);
    *bVal = true;
1391
    return true;
1392
  }
1393

1394
  if ((len == 1) && !strcasecmp(&pVal[len - 1], "f")) {
1395 1396
    //printf("Type is bool(%c)\n", pVal[len - 1]);
    *bVal = false;
1397
    return true;
1398
  }
1399

1400
  if((len == 4) && !strcasecmp(&pVal[len - 4], "true")) {
1401 1402
    //printf("Type is bool(%s)\n", &pVal[len - 4]);
    *bVal = true;
1403 1404
    return true;
  }
1405
  if((len == 5) && !strcasecmp(&pVal[len - 5], "false")) {
1406 1407
    //printf("Type is bool(%s)\n", &pVal[len - 5]);
    *bVal = false;
1408 1409 1410
    return true;
  }
  return false;
1411 1412
}

1413
static bool isBinary(char *pVal, uint16_t len) {
1414 1415 1416 1417 1418 1419
  //binary: "abc"
  if (len < 2) {
    return false;
  }
  //binary
  if (pVal[0] == '"' && pVal[len - 1] == '"') {
1420
    //printf("Type is binary(%s)\n", pVal);
1421 1422 1423 1424
    return true;
  }
  return false;
}
1425

1426
static bool isNchar(char *pVal, uint16_t len) {
1427 1428
  //nchar: L"abc"
  if (len < 3) {
1429 1430
    return false;
  }
1431
  if ((pVal[0] == 'l' || pVal[0] == 'L')&& pVal[1] == '"' && pVal[len - 1] == '"') {
1432
    //printf("Type is nchar(%s)\n", pVal);
1433
    return true;
1434
  }
1435 1436 1437
  return false;
}

1438
static bool isTimeStamp(char *pVal, uint16_t len, SMLTimeStampType *tsType, SSmlLinesInfo* info) {
1439 1440 1441 1442 1443 1444 1445
  if (len == 0) {
    return true;
  }
  if ((len == 1) && pVal[0] == '0') {
    *tsType = SML_TIME_STAMP_NOW;
    return true;
  }
1446 1447

  //Default no appendix
1448
  if (isdigit(pVal[len - 1]) && isdigit(pVal[len - 2])) {
G
Ganlin Zhao 已提交
1449
    if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
1450 1451 1452 1453 1454
      if (info->tsType != SML_TIME_STAMP_NOT_CONFIGURED) {
        *tsType = info->tsType;
      } else {
        *tsType = SML_TIME_STAMP_NANO_SECONDS;
      }
G
Ganlin Zhao 已提交
1455
    } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
1456 1457 1458 1459 1460 1461 1462 1463
      if (len == SML_TIMESTAMP_SECOND_DIGITS) {
        *tsType = SML_TIME_STAMP_SECONDS;
      } else if (len == SML_TIMESTAMP_MILLI_SECOND_DIGITS) {
        *tsType = SML_TIME_STAMP_MILLI_SECONDS;
      } else {
        return TSDB_CODE_TSC_INVALID_TIME_STAMP;
      }
    }
1464 1465
    return true;
  }
1466

1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480
  if (pVal[len - 1] == 's') {
    switch (pVal[len - 2]) {
      case 'm':
        *tsType = SML_TIME_STAMP_MILLI_SECONDS;
        break;
      case 'u':
        *tsType = SML_TIME_STAMP_MICRO_SECONDS;
        break;
      case 'n':
        *tsType = SML_TIME_STAMP_NANO_SECONDS;
        break;
      default:
        if (isdigit(pVal[len - 2])) {
          *tsType = SML_TIME_STAMP_SECONDS;
1481
          break;
1482
        } else {
1483 1484 1485
          return false;
        }
    }
1486
    //printf("Type is timestamp(%s)\n", pVal);
1487 1488 1489 1490
    return true;
  }
  return false;
}
1491

1492
static bool convertStrToNumber(TAOS_SML_KV *pVal, char *str, SSmlLinesInfo* info) {
1493 1494 1495 1496 1497 1498 1499
  errno = 0;
  uint8_t type = pVal->type;
  int16_t length = pVal->length;
  int64_t val_s;
  uint64_t val_u;
  double val_d;

1500
  strntolower_s(str, str, (int32_t)strlen(str));
1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511
  if (IS_FLOAT_TYPE(type)) {
    val_d = strtod(str, NULL);
  } else {
    if (IS_SIGNED_NUMERIC_TYPE(type)) {
      val_s = strtoll(str, NULL, 10);
    } else {
      val_u = strtoull(str, NULL, 10);
    }
  }

  if (errno == ERANGE) {
1512
    tscError("SML:0x%"PRIx64" Convert number(%s) out of range", info->id, str);
1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591
    return false;
  }

  switch (type) {
    case TSDB_DATA_TYPE_TINYINT:
      if (!IS_VALID_TINYINT(val_s)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(int8_t *)(pVal->value) = (int8_t)val_s;
      break;
    case TSDB_DATA_TYPE_UTINYINT:
      if (!IS_VALID_UTINYINT(val_u)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(uint8_t *)(pVal->value) = (uint8_t)val_u;
      break;
    case TSDB_DATA_TYPE_SMALLINT:
      if (!IS_VALID_SMALLINT(val_s)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(int16_t *)(pVal->value) = (int16_t)val_s;
      break;
    case TSDB_DATA_TYPE_USMALLINT:
      if (!IS_VALID_USMALLINT(val_u)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(uint16_t *)(pVal->value) = (uint16_t)val_u;
      break;
    case TSDB_DATA_TYPE_INT:
      if (!IS_VALID_INT(val_s)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(int32_t *)(pVal->value) = (int32_t)val_s;
      break;
    case TSDB_DATA_TYPE_UINT:
      if (!IS_VALID_UINT(val_u)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(uint32_t *)(pVal->value) = (uint32_t)val_u;
      break;
    case TSDB_DATA_TYPE_BIGINT:
      if (!IS_VALID_BIGINT(val_s)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(int64_t *)(pVal->value) = (int64_t)val_s;
      break;
    case TSDB_DATA_TYPE_UBIGINT:
      if (!IS_VALID_UBIGINT(val_u)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(uint64_t *)(pVal->value) = (uint64_t)val_u;
      break;
    case TSDB_DATA_TYPE_FLOAT:
      if (!IS_VALID_FLOAT(val_d)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(float *)(pVal->value) = (float)val_d;
      break;
    case TSDB_DATA_TYPE_DOUBLE:
      if (!IS_VALID_DOUBLE(val_d)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(double *)(pVal->value) = (double)val_d;
      break;
    default:
      return false;
  }
  return true;
}
1592
//len does not include '\0' from value.
1593
bool convertSmlValueType(TAOS_SML_KV *pVal, char *value,
1594
                         uint16_t len, SSmlLinesInfo* info, bool isTag) {
1595 1596 1597
  if (len <= 0) {
    return false;
  }
G
Ganlin Zhao 已提交
1598

1599 1600 1601 1602 1603 1604 1605 1606 1607
  //convert tags value to Nchar
  if (isTag) {
    pVal->type = TSDB_DATA_TYPE_NCHAR;
    pVal->length = len;
    pVal->value = calloc(pVal->length, 1);
    memcpy(pVal->value, value, pVal->length);
    return true;
  }

1608
  //integer number
1609 1610 1611 1612 1613 1614 1615 1616 1617 1618
  bool has_sign;
  if (isInteger(value, len, &has_sign)) {
    pVal->type = has_sign ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_UBIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 1] = '\0';
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
      return false;
    }
    return true;
  }
1619
  if (isTinyInt(value, len)) {
1620 1621 1622
    pVal->type = TSDB_DATA_TYPE_TINYINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 2] = '\0';
1623
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1624
      return false;
1625 1626 1627
    }
    return true;
  }
1628
  if (isTinyUint(value, len)) {
1629 1630 1631
    pVal->type = TSDB_DATA_TYPE_UTINYINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 2] = '\0';
1632
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1633 1634 1635 1636
      return false;
    }
    return true;
  }
1637
  if (isSmallInt(value, len)) {
1638 1639 1640
    pVal->type = TSDB_DATA_TYPE_SMALLINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1641
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1642 1643 1644 1645
      return false;
    }
    return true;
  }
1646
  if (isSmallUint(value, len)) {
1647 1648 1649
    pVal->type = TSDB_DATA_TYPE_USMALLINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1650
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1651 1652 1653 1654
      return false;
    }
    return true;
  }
1655
  if (isInt(value, len)) {
1656 1657 1658
    pVal->type = TSDB_DATA_TYPE_INT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1659
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1660 1661 1662 1663
      return false;
    }
    return true;
  }
1664
  if (isUint(value, len)) {
1665 1666 1667
    pVal->type = TSDB_DATA_TYPE_UINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1668
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1669 1670 1671 1672
      return false;
    }
    return true;
  }
1673
  if (isBigInt(value, len)) {
1674 1675 1676
    pVal->type = TSDB_DATA_TYPE_BIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1677
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1678 1679 1680 1681
      return false;
    }
    return true;
  }
1682
  if (isBigUint(value, len)) {
1683 1684 1685
    pVal->type = TSDB_DATA_TYPE_UBIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1686
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1687 1688 1689 1690
      return false;
    }
    return true;
  }
1691 1692 1693 1694 1695
  //floating number
  if (isFloat(value, len)) {
    pVal->type = TSDB_DATA_TYPE_FLOAT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1696
    if (!isValidFloat(value) || !convertStrToNumber(pVal, value, info)) {
1697 1698 1699 1700 1701 1702 1703 1704
      return false;
    }
    return true;
  }
  if (isDouble(value, len)) {
    pVal->type = TSDB_DATA_TYPE_DOUBLE;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1705
    if (!isValidFloat(value) || !convertStrToNumber(pVal, value, info)) {
1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736
      return false;
    }
    return true;
  }
  //binary
  if (isBinary(value, len)) {
    pVal->type = TSDB_DATA_TYPE_BINARY;
    pVal->length = len - 2;
    pVal->value = calloc(pVal->length, 1);
    //copy after "
    memcpy(pVal->value, value + 1, pVal->length);
    return true;
  }
  //nchar
  if (isNchar(value, len)) {
    pVal->type = TSDB_DATA_TYPE_NCHAR;
    pVal->length = len - 3;
    pVal->value = calloc(pVal->length, 1);
    //copy after L"
    memcpy(pVal->value, value + 2, pVal->length);
    return true;
  }
  //bool
  bool bVal;
  if (isBool(value, len, &bVal)) {
    pVal->type = TSDB_DATA_TYPE_BOOL;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->value = calloc(pVal->length, 1);
    memcpy(pVal->value, &bVal, pVal->length);
    return true;
  }
1737

1738 1739
  //Handle default(no appendix) type as DOUBLE
  if (isValidInteger(value) || isValidFloat(value)) {
1740
    pVal->type = TSDB_DATA_TYPE_DOUBLE;
G
Ganlin Zhao 已提交
1741
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
1742
    if (!convertStrToNumber(pVal, value, info)) {
1743 1744
      return false;
    }
G
Ganlin Zhao 已提交
1745 1746
    return true;
  }
1747
  return false;
1748
}
1749

1750
static int32_t getTimeStampValue(char *value, uint16_t len,
1751
                                 SMLTimeStampType type, int64_t *ts, SSmlLinesInfo* info) {
1752 1753 1754 1755

  if (len >= 2) {
    for (int i = 0; i < len - 2; ++i) {
      if(!isdigit(value[i])) {
1756
        return TSDB_CODE_TSC_INVALID_TIME_STAMP;
1757
      }
1758 1759
    }
  }
1760

1761
  //No appendix or no timestamp given (len = 0)
1762
  if (len != 0 && type != SML_TIME_STAMP_NOW) {
1763 1764 1765 1766 1767 1768
    *ts = (int64_t)strtoll(value, NULL, 10);
  } else {
    type = SML_TIME_STAMP_NOW;
  }
  switch (type) {
    case SML_TIME_STAMP_NOW: {
1769
      *ts = taosGetTimestampNs();
1770
      break;
1771
    }
1772 1773 1774 1775 1776 1777 1778 1779
    case SML_TIME_STAMP_HOURS: {
      *ts = (int64_t)(*ts * 3600 * 1e9);
      break;
    }
    case SML_TIME_STAMP_MINUTES: {
      *ts = (int64_t)(*ts * 60 * 1e9);
      break;
    }
1780
    case SML_TIME_STAMP_SECONDS: {
1781
      *ts = (int64_t)(*ts * 1e9);
1782
      break;
1783 1784
    }
    case SML_TIME_STAMP_MILLI_SECONDS: {
1785
      *ts = convertTimePrecision(*ts, TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_NANO);
1786
      break;
1787 1788
    }
    case SML_TIME_STAMP_MICRO_SECONDS: {
1789
      *ts = convertTimePrecision(*ts, TSDB_TIME_PRECISION_MICRO, TSDB_TIME_PRECISION_NANO);
1790
      break;
1791 1792
    }
    case SML_TIME_STAMP_NANO_SECONDS: {
1793
      *ts = *ts * 1;
1794 1795 1796
      break;
    }
    default: {
1797
      return TSDB_CODE_TSC_INVALID_TIME_STAMP;
1798
    }
1799
  }
1800
  return TSDB_CODE_SUCCESS;
1801 1802
}

1803 1804
int32_t convertSmlTimeStamp(TAOS_SML_KV *pVal, char *value,
                            uint16_t len, SSmlLinesInfo* info) {
1805 1806 1807
  int32_t ret;
  SMLTimeStampType type;
  int64_t tsVal;
1808

1809
  strntolower_s(value, value, len);
1810
  if (!isTimeStamp(value, len, &type, info)) {
1811
    return TSDB_CODE_TSC_INVALID_TIME_STAMP;
1812 1813
  }

1814
  ret = getTimeStampValue(value, len, type, &tsVal, info);
1815 1816 1817
  if (ret) {
    return ret;
  }
1818
  tscDebug("SML:0x%"PRIx64"Timestamp after conversion:%"PRId64, info->id, tsVal);
1819 1820 1821 1822 1823 1824 1825 1826

  pVal->type = TSDB_DATA_TYPE_TIMESTAMP;
  pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
  pVal->value = calloc(pVal->length, 1);
  memcpy(pVal->value, &tsVal, pVal->length);
  return TSDB_CODE_SUCCESS;
}

1827
static int32_t parseSmlTimeStamp(TAOS_SML_KV **pTS, const char **index, SSmlLinesInfo* info) {
1828
  const char *start, *cur;
1829 1830 1831
  int32_t ret = TSDB_CODE_SUCCESS;
  int len = 0;
  char key[] = "_ts";
1832
  char *value = NULL;
1833

1834
  start = cur = *index;
1835
  *pTS = calloc(1, sizeof(TAOS_SML_KV));
1836

1837
  while(*cur != '\0') {
1838 1839 1840 1841
    cur++;
    len++;
  }

1842
  if (len > 0) {
1843
    value = calloc(len + 1, 1);
1844 1845 1846
    memcpy(value, start, len);
  }

1847
  ret = convertSmlTimeStamp(*pTS, value, len, info);
1848
  if (ret) {
1849
    free(value);
1850 1851
    free(*pTS);
    return ret;
1852
  }
1853
  free(value);
1854

1855 1856 1857
  (*pTS)->key = calloc(sizeof(key), 1);
  memcpy((*pTS)->key, key, sizeof(key));
  return ret;
1858
}
1859

1860
bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info) {
1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873
  char *val = NULL;
  char *cur = key;
  char keyLower[TSDB_COL_NAME_LEN];
  size_t keyLen = 0;
  while(*cur != '\0') {
    keyLower[keyLen] = tolower(*cur);
    keyLen++;
    cur++;
  }
  keyLower[keyLen] = '\0';

  val = taosHashGet(pHash, keyLower, keyLen);
  if (val) {
1874
    tscError("SML:0x%"PRIx64" Duplicate key detected:%s", info->id, keyLower);
1875 1876 1877 1878 1879 1880 1881 1882 1883
    return true;
  }

  uint8_t dummy_val = 0;
  taosHashPut(pHash, keyLower, strlen(key), &dummy_val, sizeof(uint8_t));

  return false;
}

1884
static int32_t parseSmlKey(TAOS_SML_KV *pKV, const char **index, SHashObj *pHash, SSmlLinesInfo* info) {
1885
  const char *cur = *index;
1886
  char key[TSDB_COL_NAME_LEN + 1];  // +1 to avoid key[len] over write
1887 1888
  uint16_t len = 0;

G
Ganlin Zhao 已提交
1889 1890
  //key field cannot start with digit
  if (isdigit(*cur)) {
1891
    tscError("SML:0x%"PRIx64" Tag key cannot start with digit", info->id);
1892
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
1893 1894
  }
  while (*cur != '\0') {
1895 1896
    if (len >= TSDB_COL_NAME_LEN - 1) {
      tscError("SML:0x%"PRIx64" Key field cannot exceeds %d characters", info->id, TSDB_COL_NAME_LEN - 1);
1897
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
1898 1899 1900 1901 1902 1903 1904
    }
    //unescaped '=' identifies a tag key
    if (*cur == '=' && *(cur - 1) != '\\') {
      break;
    }
    //Escape special character
    if (*cur == '\\') {
1905 1906
      //TODO: escape will work after column & tag
      //support spcial characters
1907
      escapeSpecialCharacter(2, &cur);
1908
    }
1909 1910 1911 1912
    key[len] = *cur;
    cur++;
    len++;
  }
1913 1914 1915
  if (len == 0) {
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
  }
1916
  key[len] = '\0';
1917

1918
  if (checkDuplicateKey(key, pHash, info)) {
1919 1920 1921
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
  }

1922 1923
  pKV->key = calloc(len + 1, 1);
  memcpy(pKV->key, key, len + 1);
1924
  //tscDebug("SML:0x%"PRIx64" Key:%s|len:%d", info->id, pKV->key, len);
1925
  *index = cur + 1;
1926
  return TSDB_CODE_SUCCESS;
1927
}
1928

1929

1930
static int32_t parseSmlValue(TAOS_SML_KV *pKV, const char **index,
1931
                          bool *is_last_kv, SSmlLinesInfo* info, bool isTag) {
1932
  const char *start, *cur;
1933
  int32_t ret = TSDB_CODE_SUCCESS;
1934
  char *value = NULL;
1935
  uint16_t len = 0;
1936
  bool searchQuote = false;
1937 1938
  start = cur = *index;

1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951
  //if field value is string
  if (!isTag) {
    if (*cur == '"') {
      searchQuote = true;
      cur += 1;
      len += 1;
    } else if (*cur == 'L' && *(cur + 1) == '"') {
      searchQuote = true;
      cur += 2;
      len += 2;
    }
  }

1952 1953
  while (1) {
    // unescaped ',' or ' ' or '\0' identifies a value
1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968
    if (((*cur == ',' || *cur == ' ' ) && *(cur - 1) != '\\') || *cur == '\0') {
      if (searchQuote == true) {
        //first quote ignored while searching
        if (*(cur - 1) == '"' && len != 1 && len != 2) {
          *is_last_kv = (*cur == ' ' || *cur == '\0') ? true : false;
          break;
        } else if (*cur == '\0') {
          ret = TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
          goto error;
        } else {
          cur++;
          len++;
          continue;
        }
      }
1969 1970
      //unescaped ' ' or '\0' indicates end of value
      *is_last_kv = (*cur == ' ' || *cur == '\0') ? true : false;
1971 1972 1973 1974 1975 1976
      if (*cur == ' ' && *(cur + 1) == ' ') {
        cur++;
        continue;
      } else {
        break;
      }
1977 1978 1979
    }
    //Escape special character
    if (*cur == '\\') {
1980
      escapeSpecialCharacter(isTag ? 2 : 3, &cur);
1981
    }
1982 1983 1984
    cur++;
    len++;
  }
1985 1986 1987 1988 1989
  if (len == 0) {
    free(pKV->key);
    pKV->key = NULL;
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
  }
1990

1991 1992 1993
  value = calloc(len + 1, 1);
  memcpy(value, start, len);
  value[len] = '\0';
1994
  if (!convertSmlValueType(pKV, value, len, info, isTag)) {
1995 1996
    tscError("SML:0x%"PRIx64" Failed to convert sml value string(%s) to any type",
            info->id, value);
1997
    free(value);
1998 1999
    ret = TSDB_CODE_TSC_INVALID_VALUE;
    goto error;
2000
  }
2001
  free(value);
2002

2003
  *index = (*cur == '\0') ? cur : cur + 1;
2004 2005 2006 2007 2008 2009 2010
  return ret;

error:
  //free previous alocated key field
  free(pKV->key);
  pKV->key = NULL;
  return ret;
2011 2012 2013
}

static int32_t parseSmlMeasurement(TAOS_SML_DATA_POINT *pSml, const char **index,
2014
                                   uint8_t *has_tags, SSmlLinesInfo* info) {
2015 2016 2017
  const char *cur = *index;
  uint16_t len = 0;

2018 2019 2020 2021
  pSml->stableName = calloc(TSDB_TABLE_NAME_LEN + 1, 1);    // +1 to avoid 1772 line over write
  if (pSml->stableName == NULL){
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
G
Ganlin Zhao 已提交
2022
  if (isdigit(*cur)) {
2023
    tscError("SML:0x%"PRIx64" Measurement field cannnot start with digit", info->id);
2024
    free(pSml->stableName);
2025
    pSml->stableName = NULL;
2026
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
2027 2028
  }

2029
  while (*cur != '\0') {
2030 2031
    if (len >= TSDB_TABLE_NAME_LEN - 1) {
      tscError("SML:0x%"PRIx64" Measurement field cannot exceeds %d characters", info->id, TSDB_TABLE_NAME_LEN - 1);
2032
      free(pSml->stableName);
2033
      pSml->stableName = NULL;
2034
      return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
2035 2036 2037 2038 2039 2040 2041 2042
    }
    //first unescaped comma or space identifies measurement
    //if space detected first, meaning no tag in the input
    if (*cur == ',' && *(cur - 1) != '\\') {
      *has_tags = 1;
      break;
    }
    if (*cur == ' ' && *(cur - 1) != '\\') {
2043 2044 2045 2046 2047 2048 2049
      if (*(cur + 1) != ' ') {
        break;
      }
      else {
        cur++;
        continue;
      }
2050 2051 2052 2053 2054
    }
    //Comma, Space, Backslash needs to be escaped if any
    if (*cur == '\\') {
      escapeSpecialCharacter(1, &cur);
    }
2055
    pSml->stableName[len] = tolower(*cur);
2056 2057 2058
    cur++;
    len++;
  }
2059 2060 2061 2062 2063
  if (len == 0) {
    free(pSml->stableName);
    pSml->stableName = NULL;
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
  }
2064 2065
  pSml->stableName[len] = '\0';
  *index = cur + 1;
2066
  tscDebug("SML:0x%"PRIx64" Stable name in measurement:%s|len:%d", info->id, pSml->stableName, len);
2067 2068

  return TSDB_CODE_SUCCESS;
2069
}
2070

2071
//Table name can only contain digits(0-9),alphebet(a-z),underscore(_)
2072 2073
int32_t isValidChildTableName(const char *pTbName, int16_t len, SSmlLinesInfo* info) {
  if (len > TSDB_TABLE_NAME_LEN - 1) {
2074
    tscError("SML:0x%"PRIx64" child table name cannot exceeds %d characters", info->id, TSDB_TABLE_NAME_LEN - 1);
2075 2076
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
  }
2077 2078
  const char *cur = pTbName;
  for (int i = 0; i < len; ++i) {
2079
    if(!isdigit(cur[i]) && !isalpha(cur[i]) && (cur[i] != '_')) {
2080 2081 2082 2083 2084 2085 2086
      return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
    }
  }
  return TSDB_CODE_SUCCESS;
}


2087
static int32_t parseSmlKvPairs(TAOS_SML_KV **pKVs, int *num_kvs,
2088
                               const char **index, bool isField,
2089 2090
                               TAOS_SML_DATA_POINT* smlData, SHashObj *pHash,
                               SSmlLinesInfo* info) {
2091
  const char *cur = *index;
2092
  int32_t ret = TSDB_CODE_SUCCESS;
2093 2094
  TAOS_SML_KV *pkv;
  bool is_last_kv = false;
2095

2096
  int32_t capacity = 0;
2097
  if (isField) {
2098 2099 2100
    capacity = 64;
    *pKVs = calloc(capacity, sizeof(TAOS_SML_KV));
    // leave space for timestamp;
2101 2102
    pkv = *pKVs;
    pkv++;
2103 2104 2105
  } else {
    capacity = 8;
    *pKVs = calloc(capacity, sizeof(TAOS_SML_KV));
2106 2107
    pkv = *pKVs;
  }
2108

2109
  while (*cur != '\0') {
2110
    ret = parseSmlKey(pkv, &cur, pHash, info);
2111
    if (ret) {
2112
      tscError("SML:0x%"PRIx64" Unable to parse key", info->id);
2113 2114
      goto error;
    }
2115
    ret = parseSmlValue(pkv, &cur, &is_last_kv, info, !isField);
2116
    if (ret) {
2117
      tscError("SML:0x%"PRIx64" Unable to parse value", info->id);
2118 2119
      goto error;
    }
2120
    if (!isField && (strcasecmp(pkv->key, "ID") == 0)) {
2121
      ret = isValidChildTableName(pkv->value, pkv->length, info);
2122
      if (ret) {
D
dapan1121 已提交
2123 2124
        free(pkv->key);
        free(pkv->value);
2125 2126
        goto error;
      }
2127 2128
      smlData->childTableName = malloc( pkv->length + 1);
      memcpy(smlData->childTableName, pkv->value, pkv->length);
2129
      strntolower_s(smlData->childTableName, smlData->childTableName, (int32_t)pkv->length);
2130 2131 2132 2133 2134 2135
      smlData->childTableName[pkv->length] = '\0';
      free(pkv->key);
      free(pkv->value);
    } else {
      *num_kvs += 1;
    }
2136
    if (is_last_kv) {
2137 2138 2139 2140 2141
      goto done;
    }

    //reallocate addtional memory for more kvs
    TAOS_SML_KV *more_kvs = NULL;
2142

2143
    if (isField) {
2144 2145
      if ((*num_kvs + 2) > capacity) {
        capacity *= 3; capacity /= 2;
2146 2147 2148
        more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV));
      } else {
        more_kvs = *pKVs;
2149
      }
2150
    } else {
2151 2152
      if ((*num_kvs + 1) > capacity) {
        capacity *= 3; capacity /= 2;
2153 2154 2155
        more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV));
      } else {
        more_kvs = *pKVs;
2156
      }
2157
    }
2158

2159
    if (!more_kvs) {
2160 2161
      goto error;
    }
2162 2163 2164 2165 2166 2167 2168 2169 2170
    *pKVs = more_kvs;
    //move pKV points to next TAOS_SML_KV block
    if (isField) {
      pkv = *pKVs + *num_kvs + 1;
    } else {
      pkv = *pKVs + *num_kvs;
    }
  }
  goto done;
2171

2172
error:
2173
  return ret;
2174
done:
2175
  *index = cur;
2176
  return ret;
2177
}
2178

2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191
static void moveTimeStampToFirstKv(TAOS_SML_DATA_POINT** smlData, TAOS_SML_KV *ts) {
  TAOS_SML_KV* tsField = (*smlData)->fields;
  tsField->length = ts->length;
  tsField->type = ts->type;
  tsField->value = malloc(ts->length);
  tsField->key = malloc(strlen(ts->key) + 1);
  memcpy(tsField->key, ts->key, strlen(ts->key) + 1);
  memcpy(tsField->value, ts->value, ts->length);
  (*smlData)->fieldNum = (*smlData)->fieldNum + 1;

  free(ts->key);
  free(ts->value);
  free(ts);
2192 2193
}

2194
int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLinesInfo* info) {
2195
  const char* index = sql;
2196
  int32_t ret = TSDB_CODE_SUCCESS;
2197 2198
  uint8_t has_tags = 0;
  TAOS_SML_KV *timestamp = NULL;
2199
  SHashObj *keyHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
2200

2201
  ret = parseSmlMeasurement(smlData, &index, &has_tags, info);
2202
  if (ret) {
2203
    tscError("SML:0x%"PRIx64" Unable to parse measurement", info->id);
2204
    taosHashCleanup(keyHashTable);
2205
    return ret;
2206
  }
2207
  tscDebug("SML:0x%"PRIx64" Parse measurement finished, has_tags:%d", info->id, has_tags);
2208 2209 2210

  //Parse Tags
  if (has_tags) {
2211
    ret = parseSmlKvPairs(&smlData->tags, &smlData->tagNum, &index, false, smlData, keyHashTable, info);
2212
    if (ret) {
2213
      tscError("SML:0x%"PRIx64" Unable to parse tag", info->id);
2214
      taosHashCleanup(keyHashTable);
2215 2216
      return ret;
    }
2217
  }
2218
  tscDebug("SML:0x%"PRIx64" Parse tags finished, num of tags:%d", info->id, smlData->tagNum);
2219 2220

  //Parse fields
2221
  ret = parseSmlKvPairs(&smlData->fields, &smlData->fieldNum, &index, true, smlData, keyHashTable, info);
2222
  if (ret) {
2223
    tscError("SML:0x%"PRIx64" Unable to parse field", info->id);
2224
    taosHashCleanup(keyHashTable);
2225
    return ret;
2226
  }
2227
  tscDebug("SML:0x%"PRIx64" Parse fields finished, num of fields:%d", info->id, smlData->fieldNum);
2228
  taosHashCleanup(keyHashTable);
2229

2230
  //Parse timestamp
2231
  ret = parseSmlTimeStamp(&timestamp, &index, info);
2232
  if (ret) {
2233
    tscError("SML:0x%"PRIx64" Unable to parse timestamp", info->id);
2234
    return ret;
2235
  }
2236
  moveTimeStampToFirstKv(&smlData, timestamp);
2237
  tscDebug("SML:0x%"PRIx64" Parse timestamp finished", info->id);
2238

2239
  return TSDB_CODE_SUCCESS;
2240 2241
}

2242
//=========================================================================
2243

S
shenglian zhou 已提交
2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258
void destroySmlDataPoint(TAOS_SML_DATA_POINT* point) {
  for (int i=0; i<point->tagNum; ++i) {
    free((point->tags+i)->key);
    free((point->tags+i)->value);
  }
  free(point->tags);
  for (int i=0; i<point->fieldNum; ++i) {
    free((point->fields+i)->key);
    free((point->fields+i)->value);
  }
  free(point->fields);
  free(point->stableName);
  free(point->childTableName);
}

S
shenglian zhou 已提交
2259
int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines, SSmlLinesInfo* info) {
2260
  for (int32_t i = 0; i < numLines; ++i) {
2261
    TAOS_SML_DATA_POINT point = {0};
2262
    int32_t code = tscParseLine(lines[i], &point, info);
2263
    if (code != TSDB_CODE_SUCCESS) {
S
shenglian zhou 已提交
2264
      tscError("SML:0x%"PRIx64" data point line parse failed. line %d : %s", info->id, i, lines[i]);
S
shenglian zhou 已提交
2265
      destroySmlDataPoint(&point);
2266
      return code;
2267
    } else {
S
shenglian zhou 已提交
2268
      tscDebug("SML:0x%"PRIx64" data point line parse success. line %d", info->id, i);
2269 2270
    }

2271 2272
    taosArrayPush(points, &point);
  }
2273
  return TSDB_CODE_SUCCESS;
2274 2275
}

2276
int taos_insert_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType protocol, SMLTimeStampType tsType, int *affectedRows) {
2277
  int32_t code = 0;
2278

2279
  SSmlLinesInfo* info = tcalloc(1, sizeof(SSmlLinesInfo));
S
shenglian zhou 已提交
2280
  info->id = genLinesSmlId();
2281 2282
  info->tsType = tsType;
  info->protocol = protocol;
S
shenglian zhou 已提交
2283

2284
  if (numLines <= 0 || numLines > 65536) {
S
shenglian zhou 已提交
2285
    tscError("SML:0x%"PRIx64" taos_insert_lines numLines should be between 1 and 65536. numLines: %d", info->id, numLines);
2286
    tfree(info);
2287 2288 2289 2290 2291 2292
    code = TSDB_CODE_TSC_APP_ERROR;
    return code;
  }

  for (int i = 0; i < numLines; ++i) {
    if (lines[i] == NULL) {
S
shenglian zhou 已提交
2293
      tscError("SML:0x%"PRIx64" taos_insert_lines line %d is NULL", info->id, i);
2294
      tfree(info);
2295 2296 2297 2298 2299
      code = TSDB_CODE_TSC_APP_ERROR;
      return code;
    }
  }

2300
  SArray* lpPoints = taosArrayInit(numLines, sizeof(TAOS_SML_DATA_POINT));
2301
  if (lpPoints == NULL) {
S
shenglian zhou 已提交
2302
    tscError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id);
2303
    tfree(info);
2304 2305
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
2306

S
shenglian zhou 已提交
2307 2308
  tscDebug("SML:0x%"PRIx64" taos_insert_lines begin inserting %d lines, first line: %s", info->id, numLines, lines[0]);
  code = tscParseLines(lines, numLines, lpPoints, NULL, info);
S
shenglian zhou 已提交
2309 2310
  size_t numPoints = taosArrayGetSize(lpPoints);

2311 2312
  if (code != 0) {
    goto cleanup;
2313 2314
  }

2315
  TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints);
2316
  code = tscSmlInsert(taos, points, (int)numPoints, info);
2317
  if (code != 0) {
S
shenglian zhou 已提交
2318
    tscError("SML:0x%"PRIx64" taos_sml_insert error: %s", info->id, tstrerror((code)));
2319
  }
2320 2321 2322
  if (affectedRows != NULL) {
    *affectedRows = info->affectedRows;
  }
S
Shenglian Zhou 已提交
2323

2324
cleanup:
S
shenglian zhou 已提交
2325
  tscDebug("SML:0x%"PRIx64" taos_insert_lines finish inserting %d lines. code: %d", info->id, numLines, code);
2326 2327
  points = TARRAY_GET_START(lpPoints);
  numPoints = taosArrayGetSize(lpPoints);
S
Shenglian Zhou 已提交
2328 2329 2330
  for (int i=0; i<numPoints; ++i) {
    destroySmlDataPoint(points+i);
  }
2331 2332

  taosArrayDestroy(lpPoints);
S
shenglian zhou 已提交
2333

2334
  tfree(info);
2335
  return code;
2336 2337
}

G
Ganlin Zhao 已提交
2338 2339
static int32_t convertPrecisionType(int precision, SMLTimeStampType *tsType) {
  switch (precision) {
G
Ganlin Zhao 已提交
2340
    case TSDB_SML_TIMESTAMP_NOT_CONFIGURED:
G
Ganlin Zhao 已提交
2341 2342
      *tsType = SML_TIME_STAMP_NOT_CONFIGURED;
      break;
G
Ganlin Zhao 已提交
2343
    case TSDB_SML_TIMESTAMP_HOURS:
G
Ganlin Zhao 已提交
2344 2345
      *tsType = SML_TIME_STAMP_HOURS;
      break;
G
Ganlin Zhao 已提交
2346
    case TSDB_SML_TIMESTAMP_MILLI_SECONDS:
G
Ganlin Zhao 已提交
2347 2348
      *tsType = SML_TIME_STAMP_MILLI_SECONDS;
      break;
G
Ganlin Zhao 已提交
2349
    case TSDB_SML_TIMESTAMP_NANO_SECONDS:
G
Ganlin Zhao 已提交
2350 2351
      *tsType = SML_TIME_STAMP_NANO_SECONDS;
      break;
G
Ganlin Zhao 已提交
2352
    case TSDB_SML_TIMESTAMP_MICRO_SECONDS:
G
Ganlin Zhao 已提交
2353 2354
      *tsType = SML_TIME_STAMP_MICRO_SECONDS;
      break;
G
Ganlin Zhao 已提交
2355
    case TSDB_SML_TIMESTAMP_SECONDS:
G
Ganlin Zhao 已提交
2356 2357
      *tsType = SML_TIME_STAMP_SECONDS;
      break;
G
Ganlin Zhao 已提交
2358
    case TSDB_SML_TIMESTAMP_MINUTES:
G
Ganlin Zhao 已提交
2359 2360 2361 2362
      *tsType = SML_TIME_STAMP_MINUTES;
      break;
    default:
      return TSDB_CODE_TSC_INVALID_PRECISION_TYPE;
2363 2364
  }

G
Ganlin Zhao 已提交
2365 2366 2367
  return TSDB_CODE_SUCCESS;
}

G
Ganlin Zhao 已提交
2368
//make a dummy SSqlObj
G
Ganlin Zhao 已提交
2369
static SSqlObj* createSmlQueryObj(TAOS* taos, int32_t affected_rows, int32_t code) {
G
Ganlin Zhao 已提交
2370 2371 2372
  SSqlObj *pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
  if (pNew == NULL) {
    return NULL;
2373
  }
G
Ganlin Zhao 已提交
2374
  pNew->signature = pNew;
G
Ganlin Zhao 已提交
2375
  pNew->pTscObj = taos;
2376

G
Ganlin Zhao 已提交
2377 2378
  tsem_init(&pNew->rspSem, 0, 0);
  registerSqlObj(pNew);
2379

G
Ganlin Zhao 已提交
2380 2381 2382
  pNew->res.numOfRows = affected_rows;
  pNew->res.code = code;

G
Ganlin Zhao 已提交
2383

G
Ganlin Zhao 已提交
2384
  return pNew;
2385 2386
}

G
Ganlin Zhao 已提交
2387

2388
/**
2389
 * taos_schemaless_insert() parse and insert data points into database according to
2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408
 * different protocol.
 *
 * @param $lines input array may contain multiple lines, each line indicates a data point.
 *               If protocol=2 is used input array should contain single JSON
 *               string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
 *               multiple data points in JSON format, should include them in $JSON_string
 *               as a JSON array.
 * @param $numLines indicates how many data points in $lines.
 *                  If protocol = 2 is used this param will be ignored as $lines should
 *                  contain single JSON string.
 * @param $protocol indicates which protocol to use for parsing:
 *                  0 - influxDB line protocol
 *                  1 - OpenTSDB telnet line protocol
 *                  2 - OpenTSDB JSON format protocol
 * @return return zero for successful insertion. Otherwise return none-zero error code of
 *         failure reason.
 *
 */

G
Ganlin Zhao 已提交
2409 2410 2411
TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) {
  int code = TSDB_CODE_SUCCESS;
  int affected_rows = 0;
2412 2413
  SMLTimeStampType tsType;

G
Ganlin Zhao 已提交
2414
  if (protocol == TSDB_SML_LINE_PROTOCOL) {
G
Ganlin Zhao 已提交
2415
    code = convertPrecisionType(precision, &tsType);
2416
    if (code != TSDB_CODE_SUCCESS) {
G
Ganlin Zhao 已提交
2417
      return NULL;
2418 2419 2420
    }
  }

2421
  switch (protocol) {
G
Ganlin Zhao 已提交
2422
    case TSDB_SML_LINE_PROTOCOL:
G
Ganlin Zhao 已提交
2423
      code = taos_insert_lines(taos, lines, numLines, protocol, tsType, &affected_rows);
2424
      break;
G
Ganlin Zhao 已提交
2425
    case TSDB_SML_TELNET_PROTOCOL:
G
Ganlin Zhao 已提交
2426
      code = taos_insert_telnet_lines(taos, lines, numLines, protocol, tsType, &affected_rows);
2427
      break;
G
Ganlin Zhao 已提交
2428
    case TSDB_SML_JSON_PROTOCOL:
G
Ganlin Zhao 已提交
2429
      code = taos_insert_json_payload(taos, *lines, protocol, tsType, &affected_rows);
2430 2431 2432 2433 2434 2435
      break;
    default:
      code = TSDB_CODE_TSC_INVALID_PROTOCOL_TYPE;
      break;
  }

2436

G
Ganlin Zhao 已提交
2437
  SSqlObj *pSql = createSmlQueryObj(taos, affected_rows, code);
G
Ganlin Zhao 已提交
2438 2439

  return (TAOS_RES*)pSql;
2440
}