tscParseLineProtocol.c 79.0 KB
Newer Older
S
shenglian zhou 已提交
1 2 3 4
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
5

S
shenglian zhou 已提交
6 7 8 9 10 11 12 13
#include "os.h"
#include "osString.h"
#include "ttype.h"
#include "tmd5.h"
#include "tstrbuild.h"
#include "tname.h"
#include "hash.h"
#include "tskiplist.h"
14

S
shenglian zhou 已提交
15
#include "tscUtil.h"
16 17
#include "tsclient.h"
#include "tscLog.h"
S
shenglian zhou 已提交
18

19
#include "taos.h"
20
#include "tscParseLine.h"
21

S
shenglian zhou 已提交
22
typedef struct  {
23
  char sTableName[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE];
S
shenglian zhou 已提交
24 25 26 27
  SHashObj* tagHash;
  SHashObj* fieldHash;
  SArray* tags; //SArray<SSchema>
  SArray* fields; //SArray<SSchema>
S
shenglian zhou 已提交
28
  uint8_t precision;
S
shenglian zhou 已提交
29 30
} SSmlSTableSchema;

31 32
//=================================================================================================

33 34
static uint64_t linesSmlHandleId = 0;

35 36 37 38
static int32_t insertChildTablePointsBatch(void* pVoid, char* name, char* name1, SArray* pArray, SArray* pArray1,
                                           SArray* pArray2, SArray* pArray3, size_t size, SSmlLinesInfo* info);
static int32_t doInsertChildTablePoints(void* pVoid, char* sql, char* name, SArray* pArray, SArray* pArray1,
                                        SSmlLinesInfo* info);
39 40 41 42 43 44 45 46 47 48
uint64_t genLinesSmlId() {
  uint64_t id;

  do {
    id = atomic_add_fetch_64(&linesSmlHandleId, 1);
  } while (id == 0);

  return id;
}

S
shenglian zhou 已提交
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
int compareSmlColKv(const void* p1, const void* p2) {
  TAOS_SML_KV* kv1 = (TAOS_SML_KV*)p1;
  TAOS_SML_KV* kv2 = (TAOS_SML_KV*)p2;
  int kvLen1 = (int)strlen(kv1->key);
  int kvLen2 = (int)strlen(kv2->key);
  int res = strncasecmp(kv1->key, kv2->key, MIN(kvLen1, kvLen2));
  if (res != 0) {
    return res;
  } else {
    return kvLen1-kvLen2;
  }
}

typedef enum {
  SCHEMA_ACTION_CREATE_STABLE,
  SCHEMA_ACTION_ADD_COLUMN,
  SCHEMA_ACTION_ADD_TAG,
  SCHEMA_ACTION_CHANGE_COLUMN_SIZE,
  SCHEMA_ACTION_CHANGE_TAG_SIZE,
} ESchemaAction;

typedef struct {
71
  char sTableName[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE];
S
shenglian zhou 已提交
72 73 74 75 76
  SArray* tags; //SArray<SSchema>
  SArray* fields; //SArray<SSchema>
} SCreateSTableActionInfo;

typedef struct {
77
  char sTableName[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE];
S
shenglian zhou 已提交
78 79 80 81 82 83 84 85 86 87 88
  SSchema* field;
} SAlterSTableActionInfo;

typedef struct {
  ESchemaAction action;
  union {
    SCreateSTableActionInfo createSTable;
    SAlterSTableActionInfo alterSTable;
  };
} SSchemaAction;

S
shenglian zhou 已提交
89
static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes, uint64_t id) {
S
shenglian zhou 已提交
90 91 92 93 94 95
  if (!IS_VAR_DATA_TYPE(kv->type)) {
    *bytes = tDataTypes[kv->type].bytes;
  } else {
    if (kv->type == TSDB_DATA_TYPE_NCHAR) {
      char* ucs = malloc(kv->length * TSDB_NCHAR_SIZE + 1);
      int32_t bytesNeeded = 0;
S
Shenglian Zhou 已提交
96 97 98
      bool succ = taosMbsToUcs4(kv->value, kv->length, ucs, kv->length * TSDB_NCHAR_SIZE, &bytesNeeded);
      if (!succ) {
        free(ucs);
S
shenglian zhou 已提交
99
        tscError("SML:0x%"PRIx64" convert nchar string to UCS4_LE failed:%s", id, kv->value);
S
Shenglian Zhou 已提交
100 101
        return TSDB_CODE_TSC_INVALID_VALUE;
      }
S
shenglian zhou 已提交
102 103 104 105 106 107 108 109 110
      free(ucs);
      *bytes =  bytesNeeded + VARSTR_HEADER_SIZE;
    } else if (kv->type == TSDB_DATA_TYPE_BINARY) {
      *bytes = kv->length + VARSTR_HEADER_SIZE;
    }
  }
  return 0;
}

S
shenglian zhou 已提交
111
static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* array, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
112
  SSchema* pField = NULL;
113 114
  size_t* pFieldIdx = taosHashGet(hash, smlKv->key, strlen(smlKv->key));
  size_t fieldIdx = -1;
S
Shenglian Zhou 已提交
115
  int32_t code = 0;
116 117 118
  if (pFieldIdx) {
    fieldIdx = *pFieldIdx;
    pField = taosArrayGet(array, fieldIdx);
S
shenglian zhou 已提交
119 120

    if (pField->type != smlKv->type) {
S
shenglian zhou 已提交
121
      tscError("SML:0x%"PRIx64" type mismatch. key %s, type %d. type before %d", info->id, smlKv->key, smlKv->type, pField->type);
S
Shenglian Zhou 已提交
122
      return TSDB_CODE_TSC_INVALID_VALUE;
S
shenglian zhou 已提交
123 124 125
    }

    int32_t bytes = 0;
S
shenglian zhou 已提交
126
    code = getFieldBytesFromSmlKv(smlKv, &bytes, info->id);
S
Shenglian Zhou 已提交
127 128 129
    if (code != 0) {
      return code;
    }
S
shenglian zhou 已提交
130 131 132
    pField->bytes = MAX(pField->bytes, bytes);

  } else {
S
shenglian zhou 已提交
133
    SSchema field = {0};
S
shenglian zhou 已提交
134 135 136 137 138 139
    size_t tagKeyLen = strlen(smlKv->key);
    strncpy(field.name, smlKv->key, tagKeyLen);
    field.name[tagKeyLen] = '\0';
    field.type = smlKv->type;

    int32_t bytes = 0;
S
shenglian zhou 已提交
140
    code = getFieldBytesFromSmlKv(smlKv, &bytes, info->id);
S
Shenglian Zhou 已提交
141 142 143
    if (code != 0) {
      return code;
    }
S
shenglian zhou 已提交
144 145 146
    field.bytes = bytes;

    pField = taosArrayPush(array, &field);
147 148
    fieldIdx = taosArrayGetSize(array) - 1;
    taosHashPut(hash, field.name, tagKeyLen, &fieldIdx, sizeof(fieldIdx));
S
shenglian zhou 已提交
149
  }
150

151
  smlKv->fieldSchemaIdx = (uint32_t)fieldIdx;
152 153 154 155

  return 0;
}

S
Shengliang Guan 已提交
156 157 158 159 160 161
static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen,
                                       SSmlLinesInfo* info) {
  tscDebug("SML:0x%"PRIx64" taos_sml_insert get child table name through md5", info->id);
  qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv);

  SStringBuilder sb; memset(&sb, 0, sizeof(sb));
162
  char sTableName[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE] = {0};
163 164
  strncpy(sTableName, point->stableName, strlen(point->stableName));
  //strtolower(sTableName, point->stableName);
S
Shengliang Guan 已提交
165 166 167 168
  taosStringBuilderAppendString(&sb, sTableName);
  for (int j = 0; j < point->tagNum; ++j) {
    taosStringBuilderAppendChar(&sb, ',');
    TAOS_SML_KV* tagKv = point->tags + j;
169
    char tagName[TSDB_COL_NAME_LEN + TS_ESCAPE_CHAR_SIZE] = {0};
170 171
    strncpy(tagName, tagKv->key, strlen(tagKv->key));
    //strtolower(tagName, tagKv->key);
S
Shengliang Guan 已提交
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
    taosStringBuilderAppendString(&sb, tagName);
    taosStringBuilderAppendChar(&sb, '=');
    taosStringBuilderAppend(&sb, tagKv->value, tagKv->length);
  }
  size_t len = 0;
  char* keyJoined = taosStringBuilderGetResult(&sb, &len);
  MD5_CTX context;
  MD5Init(&context);
  MD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len);
  MD5Final(&context);
  *tableNameLen = snprintf(tableName, *tableNameLen,
                           "t_%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0],
                           context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6],
                           context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11],
                           context.digest[12], context.digest[13], context.digest[14], context.digest[15]);
  taosStringBuilderDestroy(&sb);
  tscDebug("SML:0x%"PRIx64" child table name: %s", info->id, tableName);
  return 0;
}

S
shenglian zhou 已提交
192
static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, SArray* stableSchemas, SSmlLinesInfo* info) {
S
Shenglian Zhou 已提交
193
  int32_t code = 0;
S
shenglian zhou 已提交
194 195 196 197 198 199
  SHashObj* sname2shema = taosHashInit(32,
                                       taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);

  for (int i = 0; i < numPoint; ++i) {
    TAOS_SML_DATA_POINT* point = &points[i];
    size_t stableNameLen = strlen(point->stableName);
200
    size_t* pStableIdx = taosHashGet(sname2shema, point->stableName, stableNameLen);
S
shenglian zhou 已提交
201
    SSmlSTableSchema* pStableSchema = NULL;
202 203 204 205
    size_t stableIdx = -1;
    if (pStableIdx) {
      pStableSchema= taosArrayGet(stableSchemas, *pStableIdx);
      stableIdx = *pStableIdx;
S
shenglian zhou 已提交
206 207 208 209 210 211
    } else {
      SSmlSTableSchema schema;
      strncpy(schema.sTableName, point->stableName, stableNameLen);
      schema.sTableName[stableNameLen] = '\0';
      schema.fields = taosArrayInit(64, sizeof(SSchema));
      schema.tags = taosArrayInit(8, sizeof(SSchema));
212 213
      schema.tagHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
      schema.fieldHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
S
shenglian zhou 已提交
214 215

      pStableSchema = taosArrayPush(stableSchemas, &schema);
216 217
      stableIdx = taosArrayGetSize(stableSchemas) - 1;
      taosHashPut(sname2shema, schema.sTableName, stableNameLen, &stableIdx, sizeof(size_t));
S
shenglian zhou 已提交
218 219 220 221
    }

    for (int j = 0; j < point->tagNum; ++j) {
      TAOS_SML_KV* tagKv = point->tags + j;
222
      if (!point->childTableName) {
223 224
        char childTableName[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE];
        int32_t tableNameLen = TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE;
225 226 227 228 229 230
        getSmlMd5ChildTableName(point, childTableName, &tableNameLen, info);
        point->childTableName = calloc(1, tableNameLen+1);
        strncpy(point->childTableName, childTableName, tableNameLen);
        point->childTableName[tableNameLen] = '\0';
      }

S
shenglian zhou 已提交
231
      code = buildSmlKvSchema(tagKv, pStableSchema->tagHash, pStableSchema->tags, info);
S
Shenglian Zhou 已提交
232
      if (code != 0) {
S
shenglian zhou 已提交
233
        tscError("SML:0x%"PRIx64" build data point schema failed. point no.: %d, tag key: %s", info->id, i, tagKv->key);
S
Shenglian Zhou 已提交
234 235
        return code;
      }
S
shenglian zhou 已提交
236 237 238 239
    }

    for (int j = 0; j < point->fieldNum; ++j) {
      TAOS_SML_KV* fieldKv = point->fields + j;
S
shenglian zhou 已提交
240
      code = buildSmlKvSchema(fieldKv, pStableSchema->fieldHash, pStableSchema->fields, info);
S
Shenglian Zhou 已提交
241
      if (code != 0) {
S
shenglian zhou 已提交
242
        tscError("SML:0x%"PRIx64" build data point schema failed. point no.: %d, tag key: %s", info->id, i, fieldKv->key);
S
Shenglian Zhou 已提交
243 244
        return code;
      }
S
shenglian zhou 已提交
245 246
    }

247
    point->schemaIdx = (uint32_t)stableIdx;
S
shenglian zhou 已提交
248 249 250 251 252 253 254 255 256 257
  }

  size_t numStables = taosArrayGetSize(stableSchemas);
  for (int32_t i = 0; i < numStables; ++i) {
    SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i);
    taosHashCleanup(schema->tagHash);
    taosHashCleanup(schema->fieldHash);
  }
  taosHashCleanup(sname2shema);

S
shenglian zhou 已提交
258
  tscDebug("SML:0x%"PRIx64" build point schema succeed. num of super table: %zu", info->id, numStables);
S
Shenglian Zhou 已提交
259 260 261 262 263 264
  for (int32_t i = 0; i < numStables; ++i) {
    SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i);
    tscDebug("\ttable name: %s, tags number: %zu, fields number: %zu", schema->sTableName,
             taosArrayGetSize(schema->tags), taosArrayGetSize(schema->fields));
  }

S
shenglian zhou 已提交
265 266 267
  return 0;
}

268
static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, SArray* dbAttrArray, bool isTag, char sTableName[],
S
shenglian zhou 已提交
269
                                       SSchemaAction* action, bool* actionNeeded, SSmlLinesInfo* info) {
270 271
  char fieldName[TSDB_COL_NAME_LEN + TS_ESCAPE_CHAR_SIZE] = {0};
  strcpy(fieldName, pointColField->name);
272

273
  size_t* pDbIndex = taosHashGet(dbAttrHash, fieldName, strlen(fieldName));
274 275 276
  if (pDbIndex) {
    SSchema* dbAttr = taosArrayGet(dbAttrArray, *pDbIndex);
    assert(strcasecmp(dbAttr->name, pointColField->name) == 0);
S
shenglian zhou 已提交
277
    if (pointColField->type != dbAttr->type) {
S
shenglian zhou 已提交
278
      tscError("SML:0x%"PRIx64" point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id, pointColField->name,
S
Shenglian Zhou 已提交
279 280
               pointColField->type, dbAttr->type);
      return TSDB_CODE_TSC_INVALID_VALUE;
S
shenglian zhou 已提交
281 282 283 284 285 286 287 288 289
    }

    if (IS_VAR_DATA_TYPE(pointColField->type) && (pointColField->bytes > dbAttr->bytes)) {
      if (isTag) {
        action->action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
      } else {
        action->action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
      }
      memset(&action->alterSTable, 0,  sizeof(SAlterSTableActionInfo));
290
      memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE);
S
shenglian zhou 已提交
291 292 293 294 295 296 297 298 299 300
      action->alterSTable.field = pointColField;
      *actionNeeded = true;
    }
  } else {
    if (isTag) {
      action->action = SCHEMA_ACTION_ADD_TAG;
    } else {
      action->action = SCHEMA_ACTION_ADD_COLUMN;
    }
    memset(&action->alterSTable, 0, sizeof(SAlterSTableActionInfo));
301
    memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE);
S
shenglian zhou 已提交
302 303 304
    action->alterSTable.field = pointColField;
    *actionNeeded = true;
  }
S
shenglian zhou 已提交
305
  if (*actionNeeded) {
306
    tscDebug("SML:0x%" PRIx64 " generate schema action. column name: %s, action: %d", info->id, fieldName,
S
shenglian zhou 已提交
307 308
             action->action);
  }
S
shenglian zhou 已提交
309 310 311
  return 0;
}

S
shenglian zhou 已提交
312
static int32_t buildColumnDescription(SSchema* field,
S
shenglian zhou 已提交
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332
                               char* buf, int32_t bufSize, int32_t* outBytes) {
  uint8_t type = field->type;

  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
    int32_t bytes = field->bytes - VARSTR_HEADER_SIZE;
    if (type == TSDB_DATA_TYPE_NCHAR) {
      bytes =  bytes/TSDB_NCHAR_SIZE;
    }
    int out = snprintf(buf, bufSize,"%s %s(%d)",
                       field->name,tDataTypes[field->type].name, bytes);
    *outBytes = out;
  } else {
    int out = snprintf(buf, bufSize, "%s %s",
                       field->name, tDataTypes[type].name);
    *outBytes = out;
  }

  return 0;
}

S
shenglian zhou 已提交
333

S
shenglian zhou 已提交
334
static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
335 336
  int32_t code = 0;
  int32_t outBytes = 0;
337 338
  char *result = (char *)calloc(1, tsMaxSQLStringLen+1);
  int32_t capacity = tsMaxSQLStringLen +  1;
S
shenglian zhou 已提交
339

S
shenglian zhou 已提交
340
  tscDebug("SML:0x%"PRIx64" apply schema action. action: %d", info->id, action->action);
S
shenglian zhou 已提交
341 342 343 344 345 346
  switch (action->action) {
    case SCHEMA_ACTION_ADD_COLUMN: {
      int n = sprintf(result, "alter stable %s add column ", action->alterSTable.sTableName);
      buildColumnDescription(action->alterSTable.field, result+n, capacity-n, &outBytes);
      TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
      code = taos_errno(res);
S
shenglian zhou 已提交
347 348 349
      char* errStr = taos_errstr(res);
      char* begin = strstr(errStr, "duplicated column names");
      bool tscDupColNames = (begin != NULL);
S
shenglian zhou 已提交
350
      if (code != TSDB_CODE_SUCCESS) {
S
shenglian zhou 已提交
351
        tscError("SML:0x%"PRIx64" apply schema action. error: %s", info->id, errStr);
S
shenglian zhou 已提交
352
      }
353 354
      taos_free_result(res);

S
shenglian zhou 已提交
355
      if (code == TSDB_CODE_MND_FIELD_ALREAY_EXIST || code == TSDB_CODE_MND_TAG_ALREAY_EXIST || tscDupColNames) {
356 357
        TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
        code = taos_errno(res2);
358 359 360
        if (code != TSDB_CODE_SUCCESS) {
          tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
361
        taos_free_result(res2);
362
        taosMsleep(500);
363
      }
S
shenglian zhou 已提交
364 365 366 367 368 369 370 371
      break;
    }
    case SCHEMA_ACTION_ADD_TAG: {
      int n = sprintf(result, "alter stable %s add tag ", action->alterSTable.sTableName);
      buildColumnDescription(action->alterSTable.field,
                             result+n, capacity-n, &outBytes);
      TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
      code = taos_errno(res);
S
shenglian zhou 已提交
372 373 374
      char* errStr = taos_errstr(res);
      char* begin = strstr(errStr, "duplicated column names");
      bool tscDupColNames = (begin != NULL);
S
shenglian zhou 已提交
375 376 377
      if (code != TSDB_CODE_SUCCESS) {
        tscError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
      }
378 379
      taos_free_result(res);

S
shenglian zhou 已提交
380
      if (code == TSDB_CODE_MND_TAG_ALREAY_EXIST || code == TSDB_CODE_MND_FIELD_ALREAY_EXIST || tscDupColNames) {
381 382
        TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
        code = taos_errno(res2);
383 384 385
        if (code != TSDB_CODE_SUCCESS) {
          tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
386
        taos_free_result(res2);
387
        taosMsleep(500);
388
      }
S
shenglian zhou 已提交
389 390 391 392 393 394 395 396
      break;
    }
    case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: {
      int n = sprintf(result, "alter stable %s modify column ", action->alterSTable.sTableName);
      buildColumnDescription(action->alterSTable.field, result+n,
                             capacity-n, &outBytes);
      TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
      code = taos_errno(res);
S
shenglian zhou 已提交
397 398 399
      if (code != TSDB_CODE_SUCCESS) {
        tscError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
      }
400 401
      taos_free_result(res);

402
      if (code == TSDB_CODE_MND_INVALID_COLUMN_LENGTH || code == TSDB_CODE_TSC_INVALID_COLUMN_LENGTH) {
403 404
        TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
        code = taos_errno(res2);
405 406 407
        if (code != TSDB_CODE_SUCCESS) {
          tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
408
        taos_free_result(res2);
409
        taosMsleep(500);
410
      }
411
      break;
S
shenglian zhou 已提交
412 413 414 415 416 417 418
    }
    case SCHEMA_ACTION_CHANGE_TAG_SIZE: {
      int n = sprintf(result, "alter stable %s modify tag ", action->alterSTable.sTableName);
      buildColumnDescription(action->alterSTable.field, result+n,
                             capacity-n, &outBytes);
      TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
      code = taos_errno(res);
S
shenglian zhou 已提交
419 420 421
      if (code != TSDB_CODE_SUCCESS) {
        tscError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
      }
422 423
      taos_free_result(res);

424
      if (code == TSDB_CODE_MND_INVALID_TAG_LENGTH || code == TSDB_CODE_TSC_INVALID_TAG_LENGTH) {
425 426
        TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
        code = taos_errno(res2);
427 428 429
        if (code != TSDB_CODE_SUCCESS) {
          tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
430
        taos_free_result(res2);
431
        taosMsleep(500);
432
      }
S
shenglian zhou 已提交
433 434 435 436 437
      break;
    }
    case SCHEMA_ACTION_CREATE_STABLE: {
      int n = sprintf(result, "create stable %s (", action->createSTable.sTableName);
      char* pos = result + n; int freeBytes = capacity - n;
S
shenglian zhou 已提交
438
      size_t numCols = taosArrayGetSize(action->createSTable.fields);
S
shenglian zhou 已提交
439 440 441 442 443 444 445
      for (int32_t i = 0; i < numCols; ++i) {
        SSchema* field = taosArrayGet(action->createSTable.fields, i);
        buildColumnDescription(field, pos, freeBytes, &outBytes);
        pos += outBytes; freeBytes -= outBytes;
        *pos = ','; ++pos; --freeBytes;
      }
      --pos; ++freeBytes;
446

S
shenglian zhou 已提交
447 448
      outBytes = snprintf(pos, freeBytes, ") tags (");
      pos += outBytes; freeBytes -= outBytes;
449

S
shenglian zhou 已提交
450
      size_t numTags = taosArrayGetSize(action->createSTable.tags);
S
shenglian zhou 已提交
451 452 453 454 455 456 457 458 459 460
      for (int32_t i = 0; i < numTags; ++i) {
        SSchema* field = taosArrayGet(action->createSTable.tags, i);
        buildColumnDescription(field, pos, freeBytes, &outBytes);
        pos += outBytes; freeBytes -= outBytes;
        *pos = ','; ++pos; --freeBytes;
      }
      pos--; ++freeBytes;
      outBytes = snprintf(pos, freeBytes, ")");
      TAOS_RES* res = taos_query(taos, result);
      code = taos_errno(res);
S
shenglian zhou 已提交
461 462 463
      if (code != TSDB_CODE_SUCCESS) {
        tscError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
      }
464 465
      taos_free_result(res);

466 467 468
      if (code == TSDB_CODE_MND_TABLE_ALREADY_EXIST) {
        TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
        code = taos_errno(res2);
469 470 471
        if (code != TSDB_CODE_SUCCESS) {
          tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
472
        taos_free_result(res2);
473
        taosMsleep(500);
474
      }
S
shenglian zhou 已提交
475 476
      break;
    }
S
shenglian zhou 已提交
477

S
shenglian zhou 已提交
478 479 480
    default:
      break;
  }
S
Shenglian Zhou 已提交
481

S
shenglian zhou 已提交
482
  free(result);
S
Shenglian Zhou 已提交
483
  if (code != 0) {
S
shenglian zhou 已提交
484
    tscError("SML:0x%"PRIx64 " apply schema action failure. %s", info->id, tstrerror(code));
S
Shenglian Zhou 已提交
485
  }
S
shenglian zhou 已提交
486 487 488
  return code;
}

489
static int32_t destroySmlSTableSchema(SSmlSTableSchema* schema) {
S
shenglian zhou 已提交
490 491 492 493 494 495 496
  taosHashCleanup(schema->tagHash);
  taosHashCleanup(schema->fieldHash);
  taosArrayDestroy(schema->tags);
  taosArrayDestroy(schema->fields);
  return 0;
}

497
static int32_t fillDbSchema(STableMeta* tableMeta, char* tableName, SSmlSTableSchema* schema, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
498 499 500 501 502 503 504 505 506 507
  schema->tags = taosArrayInit(8, sizeof(SSchema));
  schema->fields = taosArrayInit(64, sizeof(SSchema));
  schema->tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
  schema->fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);

  tstrncpy(schema->sTableName, tableName, strlen(tableName)+1);
  schema->precision = tableMeta->tableInfo.precision;
  for (int i=0; i<tableMeta->tableInfo.numOfColumns; ++i) {
    SSchema field;
    tstrncpy(field.name, tableMeta->schema[i].name, strlen(tableMeta->schema[i].name)+1);
508
    addEscapeCharToString(field.name, (int16_t)strlen(field.name));
S
shenglian zhou 已提交
509 510
    field.type = tableMeta->schema[i].type;
    field.bytes = tableMeta->schema[i].bytes;
511 512 513
    taosArrayPush(schema->fields, &field);
    size_t fieldIndex = taosArrayGetSize(schema->fields) - 1;
    taosHashPut(schema->fieldHash, field.name, strlen(field.name), &fieldIndex, sizeof(fieldIndex));
S
shenglian zhou 已提交
514 515 516 517 518 519
  }

  for (int i=0; i<tableMeta->tableInfo.numOfTags; ++i) {
    int j = i + tableMeta->tableInfo.numOfColumns;
    SSchema field;
    tstrncpy(field.name, tableMeta->schema[j].name, strlen(tableMeta->schema[j].name)+1);
520
    addEscapeCharToString(field.name, (int16_t)strlen(field.name));
S
shenglian zhou 已提交
521 522
    field.type = tableMeta->schema[j].type;
    field.bytes = tableMeta->schema[j].bytes;
523 524 525
    taosArrayPush(schema->tags, &field);
    size_t tagIndex = taosArrayGetSize(schema->tags) - 1;
    taosHashPut(schema->tagHash, field.name, strlen(field.name), &tagIndex, sizeof(tagIndex));
S
shenglian zhou 已提交
526
  }
527
  tscDebug("SML:0x%"PRIx64 " load table schema succeed. table name: %s, columns number: %d, tag number: %d, precision: %d",
S
shenglian zhou 已提交
528
           info->id, tableName, tableMeta->tableInfo.numOfColumns, tableMeta->tableInfo.numOfTags, schema->precision);
529 530 531
  return TSDB_CODE_SUCCESS;
}

532
static int32_t getTableMetaFromLocalCache(TAOS* taos, char* tableName, STableMeta** outTableMeta, SSmlLinesInfo* info) {
533 534
  int32_t code = 0;
  STableMeta* tableMeta = NULL;
535
  {
536 537 538 539 540 541 542 543 544 545
    SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
    if (pSql == NULL) {
      tscError("SML:0x%" PRIx64 " failed to allocate memory, reason:%s", info->id, strerror(errno));
      code = TSDB_CODE_TSC_OUT_OF_MEMORY;
      return code;
    }
    pSql->pTscObj = taos;
    pSql->signature = pSql;
    pSql->fp = NULL;

546
    registerSqlObj(pSql);
547
    char tableNameBuf[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE] = {0};
548 549 550
    memcpy(tableNameBuf, tableName, strlen(tableName));
    SStrToken tableToken = {.z = tableNameBuf, .n = (uint32_t)strlen(tableName), .type = TK_ID};
    tGetToken(tableNameBuf, &tableToken.type);
W
wpan 已提交
551
    bool dbIncluded = false;
552
    // Check if the table name available or not
W
wpan 已提交
553
    if (tscValidateName(&tableToken, true, &dbIncluded) != TSDB_CODE_SUCCESS) {
554 555
      code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
      sprintf(pSql->cmd.payload, "table name is invalid");
556
      taosReleaseRef(tscObjRef, pSql->self);
557 558 559 560
      return code;
    }

    SName sname = {0};
W
wpan 已提交
561
    if ((code = tscSetTableFullName(&sname, &tableToken, pSql, dbIncluded)) != TSDB_CODE_SUCCESS) {
562
      taosReleaseRef(tscObjRef, pSql->self);
563 564
      return code;
    }
565

566 567 568
    char fullTableName[TSDB_TABLE_FNAME_LEN] = {0};
    memset(fullTableName, 0, tListLen(fullTableName));
    tNameExtractFullName(&sname, fullTableName);
569 570

    size_t size = 0;
571
    taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), fullTableName, strlen(fullTableName), NULL, (void**)&tableMeta, &size);
D
dapan1121 已提交
572
    taosReleaseRef(tscObjRef, pSql->self);
573 574
  }

575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614
  if (tableMeta != NULL) {
    *outTableMeta = tableMeta;
    return TSDB_CODE_SUCCESS;
  } else {
    return TSDB_CODE_TSC_NO_META_CACHED;
  }
}

static int32_t retrieveTableMeta(TAOS* taos, char* tableName, STableMeta** pTableMeta, SSmlLinesInfo* info) {
  int32_t code = 0;
  int32_t retries = 0;
  STableMeta* tableMeta = NULL;
  while (retries++ <= TSDB_MAX_REPLICA && tableMeta == NULL) {
    STscObj* pObj = (STscObj*)taos;
    if (pObj == NULL || pObj->signature != pObj) {
      terrno = TSDB_CODE_TSC_DISCONNECTED;
      return TSDB_CODE_TSC_DISCONNECTED;
    }

    tscDebug("SML:0x%" PRIx64 " retrieve table meta. super table name: %s", info->id, tableName);
    code = getTableMetaFromLocalCache(taos, tableName, &tableMeta, info);
    if (code == TSDB_CODE_SUCCESS) {
      tscDebug("SML:0x%" PRIx64 " successfully retrieved table meta. super table name: %s", info->id, tableName);
      break;
    } else if (code == TSDB_CODE_TSC_NO_META_CACHED) {
      char sql[256];
      snprintf(sql, 256, "describe %s", tableName);
      TAOS_RES* res = taos_query(taos, sql);
      code = taos_errno(res);
      if (code != 0) {
        tscError("SML:0x%" PRIx64 " describe table failure. %s", info->id, taos_errstr(res));
        taos_free_result(res);
        return code;
      }
      taos_free_result(res);
    } else {
      return code;
    }
  }

615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634
  if (tableMeta != NULL) {
    *pTableMeta = tableMeta;
    return TSDB_CODE_SUCCESS;
  } else {
    tscError("SML:0x%" PRIx64 " failed to retrieve table meta. super table name: %s", info->id, tableName);
    return TSDB_CODE_TSC_NO_META_CACHED;
  }
}

static int32_t loadTableSchemaFromDB(TAOS* taos, char* tableName, SSmlSTableSchema* schema, SSmlLinesInfo* info) {
  int32_t code = 0;
  STableMeta* tableMeta = NULL;
  code = retrieveTableMeta(taos, tableName, &tableMeta, info);
  if (code == TSDB_CODE_SUCCESS) {
    assert(tableMeta != NULL);
    fillDbSchema(tableMeta, tableName, schema, info);
    free(tableMeta);
    tableMeta = NULL;
  }

S
shenglian zhou 已提交
635 636 637
  return code;
}

S
shenglian zhou 已提交
638
static int32_t modifyDBSchemas(TAOS* taos, SArray* stableSchemas, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
639 640 641 642
  int32_t code = 0;
  size_t numStable = taosArrayGetSize(stableSchemas);
  for (int i = 0; i < numStable; ++i) {
    SSmlSTableSchema* pointSchema = taosArrayGet(stableSchemas, i);
S
shenglian zhou 已提交
643 644
    SSmlSTableSchema  dbSchema;
    memset(&dbSchema, 0, sizeof(SSmlSTableSchema));
S
shenglian zhou 已提交
645

646
    code = loadTableSchemaFromDB(taos, pointSchema->sTableName, &dbSchema, info);
S
shenglian zhou 已提交
647 648 649 650
    if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) {
      SSchemaAction schemaAction = {0};
      schemaAction.action = SCHEMA_ACTION_CREATE_STABLE;
      memset(&schemaAction.createSTable, 0, sizeof(SCreateSTableActionInfo));
G
Ganlin Zhao 已提交
651
      memcpy(schemaAction.createSTable.sTableName, pointSchema->sTableName, TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE);
S
shenglian zhou 已提交
652 653
      schemaAction.createSTable.tags = pointSchema->tags;
      schemaAction.createSTable.fields = pointSchema->fields;
S
shenglian zhou 已提交
654
      applySchemaAction(taos, &schemaAction, info);
655
      code = loadTableSchemaFromDB(taos, pointSchema->sTableName, &dbSchema, info);
S
Shenglian Zhou 已提交
656
      if (code != 0) {
S
shenglian zhou 已提交
657
        tscError("SML:0x%"PRIx64" reconcile point schema failed. can not create %s", info->id, pointSchema->sTableName);
658
        return code;
S
Shenglian Zhou 已提交
659
      }
660 661 662 663
    }

    if (code == TSDB_CODE_SUCCESS) {
      pointSchema->precision = dbSchema.precision;
S
shenglian zhou 已提交
664 665 666 667 668 669 670 671 672 673
      size_t pointTagSize = taosArrayGetSize(pointSchema->tags);
      size_t pointFieldSize = taosArrayGetSize(pointSchema->fields);

      SHashObj* dbTagHash = dbSchema.tagHash;
      SHashObj* dbFieldHash = dbSchema.fieldHash;

      for (int j = 0; j < pointTagSize; ++j) {
        SSchema* pointTag = taosArrayGet(pointSchema->tags, j);
        SSchemaAction schemaAction = {0};
        bool actionNeeded = false;
S
shenglian zhou 已提交
674 675
        generateSchemaAction(pointTag, dbTagHash, dbSchema.tags, true, pointSchema->sTableName,
                             &schemaAction, &actionNeeded, info);
S
shenglian zhou 已提交
676
        if (actionNeeded) {
S
shenglian zhou 已提交
677
          code = applySchemaAction(taos, &schemaAction, info);
678 679 680 681
          if (code != 0) {
            destroySmlSTableSchema(&dbSchema);
            return code;
          }
S
shenglian zhou 已提交
682 683 684 685 686
        }
      }

      SSchema* pointColTs = taosArrayGet(pointSchema->fields, 0);
      SSchema* dbColTs = taosArrayGet(dbSchema.fields, 0);
687
      memcpy(pointColTs->name, dbColTs->name, TSDB_COL_NAME_LEN + TS_ESCAPE_CHAR_SIZE);
S
shenglian zhou 已提交
688 689 690 691 692

      for (int j = 1; j < pointFieldSize; ++j) {
        SSchema* pointCol = taosArrayGet(pointSchema->fields, j);
        SSchemaAction schemaAction = {0};
        bool actionNeeded = false;
S
shenglian zhou 已提交
693 694
        generateSchemaAction(pointCol, dbFieldHash, dbSchema.fields,false, pointSchema->sTableName,
                             &schemaAction, &actionNeeded, info);
S
shenglian zhou 已提交
695
        if (actionNeeded) {
S
shenglian zhou 已提交
696
          code = applySchemaAction(taos, &schemaAction, info);
697 698 699 700
          if (code != 0) {
            destroySmlSTableSchema(&dbSchema);
            return code;
          }
S
shenglian zhou 已提交
701 702 703 704 705
        }
      }

      pointSchema->precision = dbSchema.precision;

706
      destroySmlSTableSchema(&dbSchema);
S
shenglian zhou 已提交
707
    } else {
S
shenglian zhou 已提交
708
      tscError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code));
S
shenglian zhou 已提交
709 710 711 712 713 714
      return code;
    }
  }
  return 0;
}

715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752
static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int numPoints,
                                             SHashObj* cname2points, SArray* stableSchemas, SSmlLinesInfo* info) {
  for (int32_t i = 0; i < numPoints; ++i) {
    TAOS_SML_DATA_POINT * point = points + i;
    SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, point->schemaIdx);

    for (int j = 0; j < point->tagNum; ++j) {
      TAOS_SML_KV* kv =  point->tags + j;
      if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) {
        int64_t ts = *(int64_t*)(kv->value);
        ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision);
        *(int64_t*)(kv->value) = ts;
      }
    }

    for (int j = 0; j < point->fieldNum; ++j) {
      TAOS_SML_KV* kv =  point->fields + j;
      if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) {
        int64_t ts = *(int64_t*)(kv->value);
        ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision);
        *(int64_t*)(kv->value) = ts;
      }
    }

    SArray* cTablePoints = NULL;
    SArray** pCTablePoints = taosHashGet(cname2points, point->childTableName, strlen(point->childTableName));
    if (pCTablePoints) {
      cTablePoints = *pCTablePoints;
    } else {
      cTablePoints = taosArrayInit(64, sizeof(point));
      taosHashPut(cname2points, point->childTableName, strlen(point->childTableName), &cTablePoints, POINTER_BYTES);
    }
    taosArrayPush(cTablePoints, &point);
  }

  return 0;
}

753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777
static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema,
                                                  SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) {
  int32_t code = TSDB_CODE_SUCCESS;
  size_t numTags = taosArrayGetSize(sTableSchema->tags);
  size_t numCols = taosArrayGetSize(sTableSchema->fields);
  size_t rows = taosArrayGetSize(cTablePoints);
  SArray* tagsSchema = sTableSchema->tags;
  SArray* colsSchema = sTableSchema->fields;

  TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0};
  for (int i= 0; i < rows; ++i) {
    TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i);
    for (int j = 0; j < pDataPoint->tagNum; ++j) {
      TAOS_SML_KV* kv = pDataPoint->tags + j;
      tagKVs[kv->fieldSchemaIdx] = kv;
    }
  }

  char* sql = malloc(tsMaxSQLStringLen+1);
  if (sql == NULL) {
    tscError("malloc sql memory error");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  int32_t freeBytes = tsMaxSQLStringLen + 1 ;
778 779
  int32_t totalLen = 0;
  totalLen += sprintf(sql, "insert into %s using %s (", cTableName, sTableName);
780 781
  for (int i = 0; i < numTags; ++i) {
    SSchema* tagSchema = taosArrayGet(tagsSchema, i);
782
    totalLen += snprintf(sql+totalLen, freeBytes-totalLen, "%s,", tagSchema->name);
783
  }
784 785
  --totalLen;
  totalLen += snprintf(sql + totalLen, freeBytes-totalLen, ")");
786

787
  totalLen += snprintf(sql + totalLen, freeBytes-totalLen, " tags (");
788

S
shenglian zhou 已提交
789 790 791
//  for (int i = 0; i < numTags; ++i) {
//    snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
//  }
792 793
  for (int i = 0; i < numTags; ++i) {
    if (tagKVs[i] == NULL) {
794
      totalLen += snprintf(sql + totalLen, freeBytes-totalLen, "NULL,");
795 796
    } else {
      TAOS_SML_KV* kv =  tagKVs[i];
797
      size_t beforeLen = totalLen;
798
      int32_t len = 0;
799 800 801
      converToStr(sql+beforeLen, kv->type, kv->value, kv->length, &len);
      totalLen += len;
      totalLen += snprintf(sql+totalLen, freeBytes-totalLen, ",");
802 803
    }
  }
804 805
  --totalLen;
  totalLen += snprintf(sql + totalLen, freeBytes-totalLen, ") (");
806 807 808

  for (int i = 0; i < numCols; ++i) {
    SSchema* colSchema = taosArrayGet(colsSchema, i);
809
    totalLen += snprintf(sql+totalLen, freeBytes-totalLen, "%s,", colSchema->name);
810
  }
811 812
  --totalLen;
  totalLen += snprintf(sql + totalLen, freeBytes-totalLen, ") values ");
813 814 815

  TAOS_SML_KV** colKVs = malloc(numCols*sizeof(TAOS_SML_KV*));
  for (int r = 0; r < rows; ++r) {
816
    totalLen += snprintf(sql + totalLen, freeBytes-totalLen, "(");
817 818 819 820 821 822 823 824 825 826 827

    memset(colKVs, 0, numCols*sizeof(TAOS_SML_KV*));

    TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, r);
    for (int i = 0; i < point->fieldNum; ++i) {
      TAOS_SML_KV* kv = point->fields + i;
      colKVs[kv->fieldSchemaIdx] = kv;
    }

    for (int i = 0; i < numCols; ++i) {
      if (colKVs[i] == NULL) {
828
        totalLen += snprintf(sql + totalLen, freeBytes-totalLen, "NULL,");
829 830
      } else {
        TAOS_SML_KV* kv =  colKVs[i];
831
        size_t beforeLen = totalLen;
832
        int32_t len = 0;
833 834 835
        converToStr(sql+beforeLen, kv->type, kv->value, kv->length, &len);
        totalLen += len;
        totalLen += snprintf(sql+totalLen, freeBytes-totalLen, ",");
836 837
      }
    }
838 839
    --totalLen;
    totalLen += snprintf(sql+totalLen, freeBytes - totalLen, ")");
840 841
  }
  free(colKVs);
842
  sql[totalLen] = '\0';
843

844
  tscInfo("SML:0x%"PRIx64" insert child table table %s of super table %s sql: %s", info->id, cTableName, sTableName, sql);
S
shenglian zhou 已提交
845 846 847
  TAOS_RES* res = taos_query(taos, sql);
  code = taos_errno(res);
  info->affectedRows = taos_affected_rows(res);
848 849 850 851
  return code;
}

static int32_t applyChildTableDataPointsWithStmt(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema,
852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936
                                         SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) {
  size_t numTags = taosArrayGetSize(sTableSchema->tags);
  size_t numCols = taosArrayGetSize(sTableSchema->fields);
  size_t rows = taosArrayGetSize(cTablePoints);

  TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0};
  for (int i= 0; i < rows; ++i) {
    TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i);
    for (int j = 0; j < pDataPoint->tagNum; ++j) {
      TAOS_SML_KV* kv = pDataPoint->tags + j;
      tagKVs[kv->fieldSchemaIdx] = kv;
    }
  }

  //tag bind
  SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND));
  taosArraySetSize(tagBinds, numTags);
  int isNullColBind = TSDB_TRUE;
  for (int j = 0; j < numTags; ++j) {
    TAOS_BIND* bind = taosArrayGet(tagBinds, j);
    bind->is_null = &isNullColBind;
  }
  for (int j = 0; j < numTags; ++j) {
    if (tagKVs[j] == NULL) continue;
    TAOS_SML_KV* kv =  tagKVs[j];
    TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx);
    bind->buffer_type = kv->type;
    bind->length = malloc(sizeof(uintptr_t*));
    *bind->length = kv->length;
    bind->buffer = kv->value;
    bind->is_null = NULL;
  }

  //rows bind
  SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES);
  for (int i = 0; i < rows; ++i) {
    TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, i);

    TAOS_BIND* colBinds = calloc(numCols, sizeof(TAOS_BIND));
    if (colBinds == NULL) {
      tscError("SML:0x%"PRIx64" taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, "
               "num of rows: %zu, num of cols: %zu", info->id, rows, numCols);
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }

    for (int j = 0; j < numCols; ++j) {
      TAOS_BIND* bind = colBinds + j;
      bind->is_null = &isNullColBind;
    }
    for (int j = 0; j < point->fieldNum; ++j) {
      TAOS_SML_KV* kv = point->fields + j;
      TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx;
      bind->buffer_type = kv->type;
      bind->length = malloc(sizeof(uintptr_t*));
      *bind->length = kv->length;
      bind->buffer = kv->value;
      bind->is_null = NULL;
    }
    taosArrayPush(rowsBind, &colBinds);
  }

  int32_t code = 0;
  code = insertChildTablePointsBatch(taos, cTableName, sTableName, sTableSchema->tags, tagBinds, sTableSchema->fields, rowsBind, rowSize, info);
  if (code != 0) {
    tscError("SML:0x%"PRIx64" insert into child table %s failed. error %s", info->id, cTableName, tstrerror(code));
  }

  //free rows bind
  for (int i = 0; i < rows; ++i) {
    TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i);
    for (int j = 0; j < numCols; ++j) {
      TAOS_BIND* bind = colBinds + j;
      free(bind->length);
    }
    free(colBinds);
  }
  taosArrayDestroy(rowsBind);
  //free tag bind
  for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) {
    TAOS_BIND* bind = taosArrayGet(tagBinds, i);
    free(bind->length);
  }
  taosArrayDestroy(tagBinds);
  return code;
}
937

938 939 940 941
static int32_t insertChildTablePointsBatch(TAOS* taos, char* cTableName, char* sTableName,
                                           SArray* tagsSchema, SArray* tagsBind,
                                           SArray* colsSchema, SArray* rowsBind,
                                           size_t rowSize, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
942
  size_t numTags = taosArrayGetSize(tagsSchema);
943
  size_t numCols = taosArrayGetSize(colsSchema);
944
  char* sql = malloc(tsMaxSQLStringLen+1);
945 946 947 948
  if (sql == NULL) {
    tscError("malloc sql memory error");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
949

950 951
  int32_t freeBytes = tsMaxSQLStringLen + 1 ;
  sprintf(sql, "insert into ? using %s (", sTableName);
S
shenglian zhou 已提交
952 953 954 955 956
  for (int i = 0; i < numTags; ++i) {
    SSchema* tagSchema = taosArrayGet(tagsSchema, i);
    snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", tagSchema->name);
  }
  snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")");
957

S
shenglian zhou 已提交
958
  snprintf(sql + strlen(sql), freeBytes-strlen(sql), " tags (");
959

S
shenglian zhou 已提交
960
  for (int i = 0; i < numTags; ++i) {
S
shenglian zhou 已提交
961
    snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
S
shenglian zhou 已提交
962
  }
963
  snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ") (");
964

965 966 967
  for (int i = 0; i < numCols; ++i) {
    SSchema* colSchema = taosArrayGet(colsSchema, i);
    snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", colSchema->name);
S
shenglian zhou 已提交
968
  }
969
  snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ") values (");
S
shenglian zhou 已提交
970

971 972
  for (int i = 0; i < numCols; ++i) {
    snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
S
shenglian zhou 已提交
973
  }
974 975
  snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ")");
  sql[strlen(sql)] = '\0';
S
shenglian zhou 已提交
976

977
  tscDebug("SML:0x%"PRIx64" insert child table table %s of super table %s : %s", info->id, cTableName, sTableName, sql);
S
shenglian zhou 已提交
978

979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001
  size_t rows = taosArrayGetSize(rowsBind);
  size_t maxBatchSize = TSDB_MAX_WAL_SIZE/rowSize * 4 / 5;
  size_t batchSize = MIN(maxBatchSize, rows);
  tscDebug("SML:0x%"PRIx64" insert rows into child table %s. num of rows: %zu, batch size: %zu",
           info->id, cTableName, rows, batchSize);
  SArray* batchBind = taosArrayInit(batchSize, POINTER_BYTES);
  int32_t code = TSDB_CODE_SUCCESS;
  for (int i = 0; i < rows;) {
    int j = i;
    for (; j < i + batchSize && j<rows; ++j) {
      taosArrayPush(batchBind, taosArrayGet(rowsBind, j));
    }
    if (j > i) {
      tscDebug("SML:0x%"PRIx64" insert child table batch from line %d to line %d.", info->id, i, j - 1);
      code = doInsertChildTablePoints(taos, sql, cTableName, tagsBind, batchBind, info);
      if (code != 0) {
        taosArrayDestroy(batchBind);
        tfree(sql);
        return code;
      }
      taosArrayClear(batchBind);
    }
    i = j;
1002
  }
1003 1004
  taosArrayDestroy(batchBind);
  tfree(sql);
1005
  return code;
S
shenglian zhou 已提交
1006

1007 1008 1009
}
static int32_t doInsertChildTablePoints(TAOS* taos, char* sql, char* cTableName, SArray* tagsBind, SArray* batchBind,
                                        SSmlLinesInfo* info) {
1010
  int32_t code = 0;
S
shenglian zhou 已提交
1011

W
wpan 已提交
1012
  TAOS_STMT* stmt = taos_stmt_init(taos);
1013 1014 1015
  if (stmt == NULL) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
1016

S
shenglian zhou 已提交
1017
  code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql));
1018

W
wpan 已提交
1019
  if (code != 0) {
1020
    tscError("SML:0x%"PRIx64" taos_stmt_prepare return %d:%s", info->id, code, taos_stmt_errstr(stmt));
1021
    taos_stmt_close(stmt);
W
wpan 已提交
1022 1023 1024
    return code;
  }

1025
  bool tryAgain = false;
1026
  int32_t try = 0;
W
wpan 已提交
1027
  do {
1028
    code = taos_stmt_set_tbname_tags(stmt, cTableName, TARRAY_GET_START(tagsBind));
1029
    if (code != 0) {
1030 1031 1032 1033 1034
      tscError("SML:0x%"PRIx64" taos_stmt_set_tbname return %d:%s", info->id, code, taos_stmt_errstr(stmt));

      int affectedRows = taos_stmt_affected_rows(stmt);
      info->affectedRows += affectedRows;

1035
      taos_stmt_close(stmt);
1036 1037
      return code;
    }
1038

1039
    size_t rows = taosArrayGetSize(batchBind);
1040
    for (int32_t i = 0; i < rows; ++i) {
1041
      TAOS_BIND* colsBinds = taosArrayGetP(batchBind, i);
1042 1043
      code = taos_stmt_bind_param(stmt, colsBinds);
      if (code != 0) {
1044 1045 1046 1047 1048
        tscError("SML:0x%"PRIx64" taos_stmt_bind_param return %d:%s", info->id, code, taos_stmt_errstr(stmt));

        int affectedRows = taos_stmt_affected_rows(stmt);
        info->affectedRows += affectedRows;

1049
        taos_stmt_close(stmt);
1050 1051 1052 1053
        return code;
      }
      code = taos_stmt_add_batch(stmt);
      if (code != 0) {
1054 1055 1056 1057 1058
        tscError("SML:0x%"PRIx64" taos_stmt_add_batch return %d:%s", info->id, code, taos_stmt_errstr(stmt));

        int affectedRows = taos_stmt_affected_rows(stmt);
        info->affectedRows += affectedRows;

1059
        taos_stmt_close(stmt);
1060 1061 1062 1063 1064 1065
        return code;
      }
    }

    code = taos_stmt_execute(stmt);
    if (code != 0) {
1066
      tscError("SML:0x%"PRIx64" taos_stmt_execute return %d:%s, try:%d", info->id, code, taos_stmt_errstr(stmt), try);
1067
    }
1068
    tscDebug("SML:0x%"PRIx64" taos_stmt_execute inserted %d rows", info->id, taos_stmt_affected_rows(stmt));
1069

1070 1071
    tryAgain = false;
    if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID
1072 1073 1074 1075
         || code == TSDB_CODE_VND_INVALID_VGROUP_ID
         || code == TSDB_CODE_TDB_TABLE_RECONFIGURE
         || code == TSDB_CODE_APP_NOT_READY
         || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) {
1076 1077 1078
      tryAgain = true;
    }

1079
    if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
1080 1081 1082 1083 1084 1085
      TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
      int32_t   code2 = taos_errno(res2);
      if (code2 != TSDB_CODE_SUCCESS) {
        tscError("SML:0x%" PRIx64 " insert child table. reset query cache. error: %s", info->id, taos_errstr(res2));
      }
      taos_free_result(res2);
1086
      if (tryAgain) {
1087
        taosMsleep(100 * (2 << try));
1088
      }
1089
    }
1090
    if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
1091
      if (tryAgain) {
1092
        taosMsleep( 100 * (2 << try));
1093 1094 1095 1096
      }
    }
  } while (tryAgain);

1097 1098
  int affectedRows = taos_stmt_affected_rows(stmt);
  info->affectedRows += affectedRows;
1099

1100
  taos_stmt_close(stmt);
S
shenglian zhou 已提交
1101
  return code;
1102

S
shenglian zhou 已提交
1103 1104 1105
  return 0;
}

1106 1107 1108 1109 1110
static int32_t applyChildTableDataPoints(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema,
                                                 SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) {
  int32_t code = TSDB_CODE_SUCCESS;
  size_t childTableDataPoints = taosArrayGetSize(cTablePoints);
  if (childTableDataPoints < 10) {
1111
    code = applyChildTableDataPointsWithInsertSQL(taos, cTableName, sTableName, sTableSchema, cTablePoints, rowSize, info);
1112
  } else {
1113
    code = applyChildTableDataPointsWithStmt(taos, cTableName, sTableName, sTableSchema, cTablePoints, rowSize, info);
1114 1115 1116 1117
  }
  return code;
}

S
shenglian zhou 已提交
1118
static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas, SSmlLinesInfo* info) {
1119
  int32_t code = TSDB_CODE_SUCCESS;
1120

1121
  SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
S
shenglian zhou 已提交
1122
  arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas, info);
1123 1124 1125 1126 1127 1128

  SArray** pCTablePoints = taosHashIterate(cname2points, NULL);
  while (pCTablePoints) {
    SArray* cTablePoints = *pCTablePoints;

    TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0);
1129
    SSmlSTableSchema*    sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
S
shenglian zhou 已提交
1130

1131 1132 1133 1134 1135 1136 1137
    size_t rowSize = 0;
    size_t numCols = taosArrayGetSize(sTableSchema->fields);
    for (int i = 0; i < numCols; ++i) {
      SSchema* colSchema = taosArrayGet(sTableSchema->fields, i);
      rowSize += colSchema->bytes;
    }

1138 1139 1140
    tscDebug("SML:0x%"PRIx64" apply child table points. child table: %s of super table %s, row size: %zu",
             info->id, point->childTableName, point->stableName, rowSize);
    code = applyChildTableDataPoints(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints, rowSize, info);
1141
    if (code != 0) {
1142
      tscError("SML:0x%"PRIx64" Apply child table points failed. child table %s, error %s", info->id, point->childTableName, tstrerror(code));
1143
      goto cleanup;
1144
    }
1145

S
shenglian zhou 已提交
1146
    tscDebug("SML:0x%"PRIx64" successfully applied data points of child table %s", info->id, point->childTableName);
S
shenglian zhou 已提交
1147

S
shenglian zhou 已提交
1148
    pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
1149
  }
S
shenglian zhou 已提交
1150

1151 1152 1153 1154 1155
cleanup:
  pCTablePoints = taosHashIterate(cname2points, NULL);
  while (pCTablePoints) {
    SArray* pPoints = *pCTablePoints;
    taosArrayDestroy(pPoints);
S
shenglian zhou 已提交
1156
    pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
1157
  }
S
shenglian zhou 已提交
1158
  taosHashCleanup(cname2points);
1159
  return code;
S
shenglian zhou 已提交
1160
}
1161

1162
int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
1163
  tscDebug("SML:0x%"PRIx64" taos_sml_insert. number of points: %d", info->id, numPoint);
S
Shenglian Zhou 已提交
1164

S
shenglian zhou 已提交
1165 1166
  int32_t code = TSDB_CODE_SUCCESS;

1167
  info->affectedRows = 0;
S
shenglian zhou 已提交
1168

S
shenglian zhou 已提交
1169
  tscDebug("SML:0x%"PRIx64" build data point schemas", info->id);
S
shenglian zhou 已提交
1170
  SArray* stableSchemas = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray<STableColumnsSchema>
S
shenglian zhou 已提交
1171
  code = buildDataPointSchemas(points, numPoint, stableSchemas, info);
S
shenglian zhou 已提交
1172
  if (code != 0) {
S
shenglian zhou 已提交
1173
    tscError("SML:0x%"PRIx64" error building data point schemas : %s", info->id, tstrerror(code));
S
shenglian zhou 已提交
1174 1175 1176
    goto clean_up;
  }

S
shenglian zhou 已提交
1177 1178
  tscDebug("SML:0x%"PRIx64" modify db schemas", info->id);
  code = modifyDBSchemas(taos, stableSchemas, info);
S
shenglian zhou 已提交
1179
  if (code != 0) {
S
shenglian zhou 已提交
1180
    tscError("SML:0x%"PRIx64" error change db schema : %s", info->id, tstrerror(code));
S
shenglian zhou 已提交
1181 1182
    goto clean_up;
  }
S
shenglian zhou 已提交
1183

S
shenglian zhou 已提交
1184 1185
  tscDebug("SML:0x%"PRIx64" apply data points", info->id);
  code = applyDataPoints(taos, points, numPoint, stableSchemas, info);
S
shenglian zhou 已提交
1186
  if (code != 0) {
S
shenglian zhou 已提交
1187
    tscError("SML:0x%"PRIx64" error apply data points : %s", info->id, tstrerror(code));
S
shenglian zhou 已提交
1188 1189 1190
  }

clean_up:
S
shenglian zhou 已提交
1191 1192 1193 1194
  for (int i = 0; i < taosArrayGetSize(stableSchemas); ++i) {
    SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i);
    taosArrayDestroy(schema->fields);
    taosArrayDestroy(schema->tags);
1195
  }
S
shenglian zhou 已提交
1196
  taosArrayDestroy(stableSchemas);
1197 1198 1199
  return code;
}

1200
int tsc_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
1201 1202
  SSmlLinesInfo* info = calloc(1, sizeof(SSmlLinesInfo));
  info->id = genLinesSmlId();
1203 1204
  int code = tscSmlInsert(taos, points, numPoint, info);
  free(info);
1205 1206
  return code;
}
S
shenglian zhou 已提交
1207

1208 1209
//=========================================================================

1210 1211 1212 1213 1214
/*        Field                          Escape charaters
    1: measurement                        Comma,Space
    2: tag_key, tag_value, field_key  Comma,Equal Sign,Space
    3: field_value                    Double quote,Backslash
*/
1215
static void escapeSpecialCharacter(uint8_t field, const char **pos) {
1216 1217 1218
  const char *cur = *pos;
  if (*cur != '\\') {
    return;
1219
  }
1220 1221 1222 1223 1224 1225
  switch (field) {
    case 1:
      switch (*(cur + 1)) {
        case ',':
        case ' ':
          cur++;
1226
          break;
1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253
        default:
          break;
      }
      break;
    case 2:
      switch (*(cur + 1)) {
        case ',':
        case ' ':
        case '=':
          cur++;
          break;
        default:
          break;
      }
      break;
    case 3:
      switch (*(cur + 1)) {
        case '"':
        case '\\':
          cur++;
          break;
        default:
          break;
      }
      break;
    default:
      break;
1254
  }
1255
  *pos = cur;
1256
}
1257

1258
char* addEscapeCharToString(char *str, int32_t len) {
1259
  if (str == NULL) {
1260
    return NULL;
1261 1262
  }
  memmove(str + 1, str, len);
1263 1264
  str[0] = str[len + 1] = TS_ESCAPE_CHAR;
  str[len + 2] = '\0';
1265
  return str;
1266 1267
}

1268
bool isValidInteger(char *str) {
1269 1270
  char *c = str;
  if (*c != '+' && *c != '-' && !isdigit(*c)) {
1271 1272
    return false;
  }
1273 1274 1275 1276 1277 1278
  c++;
  while (*c != '\0') {
    if (!isdigit(*c)) {
      return false;
    }
    c++;
1279
  }
1280
  return true;
1281
}
1282

1283
bool isValidFloat(char *str) {
1284 1285 1286 1287 1288 1289 1290
  char *c = str;
  uint8_t has_dot, has_exp, has_sign;
  has_dot = 0;
  has_exp = 0;
  has_sign = 0;

  if (*c != '+' && *c != '-' && *c != '.' && !isdigit(*c)) {
1291 1292
    return false;
  }
1293 1294
  if (*c == '.' && isdigit(*(c + 1))) {
    has_dot = 1;
1295
  }
1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336
  c++;
  while (*c != '\0') {
    if (!isdigit(*c)) {
      switch (*c) {
        case '.': {
          if (!has_dot && !has_exp && isdigit(*(c + 1))) {
            has_dot = 1;
          } else {
            return false;
          }
          break;
        }
        case 'e':
        case 'E': {
          if (!has_exp && isdigit(*(c - 1)) &&
              (isdigit(*(c + 1)) ||
               *(c + 1) == '+' ||
               *(c + 1) == '-')) {
            has_exp = 1;
          } else {
            return false;
          }
          break;
        }
        case '+':
        case '-': {
          if (!has_sign && has_exp && isdigit(*(c + 1))) {
            has_sign = 1;
          } else {
            return false;
          }
          break;
        }
        default: {
          return false;
        }
      }
    }
    c++;
  } //while
  return true;
1337
}
1338

1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354
static bool isInteger(char *pVal, uint16_t len, bool *has_sign) {
  if (len <= 1) {
    return false;
  }
  if (pVal[len - 1] == 'i') {
    *has_sign = true;
    return true;
  }
  if (pVal[len - 1] == 'u') {
    *has_sign = false;
    return true;
  }

  return false;
}

1355
static bool isTinyInt(char *pVal, uint16_t len) {
1356 1357 1358
  if (len <= 2) {
    return false;
  }
1359
  if (!strcasecmp(&pVal[len - 2], "i8")) {
1360
    //printf("Type is int8(%s)\n", pVal);
1361 1362 1363 1364
    return true;
  }
  return false;
}
1365

1366
static bool isTinyUint(char *pVal, uint16_t len) {
1367 1368 1369 1370 1371
  if (len <= 2) {
    return false;
  }
  if (pVal[0] == '-') {
    return false;
1372
  }
1373
  if (!strcasecmp(&pVal[len - 2], "u8")) {
1374
    //printf("Type is uint8(%s)\n", pVal);
1375 1376 1377
    return true;
  }
  return false;
1378 1379
}

1380
static bool isSmallInt(char *pVal, uint16_t len) {
1381 1382 1383
  if (len <= 3) {
    return false;
  }
1384
  if (!strcasecmp(&pVal[len - 3], "i16")) {
1385
    //printf("Type is int16(%s)\n", pVal);
1386
    return true;
1387
  }
1388
  return false;
1389 1390
}

1391
static bool isSmallUint(char *pVal, uint16_t len) {
1392 1393
  if (len <= 3) {
    return false;
1394
  }
1395 1396 1397
  if (pVal[0] == '-') {
    return false;
  }
1398
  if (strcasecmp(&pVal[len - 3], "u16") == 0) {
1399
    //printf("Type is uint16(%s)\n", pVal);
1400 1401 1402
    return true;
  }
  return false;
1403 1404
}

1405
static bool isInt(char *pVal, uint16_t len) {
1406 1407
  if (len <= 3) {
    return false;
1408
  }
1409
  if (strcasecmp(&pVal[len - 3], "i32") == 0) {
1410
    //printf("Type is int32(%s)\n", pVal);
1411 1412 1413
    return true;
  }
  return false;
1414 1415
}

1416
static bool isUint(char *pVal, uint16_t len) {
1417 1418 1419 1420 1421 1422
  if (len <= 3) {
    return false;
  }
  if (pVal[0] == '-') {
    return false;
  }
1423
  if (strcasecmp(&pVal[len - 3], "u32") == 0) {
1424
    //printf("Type is uint32(%s)\n", pVal);
1425 1426 1427
    return true;
  }
  return false;
1428 1429
}

1430
static bool isBigInt(char *pVal, uint16_t len) {
1431 1432
  if (len <= 3) {
    return false;
1433
  }
1434
  if (strcasecmp(&pVal[len - 3], "i64") == 0) {
1435
    //printf("Type is int64(%s)\n", pVal);
1436 1437 1438
    return true;
  }
  return false;
1439 1440
}

1441
static bool isBigUint(char *pVal, uint16_t len) {
1442 1443 1444 1445 1446
  if (len <= 3) {
    return false;
  }
  if (pVal[0] == '-') {
    return false;
1447
  }
1448
  if (strcasecmp(&pVal[len - 3], "u64") == 0) {
1449
    //printf("Type is uint64(%s)\n", pVal);
1450 1451 1452
    return true;
  }
  return false;
1453 1454
}

1455
static bool isFloat(char *pVal, uint16_t len) {
1456 1457 1458
  if (len <= 3) {
    return false;
  }
1459
  if (strcasecmp(&pVal[len - 3], "f32") == 0) {
1460
    //printf("Type is float(%s)\n", pVal);
1461 1462 1463
    return true;
  }
  return false;
1464 1465
}

1466
static bool isDouble(char *pVal, uint16_t len) {
1467 1468 1469
  if (len <= 3) {
    return false;
  }
1470
  if (strcasecmp(&pVal[len - 3], "f64") == 0) {
1471
    //printf("Type is double(%s)\n", pVal);
1472 1473 1474 1475 1476
    return true;
  }
  return false;
}

1477
static bool isBool(char *pVal, uint16_t len, bool *bVal) {
1478
  if ((len == 1) && !strcasecmp(&pVal[len - 1], "t")) {
1479 1480
    //printf("Type is bool(%c)\n", pVal[len - 1]);
    *bVal = true;
1481
    return true;
1482
  }
1483

1484
  if ((len == 1) && !strcasecmp(&pVal[len - 1], "f")) {
1485 1486
    //printf("Type is bool(%c)\n", pVal[len - 1]);
    *bVal = false;
1487
    return true;
1488
  }
1489

1490
  if((len == 4) && !strcasecmp(&pVal[len - 4], "true")) {
1491 1492
    //printf("Type is bool(%s)\n", &pVal[len - 4]);
    *bVal = true;
1493 1494
    return true;
  }
1495
  if((len == 5) && !strcasecmp(&pVal[len - 5], "false")) {
1496 1497
    //printf("Type is bool(%s)\n", &pVal[len - 5]);
    *bVal = false;
1498 1499 1500
    return true;
  }
  return false;
1501 1502
}

1503
static bool isBinary(char *pVal, uint16_t len) {
1504 1505 1506 1507 1508 1509
  //binary: "abc"
  if (len < 2) {
    return false;
  }
  //binary
  if (pVal[0] == '"' && pVal[len - 1] == '"') {
1510
    //printf("Type is binary(%s)\n", pVal);
1511 1512 1513 1514
    return true;
  }
  return false;
}
1515

1516
static bool isNchar(char *pVal, uint16_t len) {
1517 1518
  //nchar: L"abc"
  if (len < 3) {
1519 1520
    return false;
  }
1521
  if ((pVal[0] == 'l' || pVal[0] == 'L')&& pVal[1] == '"' && pVal[len - 1] == '"') {
1522
    //printf("Type is nchar(%s)\n", pVal);
1523
    return true;
1524
  }
1525 1526 1527
  return false;
}

1528
static int32_t isTimeStamp(char *pVal, uint16_t len, SMLTimeStampType *tsType, SSmlLinesInfo* info) {
1529
  if (len == 0) {
1530
    return TSDB_CODE_SUCCESS;
1531 1532 1533
  }
  if ((len == 1) && pVal[0] == '0') {
    *tsType = SML_TIME_STAMP_NOW;
1534
    return TSDB_CODE_SUCCESS;
1535
  }
1536

1537 1538 1539
  for (int i = 0; i < len; ++i) {
    if(!isdigit(pVal[i])) {
      return TSDB_CODE_TSC_INVALID_TIME_STAMP;
1540
    }
1541
  }
1542

1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559
  /* For InfluxDB line protocol use user passed timestamp precision
   * For OpenTSDB protocols only 10 digit(seconds) or 13 digits(milliseconds)
   *     precision allowed
   */
  if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
    if (info->tsType != SML_TIME_STAMP_NOT_CONFIGURED) {
      *tsType = info->tsType;
    } else {
      *tsType = SML_TIME_STAMP_NANO_SECONDS;
    }
  } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
    if (len == SML_TIMESTAMP_SECOND_DIGITS) {
      *tsType = SML_TIME_STAMP_SECONDS;
    } else if (len == SML_TIMESTAMP_MILLI_SECOND_DIGITS) {
      *tsType = SML_TIME_STAMP_MILLI_SECONDS;
    } else {
      return TSDB_CODE_TSC_INVALID_TIME_STAMP;
1560 1561
    }
  }
1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586
  return TSDB_CODE_SUCCESS;

  //if (pVal[len - 1] == 's') {
  //  switch (pVal[len - 2]) {
  //    case 'm':
  //      *tsType = SML_TIME_STAMP_MILLI_SECONDS;
  //      break;
  //    case 'u':
  //      *tsType = SML_TIME_STAMP_MICRO_SECONDS;
  //      break;
  //    case 'n':
  //      *tsType = SML_TIME_STAMP_NANO_SECONDS;
  //      break;
  //    default:
  //      if (isdigit(pVal[len - 2])) {
  //        *tsType = SML_TIME_STAMP_SECONDS;
  //        break;
  //      } else {
  //        return false;
  //      }
  //  }
  //  //printf("Type is timestamp(%s)\n", pVal);
  //  return true;
  //}
  //return false;
1587
}
1588

1589
static bool convertStrToNumber(TAOS_SML_KV *pVal, char *str, SSmlLinesInfo* info) {
1590 1591 1592
  errno = 0;
  uint8_t type = pVal->type;
  int16_t length = pVal->length;
1593 1594 1595
  int64_t val_s = 0;
  uint64_t val_u = 0;
  double val_d = 0.0;
1596

1597
  strntolower_s(str, str, (int32_t)strlen(str));
1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608
  if (IS_FLOAT_TYPE(type)) {
    val_d = strtod(str, NULL);
  } else {
    if (IS_SIGNED_NUMERIC_TYPE(type)) {
      val_s = strtoll(str, NULL, 10);
    } else {
      val_u = strtoull(str, NULL, 10);
    }
  }

  if (errno == ERANGE) {
1609
    tscError("SML:0x%"PRIx64" Convert number(%s) out of range", info->id, str);
1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688
    return false;
  }

  switch (type) {
    case TSDB_DATA_TYPE_TINYINT:
      if (!IS_VALID_TINYINT(val_s)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(int8_t *)(pVal->value) = (int8_t)val_s;
      break;
    case TSDB_DATA_TYPE_UTINYINT:
      if (!IS_VALID_UTINYINT(val_u)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(uint8_t *)(pVal->value) = (uint8_t)val_u;
      break;
    case TSDB_DATA_TYPE_SMALLINT:
      if (!IS_VALID_SMALLINT(val_s)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(int16_t *)(pVal->value) = (int16_t)val_s;
      break;
    case TSDB_DATA_TYPE_USMALLINT:
      if (!IS_VALID_USMALLINT(val_u)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(uint16_t *)(pVal->value) = (uint16_t)val_u;
      break;
    case TSDB_DATA_TYPE_INT:
      if (!IS_VALID_INT(val_s)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(int32_t *)(pVal->value) = (int32_t)val_s;
      break;
    case TSDB_DATA_TYPE_UINT:
      if (!IS_VALID_UINT(val_u)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(uint32_t *)(pVal->value) = (uint32_t)val_u;
      break;
    case TSDB_DATA_TYPE_BIGINT:
      if (!IS_VALID_BIGINT(val_s)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(int64_t *)(pVal->value) = (int64_t)val_s;
      break;
    case TSDB_DATA_TYPE_UBIGINT:
      if (!IS_VALID_UBIGINT(val_u)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(uint64_t *)(pVal->value) = (uint64_t)val_u;
      break;
    case TSDB_DATA_TYPE_FLOAT:
      if (!IS_VALID_FLOAT(val_d)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(float *)(pVal->value) = (float)val_d;
      break;
    case TSDB_DATA_TYPE_DOUBLE:
      if (!IS_VALID_DOUBLE(val_d)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(double *)(pVal->value) = (double)val_d;
      break;
    default:
      return false;
  }
  return true;
}
1689
//len does not include '\0' from value.
1690
bool convertSmlValueType(TAOS_SML_KV *pVal, char *value,
1691
                         uint16_t len, SSmlLinesInfo* info, bool isTag) {
1692 1693 1694
  if (len <= 0) {
    return false;
  }
G
Ganlin Zhao 已提交
1695

1696 1697 1698 1699 1700 1701 1702 1703 1704
  //convert tags value to Nchar
  if (isTag) {
    pVal->type = TSDB_DATA_TYPE_NCHAR;
    pVal->length = len;
    pVal->value = calloc(pVal->length, 1);
    memcpy(pVal->value, value, pVal->length);
    return true;
  }

1705
  //integer number
1706 1707 1708 1709 1710 1711 1712 1713 1714 1715
  bool has_sign;
  if (isInteger(value, len, &has_sign)) {
    pVal->type = has_sign ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_UBIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 1] = '\0';
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
      return false;
    }
    return true;
  }
1716
  if (isTinyInt(value, len)) {
1717 1718 1719
    pVal->type = TSDB_DATA_TYPE_TINYINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 2] = '\0';
1720
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1721
      return false;
1722 1723 1724
    }
    return true;
  }
1725
  if (isTinyUint(value, len)) {
1726 1727 1728
    pVal->type = TSDB_DATA_TYPE_UTINYINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 2] = '\0';
1729
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1730 1731 1732 1733
      return false;
    }
    return true;
  }
1734
  if (isSmallInt(value, len)) {
1735 1736 1737
    pVal->type = TSDB_DATA_TYPE_SMALLINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1738
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1739 1740 1741 1742
      return false;
    }
    return true;
  }
1743
  if (isSmallUint(value, len)) {
1744 1745 1746
    pVal->type = TSDB_DATA_TYPE_USMALLINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1747
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1748 1749 1750 1751
      return false;
    }
    return true;
  }
1752
  if (isInt(value, len)) {
1753 1754 1755
    pVal->type = TSDB_DATA_TYPE_INT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1756
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1757 1758 1759 1760
      return false;
    }
    return true;
  }
1761
  if (isUint(value, len)) {
1762 1763 1764
    pVal->type = TSDB_DATA_TYPE_UINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1765
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1766 1767 1768 1769
      return false;
    }
    return true;
  }
1770
  if (isBigInt(value, len)) {
1771 1772 1773
    pVal->type = TSDB_DATA_TYPE_BIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1774
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1775 1776 1777 1778
      return false;
    }
    return true;
  }
1779
  if (isBigUint(value, len)) {
1780 1781 1782
    pVal->type = TSDB_DATA_TYPE_UBIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1783
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1784 1785 1786 1787
      return false;
    }
    return true;
  }
1788 1789 1790 1791 1792
  //floating number
  if (isFloat(value, len)) {
    pVal->type = TSDB_DATA_TYPE_FLOAT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1793
    if (!isValidFloat(value) || !convertStrToNumber(pVal, value, info)) {
1794 1795 1796 1797 1798 1799 1800 1801
      return false;
    }
    return true;
  }
  if (isDouble(value, len)) {
    pVal->type = TSDB_DATA_TYPE_DOUBLE;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1802
    if (!isValidFloat(value) || !convertStrToNumber(pVal, value, info)) {
1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833
      return false;
    }
    return true;
  }
  //binary
  if (isBinary(value, len)) {
    pVal->type = TSDB_DATA_TYPE_BINARY;
    pVal->length = len - 2;
    pVal->value = calloc(pVal->length, 1);
    //copy after "
    memcpy(pVal->value, value + 1, pVal->length);
    return true;
  }
  //nchar
  if (isNchar(value, len)) {
    pVal->type = TSDB_DATA_TYPE_NCHAR;
    pVal->length = len - 3;
    pVal->value = calloc(pVal->length, 1);
    //copy after L"
    memcpy(pVal->value, value + 2, pVal->length);
    return true;
  }
  //bool
  bool bVal;
  if (isBool(value, len, &bVal)) {
    pVal->type = TSDB_DATA_TYPE_BOOL;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->value = calloc(pVal->length, 1);
    memcpy(pVal->value, &bVal, pVal->length);
    return true;
  }
1834

1835 1836
  //Handle default(no appendix) type as DOUBLE
  if (isValidInteger(value) || isValidFloat(value)) {
1837
    pVal->type = TSDB_DATA_TYPE_DOUBLE;
G
Ganlin Zhao 已提交
1838
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
1839
    if (!convertStrToNumber(pVal, value, info)) {
1840 1841
      return false;
    }
G
Ganlin Zhao 已提交
1842 1843
    return true;
  }
1844
  return false;
1845
}
1846

1847
static int32_t getTimeStampValue(char *value, uint16_t len,
1848
                                 SMLTimeStampType type, int64_t *ts, SSmlLinesInfo* info) {
1849 1850

  //No appendix or no timestamp given (len = 0)
1851
  if (len != 0 && type != SML_TIME_STAMP_NOW) {
1852 1853 1854 1855 1856 1857
    *ts = (int64_t)strtoll(value, NULL, 10);
  } else {
    type = SML_TIME_STAMP_NOW;
  }
  switch (type) {
    case SML_TIME_STAMP_NOW: {
1858
      *ts = taosGetTimestampNs();
1859
      break;
1860
    }
1861 1862 1863 1864 1865 1866 1867 1868
    case SML_TIME_STAMP_HOURS: {
      *ts = (int64_t)(*ts * 3600 * 1e9);
      break;
    }
    case SML_TIME_STAMP_MINUTES: {
      *ts = (int64_t)(*ts * 60 * 1e9);
      break;
    }
1869
    case SML_TIME_STAMP_SECONDS: {
1870
      *ts = (int64_t)(*ts * 1e9);
1871
      break;
1872 1873
    }
    case SML_TIME_STAMP_MILLI_SECONDS: {
1874
      *ts = convertTimePrecision(*ts, TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_NANO);
1875
      break;
1876 1877
    }
    case SML_TIME_STAMP_MICRO_SECONDS: {
1878
      *ts = convertTimePrecision(*ts, TSDB_TIME_PRECISION_MICRO, TSDB_TIME_PRECISION_NANO);
1879
      break;
1880 1881
    }
    case SML_TIME_STAMP_NANO_SECONDS: {
1882
      *ts = *ts * 1;
1883 1884 1885
      break;
    }
    default: {
1886
      return TSDB_CODE_TSC_INVALID_TIME_STAMP;
1887
    }
1888
  }
1889
  return TSDB_CODE_SUCCESS;
1890 1891
}

1892 1893
int32_t convertSmlTimeStamp(TAOS_SML_KV *pVal, char *value,
                            uint16_t len, SSmlLinesInfo* info) {
1894
  int32_t ret;
1895
  SMLTimeStampType type = SML_TIME_STAMP_NOW;
1896
  int64_t tsVal;
1897

1898 1899 1900
  ret = isTimeStamp(value, len, &type, info);
  if (ret != TSDB_CODE_SUCCESS) {
    return ret;
1901 1902
  }

1903
  ret = getTimeStampValue(value, len, type, &tsVal, info);
1904
  if (ret != TSDB_CODE_SUCCESS) {
1905 1906
    return ret;
  }
1907
  tscDebug("SML:0x%"PRIx64"Timestamp after conversion:%"PRId64, info->id, tsVal);
1908 1909 1910 1911 1912 1913 1914 1915

  pVal->type = TSDB_DATA_TYPE_TIMESTAMP;
  pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
  pVal->value = calloc(pVal->length, 1);
  memcpy(pVal->value, &tsVal, pVal->length);
  return TSDB_CODE_SUCCESS;
}

1916
static int32_t parseSmlTimeStamp(TAOS_SML_KV **pTS, const char **index, SSmlLinesInfo* info) {
1917
  const char *start, *cur;
1918 1919
  int32_t ret = TSDB_CODE_SUCCESS;
  int len = 0;
1920
  char key[] = "ts";
1921
  char *value = NULL;
1922

1923
  start = cur = *index;
1924
  *pTS = calloc(1, sizeof(TAOS_SML_KV));
1925

1926
  while(*cur != '\0') {
1927 1928 1929 1930
    cur++;
    len++;
  }

1931
  if (len > 0) {
1932
    value = calloc(len + 1, 1);
1933 1934 1935
    memcpy(value, start, len);
  }

1936
  ret = convertSmlTimeStamp(*pTS, value, len, info);
1937
  if (ret) {
1938
    free(value);
1939 1940
    free(*pTS);
    return ret;
1941
  }
1942
  free(value);
1943

1944 1945 1946
  (*pTS)->key = calloc(sizeof(key), 1);
  memcpy((*pTS)->key, key, sizeof(key));
  return ret;
1947
}
1948

1949
bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info) {
1950
  char *val = NULL;
1951
  val = taosHashGet(pHash, key, strlen(key));
1952
  if (val) {
1953
    tscError("SML:0x%"PRIx64" Duplicate key detected:%s", info->id, key);
1954 1955 1956 1957
    return true;
  }

  uint8_t dummy_val = 0;
1958
  taosHashPut(pHash, key, strlen(key), &dummy_val, sizeof(uint8_t));
1959 1960 1961 1962

  return false;
}

1963
static int32_t parseSmlKey(TAOS_SML_KV *pKV, const char **index, SHashObj *pHash, SSmlLinesInfo* info) {
1964
  const char *cur = *index;
1965
  char key[TSDB_COL_NAME_LEN + 1];  // +1 to avoid key[len] over write
1966
  int16_t len = 0;
1967 1968

  while (*cur != '\0') {
1969
    if (len > TSDB_COL_NAME_LEN - 1) {
1970
      tscError("SML:0x%"PRIx64" Key field cannot exceeds %d characters", info->id, TSDB_COL_NAME_LEN - 1);
1971
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
1972 1973 1974 1975 1976 1977 1978
    }
    //unescaped '=' identifies a tag key
    if (*cur == '=' && *(cur - 1) != '\\') {
      break;
    }
    //Escape special character
    if (*cur == '\\') {
1979
      escapeSpecialCharacter(2, &cur);
1980
    }
1981 1982 1983 1984
    key[len] = *cur;
    cur++;
    len++;
  }
1985 1986 1987
  if (len == 0) {
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
  }
1988
  key[len] = '\0';
1989

1990
  if (checkDuplicateKey(key, pHash, info)) {
1991 1992 1993
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
  }

1994
  pKV->key = calloc(len + TS_ESCAPE_CHAR_SIZE + 1, 1);
1995
  memcpy(pKV->key, key, len + 1);
1996
  addEscapeCharToString(pKV->key, len);
1997
  tscDebug("SML:0x%"PRIx64" Key:%s|len:%d", info->id, pKV->key, len);
1998
  *index = cur + 1;
1999
  return TSDB_CODE_SUCCESS;
2000
}
2001

2002

2003
static int32_t parseSmlValue(TAOS_SML_KV *pKV, const char **index,
2004
                          bool *is_last_kv, SSmlLinesInfo* info, bool isTag) {
2005
  const char *start, *cur;
2006
  int32_t ret = TSDB_CODE_SUCCESS;
2007
  char *value = NULL;
2008
  int16_t len = 0;
2009
  bool searchQuote = false;
2010 2011
  start = cur = *index;

2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024
  //if field value is string
  if (!isTag) {
    if (*cur == '"') {
      searchQuote = true;
      cur += 1;
      len += 1;
    } else if (*cur == 'L' && *(cur + 1) == '"') {
      searchQuote = true;
      cur += 2;
      len += 2;
    }
  }

2025 2026
  while (1) {
    // unescaped ',' or ' ' or '\0' identifies a value
2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041
    if (((*cur == ',' || *cur == ' ' ) && *(cur - 1) != '\\') || *cur == '\0') {
      if (searchQuote == true) {
        //first quote ignored while searching
        if (*(cur - 1) == '"' && len != 1 && len != 2) {
          *is_last_kv = (*cur == ' ' || *cur == '\0') ? true : false;
          break;
        } else if (*cur == '\0') {
          ret = TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
          goto error;
        } else {
          cur++;
          len++;
          continue;
        }
      }
2042 2043
      //unescaped ' ' or '\0' indicates end of value
      *is_last_kv = (*cur == ' ' || *cur == '\0') ? true : false;
2044 2045 2046 2047 2048 2049
      if (*cur == ' ' && *(cur + 1) == ' ') {
        cur++;
        continue;
      } else {
        break;
      }
2050 2051 2052
    }
    //Escape special character
    if (*cur == '\\') {
2053
      escapeSpecialCharacter(isTag ? 2 : 3, &cur);
2054
      len++;
2055
    }
2056 2057 2058
    cur++;
    len++;
  }
2059 2060 2061 2062 2063
  if (len == 0) {
    free(pKV->key);
    pKV->key = NULL;
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
  }
2064

2065 2066 2067
  value = calloc(len + 1, 1);
  memcpy(value, start, len);
  value[len] = '\0';
2068
  if (!convertSmlValueType(pKV, value, len, info, isTag)) {
2069 2070
    tscError("SML:0x%"PRIx64" Failed to convert sml value string(%s) to any type",
            info->id, value);
2071
    free(value);
2072 2073
    ret = TSDB_CODE_TSC_INVALID_VALUE;
    goto error;
2074
  }
2075
  free(value);
2076

2077
  *index = (*cur == '\0') ? cur : cur + 1;
2078 2079 2080 2081 2082 2083 2084
  return ret;

error:
  //free previous alocated key field
  free(pKV->key);
  pKV->key = NULL;
  return ret;
2085 2086 2087
}

static int32_t parseSmlMeasurement(TAOS_SML_DATA_POINT *pSml, const char **index,
2088
                                   uint8_t *has_tags, SSmlLinesInfo* info) {
2089
  const char *cur = *index;
2090
  int16_t len = 0;
2091

2092
  pSml->stableName = calloc(TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE, 1);
2093 2094 2095
  if (pSml->stableName == NULL){
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
2096

2097
  while (*cur != '\0') {
2098
    if (len > TSDB_TABLE_NAME_LEN - 1) {
2099
      tscError("SML:0x%"PRIx64" Measurement field cannot exceeds %d characters", info->id, TSDB_TABLE_NAME_LEN - 1);
2100
      free(pSml->stableName);
2101
      pSml->stableName = NULL;
2102
      return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
2103 2104 2105 2106 2107 2108 2109 2110
    }
    //first unescaped comma or space identifies measurement
    //if space detected first, meaning no tag in the input
    if (*cur == ',' && *(cur - 1) != '\\') {
      *has_tags = 1;
      break;
    }
    if (*cur == ' ' && *(cur - 1) != '\\') {
2111 2112 2113 2114 2115 2116 2117
      if (*(cur + 1) != ' ') {
        break;
      }
      else {
        cur++;
        continue;
      }
2118 2119 2120 2121 2122
    }
    //Comma, Space, Backslash needs to be escaped if any
    if (*cur == '\\') {
      escapeSpecialCharacter(1, &cur);
    }
2123
    pSml->stableName[len] = *cur;
2124 2125 2126
    cur++;
    len++;
  }
2127 2128 2129 2130 2131
  if (len == 0) {
    free(pSml->stableName);
    pSml->stableName = NULL;
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
  }
2132
  addEscapeCharToString(pSml->stableName, len);
2133
  *index = cur + 1;
2134
  tscDebug("SML:0x%"PRIx64" Stable name in measurement:%s|len:%d", info->id, pSml->stableName, len);
2135 2136

  return TSDB_CODE_SUCCESS;
2137
}
2138

2139
//Table name can only contain digits(0-9),alphebet(a-z),underscore(_)
2140 2141
int32_t isValidChildTableName(const char *pTbName, int16_t len, SSmlLinesInfo* info) {
  if (len > TSDB_TABLE_NAME_LEN - 1) {
2142
    tscError("SML:0x%"PRIx64" child table name cannot exceeds %d characters", info->id, TSDB_TABLE_NAME_LEN - 1);
2143 2144
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
  }
2145 2146
  const char *cur = pTbName;
  for (int i = 0; i < len; ++i) {
2147
    if(!isdigit(cur[i]) && !isalpha(cur[i]) && (cur[i] != '_')) {
2148 2149 2150 2151 2152 2153 2154
      return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
    }
  }
  return TSDB_CODE_SUCCESS;
}


2155
static int32_t parseSmlKvPairs(TAOS_SML_KV **pKVs, int *num_kvs,
2156
                               const char **index, bool isField,
2157 2158
                               TAOS_SML_DATA_POINT* smlData, SHashObj *pHash,
                               SSmlLinesInfo* info) {
2159
  const char *cur = *index;
2160
  int32_t ret = TSDB_CODE_SUCCESS;
2161 2162
  TAOS_SML_KV *pkv;
  bool is_last_kv = false;
2163

2164
  int32_t capacity = 0;
2165
  if (isField) {
2166 2167 2168
    capacity = 64;
    *pKVs = calloc(capacity, sizeof(TAOS_SML_KV));
    // leave space for timestamp;
2169 2170
    pkv = *pKVs;
    pkv++;
2171 2172 2173
  } else {
    capacity = 8;
    *pKVs = calloc(capacity, sizeof(TAOS_SML_KV));
2174 2175
    pkv = *pKVs;
  }
2176

2177 2178 2179 2180 2181 2182 2183
  size_t childTableNameLen = strlen(tsSmlChildTableName);
  char childTableName[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE] = {0};
  if (childTableNameLen != 0) {
    memcpy(childTableName, tsSmlChildTableName, childTableNameLen);
    addEscapeCharToString(childTableName, (int32_t)(childTableNameLen));
  }

2184
  while (*cur != '\0') {
2185
    ret = parseSmlKey(pkv, &cur, pHash, info);
2186
    if (ret) {
2187
      tscError("SML:0x%"PRIx64" Unable to parse key", info->id);
2188 2189
      goto error;
    }
2190
    ret = parseSmlValue(pkv, &cur, &is_last_kv, info, !isField);
2191
    if (ret) {
2192
      tscError("SML:0x%"PRIx64" Unable to parse value", info->id);
2193 2194
      goto error;
    }
2195 2196

    if (!isField && childTableNameLen != 0 && strcasecmp(pkv->key, childTableName) == 0)  {
2197
      smlData->childTableName = malloc(pkv->length + TS_ESCAPE_CHAR_SIZE + 1);
2198
      memcpy(smlData->childTableName, pkv->value, pkv->length);
2199
      addEscapeCharToString(smlData->childTableName, (int32_t)pkv->length);
2200 2201 2202 2203 2204
      free(pkv->key);
      free(pkv->value);
    } else {
      *num_kvs += 1;
    }
2205
    if (is_last_kv) {
2206 2207 2208 2209 2210
      goto done;
    }

    //reallocate addtional memory for more kvs
    TAOS_SML_KV *more_kvs = NULL;
2211

2212
    if (isField) {
2213 2214
      if ((*num_kvs + 2) > capacity) {
        capacity *= 3; capacity /= 2;
2215 2216 2217
        more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV));
      } else {
        more_kvs = *pKVs;
2218
      }
2219
    } else {
2220 2221
      if ((*num_kvs + 1) > capacity) {
        capacity *= 3; capacity /= 2;
2222 2223 2224
        more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV));
      } else {
        more_kvs = *pKVs;
2225
      }
2226
    }
2227

2228
    if (!more_kvs) {
2229 2230
      goto error;
    }
2231 2232 2233 2234 2235 2236 2237 2238 2239
    *pKVs = more_kvs;
    //move pKV points to next TAOS_SML_KV block
    if (isField) {
      pkv = *pKVs + *num_kvs + 1;
    } else {
      pkv = *pKVs + *num_kvs;
    }
  }
  goto done;
2240

2241
error:
2242
  return ret;
2243
done:
2244
  *index = cur;
2245
  return ret;
2246
}
2247

2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260
static void moveTimeStampToFirstKv(TAOS_SML_DATA_POINT** smlData, TAOS_SML_KV *ts) {
  TAOS_SML_KV* tsField = (*smlData)->fields;
  tsField->length = ts->length;
  tsField->type = ts->type;
  tsField->value = malloc(ts->length);
  tsField->key = malloc(strlen(ts->key) + 1);
  memcpy(tsField->key, ts->key, strlen(ts->key) + 1);
  memcpy(tsField->value, ts->value, ts->length);
  (*smlData)->fieldNum = (*smlData)->fieldNum + 1;

  free(ts->key);
  free(ts->value);
  free(ts);
2261 2262
}

2263
int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLinesInfo* info) {
2264
  const char* index = sql;
2265
  int32_t ret = TSDB_CODE_SUCCESS;
2266 2267
  uint8_t has_tags = 0;
  TAOS_SML_KV *timestamp = NULL;
2268
  SHashObj *keyHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
2269

2270
  ret = parseSmlMeasurement(smlData, &index, &has_tags, info);
2271
  if (ret) {
2272
    tscError("SML:0x%"PRIx64" Unable to parse measurement", info->id);
2273
    taosHashCleanup(keyHashTable);
2274
    return ret;
2275
  }
2276
  tscDebug("SML:0x%"PRIx64" Parse measurement finished, has_tags:%d", info->id, has_tags);
2277 2278 2279

  //Parse Tags
  if (has_tags) {
2280
    ret = parseSmlKvPairs(&smlData->tags, &smlData->tagNum, &index, false, smlData, keyHashTable, info);
2281
    if (ret) {
2282
      tscError("SML:0x%"PRIx64" Unable to parse tag", info->id);
2283
      taosHashCleanup(keyHashTable);
2284 2285
      return ret;
    }
2286
  }
2287
  tscDebug("SML:0x%"PRIx64" Parse tags finished, num of tags:%d", info->id, smlData->tagNum);
2288 2289

  //Parse fields
2290
  ret = parseSmlKvPairs(&smlData->fields, &smlData->fieldNum, &index, true, smlData, keyHashTable, info);
2291
  if (ret) {
2292
    tscError("SML:0x%"PRIx64" Unable to parse field", info->id);
2293
    taosHashCleanup(keyHashTable);
2294
    return ret;
2295
  }
2296
  tscDebug("SML:0x%"PRIx64" Parse fields finished, num of fields:%d", info->id, smlData->fieldNum);
2297
  taosHashCleanup(keyHashTable);
2298

2299
  //Parse timestamp
2300
  ret = parseSmlTimeStamp(&timestamp, &index, info);
2301
  if (ret) {
2302
    tscError("SML:0x%"PRIx64" Unable to parse timestamp", info->id);
2303
    return ret;
2304
  }
2305
  moveTimeStampToFirstKv(&smlData, timestamp);
2306
  tscDebug("SML:0x%"PRIx64" Parse timestamp finished", info->id);
2307

2308
  return TSDB_CODE_SUCCESS;
2309 2310
}

2311
//=========================================================================
2312

S
shenglian zhou 已提交
2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327
void destroySmlDataPoint(TAOS_SML_DATA_POINT* point) {
  for (int i=0; i<point->tagNum; ++i) {
    free((point->tags+i)->key);
    free((point->tags+i)->value);
  }
  free(point->tags);
  for (int i=0; i<point->fieldNum; ++i) {
    free((point->fields+i)->key);
    free((point->fields+i)->value);
  }
  free(point->fields);
  free(point->stableName);
  free(point->childTableName);
}

S
shenglian zhou 已提交
2328
int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines, SSmlLinesInfo* info) {
2329
  for (int32_t i = 0; i < numLines; ++i) {
2330
    TAOS_SML_DATA_POINT point = {0};
2331
    int32_t code = tscParseLine(lines[i], &point, info);
2332
    if (code != TSDB_CODE_SUCCESS) {
S
shenglian zhou 已提交
2333
      tscError("SML:0x%"PRIx64" data point line parse failed. line %d : %s", info->id, i, lines[i]);
S
shenglian zhou 已提交
2334
      destroySmlDataPoint(&point);
2335
      return code;
2336
    } else {
S
shenglian zhou 已提交
2337
      tscDebug("SML:0x%"PRIx64" data point line parse success. line %d", info->id, i);
2338 2339
    }

2340 2341
    taosArrayPush(points, &point);
  }
2342
  return TSDB_CODE_SUCCESS;
2343 2344
}

2345
int taos_insert_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType protocol, SMLTimeStampType tsType, int *affectedRows) {
2346
  int32_t code = 0;
2347

2348
  SSmlLinesInfo* info = tcalloc(1, sizeof(SSmlLinesInfo));
S
shenglian zhou 已提交
2349
  info->id = genLinesSmlId();
2350 2351
  info->tsType = tsType;
  info->protocol = protocol;
S
shenglian zhou 已提交
2352

2353
  if (numLines <= 0 || numLines > 65536) {
S
shenglian zhou 已提交
2354
    tscError("SML:0x%"PRIx64" taos_insert_lines numLines should be between 1 and 65536. numLines: %d", info->id, numLines);
2355
    tfree(info);
2356 2357 2358 2359 2360 2361
    code = TSDB_CODE_TSC_APP_ERROR;
    return code;
  }

  for (int i = 0; i < numLines; ++i) {
    if (lines[i] == NULL) {
S
shenglian zhou 已提交
2362
      tscError("SML:0x%"PRIx64" taos_insert_lines line %d is NULL", info->id, i);
2363
      tfree(info);
2364 2365 2366 2367 2368
      code = TSDB_CODE_TSC_APP_ERROR;
      return code;
    }
  }

2369
  SArray* lpPoints = taosArrayInit(numLines, sizeof(TAOS_SML_DATA_POINT));
2370
  if (lpPoints == NULL) {
S
shenglian zhou 已提交
2371
    tscError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id);
2372
    tfree(info);
2373 2374
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
2375

S
shenglian zhou 已提交
2376 2377
  tscDebug("SML:0x%"PRIx64" taos_insert_lines begin inserting %d lines, first line: %s", info->id, numLines, lines[0]);
  code = tscParseLines(lines, numLines, lpPoints, NULL, info);
S
shenglian zhou 已提交
2378 2379
  size_t numPoints = taosArrayGetSize(lpPoints);

2380 2381
  if (code != 0) {
    goto cleanup;
2382 2383
  }

2384
  TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints);
2385
  code = tscSmlInsert(taos, points, (int)numPoints, info);
2386
  if (code != 0) {
S
shenglian zhou 已提交
2387
    tscError("SML:0x%"PRIx64" taos_sml_insert error: %s", info->id, tstrerror((code)));
2388
  }
2389 2390 2391
  if (affectedRows != NULL) {
    *affectedRows = info->affectedRows;
  }
S
Shenglian Zhou 已提交
2392

2393
cleanup:
S
shenglian zhou 已提交
2394
  tscDebug("SML:0x%"PRIx64" taos_insert_lines finish inserting %d lines. code: %d", info->id, numLines, code);
2395 2396
  points = TARRAY_GET_START(lpPoints);
  numPoints = taosArrayGetSize(lpPoints);
S
Shenglian Zhou 已提交
2397 2398 2399
  for (int i=0; i<numPoints; ++i) {
    destroySmlDataPoint(points+i);
  }
2400 2401

  taosArrayDestroy(lpPoints);
S
shenglian zhou 已提交
2402

2403
  tfree(info);
2404
  return code;
2405 2406
}

G
Ganlin Zhao 已提交
2407 2408
static int32_t convertPrecisionType(int precision, SMLTimeStampType *tsType) {
  switch (precision) {
G
Ganlin Zhao 已提交
2409
    case TSDB_SML_TIMESTAMP_NOT_CONFIGURED:
G
Ganlin Zhao 已提交
2410 2411
      *tsType = SML_TIME_STAMP_NOT_CONFIGURED;
      break;
G
Ganlin Zhao 已提交
2412
    case TSDB_SML_TIMESTAMP_HOURS:
G
Ganlin Zhao 已提交
2413 2414
      *tsType = SML_TIME_STAMP_HOURS;
      break;
G
Ganlin Zhao 已提交
2415
    case TSDB_SML_TIMESTAMP_MILLI_SECONDS:
G
Ganlin Zhao 已提交
2416 2417
      *tsType = SML_TIME_STAMP_MILLI_SECONDS;
      break;
G
Ganlin Zhao 已提交
2418
    case TSDB_SML_TIMESTAMP_NANO_SECONDS:
G
Ganlin Zhao 已提交
2419 2420
      *tsType = SML_TIME_STAMP_NANO_SECONDS;
      break;
G
Ganlin Zhao 已提交
2421
    case TSDB_SML_TIMESTAMP_MICRO_SECONDS:
G
Ganlin Zhao 已提交
2422 2423
      *tsType = SML_TIME_STAMP_MICRO_SECONDS;
      break;
G
Ganlin Zhao 已提交
2424
    case TSDB_SML_TIMESTAMP_SECONDS:
G
Ganlin Zhao 已提交
2425 2426
      *tsType = SML_TIME_STAMP_SECONDS;
      break;
G
Ganlin Zhao 已提交
2427
    case TSDB_SML_TIMESTAMP_MINUTES:
G
Ganlin Zhao 已提交
2428 2429 2430 2431
      *tsType = SML_TIME_STAMP_MINUTES;
      break;
    default:
      return TSDB_CODE_TSC_INVALID_PRECISION_TYPE;
2432 2433
  }

G
Ganlin Zhao 已提交
2434 2435 2436
  return TSDB_CODE_SUCCESS;
}

G
Ganlin Zhao 已提交
2437
//make a dummy SSqlObj
G
Ganlin Zhao 已提交
2438
static SSqlObj* createSmlQueryObj(TAOS* taos, int32_t affected_rows, int32_t code) {
G
Ganlin Zhao 已提交
2439 2440 2441
  SSqlObj *pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
  if (pNew == NULL) {
    return NULL;
2442
  }
G
Ganlin Zhao 已提交
2443
  pNew->signature = pNew;
G
Ganlin Zhao 已提交
2444
  pNew->pTscObj = taos;
2445
  pNew->fp = NULL;
2446

G
Ganlin Zhao 已提交
2447 2448
  tsem_init(&pNew->rspSem, 0, 0);
  registerSqlObj(pNew);
2449

G
Ganlin Zhao 已提交
2450 2451 2452
  pNew->res.numOfRows = affected_rows;
  pNew->res.code = code;

G
Ganlin Zhao 已提交
2453

G
Ganlin Zhao 已提交
2454
  return pNew;
2455 2456
}

G
Ganlin Zhao 已提交
2457

2458
/**
2459
 * taos_schemaless_insert() parse and insert data points into database according to
2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478
 * different protocol.
 *
 * @param $lines input array may contain multiple lines, each line indicates a data point.
 *               If protocol=2 is used input array should contain single JSON
 *               string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
 *               multiple data points in JSON format, should include them in $JSON_string
 *               as a JSON array.
 * @param $numLines indicates how many data points in $lines.
 *                  If protocol = 2 is used this param will be ignored as $lines should
 *                  contain single JSON string.
 * @param $protocol indicates which protocol to use for parsing:
 *                  0 - influxDB line protocol
 *                  1 - OpenTSDB telnet line protocol
 *                  2 - OpenTSDB JSON format protocol
 * @return return zero for successful insertion. Otherwise return none-zero error code of
 *         failure reason.
 *
 */

G
Ganlin Zhao 已提交
2479 2480 2481
TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) {
  int code = TSDB_CODE_SUCCESS;
  int affected_rows = 0;
2482
  SMLTimeStampType tsType = SML_TIME_STAMP_NOW;
2483

G
Ganlin Zhao 已提交
2484
  if (protocol == TSDB_SML_LINE_PROTOCOL) {
G
Ganlin Zhao 已提交
2485
    code = convertPrecisionType(precision, &tsType);
2486
    if (code != TSDB_CODE_SUCCESS) {
G
Ganlin Zhao 已提交
2487
      return NULL;
2488 2489 2490
    }
  }

2491
  switch (protocol) {
G
Ganlin Zhao 已提交
2492
    case TSDB_SML_LINE_PROTOCOL:
G
Ganlin Zhao 已提交
2493
      code = taos_insert_lines(taos, lines, numLines, protocol, tsType, &affected_rows);
2494
      break;
G
Ganlin Zhao 已提交
2495
    case TSDB_SML_TELNET_PROTOCOL:
G
Ganlin Zhao 已提交
2496
      code = taos_insert_telnet_lines(taos, lines, numLines, protocol, tsType, &affected_rows);
2497
      break;
G
Ganlin Zhao 已提交
2498
    case TSDB_SML_JSON_PROTOCOL:
G
Ganlin Zhao 已提交
2499
      code = taos_insert_json_payload(taos, *lines, protocol, tsType, &affected_rows);
2500 2501 2502 2503 2504 2505
      break;
    default:
      code = TSDB_CODE_TSC_INVALID_PROTOCOL_TYPE;
      break;
  }

2506

G
Ganlin Zhao 已提交
2507
  SSqlObj *pSql = createSmlQueryObj(taos, affected_rows, code);
G
Ganlin Zhao 已提交
2508 2509

  return (TAOS_RES*)pSql;
2510
}