clientSml.c 55.4 KB
Newer Older
wmmhello's avatar
wmmhello 已提交
1 2 3 4 5 6 7
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include "query.h"
#include "taos.h"
wmmhello's avatar
wmmhello 已提交
8 9 10 11
#include "taoserror.h"
#include "tdef.h"
#include "tlog.h"
#include "tmsg.h"
wmmhello's avatar
wmmhello 已提交
12
#include "tstrbuild.h"
wmmhello's avatar
wmmhello 已提交
13 14 15 16
#include "ttime.h"
#include "ttypes.h"
#include "tcommon.h"
#include "catalog.h"
17
#include "clientInt.h"
wmmhello's avatar
wmmhello 已提交
18
//=================================================================================================
wmmhello's avatar
wmmhello 已提交
19 20 21 22 23 24

#define SPACE ' '
#define COMMA ','
#define EQUAL '='
#define QUOTE '"'
#define SLASH '\\'
wmmhello's avatar
wmmhello 已提交
25
#define tsMaxSQLStringLen (1024*1024)
wmmhello's avatar
wmmhello 已提交
26

wmmhello's avatar
wmmhello 已提交
27 28 29 30 31 32 33 34 35 36 37 38 39
//=================================================================================================
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;

typedef enum {
  SCHEMA_ACTION_CREATE_STABLE,
  SCHEMA_ACTION_ADD_COLUMN,
  SCHEMA_ACTION_ADD_TAG,
  SCHEMA_ACTION_CHANGE_COLUMN_SIZE,
  SCHEMA_ACTION_CHANGE_TAG_SIZE,
} ESchemaAction;

typedef struct {
  char sTableName[TSDB_TABLE_NAME_LEN];
40 41
  SArray *tags;
  SArray *fields;
wmmhello's avatar
wmmhello 已提交
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
} SCreateSTableActionInfo;

typedef struct {
  char sTableName[TSDB_TABLE_NAME_LEN];
  SSmlKv * field;
} SAlterSTableActionInfo;

typedef struct {
  ESchemaAction action;
  union {
    SCreateSTableActionInfo createSTable;
    SAlterSTableActionInfo alterSTable;
  };
} SSchemaAction;

typedef struct {
  const char* measure;
  const char* tags;
  const char* cols;
  const char* timestamp;

  int32_t measureLen;
  int32_t measureTagsLen;
  int32_t tagsLen;
  int32_t colsLen;
  int32_t timestampLen;
} SSmlLineInfo;

typedef struct {
  const char     *sTableName;   // super table name
  uint8_t        sTableNameLen;
  char           childTableName[TSDB_TABLE_NAME_LEN];
  uint64_t       uid;

  SArray         *tags;

  // colsFormat store cols formated, for quick parse, if info->formatData is true
  SArray         *colsFormat;  // elements are SArray<SSmlKv*>

81
  // cols store cols un formated
wmmhello's avatar
wmmhello 已提交
82 83 84 85
  SArray         *cols;        // elements are SHashObj<cols key string, SSmlKv*> for find by key quickly
} SSmlTableInfo;

typedef struct {
86 87 88 89
  SArray     *tags;       // save the origin order to create table
  SHashObj   *tagHash;    // elements are <key, index in tags>

  SArray     *cols;
wmmhello's avatar
wmmhello 已提交
90
  SHashObj   *fieldHash;
91

wmmhello's avatar
wmmhello 已提交
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
  STableMeta *tableMeta;
} SSmlSTableMeta;

typedef struct {
  int32_t   len;
  char      *buf;
} SSmlMsgBuf;

typedef struct {
  uint64_t          id;

  SMLProtocolType   protocol;
  int8_t            precision;
  bool              dataFormat;     // true means that the name, number and order of keys in each line are the same

  SHashObj          *childTables;
  SHashObj          *superTables;
  SHashObj          *pVgHash;
  void              *exec;

  STscObj           *taos;
  SCatalog          *pCatalog;
  SRequestObj       *pRequest;
  SQuery            *pQuery;

  int32_t           affectedRows;
  SSmlMsgBuf        msgBuf;
119 120
  SHashObj          *dumplicateKey;  // for dumplicate key
  SArray            *colsContainer;  // for cols parse, if is dataFormat == false
wmmhello's avatar
wmmhello 已提交
121
} SSmlHandle;
wmmhello's avatar
wmmhello 已提交
122 123 124
//=================================================================================================

static uint64_t linesSmlHandleId = 0;
wmmhello's avatar
wmmhello 已提交
125 126
static const char* TS = "_ts";
static const char* TAG = "_tagNone";
wmmhello's avatar
wmmhello 已提交
127

wmmhello's avatar
wmmhello 已提交
128
//=================================================================================================
wmmhello's avatar
wmmhello 已提交
129

wmmhello's avatar
wmmhello 已提交
130
static uint64_t smlGenId() {
wmmhello's avatar
wmmhello 已提交
131 132 133 134 135 136 137 138 139
  uint64_t id;

  do {
    id = atomic_add_fetch_64(&linesSmlHandleId, 1);
  } while (id == 0);

  return id;
}

wmmhello's avatar
wmmhello 已提交
140
static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf* pBuf, const char *msg1, const char *msg2) {
wmmhello's avatar
wmmhello 已提交
141 142 143 144 145 146
  if(msg1) strncat(pBuf->buf, msg1, pBuf->len);
  int32_t left = pBuf->len - strlen(pBuf->buf);
  if(left > 2 && msg2) {
    strncat(pBuf->buf, ":", left - 1);
    strncat(pBuf->buf, msg2, left - 2);
  }
wmmhello's avatar
wmmhello 已提交
147 148 149
  return TSDB_CODE_SML_INVALID_DATA;
}

wmmhello's avatar
wmmhello 已提交
150
static int smlCompareKv(const void* p1, const void* p2) {
151 152
  SSmlKv* kv1 = *(SSmlKv**)p1;
  SSmlKv* kv2 = *(SSmlKv**)p2;
wmmhello's avatar
wmmhello 已提交
153 154
  int32_t kvLen1 = kv1->keyLen;
  int32_t kvLen2 = kv2->keyLen;
wmmhello's avatar
wmmhello 已提交
155
  int32_t res = strncasecmp(kv1->key, kv2->key, TMIN(kvLen1, kvLen2));
wmmhello's avatar
wmmhello 已提交
156 157 158 159 160 161 162
  if (res != 0) {
    return res;
  } else {
    return kvLen1-kvLen2;
  }
}

wmmhello's avatar
wmmhello 已提交
163
static void smlBuildChildTableName(SSmlTableInfo *tags) {
wmmhello's avatar
wmmhello 已提交
164 165
  int32_t size = taosArrayGetSize(tags->tags);
  ASSERT(size > 0);
wmmhello's avatar
wmmhello 已提交
166
  taosArraySort(tags->tags, smlCompareKv);
wmmhello's avatar
wmmhello 已提交
167

wmmhello's avatar
wmmhello 已提交
168 169 170 171 172 173
  SStringBuilder sb = {0};
  taosStringBuilderAppendStringLen(&sb, tags->sTableName, tags->sTableNameLen);
  for (int j = 0; j < size; ++j) {
    SSmlKv *tagKv = taosArrayGetP(tags->tags, j);
    taosStringBuilderAppendStringLen(&sb, tagKv->key, tagKv->keyLen);
    taosStringBuilderAppendStringLen(&sb, tagKv->value, tagKv->valueLen);
wmmhello's avatar
wmmhello 已提交
174 175 176 177 178 179 180 181
  }
  size_t len = 0;
  char* keyJoined = taosStringBuilderGetResult(&sb, &len);
  T_MD5_CTX context;
  tMD5Init(&context);
  tMD5Update(&context, (uint8_t *)keyJoined, (uint32_t)len);
  tMD5Final(&context);
  uint64_t digest1 = *(uint64_t*)(context.digest);
182 183 184
  //uint64_t digest2 = *(uint64_t*)(context.digest + 8);
  //snprintf(tags->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64"%016"PRIx64, digest1, digest2);
  snprintf(tags->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64, digest1);
wmmhello's avatar
wmmhello 已提交
185
  taosStringBuilderDestroy(&sb);
wmmhello's avatar
wmmhello 已提交
186
  tags->uid = digest1;
wmmhello's avatar
wmmhello 已提交
187 188
}

wmmhello's avatar
wmmhello 已提交
189 190
static int32_t smlGenerateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, SArray* dbAttrArray, bool isTag, char sTableName[],
                                       SSchemaAction* action, bool* actionNeeded, SSmlHandle* info) {
wmmhello's avatar
wmmhello 已提交
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
//  char fieldName[TSDB_COL_NAME_LEN] = {0};
//  strcpy(fieldName, pointColField->name);
//
//  size_t* pDbIndex = taosHashGet(dbAttrHash, fieldName, strlen(fieldName));
//  if (pDbIndex) {
//    SSchema* dbAttr = taosArrayGet(dbAttrArray, *pDbIndex);
//    assert(strcasecmp(dbAttr->name, pointColField->name) == 0);
//    if (pointColField->type != dbAttr->type) {
//      uError("SML:0x%"PRIx64" point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id, pointColField->name,
//               pointColField->type, dbAttr->type);
//      return TSDB_CODE_TSC_INVALID_VALUE;
//    }
//
//    if (IS_VAR_DATA_TYPE(pointColField->type) && (pointColField->bytes > dbAttr->bytes)) {
//      if (isTag) {
//        action->action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
//      } else {
//        action->action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
//      }
//      memset(&action->alterSTable, 0,  sizeof(SAlterSTableActionInfo));
//      memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN);
//      action->alterSTable.field = pointColField;
//      *actionNeeded = true;
//    }
//  } else {
//    if (isTag) {
//      action->action = SCHEMA_ACTION_ADD_TAG;
//    } else {
//      action->action = SCHEMA_ACTION_ADD_COLUMN;
//    }
//    memset(&action->alterSTable, 0, sizeof(SAlterSTableActionInfo));
//    memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN);
//    action->alterSTable.field = pointColField;
//    *actionNeeded = true;
//  }
//  if (*actionNeeded) {
//    uDebug("SML:0x%" PRIx64 " generate schema action. column name: %s, action: %d", info->id, fieldName,
//             action->action);
//  }
wmmhello's avatar
wmmhello 已提交
230 231 232
  return 0;
}

wmmhello's avatar
wmmhello 已提交
233
static int32_t smlBuildColumnDescription(SSmlKv* field, char* buf, int32_t bufSize, int32_t* outBytes) {
wmmhello's avatar
wmmhello 已提交
234
  uint8_t type = field->type;
wmmhello's avatar
wmmhello 已提交
235 236
  char    tname[TSDB_TABLE_NAME_LEN] = {0};
  memcpy(tname, field->key, field->keyLen);
wmmhello's avatar
wmmhello 已提交
237
  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
wmmhello's avatar
wmmhello 已提交
238
    int32_t bytes = field->valueLen;   // todo
wmmhello's avatar
wmmhello 已提交
239
    int out = snprintf(buf, bufSize,"%s %s(%d)",
wmmhello's avatar
wmmhello 已提交
240
                       tname,tDataTypes[field->type].name, bytes);
wmmhello's avatar
wmmhello 已提交
241 242
    *outBytes = out;
  } else {
wmmhello's avatar
wmmhello 已提交
243
    int out = snprintf(buf, bufSize, "%s %s", tname, tDataTypes[type].name);
wmmhello's avatar
wmmhello 已提交
244 245 246 247 248 249
    *outBytes = out;
  }

  return 0;
}

wmmhello's avatar
wmmhello 已提交
250
static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
wmmhello's avatar
wmmhello 已提交
251 252
  int32_t code = 0;
  int32_t outBytes = 0;
wmmhello's avatar
wmmhello 已提交
253
  char *result = (char *)taosMemoryCalloc(1, tsMaxSQLStringLen+1);
wmmhello's avatar
wmmhello 已提交
254 255 256 257 258 259
  int32_t capacity = tsMaxSQLStringLen +  1;

  uDebug("SML:0x%"PRIx64" apply schema action. action: %d", info->id, action->action);
  switch (action->action) {
    case SCHEMA_ACTION_ADD_COLUMN: {
      int n = sprintf(result, "alter stable %s add column ", action->alterSTable.sTableName);
wmmhello's avatar
wmmhello 已提交
260 261
      smlBuildColumnDescription(action->alterSTable.field, result+n, capacity-n, &outBytes);
      TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery
wmmhello's avatar
wmmhello 已提交
262
      code = taos_errno(res);
wmmhello's avatar
wmmhello 已提交
263
      const char* errStr = taos_errstr(res);
wmmhello's avatar
wmmhello 已提交
264 265 266 267 268 269 270
      char* begin = strstr(errStr, "duplicated column names");
      bool tscDupColNames = (begin != NULL);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%"PRIx64" apply schema action. error: %s", info->id, errStr);
      }
      taos_free_result(res);

wmmhello's avatar
wmmhello 已提交
271 272
//      if (code == TSDB_CODE_MND_FIELD_ALREADY_EXIST || code == TSDB_CODE_MND_TAG_ALREADY_EXIST || tscDupColNames) {
      if (code == TSDB_CODE_MND_TAG_ALREADY_EXIST || tscDupColNames) {
wmmhello's avatar
wmmhello 已提交
273
        TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
wmmhello's avatar
wmmhello 已提交
274 275 276 277 278 279 280 281 282 283 284
        code = taos_errno(res2);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
        taos_free_result(res2);
        taosMsleep(500);
      }
      break;
    }
    case SCHEMA_ACTION_ADD_TAG: {
      int n = sprintf(result, "alter stable %s add tag ", action->alterSTable.sTableName);
wmmhello's avatar
wmmhello 已提交
285
      smlBuildColumnDescription(action->alterSTable.field,
wmmhello's avatar
wmmhello 已提交
286
                             result+n, capacity-n, &outBytes);
wmmhello's avatar
wmmhello 已提交
287
      TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery
wmmhello's avatar
wmmhello 已提交
288
      code = taos_errno(res);
wmmhello's avatar
wmmhello 已提交
289
      const char* errStr = taos_errstr(res);
wmmhello's avatar
wmmhello 已提交
290 291 292 293 294 295 296
      char* begin = strstr(errStr, "duplicated column names");
      bool tscDupColNames = (begin != NULL);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
      }
      taos_free_result(res);

wmmhello's avatar
wmmhello 已提交
297 298
//      if (code ==TSDB_CODE_MND_TAG_ALREADY_EXIST || code == TSDB_CODE_MND_FIELD_ALREAY_EXIST || tscDupColNames) {
      if (code ==TSDB_CODE_MND_TAG_ALREADY_EXIST || tscDupColNames) {
wmmhello's avatar
wmmhello 已提交
299
        TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
wmmhello's avatar
wmmhello 已提交
300 301 302 303 304 305 306 307 308 309 310
        code = taos_errno(res2);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
        taos_free_result(res2);
        taosMsleep(500);
      }
      break;
    }
    case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: {
      int n = sprintf(result, "alter stable %s modify column ", action->alterSTable.sTableName);
wmmhello's avatar
wmmhello 已提交
311
      smlBuildColumnDescription(action->alterSTable.field, result+n,
wmmhello's avatar
wmmhello 已提交
312
                             capacity-n, &outBytes);
wmmhello's avatar
wmmhello 已提交
313
      TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery
wmmhello's avatar
wmmhello 已提交
314 315 316 317 318 319
      code = taos_errno(res);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
      }
      taos_free_result(res);

wmmhello's avatar
wmmhello 已提交
320 321
//      if (code == TSDB_CODE_MND_INVALID_COLUMN_LENGTH || code == TSDB_CODE_TSC_INVALID_COLUMN_LENGTH) {
      if (code == TSDB_CODE_TSC_INVALID_COLUMN_LENGTH) {
wmmhello's avatar
wmmhello 已提交
322
        TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
wmmhello's avatar
wmmhello 已提交
323 324 325 326 327 328 329 330 331 332 333
        code = taos_errno(res2);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
        taos_free_result(res2);
        taosMsleep(500);
      }
      break;
    }
    case SCHEMA_ACTION_CHANGE_TAG_SIZE: {
      int n = sprintf(result, "alter stable %s modify tag ", action->alterSTable.sTableName);
wmmhello's avatar
wmmhello 已提交
334
      smlBuildColumnDescription(action->alterSTable.field, result+n,
wmmhello's avatar
wmmhello 已提交
335
                             capacity-n, &outBytes);
wmmhello's avatar
wmmhello 已提交
336
      TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery
wmmhello's avatar
wmmhello 已提交
337 338 339 340 341 342
      code = taos_errno(res);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
      }
      taos_free_result(res);

wmmhello's avatar
wmmhello 已提交
343 344
//      if (code == TSDB_CODE_MND_INVALID_TAG_LENGTH || code == TSDB_CODE_TSC_INVALID_TAG_LENGTH) {
      if (code == TSDB_CODE_TSC_INVALID_TAG_LENGTH) {
wmmhello's avatar
wmmhello 已提交
345
        TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
wmmhello's avatar
wmmhello 已提交
346 347 348 349 350 351 352 353 354 355 356 357
        code = taos_errno(res2);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
        taos_free_result(res2);
        taosMsleep(500);
      }
      break;
    }
    case SCHEMA_ACTION_CREATE_STABLE: {
      int n = sprintf(result, "create stable %s (", action->createSTable.sTableName);
      char* pos = result + n; int freeBytes = capacity - n;
wmmhello's avatar
wmmhello 已提交
358

359
      SArray *cols = action->createSTable.fields;
wmmhello's avatar
wmmhello 已提交
360 361

      for(int i = 0; i < taosArrayGetSize(cols); i++){
362 363
        SSmlKv *kv = taosArrayGetP(cols, i);
        smlBuildColumnDescription(kv, pos, freeBytes, &outBytes);
wmmhello's avatar
wmmhello 已提交
364 365 366
        pos += outBytes; freeBytes -= outBytes;
        *pos = ','; ++pos; --freeBytes;
      }
wmmhello's avatar
wmmhello 已提交
367

wmmhello's avatar
wmmhello 已提交
368 369 370 371 372
      --pos; ++freeBytes;

      outBytes = snprintf(pos, freeBytes, ") tags (");
      pos += outBytes; freeBytes -= outBytes;

373 374 375 376
      cols = action->createSTable.tags;
      for(int i = 0; i < taosArrayGetSize(cols); i++){
        SSmlKv *kv = taosArrayGetP(cols, i);
        smlBuildColumnDescription(kv, pos, freeBytes, &outBytes);
wmmhello's avatar
wmmhello 已提交
377 378 379 380 381
        pos += outBytes; freeBytes -= outBytes;
        *pos = ','; ++pos; --freeBytes;
      }
      pos--; ++freeBytes;
      outBytes = snprintf(pos, freeBytes, ")");
wmmhello's avatar
wmmhello 已提交
382
      TAOS_RES* res = taos_query(info->taos, result);
wmmhello's avatar
wmmhello 已提交
383 384 385 386 387 388
      code = taos_errno(res);
      if (code != TSDB_CODE_SUCCESS) {
        uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
      }
      taos_free_result(res);

wmmhello's avatar
wmmhello 已提交
389
      if (code == TSDB_CODE_MND_STB_ALREADY_EXIST) {
wmmhello's avatar
wmmhello 已提交
390
        TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
wmmhello's avatar
wmmhello 已提交
391 392 393 394 395 396 397 398 399 400 401 402 403 404
        code = taos_errno(res2);
        if (code != TSDB_CODE_SUCCESS) {
          uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
        }
        taos_free_result(res2);
        taosMsleep(500);
      }
      break;
    }

    default:
      break;
  }

wmmhello's avatar
wmmhello 已提交
405
  taosMemoryFreeClear(result);
wmmhello's avatar
wmmhello 已提交
406 407 408 409 410 411
  if (code != 0) {
    uError("SML:0x%"PRIx64 " apply schema action failure. %s", info->id, tstrerror(code));
  }
  return code;
}

wmmhello's avatar
wmmhello 已提交
412
static int32_t smlModifyDBSchemas(SSmlHandle* info) {
wmmhello's avatar
wmmhello 已提交
413 414 415 416
  int32_t code = 0;

  SSmlSTableMeta** tableMetaSml = taosHashIterate(info->superTables, NULL);
  while (tableMetaSml) {
417
    SSmlSTableMeta* sTableData = *tableMetaSml;
wmmhello's avatar
wmmhello 已提交
418 419

    STableMeta *pTableMeta = NULL;
wmmhello's avatar
wmmhello 已提交
420
    SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
wmmhello's avatar
wmmhello 已提交
421

wmmhello's avatar
wmmhello 已提交
422
    size_t superTableLen = 0;
wmmhello's avatar
wmmhello 已提交
423
    void *superTable = taosHashGetKey(tableMetaSml, &superTableLen);    // todo escape
wmmhello's avatar
wmmhello 已提交
424 425 426
    SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
    strcpy(pName.dbname, info->pRequest->pDb);
    memcpy(pName.tname, superTable, superTableLen);
wmmhello's avatar
wmmhello 已提交
427

wmmhello's avatar
wmmhello 已提交
428
    code = catalogGetSTableMeta(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &pTableMeta);
wmmhello's avatar
wmmhello 已提交
429

wmmhello's avatar
wmmhello 已提交
430
    if (code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_MND_INVALID_STB) {
wmmhello's avatar
wmmhello 已提交
431 432
      SSchemaAction schemaAction = {0};
      schemaAction.action = SCHEMA_ACTION_CREATE_STABLE;
wmmhello's avatar
wmmhello 已提交
433
      memcpy(schemaAction.createSTable.sTableName, superTable, superTableLen);
434 435
      schemaAction.createSTable.tags = sTableData->tags;
      schemaAction.createSTable.fields = sTableData->cols;
wmmhello's avatar
wmmhello 已提交
436 437 438 439 440 441
      code = smlApplySchemaAction(info, &schemaAction);
      if (code != 0) {
        uError("SML:0x%"PRIx64" smlApplySchemaAction failed. can not create %s", info->id, schemaAction.createSTable.sTableName);
        return code;
      }

wmmhello's avatar
wmmhello 已提交
442
      code = catalogGetSTableMeta(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &pTableMeta);
wmmhello's avatar
wmmhello 已提交
443
      if (code != 0) {
wmmhello's avatar
wmmhello 已提交
444
        uError("SML:0x%"PRIx64" catalogGetSTableMeta failed. super table name %s", info->id, schemaAction.createSTable.sTableName);
wmmhello's avatar
wmmhello 已提交
445 446
        return code;
      }
wmmhello's avatar
wmmhello 已提交
447
    }else if (code == TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
448 449 450 451
    } else {
      uError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code));
      return code;
    }
452
    sTableData->tableMeta = pTableMeta;
wmmhello's avatar
wmmhello 已提交
453

wmmhello's avatar
wmmhello 已提交
454
    tableMetaSml = taosHashIterate(info->superTables, tableMetaSml);
wmmhello's avatar
wmmhello 已提交
455 456 457 458 459 460 461 462 463 464 465
  }
  return 0;
}

//=========================================================================

/*        Field                          Escape charaters
    1: measurement                        Comma,Space
    2: tag_key, tag_value, field_key  Comma,Equal Sign,Space
    3: field_value                    Double quote,Backslash
*/
wmmhello's avatar
wmmhello 已提交
466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507
//static void escapeSpecialCharacter(uint8_t field, const char **pos) {
//  const char *cur = *pos;
//  if (*cur != '\\') {
//    return;
//  }
//  switch (field) {
//    case 1:
//      switch (*(cur + 1)) {
//        case ',':
//        case ' ':
//          cur++;
//          break;
//        default:
//          break;
//      }
//      break;
//    case 2:
//      switch (*(cur + 1)) {
//        case ',':
//        case ' ':
//        case '=':
//          cur++;
//          break;
//        default:
//          break;
//      }
//      break;
//    case 3:
//      switch (*(cur + 1)) {
//        case '"':
//        case '\\':
//          cur++;
//          break;
//        default:
//          break;
//      }
//      break;
//    default:
//      break;
//  }
//  *pos = cur;
//}
wmmhello's avatar
wmmhello 已提交
508

wmmhello's avatar
wmmhello 已提交
509
static bool smlParseTinyInt(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
510 511
  const char *pVal = kvVal->value;
  int32_t len = kvVal->valueLen;
wmmhello's avatar
wmmhello 已提交
512 513 514
  if (len <= 2) {
    return false;
  }
wmmhello's avatar
wmmhello 已提交
515
  const char *signalPos = pVal + len - 2;
wmmhello's avatar
wmmhello 已提交
516
  if (!strncasecmp(signalPos, "i8", 2)) {
wmmhello's avatar
wmmhello 已提交
517 518 519 520
    char *endptr = NULL;
    int64_t result = strtoll(pVal, &endptr, 10);
    if(endptr != signalPos){       // 78ri8
      *isValid = false;
wmmhello's avatar
wmmhello 已提交
521
      smlBuildInvalidDataMsg(msg, "invalid tiny int", endptr);
wmmhello's avatar
wmmhello 已提交
522 523
    }else if(!IS_VALID_TINYINT(result)){
      *isValid = false;
wmmhello's avatar
wmmhello 已提交
524
      smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", endptr);
wmmhello's avatar
wmmhello 已提交
525 526 527 528
    }else{
      kvVal->i = result;
      *isValid = true;
    }
wmmhello's avatar
wmmhello 已提交
529 530 531 532 533
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
534
static bool smlParseTinyUint(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
535 536
  const char *pVal = kvVal->value;
  int32_t len = kvVal->valueLen;
wmmhello's avatar
wmmhello 已提交
537 538 539 540 541 542
  if (len <= 2) {
    return false;
  }
  if (pVal[0] == '-') {
    return false;
  }
wmmhello's avatar
wmmhello 已提交
543
  const char *signalPos = pVal + len - 2;
wmmhello's avatar
wmmhello 已提交
544
  if (!strncasecmp(signalPos, "u8", 2)) {
wmmhello's avatar
wmmhello 已提交
545 546 547 548
    char *endptr = NULL;
    int64_t result = strtoll(pVal, &endptr, 10);
    if(endptr != signalPos){       // 78ri8
      *isValid = false;
wmmhello's avatar
wmmhello 已提交
549
      smlBuildInvalidDataMsg(msg, "invalid unsigned tiny int", endptr);
wmmhello's avatar
wmmhello 已提交
550 551
    }else if(!IS_VALID_UTINYINT(result)){
      *isValid = false;
wmmhello's avatar
wmmhello 已提交
552
      smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", endptr);
wmmhello's avatar
wmmhello 已提交
553 554 555 556
    }else{
      kvVal->i = result;
      *isValid = true;
    }
wmmhello's avatar
wmmhello 已提交
557 558 559 560 561
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
562
static bool smlParseSmallInt(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
563 564
  const char *pVal = kvVal->value;
  int32_t len = kvVal->valueLen;
wmmhello's avatar
wmmhello 已提交
565 566 567
  if (len <= 3) {
    return false;
  }
wmmhello's avatar
wmmhello 已提交
568
  const char *signalPos = pVal + len - 3;
wmmhello's avatar
wmmhello 已提交
569
  if (!strncasecmp(signalPos, "i16", 3)) {
wmmhello's avatar
wmmhello 已提交
570 571 572 573
    char *endptr = NULL;
    int64_t result = strtoll(pVal, &endptr, 10);
    if(endptr != signalPos){       // 78ri8
      *isValid = false;
wmmhello's avatar
wmmhello 已提交
574
      smlBuildInvalidDataMsg(msg, "invalid small int", endptr);
wmmhello's avatar
wmmhello 已提交
575 576
    }else if(!IS_VALID_SMALLINT(result)){
      *isValid = false;
wmmhello's avatar
wmmhello 已提交
577
      smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", endptr);
wmmhello's avatar
wmmhello 已提交
578 579 580 581
    }else{
      kvVal->i = result;
      *isValid = true;
    }
wmmhello's avatar
wmmhello 已提交
582 583 584 585 586
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
587
static bool smlParseSmallUint(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
588 589
  const char *pVal = kvVal->value;
  int32_t len = kvVal->valueLen;
wmmhello's avatar
wmmhello 已提交
590 591 592 593 594 595
  if (len <= 3) {
    return false;
  }
  if (pVal[0] == '-') {
    return false;
  }
wmmhello's avatar
wmmhello 已提交
596
  const char *signalPos = pVal + len - 3;
wmmhello's avatar
wmmhello 已提交
597
  if (strncasecmp(signalPos, "u16", 3) == 0) {
wmmhello's avatar
wmmhello 已提交
598 599 600 601
    char *endptr = NULL;
    int64_t result = strtoll(pVal, &endptr, 10);
    if(endptr != signalPos){       // 78ri8
      *isValid = false;
wmmhello's avatar
wmmhello 已提交
602
      smlBuildInvalidDataMsg(msg, "invalid unsigned small int", endptr);
wmmhello's avatar
wmmhello 已提交
603 604
    }else if(!IS_VALID_USMALLINT(result)){
      *isValid = false;
wmmhello's avatar
wmmhello 已提交
605
      smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", endptr);
wmmhello's avatar
wmmhello 已提交
606 607 608 609
    }else{
      kvVal->i = result;
      *isValid = true;
    }
wmmhello's avatar
wmmhello 已提交
610 611 612 613 614
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
615
static bool smlParseInt(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
616 617
  const char *pVal = kvVal->value;
  int32_t len = kvVal->valueLen;
wmmhello's avatar
wmmhello 已提交
618 619 620
  if (len <= 3) {
    return false;
  }
wmmhello's avatar
wmmhello 已提交
621
  const char *signalPos = pVal + len - 3;
wmmhello's avatar
wmmhello 已提交
622
  if (strncasecmp(signalPos, "i32", 3) == 0) {
wmmhello's avatar
wmmhello 已提交
623 624 625 626
    char *endptr = NULL;
    int64_t result = strtoll(pVal, &endptr, 10);
    if(endptr != signalPos){       // 78ri8
      *isValid = false;
wmmhello's avatar
wmmhello 已提交
627
      smlBuildInvalidDataMsg(msg, "invalid int", endptr);
wmmhello's avatar
wmmhello 已提交
628 629
    }else if(!IS_VALID_INT(result)){
      *isValid = false;
wmmhello's avatar
wmmhello 已提交
630
      smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", endptr);
wmmhello's avatar
wmmhello 已提交
631 632 633 634
    }else{
      kvVal->i = result;
      *isValid = true;
    }
wmmhello's avatar
wmmhello 已提交
635 636 637 638 639
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
640
static bool smlParseUint(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
641 642
  const char *pVal = kvVal->value;
  int32_t len = kvVal->valueLen;
wmmhello's avatar
wmmhello 已提交
643 644 645 646 647 648
  if (len <= 3) {
    return false;
  }
  if (pVal[0] == '-') {
    return false;
  }
wmmhello's avatar
wmmhello 已提交
649
  const char *signalPos = pVal + len - 3;
wmmhello's avatar
wmmhello 已提交
650
  if (strncasecmp(signalPos, "u32", 3) == 0) {
wmmhello's avatar
wmmhello 已提交
651 652 653 654
    char *endptr = NULL;
    int64_t result = strtoll(pVal, &endptr, 10);
    if(endptr != signalPos){       // 78ri8
      *isValid = false;
wmmhello's avatar
wmmhello 已提交
655
      smlBuildInvalidDataMsg(msg, "invalid unsigned int", endptr);
wmmhello's avatar
wmmhello 已提交
656 657
    }else if(!IS_VALID_UINT(result)){
      *isValid = false;
wmmhello's avatar
wmmhello 已提交
658
      smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", endptr);
wmmhello's avatar
wmmhello 已提交
659 660 661 662
    }else{
      kvVal->i = result;
      *isValid = true;
    }
wmmhello's avatar
wmmhello 已提交
663 664 665 666 667
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
668
static bool smlParseBigInt(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
669 670
  const char *pVal = kvVal->value;
  int32_t len = kvVal->valueLen;
wmmhello's avatar
wmmhello 已提交
671
  if (len > 3 && strncasecmp(pVal + len - 3, "i64", 3) == 0) {
wmmhello's avatar
wmmhello 已提交
672
    char *endptr = NULL;
wmmhello's avatar
wmmhello 已提交
673
    errno = 0;
wmmhello's avatar
wmmhello 已提交
674 675 676
    int64_t result = strtoll(pVal, &endptr, 10);
    if(endptr != pVal + len - 3){       // 78ri8
      *isValid = false;
wmmhello's avatar
wmmhello 已提交
677
      smlBuildInvalidDataMsg(msg, "invalid big int", endptr);
wmmhello's avatar
wmmhello 已提交
678
    }else if(errno == ERANGE || !IS_VALID_BIGINT(result)){
wmmhello's avatar
wmmhello 已提交
679
      *isValid = false;
wmmhello's avatar
wmmhello 已提交
680
      smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", endptr);
wmmhello's avatar
wmmhello 已提交
681 682 683 684 685 686 687
    }else{
      kvVal->i = result;
      *isValid = true;
    }
    return true;
  }else if (len > 1 && pVal[len - 1] == 'i') {
    char *endptr = NULL;
wmmhello's avatar
wmmhello 已提交
688
    errno = 0;
wmmhello's avatar
wmmhello 已提交
689 690 691
    int64_t result = strtoll(pVal, &endptr, 10);
    if(endptr != pVal + len - 1){       // 78ri8
      *isValid = false;
wmmhello's avatar
wmmhello 已提交
692
      smlBuildInvalidDataMsg(msg, "invalid big int", endptr);
wmmhello's avatar
wmmhello 已提交
693
    }else if(errno == ERANGE || !IS_VALID_BIGINT(result)){
wmmhello's avatar
wmmhello 已提交
694
      *isValid = false;
wmmhello's avatar
wmmhello 已提交
695
      smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", endptr);
wmmhello's avatar
wmmhello 已提交
696 697 698 699
    }else{
      kvVal->i = result;
      *isValid = true;
    }
wmmhello's avatar
wmmhello 已提交
700 701 702 703 704
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
705
static bool smlParseBigUint(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
706 707
  const char *pVal = kvVal->value;
  int32_t len = kvVal->valueLen;
wmmhello's avatar
wmmhello 已提交
708 709 710 711 712 713
  if (len <= 3) {
    return false;
  }
  if (pVal[0] == '-') {
    return false;
  }
wmmhello's avatar
wmmhello 已提交
714
  const char *signalPos = pVal + len - 3;
wmmhello's avatar
wmmhello 已提交
715
  if (strncasecmp(signalPos, "u64", 3) == 0) {
wmmhello's avatar
wmmhello 已提交
716
    char *endptr = NULL;
wmmhello's avatar
wmmhello 已提交
717
    errno = 0;
wmmhello's avatar
wmmhello 已提交
718 719 720
    uint64_t result = strtoull(pVal, &endptr, 10);
    if(endptr != signalPos){       // 78ri8
      *isValid = false;
wmmhello's avatar
wmmhello 已提交
721
      smlBuildInvalidDataMsg(msg, "invalid unsigned big int", endptr);
wmmhello's avatar
wmmhello 已提交
722
    }else if(errno == ERANGE || !IS_VALID_UBIGINT(result)){
wmmhello's avatar
wmmhello 已提交
723
      *isValid = false;
wmmhello's avatar
wmmhello 已提交
724
      smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", endptr);
wmmhello's avatar
wmmhello 已提交
725 726 727 728
    }else{
      kvVal->u = result;
      *isValid = true;
    }
wmmhello's avatar
wmmhello 已提交
729 730 731 732 733
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
734
static bool smlParseFloat(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
735 736 737
  const char *pVal = kvVal->value;
  int32_t len = kvVal->valueLen;
  char *endptr = NULL;
wmmhello's avatar
wmmhello 已提交
738
  errno = 0;
wmmhello's avatar
wmmhello 已提交
739
  float result = strtof(pVal, &endptr);
wmmhello's avatar
wmmhello 已提交
740
  if(endptr == pVal + len && errno != ERANGE && IS_VALID_FLOAT(result)){       // 78
wmmhello's avatar
wmmhello 已提交
741 742 743
    kvVal->f = result;
    *isValid = true;
    return true;
wmmhello's avatar
wmmhello 已提交
744
  }
wmmhello's avatar
wmmhello 已提交
745

wmmhello's avatar
wmmhello 已提交
746
  if (len > 3 && strncasecmp(pVal + len - 3, "f32", 3) == 0) {
wmmhello's avatar
wmmhello 已提交
747 748
    if(endptr != pVal + len - 3){       // 78ri8
      *isValid = false;
wmmhello's avatar
wmmhello 已提交
749
      smlBuildInvalidDataMsg(msg, "invalid float", endptr);
wmmhello's avatar
wmmhello 已提交
750
    }else if(errno == ERANGE || !IS_VALID_FLOAT(result)){
wmmhello's avatar
wmmhello 已提交
751
      *isValid = false;
wmmhello's avatar
wmmhello 已提交
752
      smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", endptr);
wmmhello's avatar
wmmhello 已提交
753 754 755 756
    }else{
      kvVal->f = result;
      *isValid = true;
    }
wmmhello's avatar
wmmhello 已提交
757 758 759 760 761
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
762
static bool smlParseDouble(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
763 764
  const char *pVal = kvVal->value;
  int32_t len = kvVal->valueLen;
wmmhello's avatar
wmmhello 已提交
765 766 767
  if (len <= 3) {
    return false;
  }
wmmhello's avatar
wmmhello 已提交
768
  const char *signalPos = pVal + len - 3;
wmmhello's avatar
wmmhello 已提交
769
  if (strncasecmp(signalPos, "f64", 3) == 0) {
wmmhello's avatar
wmmhello 已提交
770
    char *endptr = NULL;
wmmhello's avatar
wmmhello 已提交
771
    errno = 0;
wmmhello's avatar
wmmhello 已提交
772 773 774
    double result = strtod(pVal, &endptr);
    if(endptr != signalPos){       // 78ri8
      *isValid = false;
wmmhello's avatar
wmmhello 已提交
775
      smlBuildInvalidDataMsg(msg, "invalid double", endptr);
wmmhello's avatar
wmmhello 已提交
776
    }else if(errno == ERANGE || !IS_VALID_DOUBLE(result)){
wmmhello's avatar
wmmhello 已提交
777
      *isValid = false;
wmmhello's avatar
wmmhello 已提交
778
      smlBuildInvalidDataMsg(msg, "double out of range[-1.7976931348623158e+308,1.7976931348623158e+308]", endptr);
wmmhello's avatar
wmmhello 已提交
779 780 781 782
    }else{
      kvVal->d = result;
      *isValid = true;
    }
wmmhello's avatar
wmmhello 已提交
783 784 785 786 787
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
788
static bool smlParseBool(SSmlKv *kvVal) {
wmmhello's avatar
wmmhello 已提交
789 790 791 792
  const char *pVal = kvVal->value;
  int32_t len = kvVal->valueLen;
  if ((len == 1) && pVal[len - 1] == 't') {
    kvVal->i = true;
wmmhello's avatar
wmmhello 已提交
793 794 795
    return true;
  }

wmmhello's avatar
wmmhello 已提交
796 797
  if ((len == 1) && pVal[len - 1] == 'f') {
    kvVal->i = false;
wmmhello's avatar
wmmhello 已提交
798 799 800
    return true;
  }

wmmhello's avatar
wmmhello 已提交
801
  if((len == 4) && !strncasecmp(pVal, "true", len)) {
wmmhello's avatar
wmmhello 已提交
802
    kvVal->i = true;
wmmhello's avatar
wmmhello 已提交
803 804
    return true;
  }
wmmhello's avatar
wmmhello 已提交
805
  if((len == 5) && !strncasecmp(pVal, "false", len)) {
wmmhello's avatar
wmmhello 已提交
806
    kvVal->i = false;
wmmhello's avatar
wmmhello 已提交
807 808 809 810 811
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
812
static bool smlIsBinary(const char *pVal, uint16_t len) {
wmmhello's avatar
wmmhello 已提交
813 814 815 816 817 818 819 820 821 822
  //binary: "abc"
  if (len < 2) {
    return false;
  }
  if (pVal[0] == '"' && pVal[len - 1] == '"') {
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
823
static bool smlIsNchar(const char *pVal, uint16_t len) {
wmmhello's avatar
wmmhello 已提交
824 825 826 827 828 829 830 831 832 833
  //nchar: L"abc"
  if (len < 3) {
    return false;
  }
  if ((pVal[0] == 'l' || pVal[0] == 'L')&& pVal[1] == '"' && pVal[len - 1] == '"') {
    return true;
  }
  return false;
}

wmmhello's avatar
wmmhello 已提交
834
static bool smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
wmmhello's avatar
wmmhello 已提交
835 836
  // put high probability matching type first
  bool isValid = false;
wmmhello's avatar
wmmhello 已提交
837

wmmhello's avatar
wmmhello 已提交
838
  //binary
wmmhello's avatar
wmmhello 已提交
839
  if (smlIsBinary(pVal->value, pVal->valueLen)) {
wmmhello's avatar
wmmhello 已提交
840 841
    pVal->type = TSDB_DATA_TYPE_BINARY;
    pVal->valueLen -= 2;
wmmhello's avatar
wmmhello 已提交
842 843
    pVal->length = pVal->valueLen;
    pVal->value++;
wmmhello's avatar
wmmhello 已提交
844
    return true;
wmmhello's avatar
wmmhello 已提交
845
  }
wmmhello's avatar
wmmhello 已提交
846
  //nchar
wmmhello's avatar
wmmhello 已提交
847
  if (smlIsNchar(pVal->value, pVal->valueLen)) {
wmmhello's avatar
wmmhello 已提交
848
    pVal->type = TSDB_DATA_TYPE_NCHAR;
wmmhello's avatar
wmmhello 已提交
849 850 851 852 853 854 855 856 857 858
    pVal->valueLen -= 3;
    pVal->length = pVal->valueLen;
    pVal->value += 2;
    return true;
  }
  //float
  if (smlParseFloat(pVal, &isValid, msg)) {
    if(!isValid) return false;
    pVal->type = TSDB_DATA_TYPE_FLOAT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
wmmhello's avatar
wmmhello 已提交
859 860
    return true;
  }
wmmhello's avatar
wmmhello 已提交
861
  //double
wmmhello's avatar
wmmhello 已提交
862
  if (smlParseDouble(pVal, &isValid, msg)) {
wmmhello's avatar
wmmhello 已提交
863 864 865
    if(!isValid) return false;
    pVal->type = TSDB_DATA_TYPE_DOUBLE;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
wmmhello's avatar
wmmhello 已提交
866

wmmhello's avatar
wmmhello 已提交
867 868 869
    return true;
  }
  //bool
wmmhello's avatar
wmmhello 已提交
870
  if (smlParseBool(pVal)) {
wmmhello's avatar
wmmhello 已提交
871
    pVal->type = TSDB_DATA_TYPE_BOOL;
wmmhello's avatar
wmmhello 已提交
872 873 874
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    return true;
  }
wmmhello's avatar
wmmhello 已提交
875

wmmhello's avatar
wmmhello 已提交
876
  if (smlParseTinyInt(pVal, &isValid, msg)) {
wmmhello's avatar
wmmhello 已提交
877
    if(!isValid) return false;
wmmhello's avatar
wmmhello 已提交
878 879 880 881
    pVal->type = TSDB_DATA_TYPE_TINYINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    return true;
  }
wmmhello's avatar
wmmhello 已提交
882
  if (smlParseTinyUint(pVal, &isValid, msg)) {
wmmhello's avatar
wmmhello 已提交
883
    if(!isValid) return false;
wmmhello's avatar
wmmhello 已提交
884 885 886 887
    pVal->type = TSDB_DATA_TYPE_UTINYINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    return true;
  }
wmmhello's avatar
wmmhello 已提交
888
  if (smlParseSmallInt(pVal, &isValid, msg)) {
wmmhello's avatar
wmmhello 已提交
889
    if(!isValid) return false;
wmmhello's avatar
wmmhello 已提交
890 891 892 893
    pVal->type = TSDB_DATA_TYPE_SMALLINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    return true;
  }
wmmhello's avatar
wmmhello 已提交
894
  if (smlParseSmallUint(pVal, &isValid, msg)) {
wmmhello's avatar
wmmhello 已提交
895
    if(!isValid) return false;
wmmhello's avatar
wmmhello 已提交
896 897 898 899
    pVal->type = TSDB_DATA_TYPE_USMALLINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    return true;
  }
wmmhello's avatar
wmmhello 已提交
900
  if (smlParseInt(pVal, &isValid, msg)) {
wmmhello's avatar
wmmhello 已提交
901
    if(!isValid) return false;
wmmhello's avatar
wmmhello 已提交
902 903 904 905
    pVal->type = TSDB_DATA_TYPE_INT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    return true;
  }
wmmhello's avatar
wmmhello 已提交
906
  if (smlParseUint(pVal, &isValid, msg)) {
wmmhello's avatar
wmmhello 已提交
907
    if(!isValid) return false;
wmmhello's avatar
wmmhello 已提交
908 909 910 911
    pVal->type = TSDB_DATA_TYPE_UINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    return true;
  }
wmmhello's avatar
wmmhello 已提交
912
  if (smlParseBigInt(pVal, &isValid, msg)) {
wmmhello's avatar
wmmhello 已提交
913
    if(!isValid) return false;
wmmhello's avatar
wmmhello 已提交
914 915 916 917
    pVal->type = TSDB_DATA_TYPE_BIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    return true;
  }
wmmhello's avatar
wmmhello 已提交
918
  if (smlParseBigUint(pVal, &isValid, msg)) {
wmmhello's avatar
wmmhello 已提交
919
    if(!isValid) return false;
wmmhello's avatar
wmmhello 已提交
920 921 922 923 924
    pVal->type = TSDB_DATA_TYPE_UBIGINT;
    pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
    return true;
  }

wmmhello's avatar
wmmhello 已提交
925
  smlBuildInvalidDataMsg(msg, "invalid data", pVal->value);
wmmhello's avatar
wmmhello 已提交
926 927 928
  return false;
}

wmmhello's avatar
wmmhello 已提交
929
static bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlHandle* info) {
wmmhello's avatar
wmmhello 已提交
930 931 932 933 934
  char *val = NULL;
  val = taosHashGet(pHash, key, strlen(key));
  if (val) {
    uError("SML:0x%"PRIx64" Duplicate key detected:%s", info->id, key);
    return true;
wmmhello's avatar
wmmhello 已提交
935 936
  }

wmmhello's avatar
wmmhello 已提交
937 938
  uint8_t dummy_val = 0;
  taosHashPut(pHash, key, strlen(key), &dummy_val, sizeof(uint8_t));
wmmhello's avatar
wmmhello 已提交
939

wmmhello's avatar
wmmhello 已提交
940
  return false;
wmmhello's avatar
wmmhello 已提交
941 942
}

wmmhello's avatar
wmmhello 已提交
943
static int32_t smlParseString(const char* sql, SSmlLineInfo *elements, SSmlMsgBuf *msg){
wmmhello's avatar
wmmhello 已提交
944 945 946 947 948 949 950 951
  if(!sql) return TSDB_CODE_SML_INVALID_DATA;
  while (*sql != '\0') {           // jump the space at the begining
    if(*sql != SPACE) {
      elements->measure = sql;
      break;
    }
    sql++;
  }
wmmhello's avatar
wmmhello 已提交
952 953 954 955
  if (!elements->measure || *sql == COMMA) {
    smlBuildInvalidDataMsg(msg, "invalid data", sql);
    return TSDB_CODE_SML_INVALID_DATA;
  }
wmmhello's avatar
wmmhello 已提交
956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977

  // parse measure and tag
  while (*sql != '\0') {
    if (elements->measureLen == 0 && *sql == COMMA && *(sql - 1) != SLASH) {  // find the first comma
      elements->measureLen = sql - elements->measure;
      sql++;
      elements->tags = sql;
      continue;
    }

    if (*sql == SPACE && *(sql - 1) != SLASH) {   // find the first space
      if (elements->measureLen == 0) {
        elements->measureLen = sql - elements->measure;
        elements->tags = sql;
      }
      elements->tagsLen = sql - elements->tags;
      elements->measureTagsLen = sql - elements->measure;
      break;
    }

    sql++;
  }
wmmhello's avatar
wmmhello 已提交
978 979 980 981 982 983 984
  if(elements->tagsLen == 0){     // measure, cols1=a         measure cols1=a
    elements->measureTagsLen = elements->measureLen;
  }
  if(elements->measureLen == 0) {
    smlBuildInvalidDataMsg(msg, "invalid measure", elements->measure);
    return TSDB_CODE_SML_INVALID_DATA;
  }
wmmhello's avatar
wmmhello 已提交
985 986 987 988 989 990 991 992 993

  // parse cols
  while (*sql != '\0') {
    if(*sql != SPACE) {
      elements->cols = sql;
      break;
    }
    sql++;
  }
wmmhello's avatar
wmmhello 已提交
994 995 996
  if(!elements->cols) {
    smlBuildInvalidDataMsg(msg, "invalid columns", elements->cols);
    return TSDB_CODE_SML_INVALID_DATA;
997
  }
wmmhello's avatar
wmmhello 已提交
998

wmmhello's avatar
wmmhello 已提交
999
  bool isInQuote = false;
wmmhello's avatar
wmmhello 已提交
1000
  while (*sql != '\0') {
wmmhello's avatar
wmmhello 已提交
1001 1002 1003 1004
    if(*sql == QUOTE && *(sql - 1) != SLASH){
      isInQuote = !isInQuote;
    }
    if(!isInQuote && *sql == SPACE && *(sql - 1) != SLASH) {
wmmhello's avatar
wmmhello 已提交
1005 1006 1007 1008
      break;
    }
    sql++;
  }
1009 1010 1011 1012
  if(isInQuote){
    smlBuildInvalidDataMsg(msg, "only one quote", elements->cols);
    return TSDB_CODE_SML_INVALID_DATA;
  }
wmmhello's avatar
wmmhello 已提交
1013
  elements->colsLen = sql - elements->cols;
wmmhello's avatar
wmmhello 已提交
1014

wmmhello's avatar
wmmhello 已提交
1015
  // parse ts,ts can be empty
wmmhello's avatar
wmmhello 已提交
1016
  while (*sql != '\0') {
1017
    if(*sql != SPACE && elements->timestamp == NULL) {
wmmhello's avatar
wmmhello 已提交
1018
      elements->timestamp = sql;
1019 1020
    }
    if(*sql == SPACE && elements->timestamp != NULL){
wmmhello's avatar
wmmhello 已提交
1021 1022 1023 1024
      break;
    }
    sql++;
  }
wmmhello's avatar
wmmhello 已提交
1025 1026 1027
  if(elements->timestamp){
    elements->timestampLen = sql - elements->timestamp;
  }
wmmhello's avatar
wmmhello 已提交
1028 1029 1030 1031

  return TSDB_CODE_SUCCESS;
}

1032
static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool isTag, SHashObj *dumplicateKey, SSmlMsgBuf *msg){
wmmhello's avatar
wmmhello 已提交
1033 1034 1035
  if(isTag && len == 0){
    SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1);
    kv->key = TAG;
wmmhello's avatar
wmmhello 已提交
1036
    kv->keyLen = strlen(TAG);
wmmhello's avatar
wmmhello 已提交
1037
    kv->value = TAG;
wmmhello's avatar
wmmhello 已提交
1038
    kv->valueLen = strlen(TAG);
wmmhello's avatar
wmmhello 已提交
1039 1040
    kv->type = TSDB_DATA_TYPE_NCHAR;
    if(cols) taosArrayPush(cols, &kv);
wmmhello's avatar
wmmhello 已提交
1041
    return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1042 1043
  }

wmmhello's avatar
wmmhello 已提交
1044
  for(int i = 0; i < len; i++){
wmmhello's avatar
wmmhello 已提交
1045
    // parse key
wmmhello's avatar
wmmhello 已提交
1046 1047 1048 1049 1050 1051 1052 1053 1054
    const char *key = data + i;
    int32_t keyLen = 0;
    while(i < len){
      if(data[i] == EQUAL && i > 0 && data[i-1] != SLASH){
        keyLen = data + i - key;
        break;
      }
      i++;
    }
wmmhello's avatar
wmmhello 已提交
1055
    if(keyLen == 0 || keyLen >= TSDB_COL_NAME_LEN){
wmmhello's avatar
wmmhello 已提交
1056
      smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
wmmhello's avatar
wmmhello 已提交
1057 1058 1059
      return TSDB_CODE_SML_INVALID_DATA;
    }

1060 1061 1062 1063 1064 1065 1066
    if(taosHashGet(dumplicateKey, key, keyLen)){
      smlBuildInvalidDataMsg(msg, "dumplicate key", key);
      return TSDB_CODE_SML_INVALID_DATA;
    }else{
      taosHashPut(dumplicateKey, key, keyLen, key, CHAR_BYTES);
    }

wmmhello's avatar
wmmhello 已提交
1067
    // parse value
wmmhello's avatar
wmmhello 已提交
1068 1069
    i++;
    const char *value = data + i;
wmmhello's avatar
wmmhello 已提交
1070
    bool isInQuote = false;
wmmhello's avatar
wmmhello 已提交
1071
    while(i < len){
wmmhello's avatar
wmmhello 已提交
1072 1073 1074 1075
      if(data[i] == QUOTE && data[i-1] != SLASH){
        isInQuote = !isInQuote;
      }
      if(!isInQuote && data[i] == COMMA && i > 0 && data[i-1] != SLASH){
wmmhello's avatar
wmmhello 已提交
1076 1077 1078 1079
        break;
      }
      i++;
    }
wmmhello's avatar
wmmhello 已提交
1080 1081 1082 1083
    if(isInQuote){
      smlBuildInvalidDataMsg(msg, "only one quote", value);
      return TSDB_CODE_SML_INVALID_DATA;
    }
wmmhello's avatar
wmmhello 已提交
1084
    int32_t valueLen = data + i - value;
wmmhello's avatar
wmmhello 已提交
1085
    if(valueLen == 0){
wmmhello's avatar
wmmhello 已提交
1086
      smlBuildInvalidDataMsg(msg, "invalid value", value);
wmmhello's avatar
wmmhello 已提交
1087 1088
      return TSDB_CODE_SML_INVALID_DATA;
    }
wmmhello's avatar
wmmhello 已提交
1089 1090

    // add kv to SSmlKv
wmmhello's avatar
wmmhello 已提交
1091
    SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1);
wmmhello's avatar
wmmhello 已提交
1092 1093 1094 1095
    kv->key = key;
    kv->keyLen = keyLen;
    kv->value = value;
    kv->valueLen = valueLen;
wmmhello's avatar
wmmhello 已提交
1096 1097
    if(isTag){
      kv->type = TSDB_DATA_TYPE_NCHAR;
wmmhello's avatar
wmmhello 已提交
1098
    }else{
wmmhello's avatar
wmmhello 已提交
1099
      if(!smlParseValue(kv, msg)){
wmmhello's avatar
wmmhello 已提交
1100 1101
        return TSDB_CODE_SML_INVALID_DATA;
      }
wmmhello's avatar
wmmhello 已提交
1102
    }
wmmhello's avatar
wmmhello 已提交
1103

wmmhello's avatar
wmmhello 已提交
1104 1105
    if(cols) taosArrayPush(cols, &kv);
  }
wmmhello's avatar
wmmhello 已提交
1106

wmmhello's avatar
wmmhello 已提交
1107 1108 1109
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
1110 1111 1112 1113 1114 1115
static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) {
  char *endPtr = NULL;
  double ts = (double)strtoll(value, &endPtr, 10);
  if(value + len != endPtr){
    return -1;
  }
wmmhello's avatar
wmmhello 已提交
1116 1117 1118
  switch (type) {
    case TSDB_TIME_PRECISION_HOURS:
      ts *= (3600 * 1e9);
wmmhello's avatar
wmmhello 已提交
1119
      break;
wmmhello's avatar
wmmhello 已提交
1120 1121
    case TSDB_TIME_PRECISION_MINUTES:
      ts *= (60 * 1e9);
wmmhello's avatar
wmmhello 已提交
1122
      break;
wmmhello's avatar
wmmhello 已提交
1123 1124
    case TSDB_TIME_PRECISION_SECONDS:
      ts *= (1e9);
wmmhello's avatar
wmmhello 已提交
1125
      break;
wmmhello's avatar
wmmhello 已提交
1126
    case TSDB_TIME_PRECISION_MILLI:
wmmhello's avatar
wmmhello 已提交
1127
      ts *= (1e6);
wmmhello's avatar
wmmhello 已提交
1128
      break;
wmmhello's avatar
wmmhello 已提交
1129
    case TSDB_TIME_PRECISION_MICRO:
wmmhello's avatar
wmmhello 已提交
1130 1131
      ts *= (1e3);
      break;
wmmhello's avatar
wmmhello 已提交
1132 1133
    case TSDB_TIME_PRECISION_NANO:
      break;
wmmhello's avatar
wmmhello 已提交
1134 1135
    default:
      ASSERT(0);
wmmhello's avatar
wmmhello 已提交
1136 1137 1138 1139
  }
  if(ts > (double)INT64_MAX || ts < 0){
    return -1;
  }
wmmhello's avatar
wmmhello 已提交
1140 1141

  return (int64_t)ts;
wmmhello's avatar
wmmhello 已提交
1142 1143
}

wmmhello's avatar
wmmhello 已提交
1144
static int64_t smlGetTimeNow(int8_t precision) {
wmmhello's avatar
wmmhello 已提交
1145 1146 1147 1148 1149 1150 1151
  switch (precision) {
    case TSDB_TIME_PRECISION_HOURS:
      return taosGetTimestampMs()/1000/3600;
    case TSDB_TIME_PRECISION_MINUTES:
      return taosGetTimestampMs()/1000/60;
    case TSDB_TIME_PRECISION_SECONDS:
      return taosGetTimestampMs()/1000;
wmmhello's avatar
wmmhello 已提交
1152 1153 1154
    case TSDB_TIME_PRECISION_MILLI:
    case TSDB_TIME_PRECISION_MICRO:
    case TSDB_TIME_PRECISION_NANO:
wmmhello's avatar
wmmhello 已提交
1155
      return taosGetTimestamp(precision);
wmmhello's avatar
wmmhello 已提交
1156 1157
    default:
      ASSERT(0);
wmmhello's avatar
wmmhello 已提交
1158 1159 1160
  }
}

wmmhello's avatar
wmmhello 已提交
1161
static int8_t smlGetTsTypeByLen(int32_t len) {
wmmhello's avatar
wmmhello 已提交
1162 1163 1164 1165 1166
  if (len == TSDB_TIME_PRECISION_SEC_DIGITS) {
    return TSDB_TIME_PRECISION_SECONDS;
  } else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) {
    return TSDB_TIME_PRECISION_MILLI_DIGITS;
  } else {
wmmhello's avatar
wmmhello 已提交
1167
    return -1;
wmmhello's avatar
wmmhello 已提交
1168 1169 1170
  }
}

wmmhello's avatar
wmmhello 已提交
1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227
static int8_t smlGetTsTypeByPrecision(int8_t precision) {
  switch (precision) {
    case TSDB_SML_TIMESTAMP_HOURS:
      return TSDB_TIME_PRECISION_HOURS;
    case TSDB_SML_TIMESTAMP_MILLI_SECONDS:
      return TSDB_TIME_PRECISION_MILLI;
    case TSDB_SML_TIMESTAMP_NANO_SECONDS:
    case TSDB_SML_TIMESTAMP_NOT_CONFIGURED:
      return TSDB_TIME_PRECISION_NANO;
    case TSDB_SML_TIMESTAMP_MICRO_SECONDS:
      return TSDB_TIME_PRECISION_MICRO;
    case TSDB_SML_TIMESTAMP_SECONDS:
      return TSDB_TIME_PRECISION_SECONDS;
    case TSDB_SML_TIMESTAMP_MINUTES:
      return TSDB_TIME_PRECISION_MINUTES;
    default:
      return -1;
  }
}

static int64_t smlParseInfluxTime(SSmlHandle* info, const char* data, int32_t len){
  int8_t tsType = smlGetTsTypeByPrecision(info->precision);
  if (tsType == -1) {
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp precision", NULL);
    return -1;
  }
  if(!data){
    return smlGetTimeNow(tsType);
  }

  int64_t ts = smlGetTimeValue(data, len, tsType);
  if(ts == -1){
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
    return -1;
  }
  return ts;
}

static int64_t smlParseOpenTsdbTime(SSmlHandle* info, const char* data, int32_t len){
  if(!data){
    smlBuildInvalidDataMsg(&info->msgBuf, "timestamp can not be null", NULL);
    return -1;
  }
  int8_t tsType = smlGetTsTypeByLen(len);
  if (tsType == -1) {
    smlBuildInvalidDataMsg(&info->msgBuf, "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", data);
    return -1;
  }
  int64_t ts = smlGetTimeValue(data, len, tsType);
  if(ts == -1){
    smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
    return -1;
  }
  return ts;
}

static int32_t smlParseTS(SSmlHandle* info, const char* data, int32_t len, SArray *cols){
wmmhello's avatar
wmmhello 已提交
1228
  int64_t ts = 0;
wmmhello's avatar
wmmhello 已提交
1229 1230
  if(info->protocol == TSDB_SML_LINE_PROTOCOL){
    ts = smlParseInfluxTime(info, data, len);
wmmhello's avatar
wmmhello 已提交
1231
  }else{
wmmhello's avatar
wmmhello 已提交
1232
    ts = smlParseOpenTsdbTime(info, data, len);
wmmhello's avatar
wmmhello 已提交
1233
  }
wmmhello's avatar
wmmhello 已提交
1234
  if(ts == -1)  return TSDB_CODE_TSC_INVALID_TIME_STAMP;
wmmhello's avatar
wmmhello 已提交
1235

wmmhello's avatar
wmmhello 已提交
1236
  // add ts to
wmmhello's avatar
wmmhello 已提交
1237
  SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1);
wmmhello's avatar
wmmhello 已提交
1238 1239 1240 1241 1242
  if(!kv){
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  kv->key = TS;
wmmhello's avatar
wmmhello 已提交
1243
  kv->keyLen = strlen(kv->key);
wmmhello's avatar
wmmhello 已提交
1244
  kv->i = ts;
wmmhello's avatar
wmmhello 已提交
1245
  kv->type = TSDB_DATA_TYPE_TIMESTAMP;
wmmhello's avatar
wmmhello 已提交
1246
  kv->length = (int16_t)tDataTypes[kv->type].bytes;
wmmhello's avatar
wmmhello 已提交
1247
  if(cols) taosArrayPush(cols, &kv);
wmmhello's avatar
wmmhello 已提交
1248 1249 1250
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
1251
//static int32_t parseSmlCols(const char* data, SArray *cols){
wmmhello's avatar
wmmhello 已提交
1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293
//  while(*data != '\0'){
//    if(*data == EQUAL) return TSDB_CODE_SML_INVALID_DATA;
//    const char *key = data;
//    int32_t keyLen = 0;
//    while(*data != '\0'){
//      if(*data == EQUAL && *(data-1) != SLASH){
//        keyLen = data - key;
//        data ++;
//        break;
//      }
//      data++;
//    }
//    if(keyLen == 0){
//      return TSDB_CODE_SML_INVALID_DATA;
//    }
//
//    if(*data == COMMA) return TSDB_CODE_SML_INVALID_DATA;
//    const char *value = data;
//    int32_t valueLen = 0;
//    while(*data != '\0'){
//      if(*data == COMMA && *(data-1) != SLASH){
//        valueLen = data - value;
//        data ++;
//        break;
//      }
//      data++;
//    }
//    if(valueLen == 0){
//      return TSDB_CODE_SML_INVALID_DATA;
//    }
//
//    TAOS_SML_KV *kv = taosMemoryCalloc(sizeof(TAOS_SML_KV), 1);
//    kv->key = key;
//    kv->keyLen = keyLen;
//    kv->value = value;
//    kv->valueLen = valueLen;
//    kv->type = TSDB_DATA_TYPE_NCHAR;
//    if(cols) taosArrayPush(cols, &kv);
//  }
//  return TSDB_CODE_SUCCESS;
//}

wmmhello's avatar
wmmhello 已提交
1294
static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols, SSmlMsgBuf *msg){
wmmhello's avatar
wmmhello 已提交
1295 1296
  if(tags){
    for (int i = 0; i < taosArrayGetSize(tags); ++i) {
wmmhello's avatar
wmmhello 已提交
1297
      SSmlKv *kv = taosArrayGetP(tags, i);
wmmhello's avatar
wmmhello 已提交
1298 1299
      ASSERT(kv->type == TSDB_DATA_TYPE_NCHAR);

1300 1301 1302
      uint8_t *index = taosHashGet(tableMeta->tagHash, kv->key, kv->keyLen);
      if(index){
        SSmlKv **value = taosArrayGet(tableMeta->tags, *index);
wmmhello's avatar
wmmhello 已提交
1303 1304 1305
        ASSERT((*value)->type == TSDB_DATA_TYPE_NCHAR);
        if(kv->valueLen > (*value)->valueLen){    // tags type is nchar
          *value = kv;
wmmhello's avatar
wmmhello 已提交
1306 1307
        }
      }else{
1308 1309 1310 1311 1312
        size_t tmp = taosArrayGetSize(tableMeta->tags);
        ASSERT(tmp <= UINT8_MAX);
        uint8_t size = tmp;
        taosArrayPush(tableMeta->tags, &kv);
        taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &size, CHAR_BYTES);
wmmhello's avatar
wmmhello 已提交
1313 1314 1315 1316 1317 1318
      }
    }
  }

  if(cols){
    for (int i = 1; i < taosArrayGetSize(cols); ++i) {  //jump timestamp
wmmhello's avatar
wmmhello 已提交
1319
      SSmlKv *kv = taosArrayGetP(cols, i);
1320 1321 1322 1323

      int16_t *index = taosHashGet(tableMeta->fieldHash, kv->key, kv->keyLen);
      if(index){
        SSmlKv **value = taosArrayGet(tableMeta->cols, *index);
wmmhello's avatar
wmmhello 已提交
1324
        if(kv->type != (*value)->type){
wmmhello's avatar
wmmhello 已提交
1325
          smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
wmmhello's avatar
wmmhello 已提交
1326 1327 1328 1329 1330 1331 1332
          return false;
        }else{
          if(IS_VAR_DATA_TYPE(kv->type)){     // update string len, if bigger
            if(kv->valueLen > (*value)->valueLen){
              *value = kv;
            }
          }
wmmhello's avatar
wmmhello 已提交
1333 1334
        }
      }else{
1335 1336 1337 1338 1339
        size_t tmp = taosArrayGetSize(tableMeta->cols);
        ASSERT(tmp <= INT16_MAX);
        int16_t size = tmp;
        taosArrayPush(tableMeta->cols, &kv);
        taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
wmmhello's avatar
wmmhello 已提交
1340 1341 1342
      }
    }
  }
wmmhello's avatar
wmmhello 已提交
1343
  return true;
wmmhello's avatar
wmmhello 已提交
1344 1345
}

wmmhello's avatar
wmmhello 已提交
1346
static void smlInsertMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols){
wmmhello's avatar
wmmhello 已提交
1347
  if(tags){
1348
    for (uint8_t i = 0; i < taosArrayGetSize(tags); ++i) {
wmmhello's avatar
wmmhello 已提交
1349
      SSmlKv *kv = taosArrayGetP(tags, i);
1350 1351
      taosArrayPush(tableMeta->tags, &kv);
      taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &i, CHAR_BYTES);
wmmhello's avatar
wmmhello 已提交
1352 1353 1354 1355
    }
  }

  if(cols){
1356
    for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
wmmhello's avatar
wmmhello 已提交
1357
      SSmlKv *kv = taosArrayGetP(cols, i);
1358 1359
      taosArrayPush(tableMeta->cols, &kv);
      taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
wmmhello's avatar
wmmhello 已提交
1360 1361 1362 1363
    }
  }
}

wmmhello's avatar
wmmhello 已提交
1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418
static SSmlTableInfo* smlBuildTableInfo(bool format){
  SSmlTableInfo *tag = taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
  if(!tag){
    return NULL;
  }

  if(format){
    tag->colsFormat = taosArrayInit(16, POINTER_BYTES);
    if (tag->colsFormat == NULL) {
      uError("SML:smlParseLine failed to allocate memory");
      goto cleanup;
    }
  }else{
    tag->cols = taosArrayInit(16, POINTER_BYTES);
    if (tag->cols == NULL) {
      uError("SML:smlParseLine failed to allocate memory");
      goto cleanup;
    }
  }

  tag->tags = taosArrayInit(16, POINTER_BYTES);
  if (tag->tags == NULL) {
    uError("SML:smlParseLine failed to allocate memory");
    goto cleanup;
  }
  return tag;

cleanup:
  taosMemoryFreeClear(tag);
  return NULL;
}

static void smlDestroyBuildTableInfo(SSmlTableInfo *tag, bool format){
  if(format){
    taosArrayDestroy(tag->colsFormat);
  }else{
    tag->cols = taosArrayInit(16, POINTER_BYTES);
    for(size_t i = 0; i < taosArrayGetSize(tag->cols); i++){
      SHashObj *kvHash = taosArrayGetP(tag->cols, i);
      void** p1 = taosHashIterate(kvHash, NULL);
      while (p1) {
        SSmlKv* kv = *p1;
        taosMemoryFreeClear(kv);
        p1 = taosHashIterate(kvHash, p1);
      }
      taosHashCleanup(kvHash);
    }
  }
  taosArrayDestroy(tag->tags);
  taosMemoryFreeClear(tag);
}

static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *cols){
  if(dataFormat){
    taosArrayPush(oneTable->colsFormat, &cols);
1419 1420
    return TSDB_CODE_SUCCESS;
  }
wmmhello's avatar
wmmhello 已提交
1421

1422 1423 1424 1425 1426 1427 1428 1429
  SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if(!kvHash){
    uError("SML:smlDealCols failed to allocate memory");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }
  for(size_t i = 0; i < taosArrayGetSize(cols); i++){
    SSmlKv *kv = taosArrayGetP(cols, i);
    taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);   // todo key need escape, like \=, because find by schema name later
wmmhello's avatar
wmmhello 已提交
1430
  }
1431 1432
  taosArrayPush(oneTable->cols, &kvHash);

1433
  return TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1434 1435 1436 1437 1438 1439 1440
}

static SSmlSTableMeta* smlBuildSTableMeta(){
  SSmlSTableMeta* meta = taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
  if(!meta){
    return NULL;
  }
1441
  meta->tagHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
wmmhello's avatar
wmmhello 已提交
1442 1443 1444 1445 1446
  if (meta->tagHash == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }

1447
  meta->fieldHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
wmmhello's avatar
wmmhello 已提交
1448 1449 1450 1451
  if (meta->fieldHash == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }
1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463

  meta->tags = taosArrayInit(32, POINTER_BYTES);
  if (meta->tags == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }

  meta->cols = taosArrayInit(32, POINTER_BYTES);
  if (meta->cols == NULL) {
    uError("SML:smlBuildSTableMeta failed to allocate memory");
    goto cleanup;
  }
wmmhello's avatar
wmmhello 已提交
1464 1465 1466 1467 1468 1469 1470 1471 1472 1473
  return meta;

cleanup:
  taosMemoryFreeClear(meta);
  return NULL;
}

static void smlDestroySTableMeta(SSmlSTableMeta *meta){
  taosHashCleanup(meta->tagHash);
  taosHashCleanup(meta->fieldHash);
1474 1475
  taosArrayDestroy(meta->tags);
  taosArrayDestroy(meta->cols);
wmmhello's avatar
wmmhello 已提交
1476 1477 1478 1479 1480 1481
  taosMemoryFree(meta->tableMeta);
}

static int32_t smlParseLine(SSmlHandle* info, const char* sql) {
  SSmlLineInfo elements = {0};
  int ret = smlParseString(sql, &elements, &info->msgBuf);
wmmhello's avatar
wmmhello 已提交
1482
  if(ret != TSDB_CODE_SUCCESS){
wmmhello's avatar
wmmhello 已提交
1483
    uError("SML:0x%"PRIx64" smlParseString failed", info->id);
wmmhello's avatar
wmmhello 已提交
1484 1485 1486
    return ret;
  }

1487 1488 1489 1490 1491 1492 1493 1494 1495
  SArray *cols = NULL;
  if(info->dataFormat){   // if dataFormat, cols need new memory to save data
    cols = taosArrayInit(16, POINTER_BYTES);
    if (cols == NULL) {
      uError("SML:0x%"PRIx64" smlParseLine failed to allocate memory", info->id);
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
  }else{      // if dataFormat is false, cols do not need to save data, there is another new memory to save data
    cols = info->colsContainer;
wmmhello's avatar
wmmhello 已提交
1496
  }
wmmhello's avatar
wmmhello 已提交
1497

wmmhello's avatar
wmmhello 已提交
1498
  ret = smlParseTS(info, elements.timestamp, elements.timestampLen, cols);
wmmhello's avatar
wmmhello 已提交
1499
  if(ret != TSDB_CODE_SUCCESS){
wmmhello's avatar
wmmhello 已提交
1500
    uError("SML:0x%"PRIx64" smlParseTS failed", info->id);
wmmhello's avatar
wmmhello 已提交
1501 1502
    return ret;
  }
1503
  ret = smlParseCols(elements.cols, elements.colsLen, cols, false, info->dumplicateKey, &info->msgBuf);
wmmhello's avatar
wmmhello 已提交
1504
  if(ret != TSDB_CODE_SUCCESS){
wmmhello's avatar
wmmhello 已提交
1505
    uError("SML:0x%"PRIx64" smlParseCols parse cloums fields failed", info->id);
wmmhello's avatar
wmmhello 已提交
1506 1507
    return ret;
  }
wmmhello's avatar
wmmhello 已提交
1508
  if(taosArrayGetSize(cols) > TSDB_MAX_COLUMNS){
wmmhello's avatar
wmmhello 已提交
1509
    smlBuildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL);
wmmhello's avatar
wmmhello 已提交
1510 1511
    return TSDB_CODE_SML_INVALID_DATA;
  }
wmmhello's avatar
wmmhello 已提交
1512

wmmhello's avatar
wmmhello 已提交
1513
  SSmlTableInfo **oneTable = taosHashGet(info->childTables, elements.measure, elements.measureTagsLen);
wmmhello's avatar
wmmhello 已提交
1514 1515 1516
  if(oneTable){
    SSmlSTableMeta** tableMeta = taosHashGet(info->superTables, elements.measure, elements.measureLen);
    ASSERT(tableMeta);
wmmhello's avatar
wmmhello 已提交
1517
    ret = smlUpdateMeta(*tableMeta, NULL, cols, &info->msgBuf);    // update meta cols
wmmhello's avatar
wmmhello 已提交
1518
    if(!ret){
wmmhello's avatar
wmmhello 已提交
1519
      uError("SML:0x%"PRIx64" smlUpdateMeta cols failed", info->id);
wmmhello's avatar
wmmhello 已提交
1520 1521
      return TSDB_CODE_SML_INVALID_DATA;
    }
wmmhello's avatar
wmmhello 已提交
1522 1523 1524 1525
    ret = smlDealCols(*oneTable, info->dataFormat, cols);
    if(ret != TSDB_CODE_SUCCESS){
      return ret;
    }
wmmhello's avatar
wmmhello 已提交
1526
  }else{
1527 1528
    SSmlTableInfo *tinfo = smlBuildTableInfo(info->dataFormat);
    if(!tinfo){
wmmhello's avatar
wmmhello 已提交
1529 1530
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }
1531
    ret = smlDealCols(tinfo, info->dataFormat, cols);
wmmhello's avatar
wmmhello 已提交
1532 1533
    if(ret != TSDB_CODE_SUCCESS){
      return ret;
wmmhello's avatar
wmmhello 已提交
1534 1535
    }

1536
    ret = smlParseCols(elements.tags, elements.tagsLen, tinfo->tags, true, info->dumplicateKey, &info->msgBuf);
wmmhello's avatar
wmmhello 已提交
1537
    if(ret != TSDB_CODE_SUCCESS){
wmmhello's avatar
wmmhello 已提交
1538
      uError("SML:0x%"PRIx64" smlParseCols parse tag fields failed", info->id);
wmmhello's avatar
wmmhello 已提交
1539 1540 1541
      return ret;
    }

1542
    if(taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS){
wmmhello's avatar
wmmhello 已提交
1543
      smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
wmmhello's avatar
wmmhello 已提交
1544 1545 1546
      return TSDB_CODE_SML_INVALID_DATA;
    }

1547 1548 1549 1550
    tinfo->sTableName = elements.measure;
    tinfo->sTableNameLen = elements.measureLen;
    smlBuildChildTableName(tinfo);
    uDebug("SML:0x%"PRIx64" child table name: %s", info->id, tinfo->childTableName);
wmmhello's avatar
wmmhello 已提交
1551

wmmhello's avatar
wmmhello 已提交
1552 1553
    SSmlSTableMeta** tableMeta = taosHashGet(info->superTables, elements.measure, elements.measureLen);
    if(tableMeta){  // update meta
1554
      ret = smlUpdateMeta(*tableMeta, tinfo->tags, cols, &info->msgBuf);
wmmhello's avatar
wmmhello 已提交
1555
      if(!ret){
wmmhello's avatar
wmmhello 已提交
1556
        uError("SML:0x%"PRIx64" smlUpdateMeta failed", info->id);
wmmhello's avatar
wmmhello 已提交
1557 1558
        return TSDB_CODE_SML_INVALID_DATA;
      }
wmmhello's avatar
wmmhello 已提交
1559
    }else{
wmmhello's avatar
wmmhello 已提交
1560
      SSmlSTableMeta *meta = smlBuildSTableMeta();
1561
      smlInsertMeta(meta, tinfo->tags, cols);
wmmhello's avatar
wmmhello 已提交
1562 1563 1564
      taosHashPut(info->superTables, elements.measure, elements.measureLen, &meta, POINTER_BYTES);
    }

1565
    taosHashPut(info->childTables, elements.measure, elements.measureTagsLen, &tinfo, POINTER_BYTES);
wmmhello's avatar
wmmhello 已提交
1566
  }
1567 1568 1569 1570 1571

  if(!info->dataFormat){
    taosArrayClear(info->colsContainer);
  }
  taosHashClear(info->dumplicateKey);
wmmhello's avatar
wmmhello 已提交
1572 1573 1574
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
1575
static void smlDestroyInfo(SSmlHandle* info){
wmmhello's avatar
wmmhello 已提交
1576 1577
  if(!info) return;
  qDestroyQuery(info->pQuery);
wmmhello's avatar
wmmhello 已提交
1578 1579 1580 1581 1582 1583 1584 1585 1586
  smlDestroyHandle(info->exec);

  // destroy info->childTables
  void** p1 = taosHashIterate(info->childTables, NULL);
  while (p1) {
    SSmlTableInfo* oneTable = *p1;
    smlDestroyBuildTableInfo(oneTable, info->dataFormat);
    p1 = taosHashIterate(info->childTables, p1);
  }
wmmhello's avatar
wmmhello 已提交
1587
  taosHashCleanup(info->childTables);
wmmhello's avatar
wmmhello 已提交
1588 1589 1590 1591 1592 1593 1594 1595

  // destroy info->superTables
  p1 = taosHashIterate(info->superTables, NULL);
  while (p1) {
    SSmlSTableMeta* oneTable = *p1;
    smlDestroySTableMeta(oneTable);
    p1 = taosHashIterate(info->superTables, p1);
  }
wmmhello's avatar
wmmhello 已提交
1596
  taosHashCleanup(info->superTables);
wmmhello's avatar
wmmhello 已提交
1597 1598

  // destroy info->pVgHash
wmmhello's avatar
wmmhello 已提交
1599
  taosHashCleanup(info->pVgHash);
1600
  taosHashCleanup(info->dumplicateKey);
wmmhello's avatar
wmmhello 已提交
1601 1602

  taosMemoryFreeClear(info);
wmmhello's avatar
wmmhello 已提交
1603
}
wmmhello's avatar
wmmhello 已提交
1604 1605

static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocolType protocol, int8_t precision, bool dataFormat){
1606
  int32_t code = TSDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
1607
  SSmlHandle* info = taosMemoryMalloc(sizeof(SSmlHandle));
wmmhello's avatar
wmmhello 已提交
1608 1609 1610
  if (NULL == info) {
    return NULL;
  }
wmmhello's avatar
wmmhello 已提交
1611
  info->id          = smlGenId();
wmmhello's avatar
wmmhello 已提交
1612

wmmhello's avatar
wmmhello 已提交
1613
  info->pQuery      = taosMemoryCalloc(1, sizeof(SQuery));
wmmhello's avatar
wmmhello 已提交
1614
  if (NULL == info->pQuery) {
wmmhello's avatar
wmmhello 已提交
1615
    uError("SML:0x%"PRIx64" create info->pQuery error", info->id);
wmmhello's avatar
wmmhello 已提交
1616 1617
    goto cleanup;
  }
wmmhello's avatar
wmmhello 已提交
1618
  info->pQuery->execMode      = QUERY_EXEC_MODE_SCHEDULE;
wmmhello's avatar
wmmhello 已提交
1619
  info->pQuery->haveResultSet = false;
wmmhello's avatar
wmmhello 已提交
1620 1621 1622 1623 1624 1625
  info->pQuery->msgType       = TDMT_VND_SUBMIT;
  info->pQuery->pRoot         = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
  if(NULL == info->pQuery->pRoot){
    uError("SML:0x%"PRIx64" create info->pQuery->pRoot error", info->id);
    goto cleanup;
  }
wmmhello's avatar
wmmhello 已提交
1626 1627
  ((SVnodeModifOpStmt*)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV;

wmmhello's avatar
wmmhello 已提交
1628
  info->taos        = taos;
1629
  code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
wmmhello's avatar
wmmhello 已提交
1630 1631
  if(code != TSDB_CODE_SUCCESS){
    uError("SML:0x%"PRIx64" get catalog error %d", info->id, code);
wmmhello's avatar
wmmhello 已提交
1632 1633 1634
    goto cleanup;
  }

wmmhello's avatar
wmmhello 已提交
1635 1636 1637 1638 1639 1640 1641 1642
  info->precision   = precision;
  info->protocol    = protocol;
  info->dataFormat  = dataFormat;
  info->pRequest    = request;
  info->msgBuf.buf  = info->pRequest->msgBuf;
  info->msgBuf.len  = ERROR_MSG_BUF_DEFAULT_SIZE;

  info->exec        = smlInitHandle(info->pQuery);
1643 1644 1645
  info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  info->pVgHash     = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
wmmhello's avatar
wmmhello 已提交
1646

1647 1648 1649 1650 1651 1652 1653 1654
  info->dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if(!dataFormat){
    info->colsContainer = taosArrayInit(32, POINTER_BYTES);
    if(NULL == info->colsContainer){
      uError("SML:0x%"PRIx64" create info failed", info->id);
      goto cleanup;
    }
  }
wmmhello's avatar
wmmhello 已提交
1655
  if(NULL == info->exec || NULL == info->childTables
1656 1657
      || NULL == info->superTables || NULL == info->pVgHash
      || NULL == info->dumplicateKey){
wmmhello's avatar
wmmhello 已提交
1658 1659 1660
    uError("SML:0x%"PRIx64" create info failed", info->id);
    goto cleanup;
  }
wmmhello's avatar
wmmhello 已提交
1661

wmmhello's avatar
wmmhello 已提交
1662
  return info;
wmmhello's avatar
wmmhello 已提交
1663 1664 1665 1666
cleanup:
  smlDestroyInfo(info);
  return NULL;
}
wmmhello's avatar
wmmhello 已提交
1667
static int32_t smlInsertData(SSmlHandle* info) {
wmmhello's avatar
wmmhello 已提交
1668 1669
  int32_t code = TSDB_CODE_SUCCESS;

wmmhello's avatar
wmmhello 已提交
1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688
  SSmlTableInfo** oneTable = taosHashIterate(info->childTables, NULL);
  while (oneTable) {
    SSmlTableInfo* tableData = *oneTable;

    SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
    strcpy(pName.dbname, info->pRequest->pDb);
    memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName));
    SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
    SVgroupInfo vg;
    code = catalogGetTableHashVgroup(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &vg);
    if (code != 0) {
      uError("SML:0x%"PRIx64" catalogGetTableHashVgroup failed. table name: %s", info->id, tableData->childTableName);
      return code;
    }
    taosHashPut(info->pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));

    SSmlSTableMeta** pMeta = taosHashGet(info->superTables, tableData->sTableName, tableData->sTableNameLen);
    ASSERT (NULL != pMeta && NULL != *pMeta);

1689
    // use tablemeta of stable to save vgid and uid of child table
wmmhello's avatar
wmmhello 已提交
1690 1691 1692
    (*pMeta)->tableMeta->vgId = vg.vgId;
    (*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid

1693
    code = smlBindData(info->exec, tableData->tags, tableData->colsFormat, (*pMeta)->cols,
1694
                       tableData->cols, info->dataFormat, (*pMeta)->tableMeta, tableData->childTableName, info->msgBuf.buf, info->msgBuf.len);
wmmhello's avatar
wmmhello 已提交
1695 1696 1697 1698
    if(code != TSDB_CODE_SUCCESS){
      return code;
    }
    oneTable = taosHashIterate(info->childTables, oneTable);
wmmhello's avatar
wmmhello 已提交
1699
  }
wmmhello's avatar
wmmhello 已提交
1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710

  smlBuildOutput(info->exec, info->pVgHash);
  launchQueryImpl(info->pRequest, info->pQuery, TSDB_CODE_SUCCESS, true);

  info->affectedRows = taos_affected_rows(info->pRequest);
  return info->pRequest->code;
}

static int smlInsertLines(SSmlHandle *info, char* lines[], int numLines) {
  int32_t code = TSDB_CODE_SUCCESS;

wmmhello's avatar
wmmhello 已提交
1711
  if (numLines <= 0 || numLines > 65536) {
wmmhello's avatar
wmmhello 已提交
1712
    uError("SML:0x%"PRIx64" smlInsertLines numLines should be between 1 and 65536. numLines: %d", info->id, numLines);
wmmhello's avatar
wmmhello 已提交
1713
    code = TSDB_CODE_TSC_APP_ERROR;
wmmhello's avatar
wmmhello 已提交
1714 1715
    goto cleanup;
  }
wmmhello's avatar
wmmhello 已提交
1716

wmmhello's avatar
wmmhello 已提交
1717
  for (int32_t i = 0; i < numLines; ++i) {
wmmhello's avatar
wmmhello 已提交
1718
    code = smlParseLine(info, lines[i]);
wmmhello's avatar
wmmhello 已提交
1719
    if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
1720
      uError("SML:0x%"PRIx64" smlParseLine failed. line %d : %s", info->id, i, lines[i]);
wmmhello's avatar
wmmhello 已提交
1721 1722 1723
      goto cleanup;
    }
  }
wmmhello's avatar
wmmhello 已提交
1724 1725 1726 1727 1728 1729 1730 1731
  uDebug("SML:0x%"PRIx64" smlInsertLines parse success. tables %d", info->id, taosHashGetSize(info->childTables));
  uDebug("SML:0x%"PRIx64" smlInsertLines parse success. super tables %d", info->id, taosHashGetSize(info->superTables));

  code = smlModifyDBSchemas(info);
  if (code != 0) {
    uError("SML:0x%"PRIx64" smlModifyDBSchemas error : %s", info->id, tstrerror(code));
    goto cleanup;
  }
wmmhello's avatar
wmmhello 已提交
1732

wmmhello's avatar
wmmhello 已提交
1733 1734 1735
  code = smlInsertData(info);
  if (code != 0) {
    uError("SML:0x%"PRIx64" smlInsertData error : %s", info->id, tstrerror(code));
wmmhello's avatar
wmmhello 已提交
1736 1737 1738
    goto cleanup;
  }

wmmhello's avatar
wmmhello 已提交
1739
  uDebug("SML:0x%"PRIx64" smlInsertLines finish inserting %d lines.", info->id, numLines);
wmmhello's avatar
wmmhello 已提交
1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765

cleanup:
  return code;
}

/**
 * taos_schemaless_insert() parse and insert data points into database according to
 * different protocol.
 *
 * @param $lines input array may contain multiple lines, each line indicates a data point.
 *               If protocol=2 is used input array should contain single JSON
 *               string(e.g. char *lines[] = {"$JSON_string"}). If need to insert
 *               multiple data points in JSON format, should include them in $JSON_string
 *               as a JSON array.
 * @param $numLines indicates how many data points in $lines.
 *                  If protocol = 2 is used this param will be ignored as $lines should
 *                  contain single JSON string.
 * @param $protocol indicates which protocol to use for parsing:
 *                  0 - influxDB line protocol
 *                  1 - OpenTSDB telnet line protocol
 *                  2 - OpenTSDB JSON format protocol
 * @return return zero for successful insertion. Otherwise return none-zero error code of
 *         failure reason.
 *
 */

wmmhello's avatar
wmmhello 已提交
1766
TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) {
wmmhello's avatar
wmmhello 已提交
1767
  SRequestObj* request = createRequest(taos, NULL, NULL, TSDB_SQL_INSERT);
wmmhello's avatar
wmmhello 已提交
1768
  if(!request){
1769
    return NULL;
wmmhello's avatar
wmmhello 已提交
1770 1771
  }

1772
  SSmlHandle* info = smlBuildSmlInfo(taos, request, protocol, precision, true);
wmmhello's avatar
wmmhello 已提交
1773
  if(!info){
1774
    return (TAOS_RES*)request;
wmmhello's avatar
wmmhello 已提交
1775 1776
  }

wmmhello's avatar
wmmhello 已提交
1777
  switch (protocol) {
wmmhello's avatar
wmmhello 已提交
1778
    case TSDB_SML_LINE_PROTOCOL:{
wmmhello's avatar
wmmhello 已提交
1779
      smlInsertLines(info, lines, numLines);
wmmhello's avatar
wmmhello 已提交
1780
      break;
wmmhello's avatar
wmmhello 已提交
1781
    }
wmmhello's avatar
wmmhello 已提交
1782 1783 1784 1785 1786 1787 1788 1789 1790
    case TSDB_SML_TELNET_PROTOCOL:
      //code = taos_insert_telnet_lines(taos, lines, numLines, protocol, tsType, &affected_rows);
      break;
    case TSDB_SML_JSON_PROTOCOL:
      //code = taos_insert_json_payload(taos, *lines, protocol, tsType, &affected_rows);
      break;
    default:
      break;
  }
1791
  smlDestroyInfo(info);
wmmhello's avatar
wmmhello 已提交
1792

wmmhello's avatar
wmmhello 已提交
1793 1794
end:
  return (TAOS_RES*)request;
wmmhello's avatar
wmmhello 已提交
1795
}
1796