tscParseLineProtocol.c 57.5 KB
Newer Older
S
shenglian zhou 已提交
1 2 3 4
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
5

S
shenglian zhou 已提交
6 7 8 9 10 11 12 13
#include "os.h"
#include "osString.h"
#include "ttype.h"
#include "tmd5.h"
#include "tstrbuild.h"
#include "tname.h"
#include "hash.h"
#include "tskiplist.h"
14

S
shenglian zhou 已提交
15
#include "tscUtil.h"
16 17
#include "tsclient.h"
#include "tscLog.h"
S
shenglian zhou 已提交
18

19
#include "taos.h"
20

S
shenglian zhou 已提交
21 22 23 24 25 26
typedef struct  {
  char sTableName[TSDB_TABLE_NAME_LEN];
  SHashObj* tagHash;
  SHashObj* fieldHash;
  SArray* tags; //SArray<SSchema>
  SArray* fields; //SArray<SSchema>
S
shenglian zhou 已提交
27
  uint8_t precision;
S
shenglian zhou 已提交
28 29
} SSmlSTableSchema;

S
shenglian zhou 已提交
30 31 32 33 34
typedef struct {
  char* key;
  uint8_t type;
  int16_t length;
  char* value;
35 36

  //===================================
37
  size_t fieldSchemaIdx;
S
shenglian zhou 已提交
38 39 40 41 42 43 44 45 46 47 48 49 50
} TAOS_SML_KV;

typedef struct {
  char* stableName;

  char* childTableName;
  TAOS_SML_KV* tags;
  int tagNum;

  // first kv must be timestamp
  TAOS_SML_KV* fields;
  int fieldNum;

S
shenglian zhou 已提交
51
  //================================
52
  size_t schemaIdx;
S
shenglian zhou 已提交
53 54
} TAOS_SML_DATA_POINT;

55 56 57 58 59 60 61 62
typedef enum {
  SML_TIME_STAMP_NOW,
  SML_TIME_STAMP_SECONDS,
  SML_TIME_STAMP_MILLI_SECONDS,
  SML_TIME_STAMP_MICRO_SECONDS,
  SML_TIME_STAMP_NANO_SECONDS
} SMLTimeStampType;

63 64
//=================================================================================================

S
shenglian zhou 已提交
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
int compareSmlColKv(const void* p1, const void* p2) {
  TAOS_SML_KV* kv1 = (TAOS_SML_KV*)p1;
  TAOS_SML_KV* kv2 = (TAOS_SML_KV*)p2;
  int kvLen1 = (int)strlen(kv1->key);
  int kvLen2 = (int)strlen(kv2->key);
  int res = strncasecmp(kv1->key, kv2->key, MIN(kvLen1, kvLen2));
  if (res != 0) {
    return res;
  } else {
    return kvLen1-kvLen2;
  }
}

typedef enum {
  SCHEMA_ACTION_CREATE_STABLE,
  SCHEMA_ACTION_ADD_COLUMN,
  SCHEMA_ACTION_ADD_TAG,
  SCHEMA_ACTION_CHANGE_COLUMN_SIZE,
  SCHEMA_ACTION_CHANGE_TAG_SIZE,
} ESchemaAction;

typedef struct {
  char sTableName[TSDB_TABLE_NAME_LEN];
  SArray* tags; //SArray<SSchema>
  SArray* fields; //SArray<SSchema>
} SCreateSTableActionInfo;

typedef struct {
  char sTableName[TSDB_TABLE_NAME_LEN];
  SSchema* field;
} SAlterSTableActionInfo;

typedef struct {
  ESchemaAction action;
  union {
    SCreateSTableActionInfo createSTable;
    SAlterSTableActionInfo alterSTable;
  };
} SSchemaAction;

S
shenglian zhou 已提交
105
static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes) {
S
shenglian zhou 已提交
106 107 108 109 110 111
  if (!IS_VAR_DATA_TYPE(kv->type)) {
    *bytes = tDataTypes[kv->type].bytes;
  } else {
    if (kv->type == TSDB_DATA_TYPE_NCHAR) {
      char* ucs = malloc(kv->length * TSDB_NCHAR_SIZE + 1);
      int32_t bytesNeeded = 0;
S
Shenglian Zhou 已提交
112 113 114 115 116 117
      bool succ = taosMbsToUcs4(kv->value, kv->length, ucs, kv->length * TSDB_NCHAR_SIZE, &bytesNeeded);
      if (!succ) {
        free(ucs);
        tscError("convert nchar string to UCS4_LE failed:%s", kv->value);
        return TSDB_CODE_TSC_INVALID_VALUE;
      }
S
shenglian zhou 已提交
118 119 120 121 122 123 124 125 126
      free(ucs);
      *bytes =  bytesNeeded + VARSTR_HEADER_SIZE;
    } else if (kv->type == TSDB_DATA_TYPE_BINARY) {
      *bytes = kv->length + VARSTR_HEADER_SIZE;
    }
  }
  return 0;
}

S
shenglian zhou 已提交
127
static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* array) {
S
shenglian zhou 已提交
128
  SSchema* pField = NULL;
129 130
  size_t* pFieldIdx = taosHashGet(hash, smlKv->key, strlen(smlKv->key));
  size_t fieldIdx = -1;
S
Shenglian Zhou 已提交
131
  int32_t code = 0;
132 133 134
  if (pFieldIdx) {
    fieldIdx = *pFieldIdx;
    pField = taosArrayGet(array, fieldIdx);
S
shenglian zhou 已提交
135 136

    if (pField->type != smlKv->type) {
S
Shenglian Zhou 已提交
137 138
      tscError("type mismatch. key %s, type %d. type before %d", smlKv->key, smlKv->type, pField->type);
      return TSDB_CODE_TSC_INVALID_VALUE;
S
shenglian zhou 已提交
139 140 141
    }

    int32_t bytes = 0;
S
Shenglian Zhou 已提交
142 143 144 145
    code = getFieldBytesFromSmlKv(smlKv, &bytes);
    if (code != 0) {
      return code;
    }
S
shenglian zhou 已提交
146 147 148
    pField->bytes = MAX(pField->bytes, bytes);

  } else {
S
shenglian zhou 已提交
149
    SSchema field = {0};
S
shenglian zhou 已提交
150 151 152 153 154 155
    size_t tagKeyLen = strlen(smlKv->key);
    strncpy(field.name, smlKv->key, tagKeyLen);
    field.name[tagKeyLen] = '\0';
    field.type = smlKv->type;

    int32_t bytes = 0;
S
Shenglian Zhou 已提交
156 157 158 159
    code = getFieldBytesFromSmlKv(smlKv, &bytes);
    if (code != 0) {
      return code;
    }
S
shenglian zhou 已提交
160 161 162
    field.bytes = bytes;

    pField = taosArrayPush(array, &field);
163 164
    fieldIdx = taosArrayGetSize(array) - 1;
    taosHashPut(hash, field.name, tagKeyLen, &fieldIdx, sizeof(fieldIdx));
S
shenglian zhou 已提交
165
  }
166

167
  smlKv->fieldSchemaIdx = fieldIdx;
168

S
shenglian zhou 已提交
169 170 171
  return 0;
}

S
shenglian zhou 已提交
172
static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, SArray* stableSchemas) {
S
Shenglian Zhou 已提交
173
  int32_t code = 0;
S
shenglian zhou 已提交
174 175 176 177 178 179
  SHashObj* sname2shema = taosHashInit(32,
                                       taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);

  for (int i = 0; i < numPoint; ++i) {
    TAOS_SML_DATA_POINT* point = &points[i];
    size_t stableNameLen = strlen(point->stableName);
180
    size_t* pStableIdx = taosHashGet(sname2shema, point->stableName, stableNameLen);
S
shenglian zhou 已提交
181
    SSmlSTableSchema* pStableSchema = NULL;
182 183 184 185
    size_t stableIdx = -1;
    if (pStableIdx) {
      pStableSchema= taosArrayGet(stableSchemas, *pStableIdx);
      stableIdx = *pStableIdx;
S
shenglian zhou 已提交
186 187 188 189 190 191 192 193 194 195
    } else {
      SSmlSTableSchema schema;
      strncpy(schema.sTableName, point->stableName, stableNameLen);
      schema.sTableName[stableNameLen] = '\0';
      schema.fields = taosArrayInit(64, sizeof(SSchema));
      schema.tags = taosArrayInit(8, sizeof(SSchema));
      schema.tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
      schema.fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);

      pStableSchema = taosArrayPush(stableSchemas, &schema);
196 197
      stableIdx = taosArrayGetSize(stableSchemas) - 1;
      taosHashPut(sname2shema, schema.sTableName, stableNameLen, &stableIdx, sizeof(size_t));
S
shenglian zhou 已提交
198 199 200 201
    }

    for (int j = 0; j < point->tagNum; ++j) {
      TAOS_SML_KV* tagKv = point->tags + j;
S
Shenglian Zhou 已提交
202 203 204 205 206
      code = buildSmlKvSchema(tagKv, pStableSchema->tagHash, pStableSchema->tags);
      if (code != 0) {
        tscError("build data point schema failed. point no.: %d, tag key: %s", i, tagKv->key);
        return code;
      }
S
shenglian zhou 已提交
207 208 209 210
    }

    for (int j = 0; j < point->fieldNum; ++j) {
      TAOS_SML_KV* fieldKv = point->fields + j;
S
Shenglian Zhou 已提交
211 212 213 214 215
      code = buildSmlKvSchema(fieldKv, pStableSchema->fieldHash, pStableSchema->fields);
      if (code != 0) {
        tscError("build data point schema failed. point no.: %d, tag key: %s", i, fieldKv->key);
        return code;
      }
S
shenglian zhou 已提交
216 217
    }

218
    point->schemaIdx = stableIdx;
S
shenglian zhou 已提交
219 220 221 222 223 224 225 226 227 228
  }

  size_t numStables = taosArrayGetSize(stableSchemas);
  for (int32_t i = 0; i < numStables; ++i) {
    SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i);
    taosHashCleanup(schema->tagHash);
    taosHashCleanup(schema->fieldHash);
  }
  taosHashCleanup(sname2shema);

S
Shenglian Zhou 已提交
229 230 231 232 233 234 235
  tscDebug("build point schema succeed. num of super table: %zu", numStables);
  for (int32_t i = 0; i < numStables; ++i) {
    SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i);
    tscDebug("\ttable name: %s, tags number: %zu, fields number: %zu", schema->sTableName,
             taosArrayGetSize(schema->tags), taosArrayGetSize(schema->fields));
  }

S
shenglian zhou 已提交
236 237 238
  return 0;
}

239
static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, SArray* dbAttrArray, bool isTag, char sTableName[],
S
shenglian zhou 已提交
240
                                       SSchemaAction* action, bool* actionNeeded) {
241 242 243 244
  size_t* pDbIndex = taosHashGet(dbAttrHash, pointColField->name, strlen(pointColField->name));
  if (pDbIndex) {
    SSchema* dbAttr = taosArrayGet(dbAttrArray, *pDbIndex);
    assert(strcasecmp(dbAttr->name, pointColField->name) == 0);
S
shenglian zhou 已提交
245
    if (pointColField->type != dbAttr->type) {
S
Shenglian Zhou 已提交
246 247 248
      tscError("point type and db type mismatch. key: %s. point type: %d, db type: %d", pointColField->name,
               pointColField->type, dbAttr->type);
      return TSDB_CODE_TSC_INVALID_VALUE;
S
shenglian zhou 已提交
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
    }

    if (IS_VAR_DATA_TYPE(pointColField->type) && (pointColField->bytes > dbAttr->bytes)) {
      if (isTag) {
        action->action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
      } else {
        action->action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
      }
      memset(&action->alterSTable, 0,  sizeof(SAlterSTableActionInfo));
      memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN);
      action->alterSTable.field = pointColField;
      *actionNeeded = true;
    }
  } else {
    if (isTag) {
      action->action = SCHEMA_ACTION_ADD_TAG;
    } else {
      action->action = SCHEMA_ACTION_ADD_COLUMN;
    }
    memset(&action->alterSTable, 0, sizeof(SAlterSTableActionInfo));
    memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN);
    action->alterSTable.field = pointColField;
    *actionNeeded = true;
  }
S
Shenglian Zhou 已提交
273
  tscDebug("generate schema action. action needed: %d, action: %d", *actionNeeded, action->action);
S
shenglian zhou 已提交
274 275 276
  return 0;
}

S
shenglian zhou 已提交
277
static int32_t buildColumnDescription(SSchema* field,
S
shenglian zhou 已提交
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
                               char* buf, int32_t bufSize, int32_t* outBytes) {
  uint8_t type = field->type;

  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
    int32_t bytes = field->bytes - VARSTR_HEADER_SIZE;
    if (type == TSDB_DATA_TYPE_NCHAR) {
      bytes =  bytes/TSDB_NCHAR_SIZE;
    }
    int out = snprintf(buf, bufSize,"%s %s(%d)",
                       field->name,tDataTypes[field->type].name, bytes);
    *outBytes = out;
  } else {
    int out = snprintf(buf, bufSize, "%s %s",
                       field->name, tDataTypes[type].name);
    *outBytes = out;
  }

  return 0;
}

S
shenglian zhou 已提交
298 299

static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action) {
S
shenglian zhou 已提交
300 301
  int32_t code = 0;
  int32_t outBytes = 0;
302 303
  char *result = (char *)calloc(1, tsMaxSQLStringLen+1);
  int32_t capacity = tsMaxSQLStringLen +  1;
S
shenglian zhou 已提交
304

S
Shenglian Zhou 已提交
305
  tscDebug("apply schema action: %d", action->action);
S
shenglian zhou 已提交
306 307 308 309 310 311
  switch (action->action) {
    case SCHEMA_ACTION_ADD_COLUMN: {
      int n = sprintf(result, "alter stable %s add column ", action->alterSTable.sTableName);
      buildColumnDescription(action->alterSTable.field, result+n, capacity-n, &outBytes);
      TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
      code = taos_errno(res);
S
Shenglian Zhou 已提交
312
      taos_free_result(res);
S
shenglian zhou 已提交
313 314 315 316 317 318 319 320
      break;
    }
    case SCHEMA_ACTION_ADD_TAG: {
      int n = sprintf(result, "alter stable %s add tag ", action->alterSTable.sTableName);
      buildColumnDescription(action->alterSTable.field,
                             result+n, capacity-n, &outBytes);
      TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
      code = taos_errno(res);
S
Shenglian Zhou 已提交
321
      taos_free_result(res);
S
shenglian zhou 已提交
322 323 324 325 326 327 328 329
      break;
    }
    case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: {
      int n = sprintf(result, "alter stable %s modify column ", action->alterSTable.sTableName);
      buildColumnDescription(action->alterSTable.field, result+n,
                             capacity-n, &outBytes);
      TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
      code = taos_errno(res);
S
Shenglian Zhou 已提交
330
      taos_free_result(res);
331
      break;
S
shenglian zhou 已提交
332 333 334 335 336 337 338
    }
    case SCHEMA_ACTION_CHANGE_TAG_SIZE: {
      int n = sprintf(result, "alter stable %s modify tag ", action->alterSTable.sTableName);
      buildColumnDescription(action->alterSTable.field, result+n,
                             capacity-n, &outBytes);
      TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
      code = taos_errno(res);
S
Shenglian Zhou 已提交
339
      taos_free_result(res);
S
shenglian zhou 已提交
340 341 342 343 344
      break;
    }
    case SCHEMA_ACTION_CREATE_STABLE: {
      int n = sprintf(result, "create stable %s (", action->createSTable.sTableName);
      char* pos = result + n; int freeBytes = capacity - n;
S
shenglian zhou 已提交
345
      size_t numCols = taosArrayGetSize(action->createSTable.fields);
S
shenglian zhou 已提交
346 347 348 349 350 351 352
      for (int32_t i = 0; i < numCols; ++i) {
        SSchema* field = taosArrayGet(action->createSTable.fields, i);
        buildColumnDescription(field, pos, freeBytes, &outBytes);
        pos += outBytes; freeBytes -= outBytes;
        *pos = ','; ++pos; --freeBytes;
      }
      --pos; ++freeBytes;
353

S
shenglian zhou 已提交
354 355
      outBytes = snprintf(pos, freeBytes, ") tags (");
      pos += outBytes; freeBytes -= outBytes;
356

S
shenglian zhou 已提交
357
      size_t numTags = taosArrayGetSize(action->createSTable.tags);
S
shenglian zhou 已提交
358 359 360 361 362 363 364 365 366 367
      for (int32_t i = 0; i < numTags; ++i) {
        SSchema* field = taosArrayGet(action->createSTable.tags, i);
        buildColumnDescription(field, pos, freeBytes, &outBytes);
        pos += outBytes; freeBytes -= outBytes;
        *pos = ','; ++pos; --freeBytes;
      }
      pos--; ++freeBytes;
      outBytes = snprintf(pos, freeBytes, ")");
      TAOS_RES* res = taos_query(taos, result);
      code = taos_errno(res);
S
Shenglian Zhou 已提交
368
      taos_free_result(res);
S
shenglian zhou 已提交
369 370
      break;
    }
S
shenglian zhou 已提交
371

S
shenglian zhou 已提交
372 373 374
    default:
      break;
  }
S
Shenglian Zhou 已提交
375

S
shenglian zhou 已提交
376
  free(result);
S
Shenglian Zhou 已提交
377 378 379
  if (code != 0) {
    tscError("apply schema action failure. %s", tstrerror(code));
  }
S
shenglian zhou 已提交
380 381 382
  return code;
}

383
static int32_t destroySmlSTableSchema(SSmlSTableSchema* schema) {
S
shenglian zhou 已提交
384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399
  taosHashCleanup(schema->tagHash);
  taosHashCleanup(schema->fieldHash);
  taosArrayDestroy(schema->tags);
  taosArrayDestroy(schema->fields);
  return 0;
}

int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema) {
  int32_t code = 0;

  STscObj *pObj = (STscObj *)taos;
  if (pObj == NULL || pObj->signature != pObj) {
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    return TSDB_CODE_TSC_DISCONNECTED;
  }

S
Shenglian Zhou 已提交
400 401
  tscDebug("load table schema. super table name: %s", tableName);

S
shenglian zhou 已提交
402 403 404 405 406
  char sql[256];
  snprintf(sql, 256, "describe %s", tableName);
  TAOS_RES* res = taos_query(taos, sql);
  code = taos_errno(res);
  if (code != 0) {
S
Shenglian Zhou 已提交
407
    tscError("describe table failure. %s", taos_errstr(res));
S
shenglian zhou 已提交
408 409 410 411 412 413 414 415 416 417
    taos_free_result(res);
    return code;
  }
  taos_free_result(res);

  SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
  pSql->pTscObj = taos;
  pSql->signature = pSql;
  pSql->fp = NULL;

S
shenglian zhou 已提交
418
  SStrToken tableToken = {.z=tableName, .n=(uint32_t)strlen(tableName), .type=TK_ID};
S
shenglian zhou 已提交
419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455
  tGetToken(tableName, &tableToken.type);
  // Check if the table name available or not
  if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) {
    code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
    sprintf(pSql->cmd.payload, "table name is invalid");
    return code;
  }

  SName sname = {0};
  if ((code = tscSetTableFullName(&sname, &tableToken, pSql)) != TSDB_CODE_SUCCESS) {
    return code;
  }
  char  fullTableName[TSDB_TABLE_FNAME_LEN] = {0};
  memset(fullTableName, 0, tListLen(fullTableName));
  tNameExtractFullName(&sname, fullTableName);
  if (code != TSDB_CODE_SUCCESS) {
    tscFreeSqlObj(pSql);
    return code;
  }
  tscFreeSqlObj(pSql);

  schema->tags = taosArrayInit(8, sizeof(SSchema));
  schema->fields = taosArrayInit(64, sizeof(SSchema));
  schema->tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
  schema->fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);

  uint32_t size = tscGetTableMetaMaxSize();
  STableMeta* tableMeta = calloc(1, size);
  taosHashGetClone(tscTableMetaInfo, fullTableName, strlen(fullTableName), NULL, tableMeta, -1);

  tstrncpy(schema->sTableName, tableName, strlen(tableName)+1);
  schema->precision = tableMeta->tableInfo.precision;
  for (int i=0; i<tableMeta->tableInfo.numOfColumns; ++i) {
    SSchema field;
    tstrncpy(field.name, tableMeta->schema[i].name, strlen(tableMeta->schema[i].name)+1);
    field.type = tableMeta->schema[i].type;
    field.bytes = tableMeta->schema[i].bytes;
456 457 458
    taosArrayPush(schema->fields, &field);
    size_t fieldIndex = taosArrayGetSize(schema->fields) - 1;
    taosHashPut(schema->fieldHash, field.name, strlen(field.name), &fieldIndex, sizeof(fieldIndex));
S
shenglian zhou 已提交
459 460 461 462 463 464 465 466
  }

  for (int i=0; i<tableMeta->tableInfo.numOfTags; ++i) {
    int j = i + tableMeta->tableInfo.numOfColumns;
    SSchema field;
    tstrncpy(field.name, tableMeta->schema[j].name, strlen(tableMeta->schema[j].name)+1);
    field.type = tableMeta->schema[j].type;
    field.bytes = tableMeta->schema[j].bytes;
467 468 469
    taosArrayPush(schema->tags, &field);
    size_t tagIndex = taosArrayGetSize(schema->tags) - 1;
    taosHashPut(schema->tagHash, field.name, strlen(field.name), &tagIndex, sizeof(tagIndex));
S
shenglian zhou 已提交
470
  }
S
Shenglian Zhou 已提交
471 472
  tscDebug("load table meta succeed. %s, columns number: %d, tag number: %d, precision: %d",
           tableName, tableMeta->tableInfo.numOfColumns, tableMeta->tableInfo.numOfTags, schema->precision);
S
shenglian zhou 已提交
473 474 475 476
  free(tableMeta); tableMeta = NULL;
  return code;
}

S
shenglian zhou 已提交
477
static int32_t modifyDBSchemas(TAOS* taos, SArray* stableSchemas) {
S
shenglian zhou 已提交
478 479 480 481
  int32_t code = 0;
  size_t numStable = taosArrayGetSize(stableSchemas);
  for (int i = 0; i < numStable; ++i) {
    SSmlSTableSchema* pointSchema = taosArrayGet(stableSchemas, i);
S
shenglian zhou 已提交
482 483
    SSmlSTableSchema  dbSchema;
    memset(&dbSchema, 0, sizeof(SSmlSTableSchema));
S
shenglian zhou 已提交
484 485 486 487 488 489 490 491 492 493 494

    code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema);
    if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) {
      SSchemaAction schemaAction = {0};
      schemaAction.action = SCHEMA_ACTION_CREATE_STABLE;
      memset(&schemaAction.createSTable, 0, sizeof(SCreateSTableActionInfo));
      memcpy(schemaAction.createSTable.sTableName, pointSchema->sTableName, TSDB_TABLE_NAME_LEN);
      schemaAction.createSTable.tags = pointSchema->tags;
      schemaAction.createSTable.fields = pointSchema->fields;
      applySchemaAction(taos, &schemaAction);
      code = loadTableMeta(taos, pointSchema->sTableName, &dbSchema);
S
Shenglian Zhou 已提交
495 496
      if (code != 0) {
        tscError("reconcile point schema failed. can not create %s", pointSchema->sTableName);
497
        return code;
S
Shenglian Zhou 已提交
498 499 500 501
      } else {
        pointSchema->precision = dbSchema.precision;
        destroySmlSTableSchema(&dbSchema);
      }
S
shenglian zhou 已提交
502 503 504 505 506 507 508 509 510 511 512
    } else if (code == TSDB_CODE_SUCCESS) {
      size_t pointTagSize = taosArrayGetSize(pointSchema->tags);
      size_t pointFieldSize = taosArrayGetSize(pointSchema->fields);

      SHashObj* dbTagHash = dbSchema.tagHash;
      SHashObj* dbFieldHash = dbSchema.fieldHash;

      for (int j = 0; j < pointTagSize; ++j) {
        SSchema* pointTag = taosArrayGet(pointSchema->tags, j);
        SSchemaAction schemaAction = {0};
        bool actionNeeded = false;
513
        generateSchemaAction(pointTag, dbTagHash, dbSchema.tags, true, pointSchema->sTableName, &schemaAction, &actionNeeded);
S
shenglian zhou 已提交
514 515 516 517 518 519 520 521 522 523 524 525 526
        if (actionNeeded) {
          applySchemaAction(taos, &schemaAction);
        }
      }

      SSchema* pointColTs = taosArrayGet(pointSchema->fields, 0);
      SSchema* dbColTs = taosArrayGet(dbSchema.fields, 0);
      memcpy(pointColTs->name, dbColTs->name, TSDB_COL_NAME_LEN);

      for (int j = 1; j < pointFieldSize; ++j) {
        SSchema* pointCol = taosArrayGet(pointSchema->fields, j);
        SSchemaAction schemaAction = {0};
        bool actionNeeded = false;
527
        generateSchemaAction(pointCol, dbFieldHash, dbSchema.fields,false, pointSchema->sTableName, &schemaAction, &actionNeeded);
S
shenglian zhou 已提交
528 529 530 531 532 533 534
        if (actionNeeded) {
          applySchemaAction(taos, &schemaAction);
        }
      }

      pointSchema->precision = dbSchema.precision;

535
      destroySmlSTableSchema(&dbSchema);
S
shenglian zhou 已提交
536
    } else {
S
Shenglian Zhou 已提交
537
      tscError("load table meta error: %s", tstrerror(code));
S
shenglian zhou 已提交
538 539 540 541 542 543
      return code;
    }
  }
  return 0;
}

544 545
static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen) {
  tscDebug("taos_sml_insert get child table name through md5");
546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563
  qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv);

  SStringBuilder sb; memset(&sb, 0, sizeof(sb));
  taosStringBuilderAppendString(&sb, point->stableName);
  for (int j = 0; j < point->tagNum; ++j) {
    taosStringBuilderAppendChar(&sb, ',');
    TAOS_SML_KV* tagKv = point->tags + j;
    taosStringBuilderAppendString(&sb, tagKv->key);
    taosStringBuilderAppendChar(&sb, '=');
    taosStringBuilderAppend(&sb, tagKv->value, tagKv->length);
  }
  size_t len = 0;
  char* keyJoined = taosStringBuilderGetResult(&sb, &len);
  MD5_CTX context;
  MD5Init(&context);
  MD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len);
  MD5Final(&context);
  *tableNameLen = snprintf(tableName, *tableNameLen,
S
Shenglian Zhou 已提交
564
                           "t_%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0],
565 566 567 568
                           context.digest[1], context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6],
                           context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11],
                           context.digest[12], context.digest[13], context.digest[14], context.digest[15]);
  taosStringBuilderDestroy(&sb);
S
Shenglian Zhou 已提交
569
  tscDebug("child table name: %s", tableName);
570 571 572
  return 0;
}

573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606

static int32_t changeChildTableTagValue(TAOS* taos, const char* cTableName, const char* tagName, TAOS_BIND* bind) {
  char sql[512];
  sprintf(sql, "alter table %s set tag %s=?", cTableName, tagName);

  int32_t code;
  TAOS_STMT* stmt = taos_stmt_init(taos);
  code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql));

  if (code != 0) {
    tscError("%s", taos_stmt_errstr(stmt));
    return code;
  }

  code = taos_stmt_bind_param(stmt, bind);
  if (code != 0) {
    tscError("%s", taos_stmt_errstr(stmt));
    return code;
  }

  code = taos_stmt_execute(stmt);
  if (code != 0) {
    tscError("%s", taos_stmt_errstr(stmt));
    return code;
  }

  code = taos_stmt_close(stmt);
  if (code != 0) {
    tscError("%s", taos_stmt_errstr(stmt));
    return code;
  }
  return code;
}

S
shenglian zhou 已提交
607
static int32_t creatChildTableIfNotExists(TAOS* taos, const char* cTableName, const char* sTableName, SArray* tagsSchema, SArray* tagsBind) {
S
shenglian zhou 已提交
608
  size_t numTags = taosArrayGetSize(tagsSchema);
609 610
  char* sql = malloc(tsMaxSQLStringLen+1);
  int freeBytes = tsMaxSQLStringLen + 1;
S
shenglian zhou 已提交
611
  sprintf(sql, "create table if not exists %s using %s", cTableName, sTableName);
612

S
shenglian zhou 已提交
613 614 615 616 617 618
  snprintf(sql+strlen(sql), freeBytes-strlen(sql), "(");
  for (int i = 0; i < numTags; ++i) {
    SSchema* tagSchema = taosArrayGet(tagsSchema, i);
    snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", tagSchema->name);
  }
  snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")");
619

S
shenglian zhou 已提交
620
  snprintf(sql + strlen(sql), freeBytes-strlen(sql), " tags (");
621

S
shenglian zhou 已提交
622
  for (int i = 0; i < numTags; ++i) {
S
shenglian zhou 已提交
623
    snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
S
shenglian zhou 已提交
624
  }
S
shenglian zhou 已提交
625
  snprintf(sql + strlen(sql) - 1, freeBytes-strlen(sql)+1, ")");
626
  sql[strlen(sql)] = '\0';
S
shenglian zhou 已提交
627

S
Shenglian Zhou 已提交
628 629
  tscDebug("create table : %s", sql);

S
shenglian zhou 已提交
630 631
  TAOS_STMT* stmt = taos_stmt_init(taos);
  int32_t code;
S
shenglian zhou 已提交
632
  code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql));
633 634
  free(sql);

S
shenglian zhou 已提交
635
  if (code != 0) {
S
Shenglian Zhou 已提交
636
    tscError("%s", taos_stmt_errstr(stmt));
S
shenglian zhou 已提交
637 638 639 640 641
    return code;
  }

  code = taos_stmt_bind_param(stmt, TARRAY_GET_START(tagsBind));
  if (code != 0) {
S
Shenglian Zhou 已提交
642
    tscError("%s", taos_stmt_errstr(stmt));
S
shenglian zhou 已提交
643 644 645 646 647
    return code;
  }

  code = taos_stmt_execute(stmt);
  if (code != 0) {
S
Shenglian Zhou 已提交
648
    tscError("%s", taos_stmt_errstr(stmt));
S
shenglian zhou 已提交
649 650
    return code;
  }
S
shenglian zhou 已提交
651

652 653 654 655 656 657
  code = taos_stmt_close(stmt);
  if (code != 0) {
    tscError("%s", taos_stmt_errstr(stmt));
    return code;
  }
  return code;
S
shenglian zhou 已提交
658 659
}

S
shenglian zhou 已提交
660
static int32_t insertChildTableBatch(TAOS* taos,  char* cTableName, SArray* colsSchema, SArray* rowsBind) {
S
shenglian zhou 已提交
661
  size_t numCols = taosArrayGetSize(colsSchema);
662 663
  char* sql = malloc(tsMaxSQLStringLen+1);
  int32_t freeBytes = tsMaxSQLStringLen + 1 ;
S
shenglian zhou 已提交
664
  sprintf(sql, "insert into ? (");
665

S
shenglian zhou 已提交
666 667
  for (int i = 0; i < numCols; ++i) {
    SSchema* colSchema = taosArrayGet(colsSchema, i);
S
shenglian zhou 已提交
668
    snprintf(sql+strlen(sql), freeBytes-strlen(sql), "%s,", colSchema->name);
S
shenglian zhou 已提交
669
  }
S
shenglian zhou 已提交
670
  snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ") values (");
671

S
shenglian zhou 已提交
672
  for (int i = 0; i < numCols; ++i) {
S
shenglian zhou 已提交
673
    snprintf(sql+strlen(sql), freeBytes-strlen(sql), "?,");
674
  }
S
shenglian zhou 已提交
675
  snprintf(sql + strlen(sql)-1, freeBytes-strlen(sql)+1, ")");
676
  sql[strlen(sql)] = '\0';
677

S
Shenglian Zhou 已提交
678
  tscDebug("insert rows %zu into child table %s. ", taosArrayGetSize(rowsBind), cTableName);
W
wpan 已提交
679

680 681
  int32_t code = 0;
  int32_t try = 0;
S
shenglian zhou 已提交
682

W
wpan 已提交
683
  TAOS_STMT* stmt = taos_stmt_init(taos);
684

S
shenglian zhou 已提交
685
  code = taos_stmt_prepare(stmt, sql, (unsigned long)strlen(sql));
686 687
  free(sql);

W
wpan 已提交
688
  if (code != 0) {
S
Shenglian Zhou 已提交
689
    tscError("%s", taos_stmt_errstr(stmt));
W
wpan 已提交
690 691 692 693
    return code;
  }

  do {
694
    code = taos_stmt_set_tbname(stmt, cTableName);
695
    if (code != 0) {
S
Shenglian Zhou 已提交
696
      tscError("%s", taos_stmt_errstr(stmt));
697 698
      return code;
    }
699

700 701 702 703 704
    size_t rows = taosArrayGetSize(rowsBind);
    for (int32_t i = 0; i < rows; ++i) {
      TAOS_BIND* colsBinds = taosArrayGetP(rowsBind, i);
      code = taos_stmt_bind_param(stmt, colsBinds);
      if (code != 0) {
S
Shenglian Zhou 已提交
705
        tscError("%s", taos_stmt_errstr(stmt));
706 707 708 709
        return code;
      }
      code = taos_stmt_add_batch(stmt);
      if (code != 0) {
S
Shenglian Zhou 已提交
710
        tscError("%s", taos_stmt_errstr(stmt));
711 712 713 714 715 716
        return code;
      }
    }

    code = taos_stmt_execute(stmt);
    if (code != 0) {
S
Shenglian Zhou 已提交
717
      tscError("%s", taos_stmt_errstr(stmt));
718 719
    }
  } while (code == TSDB_CODE_TDB_TABLE_RECONFIGURE && try++ < TSDB_MAX_REPLICA);
S
shenglian zhou 已提交
720

W
wpan 已提交
721
  if (code != 0) {
S
Shenglian Zhou 已提交
722
    tscError("%s", taos_stmt_errstr(stmt));
W
wpan 已提交
723 724 725 726 727
    taos_stmt_close(stmt);
  } else {
    taos_stmt_close(stmt);
  }

S
shenglian zhou 已提交
728
  return code;
S
shenglian zhou 已提交
729 730
}

731 732
static int32_t arrangePointsByChildTableName(TAOS_SML_DATA_POINT* points, int numPoints,
                                             SHashObj* cname2points, SArray* stableSchemas) {
S
shenglian zhou 已提交
733 734 735 736
  for (int32_t i = 0; i < numPoints; ++i) {
    TAOS_SML_DATA_POINT * point = points + i;
    if (!point->childTableName) {
      char childTableName[TSDB_TABLE_NAME_LEN];
737
      int32_t tableNameLen = TSDB_TABLE_NAME_LEN;
738
      getSmlMd5ChildTableName(point, childTableName, &tableNameLen);
S
shenglian zhou 已提交
739 740 741 742
      point->childTableName = calloc(1, tableNameLen+1);
      strncpy(point->childTableName, childTableName, tableNameLen);
      point->childTableName[tableNameLen] = '\0';
    }
S
shenglian zhou 已提交
743

744 745
    SSmlSTableSchema* stableSchema = taosArrayGet(stableSchemas, point->schemaIdx);

S
shenglian zhou 已提交
746 747 748 749
    for (int j = 0; j < point->tagNum; ++j) {
      TAOS_SML_KV* kv =  point->tags + j;
      if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) {
        int64_t ts = *(int64_t*)(kv->value);
750
        ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision);
S
shenglian zhou 已提交
751 752 753 754 755 756 757 758
        *(int64_t*)(kv->value) = ts;
      }
    }

    for (int j = 0; j < point->fieldNum; ++j) {
      TAOS_SML_KV* kv =  point->fields + j;
      if (kv->type == TSDB_DATA_TYPE_TIMESTAMP) {
        int64_t ts = *(int64_t*)(kv->value);
759
        ts = convertTimePrecision(ts, TSDB_TIME_PRECISION_NANO, stableSchema->precision);
S
shenglian zhou 已提交
760 761 762 763
        *(int64_t*)(kv->value) = ts;
      }
    }

S
shenglian zhou 已提交
764 765 766 767 768 769 770 771
    SArray* cTablePoints = NULL;
    SArray** pCTablePoints = taosHashGet(cname2points, point->childTableName, strlen(point->childTableName));
    if (pCTablePoints) {
      cTablePoints = *pCTablePoints;
    } else {
      cTablePoints = taosArrayInit(64, sizeof(point));
      taosHashPut(cname2points, point->childTableName, strlen(point->childTableName), &cTablePoints, POINTER_BYTES);
    }
772
    taosArrayPush(cTablePoints, &point);
773 774
  }

S
shenglian zhou 已提交
775 776 777
  return 0;
}

778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819
static int32_t applyChildTableTags(TAOS* taos, char* cTableName, char* sTableName,
                                   SSmlSTableSchema* sTableSchema, SArray* cTablePoints) {
  size_t numTags = taosArrayGetSize(sTableSchema->tags);
  size_t rows = taosArrayGetSize(cTablePoints);

  TAOS_SML_KV* tagKVs[TSDB_MAX_TAGS] = {0};
  for (int i= 0; i < rows; ++i) {
    TAOS_SML_DATA_POINT * pDataPoint = taosArrayGetP(cTablePoints, i);
    for (int j = 0; j < pDataPoint->tagNum; ++j) {
      TAOS_SML_KV* kv = pDataPoint->tags + j;
      tagKVs[kv->fieldSchemaIdx] = kv;
    }
  }

  int32_t notNullTagsIndices[TSDB_MAX_TAGS] = {0};
  int32_t numNotNullTags = 0;
  for (int32_t i = 0; i < numTags; ++i) {
    if (tagKVs[i] != NULL) {
      notNullTagsIndices[numNotNullTags] = i;
      ++numNotNullTags;
    }
  }
  
  SArray* tagBinds = taosArrayInit(numTags, sizeof(TAOS_BIND));
  taosArraySetSize(tagBinds, numTags);
  int isNullColBind = TSDB_TRUE;
  for (int j = 0; j < numTags; ++j) {
    TAOS_BIND* bind = taosArrayGet(tagBinds, j);
    bind->is_null = &isNullColBind;
  }
  for (int j = 0; j < numTags; ++j) {
    if (tagKVs[j] == NULL) continue;
    TAOS_SML_KV* kv =  tagKVs[j];
    TAOS_BIND* bind = taosArrayGet(tagBinds, kv->fieldSchemaIdx);
    bind->buffer_type = kv->type;
    bind->length = malloc(sizeof(uintptr_t*));
    *bind->length = kv->length;
    bind->buffer = kv->value;
    bind->is_null = NULL;
  }

  // select tag1,tag2,... from stable where tbname in (ctable)
S
shenglian zhou 已提交
820 821 822
  char* sql = malloc(tsMaxSQLStringLen+1);
  int freeBytes = tsMaxSQLStringLen + 1;
  snprintf(sql, freeBytes, "select tbname, ");
823
  for (int i = 0; i < numNotNullTags ; ++i)  {
S
shenglian zhou 已提交
824
    snprintf(sql + strlen(sql), freeBytes-strlen(sql), "%s,", tagKVs[notNullTagsIndices[i]]->key);
825
  }
S
shenglian zhou 已提交
826
  snprintf(sql + strlen(sql) - 1, freeBytes - strlen(sql) + 1,
827
           " from %s where tbname in (\'%s\')", sTableName, cTableName);
S
shenglian zhou 已提交
828 829
  sql[strlen(sql)] = '\0';

830
  TAOS_RES* result = taos_query(taos, sql);
S
shenglian zhou 已提交
831 832
  free(sql);

833 834
  int32_t code = taos_errno(result);
  if (code != 0) {
S
shenglian zhou 已提交
835
    tscError("get child table %s tags failed. error string %s", cTableName, taos_errstr(result));
836 837 838 839 840 841 842 843 844 845
    goto cleanup;
  }

  // check tag value and set tag values if different
  TAOS_ROW row = taos_fetch_row(result);
  if (row != NULL) {
    int numFields = taos_field_count(result);
    TAOS_FIELD* fields = taos_fetch_fields(result);
    int* lengths = taos_fetch_lengths(result);
    for (int i = 1; i < numFields; ++i) {
S
shenglian zhou 已提交
846
      uint8_t dbType = fields[i].type;
847 848 849 850
      int32_t length = lengths[i];
      char* val = row[i];

      TAOS_SML_KV* tagKV = tagKVs[notNullTagsIndices[i-1]];
S
shenglian zhou 已提交
851
      if (tagKV->type != dbType) {
852
        tscError("child table %s tag %s type mismatch. point type : %d, db type : %d",
S
shenglian zhou 已提交
853
                 cTableName, tagKV->key, tagKV->type, dbType);
854 855 856
        return TSDB_CODE_TSC_INVALID_VALUE;
      }

S
shenglian zhou 已提交
857 858 859
      assert(tagKV->value);

      if (val == NULL || length != tagKV->length || memcmp(tagKV->value, val, length) != 0) {
860 861 862 863 864 865 866 867
        TAOS_BIND* bind = taosArrayGet(tagBinds, tagKV->fieldSchemaIdx);
        code = changeChildTableTagValue(taos, cTableName, tagKV->key, bind);
        if (code != 0) {
          tscError("change child table tag failed. table name %s, tag %s", cTableName, tagKV->key);
          goto cleanup;
        }
      }
    }
S
shenglian zhou 已提交
868
    tscDebug("successfully applied point tags. child table: %s", cTableName);
869 870 871 872 873 874 875 876
  } else {
    code = creatChildTableIfNotExists(taos, cTableName, sTableName, sTableSchema->tags, tagBinds);
    if (code != 0) {
      goto cleanup;
    }
  }

cleanup:
S
shenglian zhou 已提交
877
  taos_free_result(result);
878 879 880 881 882 883 884 885 886
  for (int i = 0; i < taosArrayGetSize(tagBinds); ++i) {
    TAOS_BIND* bind = taosArrayGet(tagBinds, i);
    free(bind->length);
  }
  taosArrayDestroy(tagBinds);
  return code;
}

static int32_t applyChildTableFields(TAOS* taos, SSmlSTableSchema* sTableSchema, char* cTableName, SArray* cTablePoints) {
887 888
  int32_t code = TSDB_CODE_SUCCESS;

889 890 891
  size_t numCols = taosArrayGetSize(sTableSchema->fields);
  size_t rows = taosArrayGetSize(cTablePoints);
  SArray* rowsBind = taosArrayInit(rows, POINTER_BYTES);
S
shenglian zhou 已提交
892

893 894
  for (int i = 0; i < rows; ++i) {
    TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, i);
895

896 897 898 899 900 901
    TAOS_BIND* colBinds = calloc(numCols, sizeof(TAOS_BIND));
    if (colBinds == NULL) {
      tscError("taos_sml_insert insert points, failed to allocated memory for TAOS_BIND, "
               "num of rows: %zu, num of cols: %zu", rows, numCols);
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
902

903 904 905
    int isNullColBind = TSDB_TRUE;
    for (int j = 0; j < numCols; ++j) {
      TAOS_BIND* bind = colBinds + j;
906 907
      bind->is_null = &isNullColBind;
    }
908 909 910
    for (int j = 0; j < point->fieldNum; ++j) {
      TAOS_SML_KV* kv = point->fields + j;
      TAOS_BIND* bind = colBinds + kv->fieldSchemaIdx;
911
      bind->buffer_type = kv->type;
912 913
      bind->length = malloc(sizeof(uintptr_t*));
      *bind->length = kv->length;
914 915 916
      bind->buffer = kv->value;
      bind->is_null = NULL;
    }
917 918
    taosArrayPush(rowsBind, &colBinds);
  }
S
shenglian zhou 已提交
919

920 921 922 923
  code = insertChildTableBatch(taos, cTableName, sTableSchema->fields, rowsBind);
  if (code != 0) {
    tscError("insert into child table %s failed. error %s", cTableName, tstrerror(code));
  }
S
shenglian zhou 已提交
924

925 926 927 928 929
  for (int i = 0; i < rows; ++i) {
    TAOS_BIND* colBinds = taosArrayGetP(rowsBind, i);
    for (int j = 0; j < numCols; ++j) {
      TAOS_BIND* bind = colBinds + j;
      free(bind->length);
S
shenglian zhou 已提交
930
    }
931 932 933 934 935
    free(colBinds);
  }
  taosArrayDestroy(rowsBind);
  return code;
}
S
shenglian zhou 已提交
936

937 938
static int32_t applyDataPoints(TAOS* taos, TAOS_SML_DATA_POINT* points, int32_t numPoints, SArray* stableSchemas) {
  int32_t code = TSDB_CODE_SUCCESS;
939

940 941 942 943 944 945 946 947 948 949 950 951 952
  SHashObj* cname2points = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
  arrangePointsByChildTableName(points, numPoints, cname2points, stableSchemas);

  SArray** pCTablePoints = taosHashIterate(cname2points, NULL);
  while (pCTablePoints) {
    SArray* cTablePoints = *pCTablePoints;

    TAOS_SML_DATA_POINT* point = taosArrayGetP(cTablePoints, 0);
    SSmlSTableSchema*    sTableSchema = taosArrayGet(stableSchemas, point->schemaIdx);
    code = applyChildTableTags(taos, point->childTableName, point->stableName, sTableSchema, cTablePoints);
    if (code != 0) {
      tscError("apply child table tags failed. child table %s, error %s", point->childTableName, tstrerror(code));
      goto cleanup;
S
shenglian zhou 已提交
953
    }
954
    code = applyChildTableFields(taos, sTableSchema, point->childTableName, cTablePoints);
955
    if (code != 0) {
956 957
      tscError("Apply child table fields failed. child table %s, error %s", point->childTableName, tstrerror(code));
      goto cleanup;
958
    }
959

S
shenglian zhou 已提交
960 961
    tscDebug("successfully applied data points of child table %s", point->childTableName);

S
shenglian zhou 已提交
962
    pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
963
  }
S
shenglian zhou 已提交
964

965 966 967 968 969
cleanup:
  pCTablePoints = taosHashIterate(cname2points, NULL);
  while (pCTablePoints) {
    SArray* pPoints = *pCTablePoints;
    taosArrayDestroy(pPoints);
S
shenglian zhou 已提交
970
    pCTablePoints = taosHashIterate(cname2points, pCTablePoints);
971
  }
S
shenglian zhou 已提交
972
  taosHashCleanup(cname2points);
973
  return code;
S
shenglian zhou 已提交
974
}
975

S
shenglian zhou 已提交
976
int taos_sml_insert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint) {
S
Shenglian Zhou 已提交
977 978
  tscDebug("taos_sml_insert. number of points: %d", numPoint);

S
shenglian zhou 已提交
979 980
  int32_t code = TSDB_CODE_SUCCESS;

S
shenglian zhou 已提交
981
  SArray* stableSchemas = taosArrayInit(32, sizeof(SSmlSTableSchema)); // SArray<STableColumnsSchema>
S
shenglian zhou 已提交
982 983 984 985 986 987
  code = buildDataPointSchemas(points, numPoint, stableSchemas);
  if (code != 0) {
    tscError("error building data point schemas : %s", tstrerror(code));
    goto clean_up;
  }

S
shenglian zhou 已提交
988
  code = modifyDBSchemas(taos, stableSchemas);
S
shenglian zhou 已提交
989 990 991 992
  if (code != 0) {
    tscError("error change db schema : %s", tstrerror(code));
    goto clean_up;
  }
S
shenglian zhou 已提交
993

994
  code = applyDataPoints(taos, points, numPoint, stableSchemas);
S
shenglian zhou 已提交
995
  if (code != 0) {
S
shenglian zhou 已提交
996
    tscError("error apply data points : %s", tstrerror(code));
S
shenglian zhou 已提交
997 998 999
  }

clean_up:
S
shenglian zhou 已提交
1000 1001 1002 1003
  for (int i = 0; i < taosArrayGetSize(stableSchemas); ++i) {
    SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i);
    taosArrayDestroy(schema->fields);
    taosArrayDestroy(schema->tags);
1004
  }
S
shenglian zhou 已提交
1005
  taosArrayDestroy(stableSchemas);
1006 1007
  return code;
}
S
shenglian zhou 已提交
1008

1009 1010
//=========================================================================

1011 1012 1013 1014 1015
/*        Field                          Escape charaters
    1: measurement                        Comma,Space
    2: tag_key, tag_value, field_key  Comma,Equal Sign,Space
    3: field_value                    Double quote,Backslash
*/
1016
static void escapeSpecialCharacter(uint8_t field, const char **pos) {
1017 1018 1019
  const char *cur = *pos;
  if (*cur != '\\') {
    return;
1020
  }
1021 1022 1023 1024 1025 1026
  switch (field) {
    case 1:
      switch (*(cur + 1)) {
        case ',':
        case ' ':
          cur++;
1027
          break;
1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054
        default:
          break;
      }
      break;
    case 2:
      switch (*(cur + 1)) {
        case ',':
        case ' ':
        case '=':
          cur++;
          break;
        default:
          break;
      }
      break;
    case 3:
      switch (*(cur + 1)) {
        case '"':
        case '\\':
          cur++;
          break;
        default:
          break;
      }
      break;
    default:
      break;
1055
  }
1056
  *pos = cur;
1057
}
1058

1059
static bool isValidInteger(char *str) {
1060 1061
  char *c = str;
  if (*c != '+' && *c != '-' && !isdigit(*c)) {
1062 1063
    return false;
  }
1064 1065 1066 1067 1068 1069
  c++;
  while (*c != '\0') {
    if (!isdigit(*c)) {
      return false;
    }
    c++;
1070
  }
1071
  return true;
1072
}
1073

1074
static bool isValidFloat(char *str) {
1075 1076 1077 1078 1079 1080 1081
  char *c = str;
  uint8_t has_dot, has_exp, has_sign;
  has_dot = 0;
  has_exp = 0;
  has_sign = 0;

  if (*c != '+' && *c != '-' && *c != '.' && !isdigit(*c)) {
1082 1083
    return false;
  }
1084 1085
  if (*c == '.' && isdigit(*(c + 1))) {
    has_dot = 1;
1086
  }
1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127
  c++;
  while (*c != '\0') {
    if (!isdigit(*c)) {
      switch (*c) {
        case '.': {
          if (!has_dot && !has_exp && isdigit(*(c + 1))) {
            has_dot = 1;
          } else {
            return false;
          }
          break;
        }
        case 'e':
        case 'E': {
          if (!has_exp && isdigit(*(c - 1)) &&
              (isdigit(*(c + 1)) ||
               *(c + 1) == '+' ||
               *(c + 1) == '-')) {
            has_exp = 1;
          } else {
            return false;
          }
          break;
        }
        case '+':
        case '-': {
          if (!has_sign && has_exp && isdigit(*(c + 1))) {
            has_sign = 1;
          } else {
            return false;
          }
          break;
        }
        default: {
          return false;
        }
      }
    }
    c++;
  } //while
  return true;
1128
}
1129

1130
static bool isTinyInt(char *pVal, uint16_t len) {
1131 1132 1133 1134
  if (len <= 2) {
    return false;
  }
  if (!strcmp(&pVal[len - 2], "i8")) {
1135
    //printf("Type is int8(%s)\n", pVal);
1136 1137 1138 1139
    return true;
  }
  return false;
}
1140

1141
static bool isTinyUint(char *pVal, uint16_t len) {
1142 1143 1144 1145 1146
  if (len <= 2) {
    return false;
  }
  if (pVal[0] == '-') {
    return false;
1147
  }
1148
  if (!strcmp(&pVal[len - 2], "u8")) {
1149
    //printf("Type is uint8(%s)\n", pVal);
1150 1151 1152
    return true;
  }
  return false;
1153 1154
}

1155
static bool isSmallInt(char *pVal, uint16_t len) {
1156 1157 1158 1159
  if (len <= 3) {
    return false;
  }
  if (!strcmp(&pVal[len - 3], "i16")) {
1160
    //printf("Type is int16(%s)\n", pVal);
1161
    return true;
1162
  }
1163
  return false;
1164 1165
}

1166
static bool isSmallUint(char *pVal, uint16_t len) {
1167 1168
  if (len <= 3) {
    return false;
1169
  }
1170 1171 1172 1173
  if (pVal[0] == '-') {
    return false;
  }
  if (strcmp(&pVal[len - 3], "u16") == 0) {
1174
    //printf("Type is uint16(%s)\n", pVal);
1175 1176 1177
    return true;
  }
  return false;
1178 1179
}

1180
static bool isInt(char *pVal, uint16_t len) {
1181 1182
  if (len <= 3) {
    return false;
1183
  }
1184
  if (strcmp(&pVal[len - 3], "i32") == 0) {
1185
    //printf("Type is int32(%s)\n", pVal);
1186 1187 1188
    return true;
  }
  return false;
1189 1190
}

1191
static bool isUint(char *pVal, uint16_t len) {
1192 1193 1194 1195 1196 1197 1198
  if (len <= 3) {
    return false;
  }
  if (pVal[0] == '-') {
    return false;
  }
  if (strcmp(&pVal[len - 3], "u32") == 0) {
1199
    //printf("Type is uint32(%s)\n", pVal);
1200 1201 1202
    return true;
  }
  return false;
1203 1204
}

1205
static bool isBigInt(char *pVal, uint16_t len) {
1206 1207
  if (len <= 3) {
    return false;
1208
  }
1209
  if (strcmp(&pVal[len - 3], "i64") == 0) {
1210
    //printf("Type is int64(%s)\n", pVal);
1211 1212 1213
    return true;
  }
  return false;
1214 1215
}

1216
static bool isBigUint(char *pVal, uint16_t len) {
1217 1218 1219 1220 1221
  if (len <= 3) {
    return false;
  }
  if (pVal[0] == '-') {
    return false;
1222
  }
1223
  if (strcmp(&pVal[len - 3], "u64") == 0) {
1224
    //printf("Type is uint64(%s)\n", pVal);
1225 1226 1227
    return true;
  }
  return false;
1228 1229
}

1230
static bool isFloat(char *pVal, uint16_t len) {
1231 1232 1233 1234
  if (len <= 3) {
    return false;
  }
  if (strcmp(&pVal[len - 3], "f32") == 0) {
1235
    //printf("Type is float(%s)\n", pVal);
1236 1237 1238
    return true;
  }
  return false;
1239 1240
}

1241
static bool isDouble(char *pVal, uint16_t len) {
1242 1243 1244 1245
  if (len <= 3) {
    return false;
  }
  if (strcmp(&pVal[len - 3], "f64") == 0) {
1246
    //printf("Type is double(%s)\n", pVal);
1247 1248 1249 1250 1251
    return true;
  }
  return false;
}

1252
static bool isBool(char *pVal, uint16_t len, bool *bVal) {
1253 1254 1255
  if ((len == 1) &&
      (pVal[len - 1] == 't' ||
       pVal[len - 1] == 'T')) {
1256 1257
    //printf("Type is bool(%c)\n", pVal[len - 1]);
    *bVal = true;
1258
    return true;
1259
  }
1260 1261 1262 1263

  if ((len == 1) &&
      (pVal[len - 1] == 'f' ||
       pVal[len - 1] == 'F')) {
1264 1265
    //printf("Type is bool(%c)\n", pVal[len - 1]);
    *bVal = false;
1266
    return true;
1267
  }
1268 1269 1270 1271 1272

  if((len == 4) &&
     (!strcmp(&pVal[len - 4], "true") ||
      !strcmp(&pVal[len - 4], "True") ||
      !strcmp(&pVal[len - 4], "TRUE"))) {
1273 1274
    //printf("Type is bool(%s)\n", &pVal[len - 4]);
    *bVal = true;
1275 1276 1277 1278 1279 1280
    return true;
  }
  if((len == 5) &&
     (!strcmp(&pVal[len - 5], "false") ||
      !strcmp(&pVal[len - 5], "False") ||
      !strcmp(&pVal[len - 5], "FALSE"))) {
1281 1282
    //printf("Type is bool(%s)\n", &pVal[len - 5]);
    *bVal = false;
1283 1284 1285
    return true;
  }
  return false;
1286 1287
}

1288
static bool isBinary(char *pVal, uint16_t len) {
1289 1290 1291 1292 1293 1294
  //binary: "abc"
  if (len < 2) {
    return false;
  }
  //binary
  if (pVal[0] == '"' && pVal[len - 1] == '"') {
1295
    //printf("Type is binary(%s)\n", pVal);
1296 1297 1298 1299
    return true;
  }
  return false;
}
1300

1301
static bool isNchar(char *pVal, uint16_t len) {
1302 1303
  //nchar: L"abc"
  if (len < 3) {
1304 1305
    return false;
  }
1306
  if (pVal[0] == 'L' && pVal[1] == '"' && pVal[len - 1] == '"') {
1307
    //printf("Type is nchar(%s)\n", pVal);
1308
    return true;
1309
  }
1310 1311 1312
  return false;
}

1313
static bool isTimeStamp(char *pVal, uint16_t len, SMLTimeStampType *tsType) {
1314 1315 1316 1317 1318
  if (len == 0) {
    return true;
  }
  if ((len == 1) && pVal[0] == '0') {
    *tsType = SML_TIME_STAMP_NOW;
1319
    //printf("Type is timestamp(%s)\n", pVal);
1320 1321 1322 1323 1324 1325 1326 1327
    return true;
  }
  if (len < 2) {
    return false;
  }
  //No appendix use usec as default
  if (isdigit(pVal[len - 1]) && isdigit(pVal[len - 2])) {
    *tsType = SML_TIME_STAMP_MICRO_SECONDS;
1328
    //printf("Type is timestamp(%s)\n", pVal);
1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344
    return true;
  }
  if (pVal[len - 1] == 's') {
    switch (pVal[len - 2]) {
      case 'm':
        *tsType = SML_TIME_STAMP_MILLI_SECONDS;
        break;
      case 'u':
        *tsType = SML_TIME_STAMP_MICRO_SECONDS;
        break;
      case 'n':
        *tsType = SML_TIME_STAMP_NANO_SECONDS;
        break;
      default:
        if (isdigit(pVal[len - 2])) {
          *tsType = SML_TIME_STAMP_SECONDS;
1345
          break;
1346
        } else {
1347 1348 1349
          return false;
        }
    }
1350
    //printf("Type is timestamp(%s)\n", pVal);
1351 1352 1353 1354
    return true;
  }
  return false;
}
1355

1356
//len does not include '\0' from value.
1357 1358
static bool convertSmlValueType(TAOS_SML_KV *pVal, char *value,
                                uint16_t len) {
1359 1360 1361
  if (len <= 0) {
    return false;
  }
G
Ganlin Zhao 已提交
1362

1363
  //integer number
1364
  if (isTinyInt(value, len)) {
1365 1366 1367
    pVal->type = TSDB_DATA_TYPE_TINYINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 2] = '\0';
1368
    if (!isValidInteger(value)) {
1369
      return false;
1370 1371 1372 1373 1374 1375
    }
    pVal->value = calloc(pVal->length, 1);
    int8_t val = (int8_t)strtoll(value, NULL, 10);
    memcpy(pVal->value, &val, pVal->length);
    return true;
  }
1376
  if (isTinyUint(value, len)) {
1377 1378 1379
    pVal->type = TSDB_DATA_TYPE_UTINYINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 2] = '\0';
1380
    if (!isValidInteger(value)) {
1381 1382 1383 1384 1385 1386 1387
      return false;
    }
    pVal->value = calloc(pVal->length, 1);
    uint8_t val = (uint8_t)strtoul(value, NULL, 10);
    memcpy(pVal->value, &val, pVal->length);
    return true;
  }
1388
  if (isSmallInt(value, len)) {
1389 1390 1391
    pVal->type = TSDB_DATA_TYPE_SMALLINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1392
    if (!isValidInteger(value)) {
1393 1394 1395 1396 1397 1398 1399
      return false;
    }
    pVal->value = calloc(pVal->length, 1);
    int16_t val = (int16_t)strtoll(value, NULL, 10);
    memcpy(pVal->value, &val, pVal->length);
    return true;
  }
1400
  if (isSmallUint(value, len)) {
1401 1402 1403
    pVal->type = TSDB_DATA_TYPE_USMALLINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1404
    if (!isValidInteger(value)) {
1405 1406 1407 1408 1409 1410 1411 1412
      return false;
    }
    pVal->value = calloc(pVal->length, 1);
    uint16_t val = (uint16_t)strtoul(value, NULL, 10);
    memcpy(pVal->value, &val, pVal->length);
    //memcpy(pVal->value, &val, pVal->length);
    return true;
  }
1413
  if (isInt(value, len)) {
1414 1415 1416
    pVal->type = TSDB_DATA_TYPE_INT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1417
    if (!isValidInteger(value)) {
1418 1419 1420 1421 1422 1423 1424
      return false;
    }
    pVal->value = calloc(pVal->length, 1);
    int32_t val = (int32_t)strtoll(value, NULL, 10);
    memcpy(pVal->value, &val, pVal->length);
    return true;
  }
1425
  if (isUint(value, len)) {
1426 1427 1428
    pVal->type = TSDB_DATA_TYPE_UINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1429
    if (!isValidInteger(value)) {
1430 1431 1432 1433 1434 1435 1436
      return false;
    }
    pVal->value = calloc(pVal->length, 1);
    uint32_t val = (uint32_t)strtoul(value, NULL, 10);
    memcpy(pVal->value, &val, pVal->length);
    return true;
  }
1437
  if (isBigInt(value, len)) {
1438 1439 1440
    pVal->type = TSDB_DATA_TYPE_BIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1441
    if (!isValidInteger(value)) {
1442 1443 1444 1445 1446 1447 1448
      return false;
    }
    pVal->value = calloc(pVal->length, 1);
    int64_t val = (int64_t)strtoll(value, NULL, 10);
    memcpy(pVal->value, &val, pVal->length);
    return true;
  }
1449
  if (isBigUint(value, len)) {
1450 1451 1452
    pVal->type = TSDB_DATA_TYPE_UBIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
1453
    if (!isValidInteger(value)) {
1454 1455 1456 1457 1458 1459 1460
      return false;
    }
    pVal->value = calloc(pVal->length, 1);
    uint64_t val = (uint64_t)strtoul(value, NULL, 10);
    memcpy(pVal->value, &val, pVal->length);
    return true;
  }
1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512
  //floating number
  if (isFloat(value, len)) {
    pVal->type = TSDB_DATA_TYPE_FLOAT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
    if (!isValidFloat(value)) {
      return false;
    }
    pVal->value = calloc(pVal->length, 1);
    float val = (float)strtold(value, NULL);
    memcpy(pVal->value, &val, pVal->length);
    return true;
  }
  if (isDouble(value, len)) {
    pVal->type = TSDB_DATA_TYPE_DOUBLE;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    value[len - 3] = '\0';
    if (!isValidFloat(value)) {
      return false;
    }
    pVal->value = calloc(pVal->length, 1);
    double val = (double)strtold(value, NULL);
    memcpy(pVal->value, &val, pVal->length);
    return true;
  }
  //binary
  if (isBinary(value, len)) {
    pVal->type = TSDB_DATA_TYPE_BINARY;
    pVal->length = len - 2;
    pVal->value = calloc(pVal->length, 1);
    //copy after "
    memcpy(pVal->value, value + 1, pVal->length);
    return true;
  }
  //nchar
  if (isNchar(value, len)) {
    pVal->type = TSDB_DATA_TYPE_NCHAR;
    pVal->length = len - 3;
    pVal->value = calloc(pVal->length, 1);
    //copy after L"
    memcpy(pVal->value, value + 2, pVal->length);
    return true;
  }
  //bool
  bool bVal;
  if (isBool(value, len, &bVal)) {
    pVal->type = TSDB_DATA_TYPE_BOOL;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->value = calloc(pVal->length, 1);
    memcpy(pVal->value, &bVal, pVal->length);
    return true;
  }
G
Ganlin Zhao 已提交
1513 1514 1515 1516 1517 1518 1519 1520 1521
  //Handle default(no appendix) as float
  if (isValidInteger(value) || isValidFloat(value)) {
    pVal->type = TSDB_DATA_TYPE_FLOAT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    pVal->value = calloc(pVal->length, 1);
    float val = (float)strtold(value, NULL);
    memcpy(pVal->value, &val, pVal->length);
    return true;
  }
1522
  return false;
1523
}
1524

1525 1526
static int32_t getTimeStampValue(char *value, uint16_t len,
                                 SMLTimeStampType type, int64_t *ts) {
1527 1528 1529 1530 1531

  if (len >= 2) {
    for (int i = 0; i < len - 2; ++i) {
      if(!isdigit(value[i])) {
        return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
1532
      }
1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545
    }
  }
  //No appendix or no timestamp given (len = 0)
  if (len >= 1 && isdigit(value[len - 1]) && type != SML_TIME_STAMP_NOW) {
    type = SML_TIME_STAMP_MICRO_SECONDS;
  }
  if (len != 0) {
    *ts = (int64_t)strtoll(value, NULL, 10);
  } else {
    type = SML_TIME_STAMP_NOW;
  }
  switch (type) {
    case SML_TIME_STAMP_NOW: {
1546
      *ts = taosGetTimestampNs();
1547
      break;
1548 1549
    }
    case SML_TIME_STAMP_SECONDS: {
1550
      *ts = (int64_t)(*ts * 1e9);
1551
      break;
1552 1553
    }
    case SML_TIME_STAMP_MILLI_SECONDS: {
1554
      *ts = convertTimePrecision(*ts, TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_NANO);
1555
      break;
1556 1557
    }
    case SML_TIME_STAMP_MICRO_SECONDS: {
1558
      *ts = convertTimePrecision(*ts, TSDB_TIME_PRECISION_MICRO, TSDB_TIME_PRECISION_NANO);
1559
      break;
1560 1561
    }
    case SML_TIME_STAMP_NANO_SECONDS: {
1562
      *ts = *ts * 1;
1563 1564 1565 1566 1567
      break;
    }
    default: {
      return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
    }
1568
  }
1569
  return TSDB_CODE_SUCCESS;
1570 1571
}

1572 1573
static int32_t convertSmlTimeStamp(TAOS_SML_KV *pVal, char *value,
                                   uint16_t len) {
1574 1575 1576
  int32_t ret;
  SMLTimeStampType type;
  int64_t tsVal;
1577

1578
  if (!isTimeStamp(value, len, &type)) {
1579
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
1580 1581
  }

1582
  ret = getTimeStampValue(value, len, type, &tsVal);
1583 1584 1585
  if (ret) {
    return ret;
  }
1586
  tscDebug("Timestamp after conversion:%"PRId64, tsVal);
1587 1588 1589 1590 1591 1592 1593 1594

  pVal->type = TSDB_DATA_TYPE_TIMESTAMP;
  pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
  pVal->value = calloc(pVal->length, 1);
  memcpy(pVal->value, &tsVal, pVal->length);
  return TSDB_CODE_SUCCESS;
}

1595
static int32_t parseSmlTimeStamp(TAOS_SML_KV **pTS, const char **index) {
1596
  const char *start, *cur;
1597 1598 1599
  int32_t ret = TSDB_CODE_SUCCESS;
  int len = 0;
  char key[] = "_ts";
1600
  char *value = NULL;
1601

1602
  start = cur = *index;
1603
  *pTS = calloc(1, sizeof(TAOS_SML_KV));
1604

1605
  while(*cur != '\0') {
1606 1607 1608 1609
    cur++;
    len++;
  }

1610
  if (len > 0) {
1611
    value = calloc(len + 1, 1);
1612 1613 1614 1615 1616
    memcpy(value, start, len);
  }

  ret = convertSmlTimeStamp(*pTS, value, len);
  if (ret) {
1617
    free(value);
1618 1619
    free(*pTS);
    return ret;
1620
  }
1621
  free(value);
1622

1623 1624 1625
  (*pTS)->key = calloc(sizeof(key), 1);
  memcpy((*pTS)->key, key, sizeof(key));
  return ret;
1626
}
1627

1628
static int32_t parseSmlKey(TAOS_SML_KV *pKV, const char **index) {
1629 1630 1631 1632
  const char *cur = *index;
  char key[TSDB_COL_NAME_LEN];
  uint16_t len = 0;

G
Ganlin Zhao 已提交
1633 1634 1635
  //key field cannot start with digit
  if (isdigit(*cur)) {
    tscError("Tag key cannnot start with digit\n");
1636
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
1637 1638 1639
  }
  while (*cur != '\0') {
    if (len > TSDB_COL_NAME_LEN) {
1640
      tscDebug("Key field cannot exceeds 65 characters");
1641
      return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
1642 1643 1644 1645 1646 1647 1648
    }
    //unescaped '=' identifies a tag key
    if (*cur == '=' && *(cur - 1) != '\\') {
      break;
    }
    //Escape special character
    if (*cur == '\\') {
1649
      escapeSpecialCharacter(2, &cur);
1650
    }
1651 1652 1653 1654 1655
    key[len] = *cur;
    cur++;
    len++;
  }
  key[len] = '\0';
1656

1657 1658
  pKV->key = calloc(len + 1, 1);
  memcpy(pKV->key, key, len + 1);
G
Ganlin Zhao 已提交
1659
  //tscDebug("Key:%s|len:%d", pKV->key, len);
1660
  *index = cur + 1;
1661
  return TSDB_CODE_SUCCESS;
1662
}
1663

1664

1665 1666
static bool parseSmlValue(TAOS_SML_KV *pKV, const char **index,
                          bool *is_last_kv) {
1667 1668
  const char *start, *cur;
  char *value = NULL;
1669
  uint16_t len = 0;
1670 1671
  start = cur = *index;

1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682
  while (1) {
    // unescaped ',' or ' ' or '\0' identifies a value
    if ((*cur == ',' || *cur == ' ' || *cur == '\0') && *(cur - 1) != '\\') {
      //unescaped ' ' or '\0' indicates end of value
      *is_last_kv = (*cur == ' ' || *cur == '\0') ? true : false;
      break;
    }
    //Escape special character
    if (*cur == '\\') {
      escapeSpecialCharacter(2, &cur);
    }
1683 1684 1685
    cur++;
    len++;
  }
1686

1687 1688 1689 1690 1691 1692
  value = calloc(len + 1, 1);
  memcpy(value, start, len);
  value[len] = '\0';
  if (!convertSmlValueType(pKV, value, len)) {
    //free previous alocated key field
    free(pKV->key);
1693
    free(value);
1694
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
1695
  }
1696
  free(value);
1697

1698 1699 1700 1701 1702 1703 1704 1705 1706 1707
  *index = (*cur == '\0') ? cur : cur + 1;
  return TSDB_CODE_SUCCESS;
}

static int32_t parseSmlMeasurement(TAOS_SML_DATA_POINT *pSml, const char **index,
                                   uint8_t *has_tags) {
  const char *cur = *index;
  uint16_t len = 0;

  pSml->stableName = calloc(TSDB_TABLE_NAME_LEN, 1);
G
Ganlin Zhao 已提交
1708 1709
  if (isdigit(*cur)) {
    tscError("Measurement field cannnot start with digit");
1710
    free(pSml->stableName);
1711
    pSml->stableName = NULL;
1712
    return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
1713 1714
  }

1715 1716 1717 1718
  while (*cur != '\0') {
    if (len > TSDB_TABLE_NAME_LEN) {
      tscError("Measurement field cannot exceeds 193 characters");
      free(pSml->stableName);
1719
      pSml->stableName = NULL;
1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743
      return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
    }
    //first unescaped comma or space identifies measurement
    //if space detected first, meaning no tag in the input
    if (*cur == ',' && *(cur - 1) != '\\') {
      *has_tags = 1;
      break;
    }
    if (*cur == ' ' && *(cur - 1) != '\\') {
      break;
    }
    //Comma, Space, Backslash needs to be escaped if any
    if (*cur == '\\') {
      escapeSpecialCharacter(1, &cur);
    }
    pSml->stableName[len] = *cur;
    cur++;
    len++;
  }
  pSml->stableName[len] = '\0';
  *index = cur + 1;
  tscDebug("Stable name in measurement:%s|len:%d", pSml->stableName, len);

  return TSDB_CODE_SUCCESS;
1744
}
1745

1746
static int32_t parseSmlKvPairs(TAOS_SML_KV **pKVs, int *num_kvs,
1747
                               const char **index, bool isField, TAOS_SML_DATA_POINT* smlData) {
1748
  const char *cur = *index;
1749
  int32_t ret = TSDB_CODE_SUCCESS;
1750 1751
  TAOS_SML_KV *pkv;
  bool is_last_kv = false;
1752

1753
  int32_t capacity = 0;
1754
  if (isField) {
1755 1756 1757
    capacity = 64;
    *pKVs = calloc(capacity, sizeof(TAOS_SML_KV));
    // leave space for timestamp;
1758 1759
    pkv = *pKVs;
    pkv++;
1760 1761 1762
  } else {
    capacity = 8;
    *pKVs = calloc(capacity, sizeof(TAOS_SML_KV));
1763 1764
    pkv = *pKVs;
  }
1765

1766
  while (*cur != '\0') {
1767
    ret = parseSmlKey(pkv, &cur);
1768
    if (ret) {
1769
      tscError("Unable to parse key field");
1770 1771
      goto error;
    }
1772
    ret = parseSmlValue(pkv, &cur, &is_last_kv);
1773
    if (ret) {
1774
      tscError("Unable to parse value field");
1775 1776
      goto error;
    }
1777 1778 1779 1780 1781 1782 1783 1784 1785 1786
    if (!isField &&
        (strcasecmp(pkv->key, "ID") == 0) && pkv->type == TSDB_DATA_TYPE_BINARY) {
      smlData->childTableName = malloc( pkv->length + 1);
      memcpy(smlData->childTableName, pkv->value, pkv->length);
      smlData->childTableName[pkv->length] = '\0';
      free(pkv->key);
      free(pkv->value);
    } else {
      *num_kvs += 1;
    }
1787
    if (is_last_kv) {
G
Ganlin Zhao 已提交
1788
      //tscDebug("last key-value field detected");
1789 1790 1791 1792 1793
      goto done;
    }

    //reallocate addtional memory for more kvs
    TAOS_SML_KV *more_kvs = NULL;
1794

1795
    if (isField) {
1796 1797
      if ((*num_kvs + 2) > capacity) {
        capacity *= 3; capacity /= 2;
1798 1799 1800
        more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV));
      } else {
        more_kvs = *pKVs;
1801
      }
1802
    } else {
1803 1804
      if ((*num_kvs + 1) > capacity) {
        capacity *= 3; capacity /= 2;
1805 1806 1807
        more_kvs = realloc(*pKVs, capacity * sizeof(TAOS_SML_KV));
      } else {
        more_kvs = *pKVs;
1808
      }
1809
    }
1810

1811
    if (!more_kvs) {
1812 1813
      goto error;
    }
1814 1815 1816 1817 1818 1819 1820 1821 1822
    *pKVs = more_kvs;
    //move pKV points to next TAOS_SML_KV block
    if (isField) {
      pkv = *pKVs + *num_kvs + 1;
    } else {
      pkv = *pKVs + *num_kvs;
    }
  }
  goto done;
1823

1824
error:
1825
  return ret;
1826
done:
1827
  *index = cur;
1828
  return ret;
1829
}
1830

1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843
static void moveTimeStampToFirstKv(TAOS_SML_DATA_POINT** smlData, TAOS_SML_KV *ts) {
  TAOS_SML_KV* tsField = (*smlData)->fields;
  tsField->length = ts->length;
  tsField->type = ts->type;
  tsField->value = malloc(ts->length);
  tsField->key = malloc(strlen(ts->key) + 1);
  memcpy(tsField->key, ts->key, strlen(ts->key) + 1);
  memcpy(tsField->value, ts->value, ts->length);
  (*smlData)->fieldNum = (*smlData)->fieldNum + 1;

  free(ts->key);
  free(ts->value);
  free(ts);
1844 1845
}

1846
int32_t tscParseLine(const char* sql, TAOS_SML_DATA_POINT* smlData) {
1847
  const char* index = sql;
1848
  int32_t ret = TSDB_CODE_SUCCESS;
1849 1850 1851
  uint8_t has_tags = 0;
  TAOS_SML_KV *timestamp = NULL;

1852
  ret = parseSmlMeasurement(smlData, &index, &has_tags);
1853
  if (ret) {
1854
    tscError("Unable to parse measurement");
1855
    return ret;
1856
  }
1857
  tscDebug("Parse measurement finished, has_tags:%d", has_tags);
1858 1859 1860

  //Parse Tags
  if (has_tags) {
1861
    ret = parseSmlKvPairs(&smlData->tags, &smlData->tagNum, &index, false, smlData);
1862
    if (ret) {
1863
      tscError("Unable to parse tag");
1864 1865
      return ret;
    }
1866
  }
1867
  tscDebug("Parse tags finished, num of tags:%d", smlData->tagNum);
1868 1869

  //Parse fields
1870
  ret = parseSmlKvPairs(&smlData->fields, &smlData->fieldNum, &index, true, smlData);
1871
  if (ret) {
1872
    tscError("Unable to parse field");
1873
    return ret;
1874
  }
1875
  tscDebug("Parse fields finished, num of fields:%d", smlData->fieldNum);
1876

1877
  //Parse timestamp
1878
  ret = parseSmlTimeStamp(&timestamp, &index);
1879
  if (ret) {
1880
    tscError("Unable to parse timestamp");
1881
    return ret;
1882
  }
1883 1884
  moveTimeStampToFirstKv(&smlData, timestamp);
  tscDebug("Parse timestamp finished");
1885

1886
  return TSDB_CODE_SUCCESS;
1887 1888
}

1889
//=========================================================================
1890

S
shenglian zhou 已提交
1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905
void destroySmlDataPoint(TAOS_SML_DATA_POINT* point) {
  for (int i=0; i<point->tagNum; ++i) {
    free((point->tags+i)->key);
    free((point->tags+i)->value);
  }
  free(point->tags);
  for (int i=0; i<point->fieldNum; ++i) {
    free((point->fields+i)->key);
    free((point->fields+i)->value);
  }
  free(point->fields);
  free(point->stableName);
  free(point->childTableName);
}

1906 1907
int32_t tscParseLines(char* lines[], int numLines, SArray* points, SArray* failedLines) {
  for (int32_t i = 0; i < numLines; ++i) {
1908
    TAOS_SML_DATA_POINT point = {0};
1909 1910
    int32_t code = tscParseLine(lines[i], &point);
    if (code != TSDB_CODE_SUCCESS) {
1911
      tscError("data point line parse failed. line %d : %s", i, lines[i]);
S
shenglian zhou 已提交
1912
      destroySmlDataPoint(&point);
1913 1914 1915 1916 1917
      return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
    } else {
      tscDebug("data point line parse success. line %d", i);
    }

1918 1919 1920 1921 1922
    taosArrayPush(points, &point);
  }
  return 0;
}

1923
int taos_insert_lines(TAOS* taos, char* lines[], int numLines) {
1924
  int32_t code = 0;
1925 1926 1927

  if (numLines <= 0 || numLines > 65536) {
    tscError("taos_insert_lines numLines should be between 1 and 65536. numLines: %d", numLines);
1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939
    code = TSDB_CODE_TSC_APP_ERROR;
    return code;
  }

  for (int i = 0; i < numLines; ++i) {
    if (lines[i] == NULL) {
      tscError("taos_insert_lines line %d is NULL", i);
      code = TSDB_CODE_TSC_APP_ERROR;
      return code;
    }
  }

1940
  SArray* lpPoints = taosArrayInit(numLines, sizeof(TAOS_SML_DATA_POINT));
1941 1942 1943 1944
  if (lpPoints == NULL) {
    tscError("taos_insert_lines failed to allocate memory");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
1945

1946
  tscDebug("taos_insert_lines begin inserting %d lines, first line: %s", numLines, lines[0]);
1947
  code = tscParseLines(lines, numLines, lpPoints, NULL);
S
shenglian zhou 已提交
1948 1949
  size_t numPoints = taosArrayGetSize(lpPoints);

1950 1951
  if (code != 0) {
    goto cleanup;
1952 1953
  }

1954
  TAOS_SML_DATA_POINT* points = TARRAY_GET_START(lpPoints);
1955 1956 1957 1958
  code = taos_sml_insert(taos, points, (int)numPoints);
  if (code != 0) {
    tscError("taos_sml_insert error: %s", tstrerror((code)));
  }
S
Shenglian Zhou 已提交
1959

1960
cleanup:
1961
  tscDebug("taos_insert_lines finish inserting %d lines. code: %d", numLines, code);
1962 1963
  points = TARRAY_GET_START(lpPoints);
  numPoints = taosArrayGetSize(lpPoints);
S
Shenglian Zhou 已提交
1964 1965 1966
  for (int i=0; i<numPoints; ++i) {
    destroySmlDataPoint(points+i);
  }
1967 1968

  taosArrayDestroy(lpPoints);
1969
  return code;
1970 1971
}