tscParseLineProtocol.c 82.4 KB
Newer Older
S
shenglian zhou 已提交
1 2 3 4
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
5

S
shenglian zhou 已提交
6 7 8 9 10 11 12 13
#include "os.h"
#include "osString.h"
#include "ttype.h"
#include "tmd5.h"
#include "tstrbuild.h"
#include "tname.h"
#include "hash.h"
#include "tskiplist.h"
14

S
shenglian zhou 已提交
15
#include "tscUtil.h"
16 17
#include "tsclient.h"
#include "tscLog.h"
S
shenglian zhou 已提交
18

19
#include "taos.h"
20
#include "tscParseLine.h"
21

S
shenglian zhou 已提交
22
typedef struct  {
23
  char sTableName[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE];
S
shenglian zhou 已提交
24 25 26 27
  SHashObj* tagHash;
  SHashObj* fieldHash;
  SArray* tags; //SArray<SSchema>
  SArray* fields; //SArray<SSchema>
S
shenglian zhou 已提交
28
  uint8_t precision;
S
shenglian zhou 已提交
29 30
} SSmlSTableSchema;

31 32
//=================================================================================================

33 34
static uint64_t linesSmlHandleId = 0;

35 36 37 38
static int32_t insertChildTablePointsBatch(void* pVoid, char* name, char* name1, SArray* pArray, SArray* pArray1,
                                           SArray* pArray2, SArray* pArray3, size_t size, SSmlLinesInfo* info);
static int32_t doInsertChildTablePoints(void* pVoid, char* sql, char* name, SArray* pArray, SArray* pArray1,
                                        SSmlLinesInfo* info);
39 40 41 42 43 44 45 46 47 48
uint64_t genLinesSmlId() {
  uint64_t id;

  do {
    id = atomic_add_fetch_64(&linesSmlHandleId, 1);
  } while (id == 0);

  return id;
}

S
shenglian zhou 已提交
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
int compareSmlColKv(const void* p1, const void* p2) {
  TAOS_SML_KV* kv1 = (TAOS_SML_KV*)p1;
  TAOS_SML_KV* kv2 = (TAOS_SML_KV*)p2;
  int kvLen1 = (int)strlen(kv1->key);
  int kvLen2 = (int)strlen(kv2->key);
  int res = strncasecmp(kv1->key, kv2->key, MIN(kvLen1, kvLen2));
  if (res != 0) {
    return res;
  } else {
    return kvLen1-kvLen2;
  }
}

typedef enum {
  SCHEMA_ACTION_CREATE_STABLE,
  SCHEMA_ACTION_ADD_COLUMN,
  SCHEMA_ACTION_ADD_TAG,
  SCHEMA_ACTION_CHANGE_COLUMN_SIZE,
  SCHEMA_ACTION_CHANGE_TAG_SIZE,
} ESchemaAction;

typedef struct {
71
  char sTableName[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE];
S
shenglian zhou 已提交
72 73 74 75 76
  SArray* tags; //SArray<SSchema>
  SArray* fields; //SArray<SSchema>
} SCreateSTableActionInfo;

typedef struct {
77
  char sTableName[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE];
S
shenglian zhou 已提交
78 79 80 81 82 83 84 85 86 87 88
  SSchema* field;
} SAlterSTableActionInfo;

typedef struct {
  ESchemaAction action;
  union {
    SCreateSTableActionInfo createSTable;
    SAlterSTableActionInfo alterSTable;
  };
} SSchemaAction;

S
shenglian zhou 已提交
89
static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes, uint64_t id) {
S
shenglian zhou 已提交
90 91 92 93 94 95
  if (!IS_VAR_DATA_TYPE(kv->type)) {
    *bytes = tDataTypes[kv->type].bytes;
  } else {
    if (kv->type == TSDB_DATA_TYPE_NCHAR) {
      char* ucs = malloc(kv->length * TSDB_NCHAR_SIZE + 1);
      int32_t bytesNeeded = 0;
S
Shenglian Zhou 已提交
96 97 98
      bool succ = taosMbsToUcs4(kv->value, kv->length, ucs, kv->length * TSDB_NCHAR_SIZE, &bytesNeeded);
      if (!succ) {
        free(ucs);
S
shenglian zhou 已提交
99
        tscError("SML:0x%"PRIx64" convert nchar string to UCS4_LE failed:%s", id, kv->value);
S
Shenglian Zhou 已提交
100 101
        return TSDB_CODE_TSC_INVALID_VALUE;
      }
S
shenglian zhou 已提交
102 103 104 105 106 107 108 109 110
      free(ucs);
      *bytes =  bytesNeeded + VARSTR_HEADER_SIZE;
    } else if (kv->type == TSDB_DATA_TYPE_BINARY) {
      *bytes = kv->length + VARSTR_HEADER_SIZE;
    }
  }
  return 0;
}

S
shenglian zhou 已提交
111
static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* array, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
112
  SSchema* pField = NULL;
113 114
  size_t* pFieldIdx = taosHashGet(hash, smlKv->key, strlen(smlKv->key));
  size_t fieldIdx = -1;
S
Shenglian Zhou 已提交
115
  int32_t code = 0;
116 117 118
  if (pFieldIdx) {
    fieldIdx = *pFieldIdx;
    pField = taosArrayGet(array, fieldIdx);
S
shenglian zhou 已提交
119 120

    if (pField->type != smlKv->type) {
S
shenglian zhou 已提交
121
      tscError("SML:0x%"PRIx64" type mismatch. key %s, type %d. type before %d", info->id, smlKv->key, smlKv->type, pField->type);
S
Shenglian Zhou 已提交
122
      return TSDB_CODE_TSC_INVALID_VALUE;
S
shenglian zhou 已提交
123 124 125
    }

    int32_t bytes = 0;
S
shenglian zhou 已提交
126
    code = getFieldBytesFromSmlKv(smlKv, &bytes, info->id);
S
Shenglian Zhou 已提交
127 128 129
    if (code != 0) {
      return code;
    }
S
shenglian zhou 已提交
130 131 132
    pField->bytes = MAX(pField->bytes, bytes);

  } else {
S
shenglian zhou 已提交
133
    SSchema field = {0};
S
shenglian zhou 已提交
134 135 136 137 138 139
    size_t tagKeyLen = strlen(smlKv->key);
    strncpy(field.name, smlKv->key, tagKeyLen);
    field.name[tagKeyLen] = '\0';
    field.type = smlKv->type;

    int32_t bytes = 0;
S
shenglian zhou 已提交
140
    code = getFieldBytesFromSmlKv(smlKv, &bytes, info->id);
S
Shenglian Zhou 已提交
141 142 143
    if (code != 0) {
      return code;
    }
S
shenglian zhou 已提交
144 145 146
    field.bytes = bytes;

    pField = taosArrayPush(array, &field);
147 148
    fieldIdx = taosArrayGetSize(array) - 1;
    taosHashPut(hash, field.name, tagKeyLen, &fieldIdx, sizeof(fieldIdx));
S
shenglian zhou 已提交
149
  }
150

151
  smlKv->fieldSchemaIdx = (uint32_t)fieldIdx;
152 153 154 155

  return 0;
}

S
Shengliang Guan 已提交
156 157 158
static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen,
                                       SSmlLinesInfo* info) {
  tscDebug("SML:0x%"PRIx64" taos_sml_insert get child table name through md5", info->id);
159 160 161
  if (point->tagNum) {
    qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv);
  }
S
Shengliang Guan 已提交
162 163

  SStringBuilder sb; memset(&sb, 0, sizeof(sb));
164
  char sTableName[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE] = {0};
165 166
  strncpy(sTableName, point->stableName, strlen(point->stableName));
  //strtolower(sTableName, point->stableName);
S
Shengliang Guan 已提交
167 168 169 170
  taosStringBuilderAppendString(&sb, sTableName);
  for (int j = 0; j < point->tagNum; ++j) {
    taosStringBuilderAppendChar(&sb, ',');
    TAOS_SML_KV* tagKv = point->tags + j;
171
    char tagName[TSDB_COL_NAME_LEN + TS_ESCAPE_CHAR_SIZE] = {0};
172 173
    strncpy(tagName, tagKv->key, strlen(tagKv->key));
    //strtolower(tagName, tagKv->key);
S
Shengliang Guan 已提交
174 175 176 177 178 179 180 181 182 183
    taosStringBuilderAppendString(&sb, tagName);
    taosStringBuilderAppendChar(&sb, '=');
    taosStringBuilderAppend(&sb, tagKv->value, tagKv->length);
  }
  size_t len = 0;
  char* keyJoined = taosStringBuilderGetResult(&sb, &len);
  MD5_CTX context;
  MD5Init(&context);
  MD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len);
  MD5Final(&context);
184 185
  uint64_t digest1 = *(uint64_t*)(context.digest);
  uint64_t digest2 = *(uint64_t*)(context.digest + 8);
S
Shengliang Guan 已提交
186
  *tableNameLen = snprintf(tableName, *tableNameLen,
S
shenglian zhou 已提交
187
                           "t_%016"PRIx64"%016"PRIx64, digest1, digest2);
S
Shengliang Guan 已提交
188 189 190 191 192
  taosStringBuilderDestroy(&sb);
  tscDebug("SML:0x%"PRIx64" child table name: %s", info->id, tableName);
  return 0;
}

193 194 195 196 197 198 199 200 201 202 203
static int32_t buildSmlChildTableName(TAOS_SML_DATA_POINT* point, SSmlLinesInfo* info) {
  tscDebug("SML:0x%"PRIx64" taos_sml_insert build child table name", info->id);
  char childTableName[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE];
  int32_t tableNameLen = TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE;
  getSmlMd5ChildTableName(point, childTableName, &tableNameLen, info);
  point->childTableName = calloc(1, tableNameLen+1);
  strncpy(point->childTableName, childTableName, tableNameLen);
  point->childTableName[tableNameLen] = '\0';
  return 0;
}

S
shenglian zhou 已提交
204
static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, SArray* stableSchemas, SSmlLinesInfo* info) {
S
Shenglian Zhou 已提交
205
  int32_t code = 0;
S
shenglian zhou 已提交
206 207 208 209 210 211
  SHashObj* sname2shema = taosHashInit(32,
                                       taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);

  for (int i = 0; i < numPoint; ++i) {
    TAOS_SML_DATA_POINT* point = &points[i];
    size_t stableNameLen = strlen(point->stableName);
212
    size_t* pStableIdx = taosHashGet(sname2shema, point->stableName, stableNameLen);
S
shenglian zhou 已提交
213
    SSmlSTableSchema* pStableSchema = NULL;
214 215 216 217
    size_t stableIdx = -1;
    if (pStableIdx) {
      pStableSchema= taosArrayGet(stableSchemas, *pStableIdx);
      stableIdx = *pStableIdx;
S
shenglian zhou 已提交
218 219 220 221 222 223
    } else {
      SSmlSTableSchema schema;
      strncpy(schema.sTableName, point->stableName, stableNameLen);
      schema.sTableName[stableNameLen] = '\0';
      schema.fields = taosArrayInit(64, sizeof(SSchema));
      schema.tags = taosArrayInit(8, sizeof(SSchema));
224 225
      schema.tagHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
      schema.fieldHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
S
shenglian zhou 已提交
226 227

      pStableSchema = taosArrayPush(stableSchemas, &schema);
228 229
      stableIdx = taosArrayGetSize(stableSchemas) - 1;
      taosHashPut(sname2shema, schema.sTableName, stableNameLen, &stableIdx, sizeof(size_t));
S
shenglian zhou 已提交
230 231 232 233
    }

    for (int j = 0; j < point->tagNum; ++j) {
      TAOS_SML_KV* tagKv = point->tags + j;
234
      if (!point->childTableName) {
235
        buildSmlChildTableName(point, info);
236 237
      }

S
shenglian zhou 已提交
238
      code = buildSmlKvSchema(tagKv, pStableSchema->tagHash, pStableSchema->tags, info);
S
Shenglian Zhou 已提交
239
      if (code != 0) {
S
shenglian zhou 已提交
240
        tscError("SML:0x%"PRIx64" build data point schema failed. point no.: %d, tag key: %s", info->id, i, tagKv->key);
S
Shenglian Zhou 已提交
241 242
        return code;
      }
S
shenglian zhou 已提交
243 244
    }

245 246 247 248 249
    //for Line Protocol tags may be omitted, add a tag with NULL value
    if (point->tagNum == 0) {
      if (!point->childTableName) {
        buildSmlChildTableName(point, info);
      }
250
      char tagNullName[TSDB_COL_NAME_LEN] = {0};
G
Ganlin Zhao 已提交
251
      size_t nameLen = strlen(tsSmlTagNullName);
252
      strncpy(tagNullName, tsSmlTagNullName, nameLen);
G
Ganlin Zhao 已提交
253
      addEscapeCharToString(tagNullName, (int32_t)nameLen);
254
      size_t* pTagNullIdx = taosHashGet(pStableSchema->tagHash, tagNullName, nameLen + TS_ESCAPE_CHAR_SIZE);
255 256 257
      if (!pTagNullIdx) {
        SSchema tagNull = {0};
        tagNull.type  = TSDB_DATA_TYPE_NCHAR;
258 259
        tagNull.bytes = TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
        strncpy(tagNull.name, tagNullName, nameLen + TS_ESCAPE_CHAR_SIZE);
260 261
        taosArrayPush(pStableSchema->tags, &tagNull);
        size_t tagNullIdx = taosArrayGetSize(pStableSchema->tags) - 1;
262
        taosHashPut(pStableSchema->tagHash, tagNull.name, nameLen + TS_ESCAPE_CHAR_SIZE, &tagNullIdx, sizeof(tagNullIdx));
263
      }
264 265
    }

S
shenglian zhou 已提交
266 267
    for (int j = 0; j < point->fieldNum; ++j) {
      TAOS_SML_KV* fieldKv = point->fields + j;
S
shenglian zhou 已提交
268
      code = buildSmlKvSchema(fieldKv, pStableSchema->fieldHash, pStableSchema->fields, info);
S
Shenglian Zhou 已提交
269
      if (code != 0) {
S
shenglian zhou 已提交
270
        tscError("SML:0x%"PRIx64" build data point schema failed. point no.: %d, tag key: %s", info->id, i, fieldKv->key);
S
Shenglian Zhou 已提交
271 272
        return code;
      }
S
shenglian zhou 已提交
273 274
    }

275
    point->schemaIdx = (uint32_t)stableIdx;
S
shenglian zhou 已提交
276 277 278 279 280 281 282 283 284 285
  }

  size_t numStables = taosArrayGetSize(stableSchemas);
  for (int32_t i = 0; i < numStables; ++i) {
    SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i);
    taosHashCleanup(schema->tagHash);
    taosHashCleanup(schema->fieldHash);
  }
  taosHashCleanup(sname2shema);

S
shenglian zhou 已提交
286
  tscDebug("SML:0x%"PRIx64" build point schema succeed. num of super table: %zu", info->id, numStables);
S
Shenglian Zhou 已提交
287 288 289 290 291 292
  for (int32_t i = 0; i < numStables; ++i) {
    SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i);
    tscDebug("\ttable name: %s, tags number: %zu, fields number: %zu", schema->sTableName,
             taosArrayGetSize(schema->tags), taosArrayGetSize(schema->fields));
  }

S
shenglian zhou 已提交
293 294 295
  return 0;
}

296
static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, SArray* dbAttrArray, bool isTag, char sTableName[],
S
shenglian zhou 已提交
297
                                       SSchemaAction* action, bool* actionNeeded, SSmlLinesInfo* info) {
298 299
  char fieldName[TSDB_COL_NAME_LEN + TS_ESCAPE_CHAR_SIZE] = {0};
  strcpy(fieldName, pointColField->name);
300

301
  size_t* pDbIndex = taosHashGet(dbAttrHash, fieldName, strlen(fieldName));
302 303 304
  if (pDbIndex) {
    SSchema* dbAttr = taosArrayGet(dbAttrArray, *pDbIndex);
    assert(strcasecmp(dbAttr->name, pointColField->name) == 0);
S
shenglian zhou 已提交
305
    if (pointColField->type != dbAttr->type) {
S
shenglian zhou 已提交
306
      tscError("SML:0x%"PRIx64" point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id, pointColField->name,
S
Shenglian Zhou 已提交
307 308
               pointColField->type, dbAttr->type);
      return TSDB_CODE_TSC_INVALID_VALUE;
S
shenglian zhou 已提交
309 310 311 312 313 314 315 316 317
    }

    if (IS_VAR_DATA_TYPE(pointColField->type) && (pointColField->bytes > dbAttr->bytes)) {
      if (isTag) {
        action->action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
      } else {
        action->action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
      }
      memset(&action->alterSTable, 0,  sizeof(SAlterSTableActionInfo));
318
      memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE);
S
shenglian zhou 已提交
319 320 321 322 323 324 325 326 327 328
      action->alterSTable.field = pointColField;
      *actionNeeded = true;
    }
  } else {
    if (isTag) {
      action->action = SCHEMA_ACTION_ADD_TAG;
    } else {
      action->action = SCHEMA_ACTION_ADD_COLUMN;
    }
    memset(&action->alterSTable, 0, sizeof(SAlterSTableActionInfo));
329
    memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE);
S
shenglian zhou 已提交
330 331 332
    action->alterSTable.field = pointColField;
    *actionNeeded = true;
  }
S
shenglian zhou 已提交
333
  if (*actionNeeded) {
334
    tscDebug("SML:0x%" PRIx64 " generate schema action. column name: %s, action: %d", info->id, fieldName,
S
shenglian zhou 已提交
335 336
             action->action);
  }
S
shenglian zhou 已提交
337 338 339
  return 0;
}

S
shenglian zhou 已提交
340
static int32_t buildColumnDescription(SSchema* field,
S
shenglian zhou 已提交
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360
                               char* buf, int32_t bufSize, int32_t* outBytes) {
  uint8_t type = field->type;

  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
    int32_t bytes = field->bytes - VARSTR_HEADER_SIZE;
    if (type == TSDB_DATA_TYPE_NCHAR) {
      bytes =  bytes/TSDB_NCHAR_SIZE;
    }
    int out = snprintf(buf, bufSize,"%s %s(%d)",
                       field->name,tDataTypes[field->type].name, bytes);
    *outBytes = out;
  } else {
    int out = snprintf(buf, bufSize, "%s %s",
                       field->name, tDataTypes[type].name);
    *outBytes = out;
  }

  return 0;
}

S
shenglian zhou 已提交
361

S
shenglian zhou 已提交
362
static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
363 364
  int32_t code = 0;
  int32_t outBytes = 0;
365 366
  char *result = (char *)calloc(1, tsMaxSQLStringLen+1);
  int32_t capacity = tsMaxSQLStringLen +  1;
S
shenglian zhou 已提交
367

S
shenglian zhou 已提交
368
  tscDebug("SML:0x%"PRIx64" apply schema action. action: %d", info->id, action->action);
S
shenglian zhou 已提交
369 370 371 372 373 374
  switch (action->action) {
    case SCHEMA_ACTION_ADD_COLUMN: {
      int n = sprintf(result, "alter stable %s add column ", action->alterSTable.sTableName);
      buildColumnDescription(action->alterSTable.field, result+n, capacity-n, &outBytes);
      TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
      code = taos_errno(res);
S
shenglian zhou 已提交
375 376 377
      char* errStr = taos_errstr(res);
      char* begin = strstr(errStr, "duplicated column names");
      bool tscDupColNames = (begin != NULL);
S
shenglian zhou 已提交
378
      if (code != TSDB_CODE_SUCCESS) {
S
shenglian zhou 已提交
379
        tscError("SML:0x%"PRIx64" apply schema action. error: %s", info->id, errStr);
S
shenglian zhou 已提交
380
      }
381 382
      taos_free_result(res);

S
shenglian zhou 已提交
383
      if (code == TSDB_CODE_MND_FIELD_ALREAY_EXIST || code == TSDB_CODE_MND_TAG_ALREAY_EXIST || tscDupColNames) {
384 385
        TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
        code = taos_errno(res2);
386 387 388
        if (code != TSDB_CODE_SUCCESS) {
          tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
389
        taos_free_result(res2);
390
        taosMsleep(500);
391
      }
S
shenglian zhou 已提交
392 393 394 395 396 397 398 399
      break;
    }
    case SCHEMA_ACTION_ADD_TAG: {
      int n = sprintf(result, "alter stable %s add tag ", action->alterSTable.sTableName);
      buildColumnDescription(action->alterSTable.field,
                             result+n, capacity-n, &outBytes);
      TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
      code = taos_errno(res);
S
shenglian zhou 已提交
400 401 402
      char* errStr = taos_errstr(res);
      char* begin = strstr(errStr, "duplicated column names");
      bool tscDupColNames = (begin != NULL);
S
shenglian zhou 已提交
403 404 405
      if (code != TSDB_CODE_SUCCESS) {
        tscError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
      }
406 407
      taos_free_result(res);

S
shenglian zhou 已提交
408
      if (code == TSDB_CODE_MND_TAG_ALREAY_EXIST || code == TSDB_CODE_MND_FIELD_ALREAY_EXIST || tscDupColNames) {
409 410
        TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
        code = taos_errno(res2);
411 412 413
        if (code != TSDB_CODE_SUCCESS) {
          tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
414
        taos_free_result(res2);
415
        taosMsleep(500);
416
      }
S
shenglian zhou 已提交
417 418 419 420 421 422 423 424
      break;
    }
    case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: {
      int n = sprintf(result, "alter stable %s modify column ", action->alterSTable.sTableName);
      buildColumnDescription(action->alterSTable.field, result+n,
                             capacity-n, &outBytes);
      TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
      code = taos_errno(res);
S
shenglian zhou 已提交
425 426 427
      if (code != TSDB_CODE_SUCCESS) {
        tscError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
      }
428 429
      taos_free_result(res);

430
      if (code == TSDB_CODE_MND_INVALID_COLUMN_LENGTH || code == TSDB_CODE_TSC_INVALID_COLUMN_LENGTH) {
431 432
        TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
        code = taos_errno(res2);
433 434 435
        if (code != TSDB_CODE_SUCCESS) {
          tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
436
        taos_free_result(res2);
437
        taosMsleep(500);
438
      }
439
      break;
S
shenglian zhou 已提交
440 441 442 443 444 445 446
    }
    case SCHEMA_ACTION_CHANGE_TAG_SIZE: {
      int n = sprintf(result, "alter stable %s modify tag ", action->alterSTable.sTableName);
      buildColumnDescription(action->alterSTable.field, result+n,
                             capacity-n, &outBytes);
      TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
      code = taos_errno(res);
S
shenglian zhou 已提交
447 448 449
      if (code != TSDB_CODE_SUCCESS) {
        tscError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
      }
450 451
      taos_free_result(res);

452
      if (code == TSDB_CODE_MND_INVALID_TAG_LENGTH || code == TSDB_CODE_TSC_INVALID_TAG_LENGTH) {
453 454
        TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
        code = taos_errno(res2);
455 456 457
        if (code != TSDB_CODE_SUCCESS) {
          tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
458
        taos_free_result(res2);
459
        taosMsleep(500);
460
      }
S
shenglian zhou 已提交
461 462 463 464 465
      break;
    }
    case SCHEMA_ACTION_CREATE_STABLE: {
      int n = sprintf(result, "create stable %s (", action->createSTable.sTableName);
      char* pos = result + n; int freeBytes = capacity - n;
S
shenglian zhou 已提交
466
      size_t numCols = taosArrayGetSize(action->createSTable.fields);
S
shenglian zhou 已提交
467 468 469 470 471 472 473
      for (int32_t i = 0; i < numCols; ++i) {
        SSchema* field = taosArrayGet(action->createSTable.fields, i);
        buildColumnDescription(field, pos, freeBytes, &outBytes);
        pos += outBytes; freeBytes -= outBytes;
        *pos = ','; ++pos; --freeBytes;
      }
      --pos; ++freeBytes;
474

S
shenglian zhou 已提交
475 476
      outBytes = snprintf(pos, freeBytes, ") tags (");
      pos += outBytes; freeBytes -= outBytes;
477

S
shenglian zhou 已提交
478
      size_t numTags = taosArrayGetSize(action->createSTable.tags);
S
shenglian zhou 已提交
479 480 481 482 483 484 485 486 487 488
      for (int32_t i = 0; i < numTags; ++i) {
        SSchema* field = taosArrayGet(action->createSTable.tags, i);
        buildColumnDescription(field, pos, freeBytes, &outBytes);
        pos += outBytes; freeBytes -= outBytes;
        *pos = ','; ++pos; --freeBytes;
      }
      pos--; ++freeBytes;
      outBytes = snprintf(pos, freeBytes, ")");
      TAOS_RES* res = taos_query(taos, result);
      code = taos_errno(res);
S
shenglian zhou 已提交
489 490 491
      if (code != TSDB_CODE_SUCCESS) {
        tscError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
      }
492 493
      taos_free_result(res);

494 495 496
      if (code == TSDB_CODE_MND_TABLE_ALREADY_EXIST) {
        TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
        code = taos_errno(res2);
497 498 499
        if (code != TSDB_CODE_SUCCESS) {
          tscError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
500
        taos_free_result(res2);
501
        taosMsleep(500);
502
      }
S
shenglian zhou 已提交
503 504
      break;
    }
S
shenglian zhou 已提交
505

S
shenglian zhou 已提交
506 507 508
    default:
      break;
  }
S
Shenglian Zhou 已提交
509

S
shenglian zhou 已提交
510
  free(result);
S
Shenglian Zhou 已提交
511
  if (code != 0) {
S
shenglian zhou 已提交
512
    tscError("SML:0x%"PRIx64 " apply schema action failure. %s", info->id, tstrerror(code));
S
Shenglian Zhou 已提交
513
  }
S
shenglian zhou 已提交
514 515 516
  return code;
}

517
static int32_t destroySmlSTableSchema(SSmlSTableSchema* schema) {
S
shenglian zhou 已提交
518 519 520 521 522 523 524
  taosHashCleanup(schema->tagHash);
  taosHashCleanup(schema->fieldHash);
  taosArrayDestroy(schema->tags);
  taosArrayDestroy(schema->fields);
  return 0;
}

525
static int32_t fillDbSchema(STableMeta* tableMeta, char* tableName, SSmlSTableSchema* schema, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
526 527 528 529 530 531 532 533 534 535
  schema->tags = taosArrayInit(8, sizeof(SSchema));
  schema->fields = taosArrayInit(64, sizeof(SSchema));
  schema->tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
  schema->fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);

  tstrncpy(schema->sTableName, tableName, strlen(tableName)+1);
  schema->precision = tableMeta->tableInfo.precision;
  for (int i=0; i<tableMeta->tableInfo.numOfColumns; ++i) {
    SSchema field;
    tstrncpy(field.name, tableMeta->schema[i].name, strlen(tableMeta->schema[i].name)+1);
536
    addEscapeCharToString(field.name, (int16_t)strlen(field.name));
S
shenglian zhou 已提交
537 538
    field.type = tableMeta->schema[i].type;
    field.bytes = tableMeta->schema[i].bytes;
539 540 541
    taosArrayPush(schema->fields, &field);
    size_t fieldIndex = taosArrayGetSize(schema->fields) - 1;
    taosHashPut(schema->fieldHash, field.name, strlen(field.name), &fieldIndex, sizeof(fieldIndex));
S
shenglian zhou 已提交
542 543 544 545 546 547
  }

  for (int i=0; i<tableMeta->tableInfo.numOfTags; ++i) {
    int j = i + tableMeta->tableInfo.numOfColumns;
    SSchema field;
    tstrncpy(field.name, tableMeta->schema[j].name, strlen(tableMeta->schema[j].name)+1);
548
    addEscapeCharToString(field.name, (int16_t)strlen(field.name));
S
shenglian zhou 已提交
549 550
    field.type = tableMeta->schema[j].type;
    field.bytes = tableMeta->schema[j].bytes;
551 552 553
    taosArrayPush(schema->tags, &field);
    size_t tagIndex = taosArrayGetSize(schema->tags) - 1;
    taosHashPut(schema->tagHash, field.name, strlen(field.name), &tagIndex, sizeof(tagIndex));
S
shenglian zhou 已提交
554
  }
555
  tscDebug("SML:0x%"PRIx64 " load table schema succeed. table name: %s, columns number: %d, tag number: %d, precision: %d",
S
shenglian zhou 已提交
556
           info->id, tableName, tableMeta->tableInfo.numOfColumns, tableMeta->tableInfo.numOfTags, schema->precision);
557 558 559
  return TSDB_CODE_SUCCESS;
}

560 561
static int32_t getSuperTableMetaFromLocalCache(TAOS* taos, char* tableName, STableMeta** outTableMeta, SSmlLinesInfo* info) {
  int32_t     code = 0;
562 563
  STableMeta* tableMeta = NULL;

564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586
  SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
  if (pSql == NULL) {
    tscError("SML:0x%" PRIx64 " failed to allocate memory, reason:%s", info->id, strerror(errno));
    code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return code;
  }
  pSql->pTscObj = taos;
  pSql->signature = pSql;
  pSql->fp = NULL;

  registerSqlObj(pSql);
  char tableNameBuf[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE] = {0};
  memcpy(tableNameBuf, tableName, strlen(tableName));
  SStrToken tableToken = {.z = tableNameBuf, .n = (uint32_t)strlen(tableName), .type = TK_ID};
  tGetToken(tableNameBuf, &tableToken.type);
  bool dbIncluded = false;
  // Check if the table name available or not
  if (tscValidateName(&tableToken, true, &dbIncluded) != TSDB_CODE_SUCCESS) {
    code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
    sprintf(pSql->cmd.payload, "table name is invalid");
    taosReleaseRef(tscObjRef, pSql->self);
    return code;
  }
587

588 589
  SName sname = {0};
  if ((code = tscSetTableFullName(&sname, &tableToken, pSql, dbIncluded)) != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
590
    taosReleaseRef(tscObjRef, pSql->self);
591
    return code;
592 593
  }

594 595 596 597 598 599 600 601 602 603 604 605
  char fullTableName[TSDB_TABLE_FNAME_LEN] = {0};
  memset(fullTableName, 0, tListLen(fullTableName));
  tNameExtractFullName(&sname, fullTableName);

  size_t size = 0;
  taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), fullTableName, strlen(fullTableName), NULL, (void**)&tableMeta, &size);

  STableMeta* stableMeta = tableMeta;
  if (tableMeta != NULL && tableMeta->tableType == TSDB_CHILD_TABLE) {
      taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), tableMeta->sTableName, strlen(tableMeta->sTableName), NULL,
                          (void**)stableMeta, &size);
  }
606
  taosReleaseRef(tscObjRef, pSql->self);
607 608 609 610 611 612

  if (stableMeta != tableMeta) {
    free(tableMeta);
  }

  if (stableMeta != NULL) {
613
    if (outTableMeta != NULL) {
614
      *outTableMeta = stableMeta;
615
    } else {
616
      free(stableMeta);
617
    }
618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635
    return TSDB_CODE_SUCCESS;
  } else {
    return TSDB_CODE_TSC_NO_META_CACHED;
  }
}

static int32_t retrieveTableMeta(TAOS* taos, char* tableName, STableMeta** pTableMeta, SSmlLinesInfo* info) {
  int32_t code = 0;
  int32_t retries = 0;
  STableMeta* tableMeta = NULL;
  while (retries++ <= TSDB_MAX_REPLICA && tableMeta == NULL) {
    STscObj* pObj = (STscObj*)taos;
    if (pObj == NULL || pObj->signature != pObj) {
      terrno = TSDB_CODE_TSC_DISCONNECTED;
      return TSDB_CODE_TSC_DISCONNECTED;
    }

    tscDebug("SML:0x%" PRIx64 " retrieve table meta. super table name: %s", info->id, tableName);
636
    code = getSuperTableMetaFromLocalCache(taos, tableName, &tableMeta, info);
637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655
    if (code == TSDB_CODE_SUCCESS) {
      tscDebug("SML:0x%" PRIx64 " successfully retrieved table meta. super table name: %s", info->id, tableName);
      break;
    } else if (code == TSDB_CODE_TSC_NO_META_CACHED) {
      char sql[256];
      snprintf(sql, 256, "describe %s", tableName);
      TAOS_RES* res = taos_query(taos, sql);
      code = taos_errno(res);
      if (code != 0) {
        tscError("SML:0x%" PRIx64 " describe table failure. %s", info->id, taos_errstr(res));
        taos_free_result(res);
        return code;
      }
      taos_free_result(res);
    } else {
      return code;
    }
  }

656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675
  if (tableMeta != NULL) {
    *pTableMeta = tableMeta;
    return TSDB_CODE_SUCCESS;
  } else {
    tscError("SML:0x%" PRIx64 " failed to retrieve table meta. super table name: %s", info->id, tableName);
    return TSDB_CODE_TSC_NO_META_CACHED;
  }
}

static int32_t loadTableSchemaFromDB(TAOS* taos, char* tableName, SSmlSTableSchema* schema, SSmlLinesInfo* info) {
  int32_t code = 0;
  STableMeta* tableMeta = NULL;
  code = retrieveTableMeta(taos, tableName, &tableMeta, info);
  if (code == TSDB_CODE_SUCCESS) {
    assert(tableMeta != NULL);
    fillDbSchema(tableMeta, tableName, schema, info);
    free(tableMeta);
    tableMeta = NULL;
  }

S
shenglian zhou 已提交
676 677 678
  return code;
}

S
shenglian zhou 已提交
679
static int32_t modifyDBSchemas(TAOS* taos, SArray* stableSchemas, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
680 681 682 683
  int32_t code = 0;
  size_t numStable = taosArrayGetSize(stableSchemas);
  for (int i = 0; i < numStable; ++i) {
    SSmlSTableSchema* pointSchema = taosArrayGet(stableSchemas, i);
S
shenglian zhou 已提交
684 685
    SSmlSTableSchema  dbSchema;
    memset(&dbSchema, 0, sizeof(SSmlSTableSchema));
S
shenglian zhou 已提交
686

687
    code = loadTableSchemaFromDB(taos, pointSchema->sTableName, &dbSchema, info);
S
shenglian zhou 已提交
688 689 690 691
    if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) {
      SSchemaAction schemaAction = {0};
      schemaAction.action = SCHEMA_ACTION_CREATE_STABLE;
      memset(&schemaAction.createSTable, 0, sizeof(SCreateSTableActionInfo));
G
Ganlin Zhao 已提交
692
      memcpy(schemaAction.createSTable.sTableName, pointSchema->sTableName, TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE);
S
shenglian zhou 已提交
693 694
      schemaAction.createSTable.tags = pointSchema->tags;
      schemaAction.createSTable.fields = pointSchema->fields;
S
shenglian zhou 已提交
695
      applySchemaAction(taos, &schemaAction, info);
696
      code = loadTableSchemaFromDB(taos, pointSchema->sTableName, &dbSchema, info);
S
Shenglian Zhou 已提交
697
      if (code != 0) {
S
shenglian zhou 已提交
698
        tscError("SML:0x%"PRIx64" reconcile point schema failed. can not create %s", info->id, pointSchema->sTableName);
699
        return code;
S
Shenglian Zhou 已提交
700
      }
701 702 703 704
    }

    if (code == TSDB_CODE_SUCCESS) {
      pointSchema->precision = dbSchema.precision;
S
shenglian zhou 已提交
705 706 707 708 709 710 711 712 713 714
      size_t pointTagSize = taosArrayGetSize(pointSchema->tags);
      size_t pointFieldSize = taosArrayGetSize(pointSchema->fields);

      SHashObj* dbTagHash = dbSchema.tagHash;
      SHashObj* dbFieldHash = dbSchema.fieldHash;

      for (int j = 0; j < pointTagSize; ++j) {
        SSchema* pointTag = taosArrayGet(pointSchema->tags, j);
        SSchemaAction schemaAction = {0};
        bool actionNeeded = false;
S
shenglian zhou 已提交
715 716
        generateSchemaAction(pointTag, dbTagHash, dbSchema.tags, true, pointSchema->sTableName,
                             &schemaAction, &actionNeeded, info);
S
shenglian zhou 已提交
717
        if (actionNeeded) {
S
shenglian zhou 已提交
718
          code = applySchemaAction(taos, &schemaAction, info);
719 720 721 722
          if (code != 0) {
            destroySmlSTableSchema(&dbSchema);
            return code;
          }
S
shenglian zhou 已提交
723 724 725 726 727
        }
      }

      SSchema* pointColTs = taosArrayGet(pointSchema->fields, 0);
      SSchema* dbColTs = taosArrayGet(dbSchema.fields, 0);
728
      memcpy(pointColTs->name, dbColTs->name, TSDB_COL_NAME_LEN + TS_ESCAPE_CHAR_SIZE);
S
shenglian zhou 已提交
729 730 731 732 733

      for (int j = 1; j < pointFieldSize; ++j) {
        SSchema* pointCol = taosArrayGet(pointSchema->fields, j);
        SSchemaAction schemaAction = {0};
        bool actionNeeded = false;
S
shenglian zhou 已提交
734 735
        generateSchemaAction(pointCol, dbFieldHash, dbSchema.fields,false, pointSchema->sTableName,
                             &schemaAction, &actionNeeded, info);
S
shenglian zhou 已提交
736
        if (actionNeeded) {
S
shenglian zhou 已提交
737
          code = applySchemaAction(taos, &schemaAction, info);
738 739 740 741
          if (code != 0) {
            destroySmlSTableSchema(&dbSchema);
            return code;
          }
S
shenglian zhou 已提交
742 743 744 745 746
        }
      }

      pointSchema->precision = dbSchema.precision;

747
      destroySmlSTableSchema(&dbSchema);
S
shenglian zhou 已提交
748
    } else {
S
shenglian zhou 已提交
749
      tscError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code));
S
shenglian zhou 已提交
750 751 752 753 754 755
      return code;
    }
  }
  return 0;
}

756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793
static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int numPoints,
                                             SHashObj* cname2points, SArray* stableSchemas, SSmlLinesInfo* info) {
  for (int32_t i = 0; i < numPoints; ++i) {
    TAOS_SML_DATA_POINT * point = points + i;
    SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, point->schemaIdx);

    for (int j = 0; j < point->tagNum; ++j) {
      TAOS_SML_KV* kv =  point->tags + j;
      if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) {
        int64_t ts = *(int64_t*)(kv->value);
        ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision);
        *(int64_t*)(kv->value) = ts;
      }
    }

    for (int j = 0; j < point->fieldNum; ++j) {
      TAOS_SML_KV* kv =  point->fields + j;
      if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) {
        int64_t ts = *(int64_t*)(kv->value);
        ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision);
        *(int64_t*)(kv->value) = ts;
      }
    }

    SArray* cTablePoints = NULL;
    SArray** pCTablePoints = taosHashGet(cname2points, point->childTableName, strlen(point->childTableName));
    if (pCTablePoints) {
      cTablePoints = *pCTablePoints;
    } else {
      cTablePoints = taosArrayInit(64, sizeof(point));
      taosHashPut(cname2points, point->childTableName, strlen(point->childTableName), &cTablePoints, POINTER_BYTES);
    }
    taosArrayPush(cTablePoints, &point);
  }

  return 0;
}

794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818
static int32_t applyChildTableDataPointsWithInsertSQL(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema,
                                                  SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) {
  int32_t code = TSDB_CODE_SUCCESS;
  size_t numTags = taosArrayGetSize(sTableSchema->tags);
  size_t numCols = taosArrayGetSize(sTableSchema->fields);
  size_t rows = taosArrayGetSize(cTablePoints);
  SArray* tagsSchema = sTableSchema->tags;
  SArray* colsSchema = sTableSchema->fields;

  TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0};
  for (int i= 0; i < rows; ++i) {
    TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i);
    for (int j = 0; j < pDataPoint->tagNum; ++j) {
      TAOS_SML_KV* kv = pDataPoint->tags + j;
      tagKVs[kv->fieldSchemaIdx] = kv;
    }
  }

  char* sql = malloc(tsMaxSQLStringLen+1);
  if (sql == NULL) {
    tscError("malloc sql memory error");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  int32_t freeBytes = tsMaxSQLStringLen + 1 ;
819 820
  int32_t totalLen = 0;
  totalLen += sprintf(sql, "insert into %s using %s (", cTableName, sTableName);
821 822
  for (int i = 0; i < numTags; ++i) {
    SSchema* tagSchema = taosArrayGet(tagsSchema, i);
823
    totalLen += snprintf(sql+totalLen, freeBytes-totalLen, "%s,", tagSchema->name);
824
  }
825 826
  --totalLen;
  totalLen += snprintf(sql + totalLen, freeBytes-totalLen, ")");
827

828
  totalLen += snprintf(sql + totalLen, freeBytes-totalLen, " tags (");
829

S
shenglian zhou 已提交
830 831 832
//  for (int i = 0; i < numTags; ++i) {
//    snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
//  }
833 834
  for (int i = 0; i < numTags; ++i) {
    if (tagKVs[i] == NULL) {
835
      totalLen += snprintf(sql + totalLen, freeBytes-totalLen, "NULL,");
836 837
    } else {
      TAOS_SML_KV* kv =  tagKVs[i];
838
      size_t beforeLen = totalLen;
839
      int32_t len = 0;
840 841 842
      converToStr(sql+beforeLen, kv->type, kv->value, kv->length, &len);
      totalLen += len;
      totalLen += snprintf(sql+totalLen, freeBytes-totalLen, ",");
843 844
    }
  }
845 846
  --totalLen;
  totalLen += snprintf(sql + totalLen, freeBytes-totalLen, ") (");
847 848 849

  for (int i = 0; i < numCols; ++i) {
    SSchema* colSchema = taosArrayGet(colsSchema, i);
850
    totalLen += snprintf(sql+totalLen, freeBytes-totalLen, "%s,", colSchema->name);
851
  }
852 853
  --totalLen;
  totalLen += snprintf(sql + totalLen, freeBytes-totalLen, ") values ");
854 855 856

  TAOS_SML_KV** colKVs = malloc(numCols*sizeof(TAOS_SML_KV*));
  for (int r = 0; r < rows; ++r) {
857
    totalLen += snprintf(sql + totalLen, freeBytes-totalLen, "(");
858 859 860 861 862 863 864 865 866 867 868

    memset(colKVs, 0, numCols*sizeof(TAOS_SML_KV*));

    TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, r);
    for (int i = 0; i < point->fieldNum; ++i) {
      TAOS_SML_KV* kv = point->fields + i;
      colKVs[kv->fieldSchemaIdx] = kv;
    }

    for (int i = 0; i < numCols; ++i) {
      if (colKVs[i] == NULL) {
869
        totalLen += snprintf(sql + totalLen, freeBytes-totalLen, "NULL,");
870 871
      } else {
        TAOS_SML_KV* kv =  colKVs[i];
872
        size_t beforeLen = totalLen;
873
        int32_t len = 0;
874 875 876
        converToStr(sql+beforeLen, kv->type, kv->value, kv->length, &len);
        totalLen += len;
        totalLen += snprintf(sql+totalLen, freeBytes-totalLen, ",");
877 878
      }
    }
879 880
    --totalLen;
    totalLen += snprintf(sql+totalLen, freeBytes - totalLen, ")");
881 882
  }
  free(colKVs);
883
  sql[totalLen] = '\0';
884

885
  tscDebug("SML:0x%"PRIx64" insert child table table %s of super table %s sql: %s", info->id, cTableName, sTableName, sql);
S
shenglian zhou 已提交
886
  TAOS_RES* res = taos_query(taos, sql);
S
shenglian zhou 已提交
887
  free(sql);
S
shenglian zhou 已提交
888 889
  code = taos_errno(res);
  info->affectedRows = taos_affected_rows(res);
890
  taos_free_result(res);
891 892 893 894
  return code;
}

static int32_t applyChildTableDataPointsWithStmt(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema,
895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979
                                         SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) {
  size_t numTags = taosArrayGetSize(sTableSchema->tags);
  size_t numCols = taosArrayGetSize(sTableSchema->fields);
  size_t rows = taosArrayGetSize(cTablePoints);

  TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0};
  for (int i= 0; i < rows; ++i) {
    TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i);
    for (int j = 0; j < pDataPoint->tagNum; ++j) {
      TAOS_SML_KV* kv = pDataPoint->tags + j;
      tagKVs[kv->fieldSchemaIdx] = kv;
    }
  }

  //tag bind
  SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND));
  taosArraySetSize(tagBinds, numTags);
  int isNullColBind = TSDB_TRUE;
  for (int j = 0; j < numTags; ++j) {
    TAOS_BIND* bind = taosArrayGet(tagBinds, j);
    bind->is_null = &isNullColBind;
  }
  for (int j = 0; j < numTags; ++j) {
    if (tagKVs[j] == NULL) continue;
    TAOS_SML_KV* kv =  tagKVs[j];
    TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx);
    bind->buffer_type = kv->type;
    bind->length = malloc(sizeof(uintptr_t*));
    *bind->length = kv->length;
    bind->buffer = kv->value;
    bind->is_null = NULL;
  }

  //rows bind
  SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES);
  for (int i = 0; i < rows; ++i) {
    TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, i);

    TAOS_BIND* colBinds = calloc(numCols, sizeof(TAOS_BIND));
    if (colBinds == NULL) {
      tscError("SML:0x%"PRIx64" taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, "
               "num of rows: %zu, num of cols: %zu", info->id, rows, numCols);
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }

    for (int j = 0; j < numCols; ++j) {
      TAOS_BIND* bind = colBinds + j;
      bind->is_null = &isNullColBind;
    }
    for (int j = 0; j < point->fieldNum; ++j) {
      TAOS_SML_KV* kv = point->fields + j;
      TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx;
      bind->buffer_type = kv->type;
      bind->length = malloc(sizeof(uintptr_t*));
      *bind->length = kv->length;
      bind->buffer = kv->value;
      bind->is_null = NULL;
    }
    taosArrayPush(rowsBind, &colBinds);
  }

  int32_t code = 0;
  code = insertChildTablePointsBatch(taos, cTableName, sTableName, sTableSchema->tags, tagBinds, sTableSchema->fields, rowsBind, rowSize, info);
  if (code != 0) {
    tscError("SML:0x%"PRIx64" insert into child table %s failed. error %s", info->id, cTableName, tstrerror(code));
  }

  //free rows bind
  for (int i = 0; i < rows; ++i) {
    TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i);
    for (int j = 0; j < numCols; ++j) {
      TAOS_BIND* bind = colBinds + j;
      free(bind->length);
    }
    free(colBinds);
  }
  taosArrayDestroy(rowsBind);
  //free tag bind
  for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) {
    TAOS_BIND* bind = taosArrayGet(tagBinds, i);
    free(bind->length);
  }
  taosArrayDestroy(tagBinds);
  return code;
}
980

981 982 983 984
static int32_t insertChildTablePointsBatch(TAOS* taos, char* cTableName, char* sTableName,
                                           SArray* tagsSchema, SArray* tagsBind,
                                           SArray* colsSchema, SArray* rowsBind,
                                           size_t rowSize, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
985
  size_t numTags = taosArrayGetSize(tagsSchema);
986
  size_t numCols = taosArrayGetSize(colsSchema);
987
  char* sql = malloc(tsMaxSQLStringLen+1);
988 989 990 991
  if (sql == NULL) {
    tscError("malloc sql memory error");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
992

993 994
  int32_t freeBytes = tsMaxSQLStringLen + 1 ;
  sprintf(sql, "insert into ? using %s (", sTableName);
S
shenglian zhou 已提交
995 996 997 998 999
  for (int i = 0; i < numTags; ++i) {
    SSchema* tagSchema = taosArrayGet(tagsSchema, i);
    snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", tagSchema->name);
  }
  snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")");
1000

S
shenglian zhou 已提交
1001
  snprintf(sql + strlen(sql), freeBytes-strlen(sql), " tags (");
1002

S
shenglian zhou 已提交
1003
  for (int i = 0; i < numTags; ++i) {
S
shenglian zhou 已提交
1004
    snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
S
shenglian zhou 已提交
1005
  }
1006
  snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ") (");
1007

1008 1009 1010
  for (int i = 0; i < numCols; ++i) {
    SSchema* colSchema = taosArrayGet(colsSchema, i);
    snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", colSchema->name);
S
shenglian zhou 已提交
1011
  }
1012
  snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ") values (");
S
shenglian zhou 已提交
1013

1014 1015
  for (int i = 0; i < numCols; ++i) {
    snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
S
shenglian zhou 已提交
1016
  }
1017 1018
  snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ")");
  sql[strlen(sql)] = '\0';
S
shenglian zhou 已提交
1019

1020
  tscDebug("SML:0x%"PRIx64" insert child table table %s of super table %s : %s", info->id, cTableName, sTableName, sql);
S
shenglian zhou 已提交
1021

1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044
  size_t rows = taosArrayGetSize(rowsBind);
  size_t maxBatchSize = TSDB_MAX_WAL_SIZE/rowSize * 4 / 5;
  size_t batchSize = MIN(maxBatchSize, rows);
  tscDebug("SML:0x%"PRIx64" insert rows into child table %s. num of rows: %zu, batch size: %zu",
           info->id, cTableName, rows, batchSize);
  SArray* batchBind = taosArrayInit(batchSize, POINTER_BYTES);
  int32_t code = TSDB_CODE_SUCCESS;
  for (int i = 0; i < rows;) {
    int j = i;
    for (; j < i + batchSize && j<rows; ++j) {
      taosArrayPush(batchBind, taosArrayGet(rowsBind, j));
    }
    if (j > i) {
      tscDebug("SML:0x%"PRIx64" insert child table batch from line %d to line %d.", info->id, i, j - 1);
      code = doInsertChildTablePoints(taos, sql, cTableName, tagsBind, batchBind, info);
      if (code != 0) {
        taosArrayDestroy(batchBind);
        tfree(sql);
        return code;
      }
      taosArrayClear(batchBind);
    }
    i = j;
1045
  }
1046 1047
  taosArrayDestroy(batchBind);
  tfree(sql);
1048
  return code;
S
shenglian zhou 已提交
1049

1050 1051 1052
}
static int32_t doInsertChildTablePoints(TAOS* taos, char* sql, char* cTableName, SArray* tagsBind, SArray* batchBind,
                                        SSmlLinesInfo* info) {
1053
  int32_t code = 0;
S
shenglian zhou 已提交
1054

W
wpan 已提交
1055
  TAOS_STMT* stmt = taos_stmt_init(taos);
1056 1057 1058
  if (stmt == NULL) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
1059

S
shenglian zhou 已提交
1060
  code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql));
1061

W
wpan 已提交
1062
  if (code != 0) {
1063
    tscError("SML:0x%"PRIx64" taos_stmt_prepare return %d:%s", info->id, code, taos_stmt_errstr(stmt));
1064
    taos_stmt_close(stmt);
W
wpan 已提交
1065 1066 1067
    return code;
  }

1068
  bool tryAgain = false;
1069
  int32_t try = 0;
W
wpan 已提交
1070
  do {
1071
    code = taos_stmt_set_tbname_tags(stmt, cTableName, TARRAY_GET_START(tagsBind));
1072
    if (code != 0) {
1073 1074 1075 1076 1077
      tscError("SML:0x%"PRIx64" taos_stmt_set_tbname return %d:%s", info->id, code, taos_stmt_errstr(stmt));

      int affectedRows = taos_stmt_affected_rows(stmt);
      info->affectedRows += affectedRows;

1078
      taos_stmt_close(stmt);
1079 1080
      return code;
    }
1081

1082
    size_t rows = taosArrayGetSize(batchBind);
1083
    for (int32_t i = 0; i < rows; ++i) {
1084
      TAOS_BIND* colsBinds = taosArrayGetP(batchBind, i);
1085 1086
      code = taos_stmt_bind_param(stmt, colsBinds);
      if (code != 0) {
1087 1088 1089 1090 1091
        tscError("SML:0x%"PRIx64" taos_stmt_bind_param return %d:%s", info->id, code, taos_stmt_errstr(stmt));

        int affectedRows = taos_stmt_affected_rows(stmt);
        info->affectedRows += affectedRows;

1092
        taos_stmt_close(stmt);
1093 1094 1095 1096
        return code;
      }
      code = taos_stmt_add_batch(stmt);
      if (code != 0) {
1097 1098 1099 1100 1101
        tscError("SML:0x%"PRIx64" taos_stmt_add_batch return %d:%s", info->id, code, taos_stmt_errstr(stmt));

        int affectedRows = taos_stmt_affected_rows(stmt);
        info->affectedRows += affectedRows;

1102
        taos_stmt_close(stmt);
1103 1104 1105 1106 1107 1108
        return code;
      }
    }

    code = taos_stmt_execute(stmt);
    if (code != 0) {
1109
      tscError("SML:0x%"PRIx64" taos_stmt_execute return %d:%s, try:%d", info->id, code, taos_stmt_errstr(stmt), try);
1110
    }
1111
    tscDebug("SML:0x%"PRIx64" taos_stmt_execute inserted %d rows", info->id, taos_stmt_affected_rows(stmt));
1112

1113 1114
    tryAgain = false;
    if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID
1115 1116 1117 1118
         || code == TSDB_CODE_VND_INVALID_VGROUP_ID
         || code == TSDB_CODE_TDB_TABLE_RECONFIGURE
         || code == TSDB_CODE_APP_NOT_READY
         || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && try++ < TSDB_MAX_REPLICA) {
1119 1120 1121
      tryAgain = true;
    }

1122
    if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
1123 1124 1125 1126 1127 1128
      TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
      int32_t   code2 = taos_errno(res2);
      if (code2 != TSDB_CODE_SUCCESS) {
        tscError("SML:0x%" PRIx64 " insert child table. reset query cache. error: %s", info->id, taos_errstr(res2));
      }
      taos_free_result(res2);
1129
      if (tryAgain) {
1130
        taosMsleep(100 * (2 << try));
1131
      }
1132
    }
1133
    if (code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
1134
      if (tryAgain) {
1135
        taosMsleep( 100 * (2 << try));
1136 1137 1138 1139
      }
    }
  } while (tryAgain);

1140 1141
  int affectedRows = taos_stmt_affected_rows(stmt);
  info->affectedRows += affectedRows;
1142

1143
  taos_stmt_close(stmt);
S
shenglian zhou 已提交
1144
  return code;
1145

S
shenglian zhou 已提交
1146 1147 1148
  return 0;
}

1149 1150 1151 1152 1153
static int32_t applyChildTableDataPoints(TAOS* taos, char* cTableName, char* sTableName, SSmlSTableSchema* sTableSchema,
                                                 SArray* cTablePoints, size_t rowSize, SSmlLinesInfo* info) {
  int32_t code = TSDB_CODE_SUCCESS;
  size_t childTableDataPoints = taosArrayGetSize(cTablePoints);
  if (childTableDataPoints < 10) {
1154
    code = applyChildTableDataPointsWithInsertSQL(taos, cTableName, sTableName, sTableSchema, cTablePoints, rowSize, info);
1155
  } else {
1156
    code = applyChildTableDataPointsWithStmt(taos, cTableName, sTableName, sTableSchema, cTablePoints, rowSize, info);
1157 1158 1159 1160
  }
  return code;
}

S
shenglian zhou 已提交
1161
static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas, SSmlLinesInfo* info) {
1162
  int32_t code = TSDB_CODE_SUCCESS;
1163

1164
  SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
S
shenglian zhou 已提交
1165
  arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas, info);
1166 1167 1168 1169 1170 1171

  SArray** pCTablePoints = taosHashIterate(cname2points, NULL);
  while (pCTablePoints) {
    SArray* cTablePoints = *pCTablePoints;

    TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0);
1172
    SSmlSTableSchema*    sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
S
shenglian zhou 已提交
1173

1174 1175 1176 1177 1178 1179 1180
    size_t rowSize = 0;
    size_t numCols = taosArrayGetSize(sTableSchema->fields);
    for (int i = 0; i < numCols; ++i) {
      SSchema* colSchema = taosArrayGet(sTableSchema->fields, i);
      rowSize += colSchema->bytes;
    }

1181 1182 1183
    tscDebug("SML:0x%"PRIx64" apply child table points. child table: %s of super table %s, row size: %zu",
             info->id, point->childTableName, point->stableName, rowSize);
    code = applyChildTableDataPoints(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints, rowSize, info);
1184
    if (code != 0) {
1185
      tscError("SML:0x%"PRIx64" Apply child table points failed. child table %s, error %s", info->id, point->childTableName, tstrerror(code));
1186
      goto cleanup;
1187
    }
1188

S
shenglian zhou 已提交
1189
    tscDebug("SML:0x%"PRIx64" successfully applied data points of child table %s", info->id, point->childTableName);
S
shenglian zhou 已提交
1190

S
shenglian zhou 已提交
1191
    pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
1192
  }
S
shenglian zhou 已提交
1193

1194 1195 1196 1197 1198
cleanup:
  pCTablePoints = taosHashIterate(cname2points, NULL);
  while (pCTablePoints) {
    SArray* pPoints = *pCTablePoints;
    taosArrayDestroy(pPoints);
S
shenglian zhou 已提交
1199
    pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
1200
  }
S
shenglian zhou 已提交
1201
  taosHashCleanup(cname2points);
1202
  return code;
S
shenglian zhou 已提交
1203
}
1204

1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258
static int doSmlInsertOneDataPoint(TAOS* taos, TAOS_SML_DATA_POINT* point, SSmlLinesInfo* info) {
  int32_t code = TSDB_CODE_SUCCESS;

  if (!point->childTableName) {
    int tableNameLen = TSDB_TABLE_NAME_LEN;
    point->childTableName = calloc(1, tableNameLen + 1);
    getSmlMd5ChildTableName(point, point->childTableName, &tableNameLen, info);
    point->childTableName[tableNameLen] = '\0';
  }

  STableMeta* tableMeta = NULL;
  int32_t ret = getSuperTableMetaFromLocalCache(taos, point->stableName, &tableMeta, info);
  if (ret != TSDB_CODE_SUCCESS) {
    return ret;
  }
  uint8_t precision = tableMeta->tableInfo.precision;
  free(tableMeta);

  char* sql = malloc(TSDB_MAX_SQL_LEN + 1);
  int   freeBytes = TSDB_MAX_SQL_LEN;
  int   sqlLen = 0;
  sqlLen += snprintf(sql + sqlLen, freeBytes - sqlLen, "insert into %s(", point->childTableName);
  for (int col = 0; col < point->fieldNum; ++col) {
    TAOS_SML_KV* kv = point->fields + col;
    sqlLen += snprintf(sql + sqlLen, freeBytes - sqlLen, "%s,", kv->key);
  }
  --sqlLen;
  sqlLen += snprintf(sql + sqlLen, freeBytes - sqlLen, ") values (");
  TAOS_SML_KV* tsField = point->fields + 0;
  int64_t      ts = *(int64_t*)(tsField->value);
  ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, precision);
  sqlLen += snprintf(sql + sqlLen, freeBytes - sqlLen, "%" PRId64 ",", ts);
  for (int col = 1; col < point->fieldNum; ++col) {
    TAOS_SML_KV* kv = point->fields + col;
    int32_t      len = 0;
    converToStr(sql + sqlLen, kv->type, kv->value, kv->length, &len);
    sqlLen += len;
    sqlLen += snprintf(sql + sqlLen, freeBytes - sqlLen, ",");
  }
  --sqlLen;
  sqlLen += snprintf(sql + sqlLen, freeBytes - sqlLen, ")");
  sql[sqlLen] = 0;

  tscDebug("SML:0x%" PRIx64 " insert child table table %s of super table %s sql: %s", info->id,
           point->childTableName, point->stableName, sql);
  TAOS_RES* res = taos_query(taos, sql);
  free(sql);
  code = taos_errno(res);
  info->affectedRows = taos_affected_rows(res);
  taos_free_result(res);

  return code;
}

1259
int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLinesInfo* info) {
S
shenglian zhou 已提交
1260
  tscDebug("SML:0x%"PRIx64" taos_sml_insert. number of points: %d", info->id, numPoint);
S
Shenglian Zhou 已提交
1261

S
shenglian zhou 已提交
1262 1263
  int32_t code = TSDB_CODE_SUCCESS;

1264
  info->affectedRows = 0;
S
shenglian zhou 已提交
1265

1266
  if (numPoint == 1) {
1267
    TAOS_SML_DATA_POINT* point = points + 0;
1268 1269 1270
    code = doSmlInsertOneDataPoint(taos, point, info);
    if (code == TSDB_CODE_SUCCESS) {
      return code;
1271 1272 1273
    }
  }

S
shenglian zhou 已提交
1274
  tscDebug("SML:0x%"PRIx64" build data point schemas", info->id);
S
shenglian zhou 已提交
1275
  SArray* stableSchemas = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray<STableColumnsSchema>
S
shenglian zhou 已提交
1276
  code = buildDataPointSchemas(points, numPoint, stableSchemas, info);
S
shenglian zhou 已提交
1277
  if (code != 0) {
S
shenglian zhou 已提交
1278
    tscError("SML:0x%"PRIx64" error building data point schemas : %s", info->id, tstrerror(code));
S
shenglian zhou 已提交
1279 1280 1281
    goto clean_up;
  }

S
shenglian zhou 已提交
1282 1283
  tscDebug("SML:0x%"PRIx64" modify db schemas", info->id);
  code = modifyDBSchemas(taos, stableSchemas, info);
S
shenglian zhou 已提交
1284
  if (code != 0) {
S
shenglian zhou 已提交
1285
    tscError("SML:0x%"PRIx64" error change db schema : %s", info->id, tstrerror(code));
S
shenglian zhou 已提交
1286 1287
    goto clean_up;
  }
S
shenglian zhou 已提交
1288

S
shenglian zhou 已提交
1289 1290
  tscDebug("SML:0x%"PRIx64" apply data points", info->id);
  code = applyDataPoints(taos, points, numPoint, stableSchemas, info);
S
shenglian zhou 已提交
1291
  if (code != 0) {
S
shenglian zhou 已提交
1292
    tscError("SML:0x%"PRIx64" error apply data points : %s", info->id, tstrerror(code));
S
shenglian zhou 已提交
1293 1294 1295
  }

clean_up:
S
shenglian zhou 已提交
1296 1297 1298 1299
  for (int i = 0; i < taosArrayGetSize(stableSchemas); ++i) {
    SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i);
    taosArrayDestroy(schema->fields);
    taosArrayDestroy(schema->tags);
1300
  }
S
shenglian zhou 已提交
1301
  taosArrayDestroy(stableSchemas);
1302 1303 1304
  return code;
}

1305
int tsc_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
1306 1307
  SSmlLinesInfo* info = calloc(1, sizeof(SSmlLinesInfo));
  info->id = genLinesSmlId();
1308 1309
  int code = tscSmlInsert(taos, points, numPoint, info);
  free(info);
1310 1311
  return code;
}
S
shenglian zhou 已提交
1312

1313 1314
//=========================================================================

1315 1316 1317 1318 1319
/*        Field                          Escape charaters
    1: measurement                        Comma,Space
    2: tag_key, tag_value, field_key  Comma,Equal Sign,Space
    3: field_value                    Double quote,Backslash
*/
1320
static void escapeSpecialCharacter(uint8_t field, const char **pos) {
1321 1322 1323
  const char *cur = *pos;
  if (*cur != '\\') {
    return;
1324
  }
1325 1326 1327 1328 1329 1330
  switch (field) {
    case 1:
      switch (*(cur + 1)) {
        case ',':
        case ' ':
          cur++;
1331
          break;
1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358
        default:
          break;
      }
      break;
    case 2:
      switch (*(cur + 1)) {
        case ',':
        case ' ':
        case '=':
          cur++;
          break;
        default:
          break;
      }
      break;
    case 3:
      switch (*(cur + 1)) {
        case '"':
        case '\\':
          cur++;
          break;
        default:
          break;
      }
      break;
    default:
      break;
1359
  }
1360
  *pos = cur;
1361
}
1362

1363
char* addEscapeCharToString(char *str, int32_t len) {
1364
  if (str == NULL) {
1365
    return NULL;
1366 1367
  }
  memmove(str + 1, str, len);
1368 1369
  str[0] = str[len + 1] = TS_ESCAPE_CHAR;
  str[len + 2] = '\0';
1370
  return str;
1371 1372
}

1373
bool isValidInteger(char *str) {
1374 1375
  char *c = str;
  if (*c != '+' && *c != '-' && !isdigit(*c)) {
1376 1377
    return false;
  }
1378 1379 1380 1381 1382 1383
  c++;
  while (*c != '\0') {
    if (!isdigit(*c)) {
      return false;
    }
    c++;
1384
  }
1385
  return true;
1386
}
1387

1388
bool isValidFloat(char *str) {
1389 1390 1391 1392 1393 1394 1395
  char *c = str;
  uint8_t has_dot, has_exp, has_sign;
  has_dot = 0;
  has_exp = 0;
  has_sign = 0;

  if (*c != '+' && *c != '-' && *c != '.' && !isdigit(*c)) {
1396 1397
    return false;
  }
1398 1399
  if (*c == '.' && isdigit(*(c + 1))) {
    has_dot = 1;
1400
  }
1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441
  c++;
  while (*c != '\0') {
    if (!isdigit(*c)) {
      switch (*c) {
        case '.': {
          if (!has_dot && !has_exp && isdigit(*(c + 1))) {
            has_dot = 1;
          } else {
            return false;
          }
          break;
        }
        case 'e':
        case 'E': {
          if (!has_exp && isdigit(*(c - 1)) &&
              (isdigit(*(c + 1)) ||
               *(c + 1) == '+' ||
               *(c + 1) == '-')) {
            has_exp = 1;
          } else {
            return false;
          }
          break;
        }
        case '+':
        case '-': {
          if (!has_sign && has_exp && isdigit(*(c + 1))) {
            has_sign = 1;
          } else {
            return false;
          }
          break;
        }
        default: {
          return false;
        }
      }
    }
    c++;
  } //while
  return true;
1442
}
1443

1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459
static bool isInteger(char *pVal, uint16_t len, bool *has_sign) {
  if (len <= 1) {
    return false;
  }
  if (pVal[len - 1] == 'i') {
    *has_sign = true;
    return true;
  }
  if (pVal[len - 1] == 'u') {
    *has_sign = false;
    return true;
  }

  return false;
}

1460
static bool isTinyInt(char *pVal, uint16_t len) {
1461 1462 1463
  if (len <= 2) {
    return false;
  }
1464
  if (!strcasecmp(&pVal[len - 2], "i8")) {
1465
    //printf("Type is int8(%s)\n", pVal);
1466 1467 1468 1469
    return true;
  }
  return false;
}
1470

1471
static bool isTinyUint(char *pVal, uint16_t len) {
1472 1473 1474 1475 1476
  if (len <= 2) {
    return false;
  }
  if (pVal[0] == '-') {
    return false;
1477
  }
1478
  if (!strcasecmp(&pVal[len - 2], "u8")) {
1479
    //printf("Type is uint8(%s)\n", pVal);
1480 1481 1482
    return true;
  }
  return false;
1483 1484
}

1485
static bool isSmallInt(char *pVal, uint16_t len) {
1486 1487 1488
  if (len <= 3) {
    return false;
  }
1489
  if (!strcasecmp(&pVal[len - 3], "i16")) {
1490
    //printf("Type is int16(%s)\n", pVal);
1491
    return true;
1492
  }
1493
  return false;
1494 1495
}

1496
static bool isSmallUint(char *pVal, uint16_t len) {
1497 1498
  if (len <= 3) {
    return false;
1499
  }
1500 1501 1502
  if (pVal[0] == '-') {
    return false;
  }
1503
  if (strcasecmp(&pVal[len - 3], "u16") == 0) {
1504
    //printf("Type is uint16(%s)\n", pVal);
1505 1506 1507
    return true;
  }
  return false;
1508 1509
}

1510
static bool isInt(char *pVal, uint16_t len) {
1511 1512
  if (len <= 3) {
    return false;
1513
  }
1514
  if (strcasecmp(&pVal[len - 3], "i32") == 0) {
1515
    //printf("Type is int32(%s)\n", pVal);
1516 1517 1518
    return true;
  }
  return false;
1519 1520
}

1521
static bool isUint(char *pVal, uint16_t len) {
1522 1523 1524 1525 1526 1527
  if (len <= 3) {
    return false;
  }
  if (pVal[0] == '-') {
    return false;
  }
1528
  if (strcasecmp(&pVal[len - 3], "u32") == 0) {
1529
    //printf("Type is uint32(%s)\n", pVal);
1530 1531 1532
    return true;
  }
  return false;
1533 1534
}

1535
static bool isBigInt(char *pVal, uint16_t len) {
1536 1537
  if (len <= 3) {
    return false;
1538
  }
1539
  if (strcasecmp(&pVal[len - 3], "i64") == 0) {
1540
    //printf("Type is int64(%s)\n", pVal);
1541 1542 1543
    return true;
  }
  return false;
1544 1545
}

1546
static bool isBigUint(char *pVal, uint16_t len) {
1547 1548 1549 1550 1551
  if (len <= 3) {
    return false;
  }
  if (pVal[0] == '-') {
    return false;
1552
  }
1553
  if (strcasecmp(&pVal[len - 3], "u64") == 0) {
1554
    //printf("Type is uint64(%s)\n", pVal);
1555 1556 1557
    return true;
  }
  return false;
1558 1559
}

1560
static bool isFloat(char *pVal, uint16_t len) {
1561 1562 1563
  if (len <= 3) {
    return false;
  }
1564
  if (strcasecmp(&pVal[len - 3], "f32") == 0) {
1565
    //printf("Type is float(%s)\n", pVal);
1566 1567 1568
    return true;
  }
  return false;
1569 1570
}

1571
static bool isDouble(char *pVal, uint16_t len) {
1572 1573 1574
  if (len <= 3) {
    return false;
  }
1575
  if (strcasecmp(&pVal[len - 3], "f64") == 0) {
1576
    //printf("Type is double(%s)\n", pVal);
1577 1578 1579 1580 1581
    return true;
  }
  return false;
}

1582
static bool isBool(char *pVal, uint16_t len, bool *bVal) {
1583
  if ((len == 1) && !strcasecmp(&pVal[len - 1], "t")) {
1584 1585
    //printf("Type is bool(%c)\n", pVal[len - 1]);
    *bVal = true;
1586
    return true;
1587
  }
1588

1589
  if ((len == 1) && !strcasecmp(&pVal[len - 1], "f")) {
1590 1591
    //printf("Type is bool(%c)\n", pVal[len - 1]);
    *bVal = false;
1592
    return true;
1593
  }
1594

1595
  if((len == 4) && !strcasecmp(&pVal[len - 4], "true")) {
1596 1597
    //printf("Type is bool(%s)\n", &pVal[len - 4]);
    *bVal = true;
1598 1599
    return true;
  }
1600
  if((len == 5) && !strcasecmp(&pVal[len - 5], "false")) {
1601 1602
    //printf("Type is bool(%s)\n", &pVal[len - 5]);
    *bVal = false;
1603 1604 1605
    return true;
  }
  return false;
1606 1607
}

1608
static bool isBinary(char *pVal, uint16_t len) {
1609 1610 1611 1612 1613 1614
  //binary: "abc"
  if (len < 2) {
    return false;
  }
  //binary
  if (pVal[0] == '"' && pVal[len - 1] == '"') {
1615
    //printf("Type is binary(%s)\n", pVal);
1616 1617 1618 1619
    return true;
  }
  return false;
}
1620

1621
static bool isNchar(char *pVal, uint16_t len) {
1622 1623
  //nchar: L"abc"
  if (len < 3) {
1624 1625
    return false;
  }
1626
  if ((pVal[0] == 'l' || pVal[0] == 'L')&& pVal[1] == '"' && pVal[len - 1] == '"') {
1627
    //printf("Type is nchar(%s)\n", pVal);
1628
    return true;
1629
  }
1630 1631 1632
  return false;
}

1633
static int32_t isTimeStamp(char *pVal, uint16_t len, SMLTimeStampType *tsType, SSmlLinesInfo* info) {
1634
  if (len == 0) {
1635
    return TSDB_CODE_SUCCESS;
1636 1637 1638
  }
  if ((len == 1) && pVal[0] == '0') {
    *tsType = SML_TIME_STAMP_NOW;
1639
    return TSDB_CODE_SUCCESS;
1640
  }
1641

1642 1643 1644
  for (int i = 0; i < len; ++i) {
    if(!isdigit(pVal[i])) {
      return TSDB_CODE_TSC_INVALID_TIME_STAMP;
1645
    }
1646
  }
1647

1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664
  /* For InfluxDB line protocol use user passed timestamp precision
   * For OpenTSDB protocols only 10 digit(seconds) or 13 digits(milliseconds)
   *     precision allowed
   */
  if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
    if (info->tsType != SML_TIME_STAMP_NOT_CONFIGURED) {
      *tsType = info->tsType;
    } else {
      *tsType = SML_TIME_STAMP_NANO_SECONDS;
    }
  } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
    if (len == SML_TIMESTAMP_SECOND_DIGITS) {
      *tsType = SML_TIME_STAMP_SECONDS;
    } else if (len == SML_TIMESTAMP_MILLI_SECOND_DIGITS) {
      *tsType = SML_TIME_STAMP_MILLI_SECONDS;
    } else {
      return TSDB_CODE_TSC_INVALID_TIME_STAMP;
1665 1666
    }
  }
1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691
  return TSDB_CODE_SUCCESS;

  //if (pVal[len - 1] == 's') {
  //  switch (pVal[len - 2]) {
  //    case 'm':
  //      *tsType = SML_TIME_STAMP_MILLI_SECONDS;
  //      break;
  //    case 'u':
  //      *tsType = SML_TIME_STAMP_MICRO_SECONDS;
  //      break;
  //    case 'n':
  //      *tsType = SML_TIME_STAMP_NANO_SECONDS;
  //      break;
  //    default:
  //      if (isdigit(pVal[len - 2])) {
  //        *tsType = SML_TIME_STAMP_SECONDS;
  //        break;
  //      } else {
  //        return false;
  //      }
  //  }
  //  //printf("Type is timestamp(%s)\n", pVal);
  //  return true;
  //}
  //return false;
1692
}
1693

1694
static bool convertStrToNumber(TAOS_SML_KV *pVal, char *str, SSmlLinesInfo* info) {
1695 1696 1697
  errno = 0;
  uint8_t type = pVal->type;
  int16_t length = pVal->length;
1698 1699 1700
  int64_t val_s = 0;
  uint64_t val_u = 0;
  double val_d = 0.0;
1701

1702
  strntolower_s(str, str, (int32_t)strlen(str));
1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713
  if (IS_FLOAT_TYPE(type)) {
    val_d = strtod(str, NULL);
  } else {
    if (IS_SIGNED_NUMERIC_TYPE(type)) {
      val_s = strtoll(str, NULL, 10);
    } else {
      val_u = strtoull(str, NULL, 10);
    }
  }

  if (errno == ERANGE) {
1714
    tscError("SML:0x%"PRIx64" Convert number(%s) out of range", info->id, str);
1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793
    return false;
  }

  switch (type) {
    case TSDB_DATA_TYPE_TINYINT:
      if (!IS_VALID_TINYINT(val_s)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(int8_t *)(pVal->value) = (int8_t)val_s;
      break;
    case TSDB_DATA_TYPE_UTINYINT:
      if (!IS_VALID_UTINYINT(val_u)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(uint8_t *)(pVal->value) = (uint8_t)val_u;
      break;
    case TSDB_DATA_TYPE_SMALLINT:
      if (!IS_VALID_SMALLINT(val_s)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(int16_t *)(pVal->value) = (int16_t)val_s;
      break;
    case TSDB_DATA_TYPE_USMALLINT:
      if (!IS_VALID_USMALLINT(val_u)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(uint16_t *)(pVal->value) = (uint16_t)val_u;
      break;
    case TSDB_DATA_TYPE_INT:
      if (!IS_VALID_INT(val_s)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(int32_t *)(pVal->value) = (int32_t)val_s;
      break;
    case TSDB_DATA_TYPE_UINT:
      if (!IS_VALID_UINT(val_u)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(uint32_t *)(pVal->value) = (uint32_t)val_u;
      break;
    case TSDB_DATA_TYPE_BIGINT:
      if (!IS_VALID_BIGINT(val_s)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(int64_t *)(pVal->value) = (int64_t)val_s;
      break;
    case TSDB_DATA_TYPE_UBIGINT:
      if (!IS_VALID_UBIGINT(val_u)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(uint64_t *)(pVal->value) = (uint64_t)val_u;
      break;
    case TSDB_DATA_TYPE_FLOAT:
      if (!IS_VALID_FLOAT(val_d)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(float *)(pVal->value) = (float)val_d;
      break;
    case TSDB_DATA_TYPE_DOUBLE:
      if (!IS_VALID_DOUBLE(val_d)) {
        return false;
      }
      pVal->value = calloc(length, 1);
      *(double *)(pVal->value) = (double)val_d;
      break;
    default:
      return false;
  }
  return true;
}
1794
//len does not include '\0' from value.
1795
bool convertSmlValueType(TAOS_SML_KV *pVal, char *value,
1796
                         uint16_t len, SSmlLinesInfo* info, bool isTag) {
1797 1798 1799
  if (len <= 0) {
    return false;
  }
G
Ganlin Zhao 已提交
1800

1801 1802 1803 1804 1805 1806 1807 1808 1809
  //convert tags value to Nchar
  if (isTag) {
    pVal->type = TSDB_DATA_TYPE_NCHAR;
    pVal->length = len;
    pVal->value = calloc(pVal->length, 1);
    memcpy(pVal->value, value, pVal->length);
    return true;
  }

1810
  //integer number
1811 1812 1813 1814 1815 1816 1817 1818 1819 1820
  bool has_sign;
  if (isInteger(value, len, &has_sign)) {
    pVal->type = has_sign ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_UBIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 1] = '\0';
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
      return false;
    }
    return true;
  }
1821
  if (isTinyInt(value, len)) {
1822 1823 1824
    pVal->type = TSDB_DATA_TYPE_TINYINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 2] = '\0';
1825
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1826
      return false;
1827 1828 1829
    }
    return true;
  }
1830
  if (isTinyUint(value, len)) {
1831 1832 1833
    pVal->type = TSDB_DATA_TYPE_UTINYINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 2] = '\0';
1834
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1835 1836 1837 1838
      return false;
    }
    return true;
  }
1839
  if (isSmallInt(value, len)) {
1840 1841 1842
    pVal->type = TSDB_DATA_TYPE_SMALLINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1843
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1844 1845 1846 1847
      return false;
    }
    return true;
  }
1848
  if (isSmallUint(value, len)) {
1849 1850 1851
    pVal->type = TSDB_DATA_TYPE_USMALLINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1852
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1853 1854 1855 1856
      return false;
    }
    return true;
  }
1857
  if (isInt(value, len)) {
1858 1859 1860
    pVal->type = TSDB_DATA_TYPE_INT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1861
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1862 1863 1864 1865
      return false;
    }
    return true;
  }
1866
  if (isUint(value, len)) {
1867 1868 1869
    pVal->type = TSDB_DATA_TYPE_UINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1870
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1871 1872 1873 1874
      return false;
    }
    return true;
  }
1875
  if (isBigInt(value, len)) {
1876 1877 1878
    pVal->type = TSDB_DATA_TYPE_BIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1879
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1880 1881 1882 1883
      return false;
    }
    return true;
  }
1884
  if (isBigUint(value, len)) {
1885 1886 1887
    pVal->type = TSDB_DATA_TYPE_UBIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1888
    if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
1889 1890 1891 1892
      return false;
    }
    return true;
  }
1893 1894 1895 1896 1897
  //floating number
  if (isFloat(value, len)) {
    pVal->type = TSDB_DATA_TYPE_FLOAT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1898
    if (!isValidFloat(value) || !convertStrToNumber(pVal, value, info)) {
1899 1900 1901 1902 1903 1904 1905 1906
      return false;
    }
    return true;
  }
  if (isDouble(value, len)) {
    pVal->type = TSDB_DATA_TYPE_DOUBLE;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1907
    if (!isValidFloat(value) || !convertStrToNumber(pVal, value, info)) {
1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938
      return false;
    }
    return true;
  }
  //binary
  if (isBinary(value, len)) {
    pVal->type = TSDB_DATA_TYPE_BINARY;
    pVal->length = len - 2;
    pVal->value = calloc(pVal->length, 1);
    //copy after "
    memcpy(pVal->value, value + 1, pVal->length);
    return true;
  }
  //nchar
  if (isNchar(value, len)) {
    pVal->type = TSDB_DATA_TYPE_NCHAR;
    pVal->length = len - 3;
    pVal->value = calloc(pVal->length, 1);
    //copy after L"
    memcpy(pVal->value, value + 2, pVal->length);
    return true;
  }
  //bool
  bool bVal;
  if (isBool(value, len, &bVal)) {
    pVal->type = TSDB_DATA_TYPE_BOOL;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->value = calloc(pVal->length, 1);
    memcpy(pVal->value, &bVal, pVal->length);
    return true;
  }
1939

1940 1941
  //Handle default(no appendix) type as DOUBLE
  if (isValidInteger(value) || isValidFloat(value)) {
1942
    pVal->type = TSDB_DATA_TYPE_DOUBLE;
G
Ganlin Zhao 已提交
1943
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
1944
    if (!convertStrToNumber(pVal, value, info)) {
1945 1946
      return false;
    }
G
Ganlin Zhao 已提交
1947 1948
    return true;
  }
1949
  return false;
1950
}
1951

1952
static int32_t getTimeStampValue(char *value, uint16_t len,
1953
                                 SMLTimeStampType type, int64_t *ts, SSmlLinesInfo* info) {
1954 1955

  //No appendix or no timestamp given (len = 0)
1956
  if (len != 0 && type != SML_TIME_STAMP_NOW) {
1957 1958 1959 1960 1961 1962
    *ts = (int64_t)strtoll(value, NULL, 10);
  } else {
    type = SML_TIME_STAMP_NOW;
  }
  switch (type) {
    case SML_TIME_STAMP_NOW: {
1963
      *ts = taosGetTimestampNs();
1964
      break;
1965
    }
1966 1967 1968 1969 1970 1971 1972 1973
    case SML_TIME_STAMP_HOURS: {
      *ts = (int64_t)(*ts * 3600 * 1e9);
      break;
    }
    case SML_TIME_STAMP_MINUTES: {
      *ts = (int64_t)(*ts * 60 * 1e9);
      break;
    }
1974
    case SML_TIME_STAMP_SECONDS: {
1975
      *ts = (int64_t)(*ts * 1e9);
1976
      break;
1977 1978
    }
    case SML_TIME_STAMP_MILLI_SECONDS: {
1979
      *ts = convertTimePrecision(*ts, TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_NANO);
1980
      break;
1981 1982
    }
    case SML_TIME_STAMP_MICRO_SECONDS: {
1983
      *ts = convertTimePrecision(*ts, TSDB_TIME_PRECISION_MICRO, TSDB_TIME_PRECISION_NANO);
1984
      break;
1985 1986
    }
    case SML_TIME_STAMP_NANO_SECONDS: {
1987
      *ts = *ts * 1;
1988 1989 1990
      break;
    }
    default: {
1991
      return TSDB_CODE_TSC_INVALID_TIME_STAMP;
1992
    }
1993
  }
1994
  return TSDB_CODE_SUCCESS;
1995 1996
}

1997 1998
int32_t convertSmlTimeStamp(TAOS_SML_KV *pVal, char *value,
                            uint16_t len, SSmlLinesInfo* info) {
1999
  int32_t ret;
2000
  SMLTimeStampType type = SML_TIME_STAMP_NOW;
2001
  int64_t tsVal;
2002

2003 2004 2005
  ret = isTimeStamp(value, len, &type, info);
  if (ret != TSDB_CODE_SUCCESS) {
    return ret;
2006 2007
  }

2008
  ret = getTimeStampValue(value, len, type, &tsVal, info);
2009
  if (ret != TSDB_CODE_SUCCESS) {
2010 2011
    return ret;
  }
2012
  tscDebug("SML:0x%"PRIx64"Timestamp after conversion:%"PRId64, info->id, tsVal);
2013 2014 2015 2016 2017 2018 2019 2020

  pVal->type = TSDB_DATA_TYPE_TIMESTAMP;
  pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
  pVal->value = calloc(pVal->length, 1);
  memcpy(pVal->value, &tsVal, pVal->length);
  return TSDB_CODE_SUCCESS;
}

2021
static int32_t parseSmlTimeStamp(TAOS_SML_KV **pTS, const char **index, SSmlLinesInfo* info) {
2022
  const char *start, *cur;
2023 2024
  int32_t ret = TSDB_CODE_SUCCESS;
  int len = 0;
2025
  char key[] = "ts";
2026
  char *value = NULL;
2027

2028
  start = cur = *index;
2029
  *pTS = calloc(1, sizeof(TAOS_SML_KV));
2030

2031
  while(*cur != '\0') {
2032 2033 2034 2035
    cur++;
    len++;
  }

2036
  if (len > 0) {
2037
    value = calloc(len + 1, 1);
2038 2039 2040
    memcpy(value, start, len);
  }

2041
  ret = convertSmlTimeStamp(*pTS, value, len, info);
2042
  if (ret) {
2043
    free(value);
2044 2045
    free(*pTS);
    return ret;
2046
  }
2047
  free(value);
2048

2049 2050 2051
  (*pTS)->key = calloc(sizeof(key), 1);
  memcpy((*pTS)->key, key, sizeof(key));
  return ret;
2052
}
2053

2054
bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info) {
2055
  char *val = NULL;
2056
  val = taosHashGet(pHash, key, strlen(key));
2057
  if (val) {
2058
    tscError("SML:0x%"PRIx64" Duplicate key detected:%s", info->id, key);
2059 2060 2061 2062
    return true;
  }

  uint8_t dummy_val = 0;
2063
  taosHashPut(pHash, key, strlen(key), &dummy_val, sizeof(uint8_t));
2064 2065 2066 2067

  return false;
}

2068
static int32_t parseSmlKey(TAOS_SML_KV *pKV, const char **index, SHashObj *pHash, SSmlLinesInfo* info) {
2069
  const char *cur = *index;
2070
  char key[TSDB_COL_NAME_LEN + 1];  // +1 to avoid key[len] over write
2071
  int16_t len = 0;
2072 2073

  while (*cur != '\0') {
2074
    if (len > TSDB_COL_NAME_LEN - 1) {
2075
      tscError("SML:0x%"PRIx64" Key field cannot exceeds %d characters", info->id, TSDB_COL_NAME_LEN - 1);
2076
      return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
2077 2078 2079 2080 2081 2082 2083
    }
    //unescaped '=' identifies a tag key
    if (*cur == '=' && *(cur - 1) != '\\') {
      break;
    }
    //Escape special character
    if (*cur == '\\') {
2084
      escapeSpecialCharacter(2, &cur);
2085
    }
2086 2087 2088 2089
    key[len] = *cur;
    cur++;
    len++;
  }
2090 2091 2092
  if (len == 0) {
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
  }
2093
  key[len] = '\0';
2094

2095
  if (checkDuplicateKey(key, pHash, info)) {
2096 2097 2098
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
  }

2099
  pKV->key = calloc(len + TS_ESCAPE_CHAR_SIZE + 1, 1);
2100
  memcpy(pKV->key, key, len + 1);
2101
  addEscapeCharToString(pKV->key, len);
2102
  tscDebug("SML:0x%"PRIx64" Key:%s|len:%d", info->id, pKV->key, len);
2103
  *index = cur + 1;
2104
  return TSDB_CODE_SUCCESS;
2105
}
2106

2107

2108
static int32_t parseSmlValue(TAOS_SML_KV *pKV, const char **index,
2109
                          bool *is_last_kv, SSmlLinesInfo* info, bool isTag) {
2110
  const char *start, *cur;
2111
  int32_t ret = TSDB_CODE_SUCCESS;
2112
  char *value = NULL;
2113
  int16_t len = 0;
2114
  bool searchQuote = false;
2115 2116
  start = cur = *index;

2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129
  //if field value is string
  if (!isTag) {
    if (*cur == '"') {
      searchQuote = true;
      cur += 1;
      len += 1;
    } else if (*cur == 'L' && *(cur + 1) == '"') {
      searchQuote = true;
      cur += 2;
      len += 2;
    }
  }

2130 2131
  while (1) {
    // unescaped ',' or ' ' or '\0' identifies a value
2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146
    if (((*cur == ',' || *cur == ' ' ) && *(cur - 1) != '\\') || *cur == '\0') {
      if (searchQuote == true) {
        //first quote ignored while searching
        if (*(cur - 1) == '"' && len != 1 && len != 2) {
          *is_last_kv = (*cur == ' ' || *cur == '\0') ? true : false;
          break;
        } else if (*cur == '\0') {
          ret = TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
          goto error;
        } else {
          cur++;
          len++;
          continue;
        }
      }
2147 2148
      //unescaped ' ' or '\0' indicates end of value
      *is_last_kv = (*cur == ' ' || *cur == '\0') ? true : false;
2149 2150 2151 2152 2153 2154
      if (*cur == ' ' && *(cur + 1) == ' ') {
        cur++;
        continue;
      } else {
        break;
      }
2155 2156 2157
    }
    //Escape special character
    if (*cur == '\\') {
2158
      escapeSpecialCharacter(isTag ? 2 : 3, &cur);
2159
      len++;
2160
    }
2161 2162 2163
    cur++;
    len++;
  }
2164 2165 2166 2167 2168
  if (len == 0) {
    free(pKV->key);
    pKV->key = NULL;
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
  }
2169

2170 2171 2172
  value = calloc(len + 1, 1);
  memcpy(value, start, len);
  value[len] = '\0';
2173
  if (!convertSmlValueType(pKV, value, len, info, isTag)) {
2174 2175
    tscError("SML:0x%"PRIx64" Failed to convert sml value string(%s) to any type",
            info->id, value);
2176
    free(value);
2177 2178
    ret = TSDB_CODE_TSC_INVALID_VALUE;
    goto error;
2179
  }
2180
  free(value);
2181

2182
  *index = (*cur == '\0') ? cur : cur + 1;
2183 2184 2185 2186 2187 2188 2189
  return ret;

error:
  //free previous alocated key field
  free(pKV->key);
  pKV->key = NULL;
  return ret;
2190 2191 2192
}

static int32_t parseSmlMeasurement(TAOS_SML_DATA_POINT *pSml, const char **index,
2193
                                   uint8_t *has_tags, SSmlLinesInfo* info) {
2194
  const char *cur = *index;
2195
  int16_t len = 0;
2196

2197
  pSml->stableName = calloc(TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE, 1);
2198 2199 2200
  if (pSml->stableName == NULL){
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
2201

2202
  while (*cur != '\0') {
2203
    if (len > TSDB_TABLE_NAME_LEN - 1) {
2204
      tscError("SML:0x%"PRIx64" Measurement field cannot exceeds %d characters", info->id, TSDB_TABLE_NAME_LEN - 1);
2205
      free(pSml->stableName);
2206
      pSml->stableName = NULL;
2207
      return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
2208 2209 2210 2211 2212 2213 2214 2215
    }
    //first unescaped comma or space identifies measurement
    //if space detected first, meaning no tag in the input
    if (*cur == ',' && *(cur - 1) != '\\') {
      *has_tags = 1;
      break;
    }
    if (*cur == ' ' && *(cur - 1) != '\\') {
2216 2217 2218 2219 2220 2221 2222
      if (*(cur + 1) != ' ') {
        break;
      }
      else {
        cur++;
        continue;
      }
2223 2224 2225 2226 2227
    }
    //Comma, Space, Backslash needs to be escaped if any
    if (*cur == '\\') {
      escapeSpecialCharacter(1, &cur);
    }
2228
    pSml->stableName[len] = *cur;
2229 2230 2231
    cur++;
    len++;
  }
2232 2233 2234 2235 2236
  if (len == 0) {
    free(pSml->stableName);
    pSml->stableName = NULL;
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
  }
2237
  addEscapeCharToString(pSml->stableName, len);
2238
  *index = cur + 1;
2239
  tscDebug("SML:0x%"PRIx64" Stable name in measurement:%s|len:%d", info->id, pSml->stableName, len);
2240 2241

  return TSDB_CODE_SUCCESS;
2242
}
2243

2244
//Table name can only contain digits(0-9),alphebet(a-z),underscore(_)
2245 2246
int32_t isValidChildTableName(const char *pTbName, int16_t len, SSmlLinesInfo* info) {
  if (len > TSDB_TABLE_NAME_LEN - 1) {
2247
    tscError("SML:0x%"PRIx64" child table name cannot exceeds %d characters", info->id, TSDB_TABLE_NAME_LEN - 1);
2248 2249
    return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
  }
2250 2251
  const char *cur = pTbName;
  for (int i = 0; i < len; ++i) {
2252
    if(!isdigit(cur[i]) && !isalpha(cur[i]) && (cur[i] != '_')) {
2253 2254 2255 2256 2257 2258 2259
      return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
    }
  }
  return TSDB_CODE_SUCCESS;
}


2260
static int32_t parseSmlKvPairs(TAOS_SML_KV **pKVs, int *num_kvs,
2261
                               const char **index, bool isField,
2262 2263
                               TAOS_SML_DATA_POINT* smlData, SHashObj *pHash,
                               SSmlLinesInfo* info) {
2264
  const char *cur = *index;
2265
  int32_t ret = TSDB_CODE_SUCCESS;
2266 2267
  TAOS_SML_KV *pkv;
  bool is_last_kv = false;
2268

2269
  int32_t capacity = 0;
2270
  if (isField) {
2271 2272 2273
    capacity = 64;
    *pKVs = calloc(capacity, sizeof(TAOS_SML_KV));
    // leave space for timestamp;
2274 2275
    pkv = *pKVs;
    pkv++;
2276 2277 2278
  } else {
    capacity = 8;
    *pKVs = calloc(capacity, sizeof(TAOS_SML_KV));
2279 2280
    pkv = *pKVs;
  }
2281

2282 2283 2284 2285 2286 2287 2288
  size_t childTableNameLen = strlen(tsSmlChildTableName);
  char childTableName[TSDB_TABLE_NAME_LEN + TS_ESCAPE_CHAR_SIZE] = {0};
  if (childTableNameLen != 0) {
    memcpy(childTableName, tsSmlChildTableName, childTableNameLen);
    addEscapeCharToString(childTableName, (int32_t)(childTableNameLen));
  }

2289
  while (*cur != '\0') {
2290
    ret = parseSmlKey(pkv, &cur, pHash, info);
2291
    if (ret) {
2292
      tscError("SML:0x%"PRIx64" Unable to parse key", info->id);
2293 2294
      goto error;
    }
2295
    ret = parseSmlValue(pkv, &cur, &is_last_kv, info, !isField);
2296
    if (ret) {
2297
      tscError("SML:0x%"PRIx64" Unable to parse value", info->id);
2298 2299
      goto error;
    }
2300 2301

    if (!isField && childTableNameLen != 0 && strcasecmp(pkv->key, childTableName) == 0)  {
2302
      smlData->childTableName = malloc(pkv->length + TS_ESCAPE_CHAR_SIZE + 1);
2303
      memcpy(smlData->childTableName, pkv->value, pkv->length);
2304
      addEscapeCharToString(smlData->childTableName, (int32_t)pkv->length);
2305 2306 2307 2308 2309
      free(pkv->key);
      free(pkv->value);
    } else {
      *num_kvs += 1;
    }
2310
    if (is_last_kv) {
2311 2312 2313 2314 2315
      goto done;
    }

    //reallocate addtional memory for more kvs
    TAOS_SML_KV *more_kvs = NULL;
2316

2317
    if (isField) {
2318 2319
      if ((*num_kvs + 2) > capacity) {
        capacity *= 3; capacity /= 2;
2320 2321 2322
        more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV));
      } else {
        more_kvs = *pKVs;
2323
      }
2324
    } else {
2325 2326
      if ((*num_kvs + 1) > capacity) {
        capacity *= 3; capacity /= 2;
2327 2328 2329
        more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV));
      } else {
        more_kvs = *pKVs;
2330
      }
2331
    }
2332

2333
    if (!more_kvs) {
2334 2335
      goto error;
    }
2336 2337 2338 2339 2340 2341 2342 2343 2344
    *pKVs = more_kvs;
    //move pKV points to next TAOS_SML_KV block
    if (isField) {
      pkv = *pKVs + *num_kvs + 1;
    } else {
      pkv = *pKVs + *num_kvs;
    }
  }
  goto done;
2345

2346
error:
2347
  return ret;
2348
done:
2349
  *index = cur;
2350
  return ret;
2351
}
2352

2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365
static void moveTimeStampToFirstKv(TAOS_SML_DATA_POINT** smlData, TAOS_SML_KV *ts) {
  TAOS_SML_KV* tsField = (*smlData)->fields;
  tsField->length = ts->length;
  tsField->type = ts->type;
  tsField->value = malloc(ts->length);
  tsField->key = malloc(strlen(ts->key) + 1);
  memcpy(tsField->key, ts->key, strlen(ts->key) + 1);
  memcpy(tsField->value, ts->value, ts->length);
  (*smlData)->fieldNum = (*smlData)->fieldNum + 1;

  free(ts->key);
  free(ts->value);
  free(ts);
2366 2367
}

2368
int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData, SSmlLinesInfo* info) {
2369
  const char* index = sql;
2370
  int32_t ret = TSDB_CODE_SUCCESS;
2371 2372
  uint8_t has_tags = 0;
  TAOS_SML_KV *timestamp = NULL;
2373
  SHashObj *keyHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
2374

2375
  ret = parseSmlMeasurement(smlData, &index, &has_tags, info);
2376
  if (ret) {
2377
    tscError("SML:0x%"PRIx64" Unable to parse measurement", info->id);
2378
    taosHashCleanup(keyHashTable);
2379
    return ret;
2380
  }
2381
  tscDebug("SML:0x%"PRIx64" Parse measurement finished, has_tags:%d", info->id, has_tags);
2382 2383 2384

  //Parse Tags
  if (has_tags) {
2385
    ret = parseSmlKvPairs(&smlData->tags, &smlData->tagNum, &index, false, smlData, keyHashTable, info);
2386
    if (ret) {
2387
      tscError("SML:0x%"PRIx64" Unable to parse tag", info->id);
2388
      taosHashCleanup(keyHashTable);
2389 2390
      return ret;
    }
2391
  }
2392
  tscDebug("SML:0x%"PRIx64" Parse tags finished, num of tags:%d", info->id, smlData->tagNum);
2393 2394

  //Parse fields
2395
  ret = parseSmlKvPairs(&smlData->fields, &smlData->fieldNum, &index, true, smlData, keyHashTable, info);
2396
  if (ret) {
2397
    tscError("SML:0x%"PRIx64" Unable to parse field", info->id);
2398
    taosHashCleanup(keyHashTable);
2399
    return ret;
2400
  }
2401
  tscDebug("SML:0x%"PRIx64" Parse fields finished, num of fields:%d", info->id, smlData->fieldNum);
2402
  taosHashCleanup(keyHashTable);
2403

2404
  //Parse timestamp
2405
  ret = parseSmlTimeStamp(&timestamp, &index, info);
2406
  if (ret) {
2407
    tscError("SML:0x%"PRIx64" Unable to parse timestamp", info->id);
2408
    return ret;
2409
  }
2410
  moveTimeStampToFirstKv(&smlData, timestamp);
2411
  tscDebug("SML:0x%"PRIx64" Parse timestamp finished", info->id);
2412

2413
  return TSDB_CODE_SUCCESS;
2414 2415
}

2416
//=========================================================================
2417

S
shenglian zhou 已提交
2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432
void destroySmlDataPoint(TAOS_SML_DATA_POINT* point) {
  for (int i=0; i<point->tagNum; ++i) {
    free((point->tags+i)->key);
    free((point->tags+i)->value);
  }
  free(point->tags);
  for (int i=0; i<point->fieldNum; ++i) {
    free((point->fields+i)->key);
    free((point->fields+i)->value);
  }
  free(point->fields);
  free(point->stableName);
  free(point->childTableName);
}

S
shenglian zhou 已提交
2433
int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines, SSmlLinesInfo* info) {
2434
  for (int32_t i = 0; i < numLines; ++i) {
2435
    TAOS_SML_DATA_POINT point = {0};
2436
    int32_t code = tscParseLine(lines[i], &point, info);
2437
    if (code != TSDB_CODE_SUCCESS) {
S
shenglian zhou 已提交
2438
      tscError("SML:0x%"PRIx64" data point line parse failed. line %d : %s", info->id, i, lines[i]);
S
shenglian zhou 已提交
2439
      destroySmlDataPoint(&point);
2440
      return code;
2441
    } else {
S
shenglian zhou 已提交
2442
      tscDebug("SML:0x%"PRIx64" data point line parse success. line %d", info->id, i);
2443 2444
    }

2445 2446
    taosArrayPush(points, &point);
  }
2447
  return TSDB_CODE_SUCCESS;
2448 2449
}

2450
int taos_insert_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType protocol, SMLTimeStampType tsType, int *affectedRows) {
2451
  int32_t code = 0;
2452

2453
  SSmlLinesInfo* info = tcalloc(1, sizeof(SSmlLinesInfo));
S
shenglian zhou 已提交
2454
  info->id = genLinesSmlId();
2455 2456
  info->tsType = tsType;
  info->protocol = protocol;
S
shenglian zhou 已提交
2457

2458
  if (numLines <= 0 || numLines > 65536) {
S
shenglian zhou 已提交
2459
    tscError("SML:0x%"PRIx64" taos_insert_lines numLines should be between 1 and 65536. numLines: %d", info->id, numLines);
2460
    tfree(info);
2461 2462 2463 2464 2465 2466
    code = TSDB_CODE_TSC_APP_ERROR;
    return code;
  }

  for (int i = 0; i < numLines; ++i) {
    if (lines[i] == NULL) {
S
shenglian zhou 已提交
2467
      tscError("SML:0x%"PRIx64" taos_insert_lines line %d is NULL", info->id, i);
2468
      tfree(info);
2469 2470 2471 2472 2473
      code = TSDB_CODE_TSC_APP_ERROR;
      return code;
    }
  }

2474
  SArray* lpPoints = taosArrayInit(numLines, sizeof(TAOS_SML_DATA_POINT));
2475
  if (lpPoints == NULL) {
S
shenglian zhou 已提交
2476
    tscError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id);
2477
    tfree(info);
2478 2479
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
2480

S
shenglian zhou 已提交
2481 2482
  tscDebug("SML:0x%"PRIx64" taos_insert_lines begin inserting %d lines, first line: %s", info->id, numLines, lines[0]);
  code = tscParseLines(lines, numLines, lpPoints, NULL, info);
S
shenglian zhou 已提交
2483 2484
  size_t numPoints = taosArrayGetSize(lpPoints);

2485 2486
  if (code != 0) {
    goto cleanup;
2487 2488
  }

2489
  TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints);
2490
  code = tscSmlInsert(taos, points, (int)numPoints, info);
2491
  if (code != 0) {
S
shenglian zhou 已提交
2492
    tscError("SML:0x%"PRIx64" taos_sml_insert error: %s", info->id, tstrerror((code)));
2493
  }
2494 2495 2496
  if (affectedRows != NULL) {
    *affectedRows = info->affectedRows;
  }
S
Shenglian Zhou 已提交
2497

2498
cleanup:
S
shenglian zhou 已提交
2499
  tscDebug("SML:0x%"PRIx64" taos_insert_lines finish inserting %d lines. code: %d", info->id, numLines, code);
2500 2501
  points = TARRAY_GET_START(lpPoints);
  numPoints = taosArrayGetSize(lpPoints);
S
Shenglian Zhou 已提交
2502 2503 2504
  for (int i=0; i<numPoints; ++i) {
    destroySmlDataPoint(points+i);
  }
2505 2506

  taosArrayDestroy(lpPoints);
S
shenglian zhou 已提交
2507

2508
  tfree(info);
2509
  return code;
2510 2511
}

G
Ganlin Zhao 已提交
2512 2513
static int32_t convertPrecisionType(int precision, SMLTimeStampType *tsType) {
  switch (precision) {
G
Ganlin Zhao 已提交
2514
    case TSDB_SML_TIMESTAMP_NOT_CONFIGURED:
G
Ganlin Zhao 已提交
2515 2516
      *tsType = SML_TIME_STAMP_NOT_CONFIGURED;
      break;
G
Ganlin Zhao 已提交
2517
    case TSDB_SML_TIMESTAMP_HOURS:
G
Ganlin Zhao 已提交
2518 2519
      *tsType = SML_TIME_STAMP_HOURS;
      break;
G
Ganlin Zhao 已提交
2520
    case TSDB_SML_TIMESTAMP_MILLI_SECONDS:
G
Ganlin Zhao 已提交
2521 2522
      *tsType = SML_TIME_STAMP_MILLI_SECONDS;
      break;
G
Ganlin Zhao 已提交
2523
    case TSDB_SML_TIMESTAMP_NANO_SECONDS:
G
Ganlin Zhao 已提交
2524 2525
      *tsType = SML_TIME_STAMP_NANO_SECONDS;
      break;
G
Ganlin Zhao 已提交
2526
    case TSDB_SML_TIMESTAMP_MICRO_SECONDS:
G
Ganlin Zhao 已提交
2527 2528
      *tsType = SML_TIME_STAMP_MICRO_SECONDS;
      break;
G
Ganlin Zhao 已提交
2529
    case TSDB_SML_TIMESTAMP_SECONDS:
G
Ganlin Zhao 已提交
2530 2531
      *tsType = SML_TIME_STAMP_SECONDS;
      break;
G
Ganlin Zhao 已提交
2532
    case TSDB_SML_TIMESTAMP_MINUTES:
G
Ganlin Zhao 已提交
2533 2534 2535 2536
      *tsType = SML_TIME_STAMP_MINUTES;
      break;
    default:
      return TSDB_CODE_TSC_INVALID_PRECISION_TYPE;
2537 2538
  }

G
Ganlin Zhao 已提交
2539 2540 2541
  return TSDB_CODE_SUCCESS;
}

G
Ganlin Zhao 已提交
2542
//make a dummy SSqlObj
G
Ganlin Zhao 已提交
2543
static SSqlObj* createSmlQueryObj(TAOS* taos, int32_t affected_rows, int32_t code) {
G
Ganlin Zhao 已提交
2544 2545 2546
  SSqlObj *pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
  if (pNew == NULL) {
    return NULL;
2547
  }
G
Ganlin Zhao 已提交
2548
  pNew->signature = pNew;
G
Ganlin Zhao 已提交
2549
  pNew->pTscObj = taos;
2550
  pNew->fp = NULL;
2551

G
Ganlin Zhao 已提交
2552 2553
  tsem_init(&pNew->rspSem, 0, 0);
  registerSqlObj(pNew);
2554

G
Ganlin Zhao 已提交
2555 2556 2557
  pNew->res.numOfRows = affected_rows;
  pNew->res.code = code;

G
Ganlin Zhao 已提交
2558

G
Ganlin Zhao 已提交
2559
  return pNew;
2560 2561
}

G
Ganlin Zhao 已提交
2562

2563
/**
2564
 * taos_schemaless_insert() parse and insert data points into database according to
2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583
 * different protocol.
 *
 * @param $lines input array may contain multiple lines, each line indicates a data point.
 *               If protocol=2 is used input array should contain single JSON
 *               string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
 *               multiple data points in JSON format, should include them in $JSON_string
 *               as a JSON array.
 * @param $numLines indicates how many data points in $lines.
 *                  If protocol = 2 is used this param will be ignored as $lines should
 *                  contain single JSON string.
 * @param $protocol indicates which protocol to use for parsing:
 *                  0 - influxDB line protocol
 *                  1 - OpenTSDB telnet line protocol
 *                  2 - OpenTSDB JSON format protocol
 * @return return zero for successful insertion. Otherwise return none-zero error code of
 *         failure reason.
 *
 */

G
Ganlin Zhao 已提交
2584 2585 2586
TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) {
  int code = TSDB_CODE_SUCCESS;
  int affected_rows = 0;
2587
  SMLTimeStampType tsType = SML_TIME_STAMP_NOW;
2588

G
Ganlin Zhao 已提交
2589
  if (protocol == TSDB_SML_LINE_PROTOCOL) {
G
Ganlin Zhao 已提交
2590
    code = convertPrecisionType(precision, &tsType);
2591
    if (code != TSDB_CODE_SUCCESS) {
G
Ganlin Zhao 已提交
2592
      return NULL;
2593 2594 2595
    }
  }

2596
  switch (protocol) {
G
Ganlin Zhao 已提交
2597
    case TSDB_SML_LINE_PROTOCOL:
G
Ganlin Zhao 已提交
2598
      code = taos_insert_lines(taos, lines, numLines, protocol, tsType, &affected_rows);
2599
      break;
G
Ganlin Zhao 已提交
2600
    case TSDB_SML_TELNET_PROTOCOL:
G
Ganlin Zhao 已提交
2601
      code = taos_insert_telnet_lines(taos, lines, numLines, protocol, tsType, &affected_rows);
2602
      break;
G
Ganlin Zhao 已提交
2603
    case TSDB_SML_JSON_PROTOCOL:
G
Ganlin Zhao 已提交
2604
      code = taos_insert_json_payload(taos, *lines, protocol, tsType, &affected_rows);
2605 2606 2607 2608 2609 2610
      break;
    default:
      code = TSDB_CODE_TSC_INVALID_PROTOCOL_TYPE;
      break;
  }

2611

G
Ganlin Zhao 已提交
2612
  SSqlObj *pSql = createSmlQueryObj(taos, affected_rows, code);
G
Ganlin Zhao 已提交
2613 2614

  return (TAOS_RES*)pSql;
2615
}