提交 ec97d053 编写于 作者: wmmhello's avatar wmmhello

refactor:add schemaless function

上级 a4629e56
......@@ -199,7 +199,7 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr);
#endif
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision, bool dataFormat);
/* --------------------------TMQ INTERFACE------------------------------- */
......
......@@ -96,9 +96,9 @@ int32_t qBindStmtTagsValue(void *pBlock, void *boundTags, int64_t suid, char *tN
void destroyBoundColumnInfo(void* pBoundInfo);
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char *msgBuf, int32_t msgBufLen);
void* tscSmlInitHandle(SQuery *pQuery);
void tscSmlDestroyHandle(void *pHandle);
int32_t smlBind(void *handle, SArray *tags, SArray *cols, STableMeta *pTableMeta, char *msgBuf, int16_t msgBufLen);
void* smlInitHandle(SQuery *pQuery);
void smlDestroyHandle(void *pHandle);
int32_t smlBindData(void *handle, SArray *tags, SArray *cols, STableMeta *pTableMeta, char *msgBuf, int16_t msgBufLen);
int32_t smlBuildOutput(void* handle, SHashObj* pVgHash);
#ifdef __cplusplus
......
/*
* Copyright (c) 2021 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_CLIENTSML_H
#define TDENGINE_CLIENTSML_H
#ifdef __cplusplus
extern "C" {
#endif
#include "thash.h"
#include "clientInt.h"
#include "catalog.h"
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;
typedef struct {
const char* measure;
const char* tags;
const char* cols;
const char* timestamp;
int32_t measureLen;
int32_t measureTagsLen;
int32_t tagsLen;
int32_t colsLen;
int32_t timestampLen;
} TAOS_PARSE_ELEMENTS;
typedef struct {
const char *sTableName; // super table name
uint8_t sTableNameLen;
char childTableName[TSDB_TABLE_NAME_LEN];
uint64_t uid;
SArray *tags;
SArray *cols; // elements are SHashObj<key, SSmlKv*> for find by key quickly
SArray colsColumn; // elements are cols key string
} TAOS_SML_DATA_POINT_TAGS;
typedef struct SSmlSTableMeta {
// char *sTableName; // super table name
// uint8_t sTableNameLen;
uint8_t precision; // the number of precision
SHashObj *tagHash;
SHashObj *fieldHash;
} SSmlSTableMeta;
typedef struct SMsgBuf {
int32_t len;
char *buf;
} SMsgBuf;
typedef struct {
uint64_t id;
SMLProtocolType protocol;
int32_t tsType;
SHashObj *childTables;
SHashObj *superTables;
SHashObj *metaHashObj;
SHashObj *pVgHash;
void *exec;
STscObj *taos;
SCatalog *pCatalog;
SRequestObj *pRequest;
SQuery *pQuery;
int32_t affectedRows;
SMsgBuf msgBuf;
} SSmlLinesInfo;
int smlInsert(TAOS* taos, SSmlLinesInfo* info);
bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info);
bool isValidInteger(char *str);
bool isValidFloat(char *str);
int32_t isValidChildTableName(const char *pTbName, int16_t len, SSmlLinesInfo* info);
bool convertSmlValueType(SSmlKv *pVal, char *value,
uint16_t len, SSmlLinesInfo* info, bool isTag);
int32_t convertSmlTimeStamp(SSmlKv *pVal, char *value,
uint16_t len, SSmlLinesInfo* info);
int sml_insert_lines(TAOS* taos, SRequestObj* request, char* lines[], int numLines, SMLProtocolType protocol,
SMLTimeStampType tsType);
int sml_insert_telnet_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType protocol,
SMLTimeStampType tsType, int* affectedRows);
int sml_insert_json_payload(TAOS* taos, char* payload, SMLProtocolType protocol,
SMLTimeStampType tsType, int* affectedRows);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_CLIENTSML_H
#include <clientInt.h>
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "clientSml.h"
#include "tdef.h"
#include "ttypes.h"
#include "tmsg.h"
#include "tlog.h"
#include "query.h"
#include "taoserror.h"
#include "taos.h"
#include "ttime.h"
#include "taoserror.h"
#include "tdef.h"
#include "tlog.h"
#include "tmsg.h"
#include "tstrbuild.h"
typedef struct {
char sTableName[TSDB_TABLE_NAME_LEN];
SHashObj* tagHash;
SHashObj* fieldHash;
SArray* tags; //SArray<SSchema>
SArray* fields; //SArray<SSchema>
uint8_t precision;
} SSmlSTableSchema;
#include "ttime.h"
#include "ttypes.h"
#include "tcommon.h"
#include "catalog.h"
//=================================================================================================
#define SPACE ' '
#define COMMA ','
......@@ -32,16 +24,105 @@ typedef struct {
#define SLASH '\\'
#define tsMaxSQLStringLen (1024*1024)
#define TSNAMELEN 2
#define TAGNAMELEN 3
//=================================================================================================
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;
typedef enum {
SCHEMA_ACTION_CREATE_STABLE,
SCHEMA_ACTION_ADD_COLUMN,
SCHEMA_ACTION_ADD_TAG,
SCHEMA_ACTION_CHANGE_COLUMN_SIZE,
SCHEMA_ACTION_CHANGE_TAG_SIZE,
} ESchemaAction;
typedef struct {
char sTableName[TSDB_TABLE_NAME_LEN];
SHashObj *tags;
SHashObj *fields;
} SCreateSTableActionInfo;
typedef struct {
char sTableName[TSDB_TABLE_NAME_LEN];
SSmlKv * field;
} SAlterSTableActionInfo;
typedef struct {
ESchemaAction action;
union {
SCreateSTableActionInfo createSTable;
SAlterSTableActionInfo alterSTable;
};
} SSchemaAction;
typedef struct {
const char* measure;
const char* tags;
const char* cols;
const char* timestamp;
int32_t measureLen;
int32_t measureTagsLen;
int32_t tagsLen;
int32_t colsLen;
int32_t timestampLen;
} SSmlLineInfo;
typedef struct {
const char *sTableName; // super table name
uint8_t sTableNameLen;
char childTableName[TSDB_TABLE_NAME_LEN];
uint64_t uid;
SArray *tags;
// colsFormat store cols formated, for quick parse, if info->formatData is true
SArray *colsFormat; // elements are SArray<SSmlKv*>
// cols & colsColumn store cols un formated
SArray *cols; // elements are SHashObj<cols key string, SSmlKv*> for find by key quickly
SHashObj *columnsHash; // elements are <cols key string, 1>, just for judge if key exists quickly.
} SSmlTableInfo;
typedef struct {
SHashObj *tagHash;
SHashObj *fieldHash;
STableMeta *tableMeta;
} SSmlSTableMeta;
typedef struct {
int32_t len;
char *buf;
} SSmlMsgBuf;
typedef struct {
uint64_t id;
SMLProtocolType protocol;
int8_t precision;
bool dataFormat; // true means that the name, number and order of keys in each line are the same
SHashObj *childTables;
SHashObj *superTables;
SHashObj *pVgHash;
void *exec;
STscObj *taos;
SCatalog *pCatalog;
SRequestObj *pRequest;
SQuery *pQuery;
int32_t affectedRows;
SSmlMsgBuf msgBuf;
} SSmlHandle;
//=================================================================================================
static uint64_t linesSmlHandleId = 0;
static const char* TS = "ts";
static const char* TAG = "tag";
static const char* TAG = "tagNone";
//=================================================================================================
uint64_t genLinesSmlId() {
static uint64_t smlGenId() {
uint64_t id;
do {
......@@ -51,13 +132,13 @@ uint64_t genLinesSmlId() {
return id;
}
static int32_t buildInvalidDataMsg(SMsgBuf* pBuf, const char *msg1, const char *msg2) {
static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf* pBuf, const char *msg1, const char *msg2) {
if(msg1) snprintf(pBuf->buf, pBuf->len, "%s:", msg1);
if(msg2) strncpy(pBuf->buf, msg2, pBuf->len);
return TSDB_CODE_SML_INVALID_DATA;
}
int compareSmlColKv(const void* p1, const void* p2) {
static int smlCompareKv(const void* p1, const void* p2) {
SSmlKv* kv1 = (SSmlKv *)p1;
SSmlKv* kv2 = (SSmlKv*)p2;
int kvLen1 = (int)strlen(kv1->key);
......@@ -70,37 +151,10 @@ int compareSmlColKv(const void* p1, const void* p2) {
}
}
typedef enum {
SCHEMA_ACTION_CREATE_STABLE,
SCHEMA_ACTION_ADD_COLUMN,
SCHEMA_ACTION_ADD_TAG,
SCHEMA_ACTION_CHANGE_COLUMN_SIZE,
SCHEMA_ACTION_CHANGE_TAG_SIZE,
} ESchemaAction;
typedef struct {
char sTableName[TSDB_TABLE_NAME_LEN];
SHashObj *tags;
SHashObj *fields;
} SCreateSTableActionInfo;
typedef struct {
char sTableName[TSDB_TABLE_NAME_LEN];
SSmlKv * field;
} SAlterSTableActionInfo;
typedef struct {
ESchemaAction action;
union {
SCreateSTableActionInfo createSTable;
SAlterSTableActionInfo alterSTable;
};
} SSchemaAction;
static int32_t buildSmlChildTableName(TAOS_SML_DATA_POINT_TAGS *tags) {
static void smlBuildChildTableName(SSmlTableInfo *tags) {
int32_t size = taosArrayGetSize(tags->tags);
ASSERT(size > 0);
qsort(tags->tags, size, POINTER_BYTES, compareSmlColKv);
qsort(tags->tags, size, POINTER_BYTES, smlCompareKv);
SStringBuilder sb = {0};
taosStringBuilderAppendStringLen(&sb, tags->sTableName, tags->sTableNameLen);
......@@ -120,12 +174,10 @@ static int32_t buildSmlChildTableName(TAOS_SML_DATA_POINT_TAGS *tags) {
snprintf(tags->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64"%016"PRIx64, digest1, digest2);
taosStringBuilderDestroy(&sb);
tags->uid = digest1;
uDebug("SML: child table name: %s", tags->childTableName);
return 0;
}
static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, SArray* dbAttrArray, bool isTag, char sTableName[],
SSchemaAction* action, bool* actionNeeded, SSmlLinesInfo* info) {
static int32_t smlGenerateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, SArray* dbAttrArray, bool isTag, char sTableName[],
SSchemaAction* action, bool* actionNeeded, SSmlHandle* info) {
// char fieldName[TSDB_COL_NAME_LEN] = {0};
// strcpy(fieldName, pointColField->name);
//
......@@ -168,7 +220,7 @@ static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash
return 0;
}
static int32_t buildColumnDescription(SSmlKv* field, char* buf, int32_t bufSize, int32_t* outBytes) {
static int32_t smlBuildColumnDescription(SSmlKv* field, char* buf, int32_t bufSize, int32_t* outBytes) {
uint8_t type = field->type;
char tname[TSDB_TABLE_NAME_LEN] = {0};
memcpy(tname, field->key, field->keyLen);
......@@ -185,7 +237,7 @@ static int32_t buildColumnDescription(SSmlKv* field, char* buf, int32_t bufSize,
return 0;
}
static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInfo* info) {
static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
int32_t code = 0;
int32_t outBytes = 0;
char *result = (char *)taosMemoryCalloc(1, tsMaxSQLStringLen+1);
......@@ -195,8 +247,8 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
switch (action->action) {
case SCHEMA_ACTION_ADD_COLUMN: {
int n = sprintf(result, "alter stable %s add column ", action->alterSTable.sTableName);
buildColumnDescription(action->alterSTable.field, result+n, capacity-n, &outBytes);
TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
smlBuildColumnDescription(action->alterSTable.field, result+n, capacity-n, &outBytes);
TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery
code = taos_errno(res);
const char* errStr = taos_errstr(res);
char* begin = strstr(errStr, "duplicated column names");
......@@ -208,7 +260,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
// if (code == TSDB_CODE_MND_FIELD_ALREADY_EXIST || code == TSDB_CODE_MND_TAG_ALREADY_EXIST || tscDupColNames) {
if (code == TSDB_CODE_MND_TAG_ALREADY_EXIST || tscDupColNames) {
TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
code = taos_errno(res2);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
......@@ -220,9 +272,9 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
}
case SCHEMA_ACTION_ADD_TAG: {
int n = sprintf(result, "alter stable %s add tag ", action->alterSTable.sTableName);
buildColumnDescription(action->alterSTable.field,
smlBuildColumnDescription(action->alterSTable.field,
result+n, capacity-n, &outBytes);
TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery
code = taos_errno(res);
const char* errStr = taos_errstr(res);
char* begin = strstr(errStr, "duplicated column names");
......@@ -234,7 +286,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
// if (code ==TSDB_CODE_MND_TAG_ALREADY_EXIST || code == TSDB_CODE_MND_FIELD_ALREAY_EXIST || tscDupColNames) {
if (code ==TSDB_CODE_MND_TAG_ALREADY_EXIST || tscDupColNames) {
TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
code = taos_errno(res2);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
......@@ -246,9 +298,9 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
}
case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: {
int n = sprintf(result, "alter stable %s modify column ", action->alterSTable.sTableName);
buildColumnDescription(action->alterSTable.field, result+n,
smlBuildColumnDescription(action->alterSTable.field, result+n,
capacity-n, &outBytes);
TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery
code = taos_errno(res);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
......@@ -257,7 +309,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
// if (code == TSDB_CODE_MND_INVALID_COLUMN_LENGTH || code == TSDB_CODE_TSC_INVALID_COLUMN_LENGTH) {
if (code == TSDB_CODE_TSC_INVALID_COLUMN_LENGTH) {
TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
code = taos_errno(res2);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
......@@ -269,9 +321,9 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
}
case SCHEMA_ACTION_CHANGE_TAG_SIZE: {
int n = sprintf(result, "alter stable %s modify tag ", action->alterSTable.sTableName);
buildColumnDescription(action->alterSTable.field, result+n,
smlBuildColumnDescription(action->alterSTable.field, result+n,
capacity-n, &outBytes);
TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery
TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery
code = taos_errno(res);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
......@@ -280,7 +332,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
// if (code == TSDB_CODE_MND_INVALID_TAG_LENGTH || code == TSDB_CODE_TSC_INVALID_TAG_LENGTH) {
if (code == TSDB_CODE_TSC_INVALID_TAG_LENGTH) {
TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
code = taos_errno(res2);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
......@@ -296,7 +348,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
SSmlKv **kv = taosHashIterate(action->createSTable.fields, NULL);
while(kv){
buildColumnDescription(*kv, pos, freeBytes, &outBytes);
smlBuildColumnDescription(*kv, pos, freeBytes, &outBytes);
pos += outBytes; freeBytes -= outBytes;
*pos = ','; ++pos; --freeBytes;
kv = taosHashIterate(action->createSTable.fields, kv);
......@@ -308,14 +360,14 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
kv = taosHashIterate(action->createSTable.tags, NULL);
while(kv){
buildColumnDescription(*kv, pos, freeBytes, &outBytes);
smlBuildColumnDescription(*kv, pos, freeBytes, &outBytes);
pos += outBytes; freeBytes -= outBytes;
*pos = ','; ++pos; --freeBytes;
kv = taosHashIterate(action->createSTable.tags, kv);
}
pos--; ++freeBytes;
outBytes = snprintf(pos, freeBytes, ")");
TAOS_RES* res = taos_query(taos, result);
TAOS_RES* res = taos_query(info->taos, result);
code = taos_errno(res);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
......@@ -323,7 +375,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
taos_free_result(res);
if (code == TSDB_CODE_MND_STB_ALREADY_EXIST) {
TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE");
TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
code = taos_errno(res2);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
......@@ -338,14 +390,14 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
break;
}
taosMemoryFree(result);
taosMemoryFreeClear(result);
if (code != 0) {
uError("SML:0x%"PRIx64 " apply schema action failure. %s", info->id, tstrerror(code));
}
return code;
}
static int32_t modifyDBSchemas(TAOS* taos, SSmlLinesInfo* info) {
static int32_t smlModifyDBSchemas(SSmlHandle* info) {
int32_t code = 0;
SSmlSTableMeta** tableMetaSml = taosHashIterate(info->superTables, NULL);
......@@ -356,7 +408,7 @@ static int32_t modifyDBSchemas(TAOS* taos, SSmlLinesInfo* info) {
SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
size_t superTableLen = 0;
void *superTable = taosHashGetKey(tableMetaSml, &superTableLen);
void *superTable = taosHashGetKey(tableMetaSml, &superTableLen); // todo escape
SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
strcpy(pName.dbname, info->pRequest->pDb);
memcpy(pName.tname, superTable, superTableLen);
......@@ -369,10 +421,15 @@ static int32_t modifyDBSchemas(TAOS* taos, SSmlLinesInfo* info) {
memcpy(schemaAction.createSTable.sTableName, superTable, superTableLen);
schemaAction.createSTable.tags = cTablePoints->tagHash;
schemaAction.createSTable.fields = cTablePoints->fieldHash;
applySchemaAction(taos, &schemaAction, info);
code = smlApplySchemaAction(info, &schemaAction);
if (code != 0) {
uError("SML:0x%"PRIx64" smlApplySchemaAction failed. can not create %s", info->id, schemaAction.createSTable.sTableName);
return code;
}
code = catalogGetSTableMeta(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &pTableMeta);
if (code != 0) {
uError("SML:0x%"PRIx64" reconcile point schema failed. can not create %s", info->id, schemaAction.createSTable.sTableName);
uError("SML:0x%"PRIx64" catalogGetSTableMeta failed. super table name %s", info->id, schemaAction.createSTable.sTableName);
return code;
}
}else if (code == TSDB_CODE_SUCCESS) {
......@@ -380,67 +437,13 @@ static int32_t modifyDBSchemas(TAOS* taos, SSmlLinesInfo* info) {
uError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code));
return code;
}
taosHashPut(info->metaHashObj, superTable, superTableLen, &pTableMeta, POINTER_BYTES);
cTablePoints->tableMeta = pTableMeta;
tableMetaSml = taosHashIterate(info->superTables, tableMetaSml);
}
return 0;
}
static int32_t applyDataPoints(SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS;
TAOS_SML_DATA_POINT_TAGS** oneTable = taosHashIterate(info->childTables, NULL);
while (oneTable) {
TAOS_SML_DATA_POINT_TAGS* tableData = *oneTable;
SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
strcpy(pName.dbname, info->pRequest->pDb);
memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName));
SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
SVgroupInfo vg;
catalogGetTableHashVgroup(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &vg);
taosHashPut(info->pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
STableMeta** pMeta = taosHashGet(info->metaHashObj, tableData->sTableName, tableData->sTableNameLen);
ASSERT (NULL != pMeta && NULL != *pMeta);
(*pMeta)->vgId = vg.vgId;
(*pMeta)->uid = tableData->uid; // one table merge data block together according uid
code = smlBind(info->exec, tableData->tags, tableData->cols, *pMeta, info->msgBuf.buf, info->msgBuf.len);
if(code != TSDB_CODE_SUCCESS){
return code;
}
oneTable = taosHashIterate(info->childTables, oneTable);
}
smlBuildOutput(info->exec, info->pVgHash);
launchQueryImpl(info->pRequest, info->pQuery, TSDB_CODE_SUCCESS, true);
info->affectedRows = taos_affected_rows(info->pRequest);
return info->pRequest->code;
}
int smlInsert(TAOS* taos, SSmlLinesInfo* info) {
uDebug("SML:0x%"PRIx64" taos_sml_insert. number of super tables: %d", info->id, taosHashGetSize(info->superTables));
uDebug("SML:0x%"PRIx64" modify db schemas", info->id);
int32_t code = modifyDBSchemas(taos, info);
if (code != 0) {
uError("SML:0x%"PRIx64" error change db schema : %s", info->id, tstrerror(code));
return code;
}
uDebug("SML:0x%"PRIx64" apply data points", info->id);
code = applyDataPoints(info);
if (code != 0) {
uError("SML:0x%"PRIx64" error apply data points : %s", info->id, tstrerror(code));
return code;
}
return TSDB_CODE_SUCCESS;
}
//=========================================================================
/* Field Escape charaters
......@@ -448,50 +451,50 @@ int smlInsert(TAOS* taos, SSmlLinesInfo* info) {
2: tag_key, tag_value, field_key Comma,Equal Sign,Space
3: field_value Double quote,Backslash
*/
static void escapeSpecialCharacter(uint8_t field, const char **pos) {
const char *cur = *pos;
if (*cur != '\\') {
return;
}
switch (field) {
case 1:
switch (*(cur + 1)) {
case ',':
case ' ':
cur++;
break;
default:
break;
}
break;
case 2:
switch (*(cur + 1)) {
case ',':
case ' ':
case '=':
cur++;
break;
default:
break;
}
break;
case 3:
switch (*(cur + 1)) {
case '"':
case '\\':
cur++;
break;
default:
break;
}
break;
default:
break;
}
*pos = cur;
}
//static void escapeSpecialCharacter(uint8_t field, const char **pos) {
// const char *cur = *pos;
// if (*cur != '\\') {
// return;
// }
// switch (field) {
// case 1:
// switch (*(cur + 1)) {
// case ',':
// case ' ':
// cur++;
// break;
// default:
// break;
// }
// break;
// case 2:
// switch (*(cur + 1)) {
// case ',':
// case ' ':
// case '=':
// cur++;
// break;
// default:
// break;
// }
// break;
// case 3:
// switch (*(cur + 1)) {
// case '"':
// case '\\':
// cur++;
// break;
// default:
// break;
// }
// break;
// default:
// break;
// }
// *pos = cur;
//}
static bool parseTinyInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
static bool smlParseTinyInt(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) {
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if (len <= 2) {
......@@ -503,10 +506,10 @@ static bool parseTinyInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
int64_t result = strtoll(pVal, &endptr, 10);
if(endptr != signalPos){ // 78ri8
*isValid = false;
buildInvalidDataMsg(msg, "invalid tiny int", endptr);
smlBuildInvalidDataMsg(msg, "invalid tiny int", endptr);
}else if(!IS_VALID_TINYINT(result)){
*isValid = false;
buildInvalidDataMsg(msg, "tiny int out of range[-128,127]", endptr);
smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", endptr);
}else{
kvVal->i = result;
*isValid = true;
......@@ -516,7 +519,7 @@ static bool parseTinyInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
return false;
}
static bool parseTinyUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
static bool smlParseTinyUint(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) {
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if (len <= 2) {
......@@ -531,10 +534,10 @@ static bool parseTinyUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
int64_t result = strtoll(pVal, &endptr, 10);
if(endptr != signalPos){ // 78ri8
*isValid = false;
buildInvalidDataMsg(msg, "invalid unsigned tiny int", endptr);
smlBuildInvalidDataMsg(msg, "invalid unsigned tiny int", endptr);
}else if(!IS_VALID_UTINYINT(result)){
*isValid = false;
buildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", endptr);
smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", endptr);
}else{
kvVal->i = result;
*isValid = true;
......@@ -544,7 +547,7 @@ static bool parseTinyUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
return false;
}
static bool parseSmallInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
static bool smlParseSmallInt(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) {
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if (len <= 3) {
......@@ -556,10 +559,10 @@ static bool parseSmallInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
int64_t result = strtoll(pVal, &endptr, 10);
if(endptr != signalPos){ // 78ri8
*isValid = false;
buildInvalidDataMsg(msg, "invalid small int", endptr);
smlBuildInvalidDataMsg(msg, "invalid small int", endptr);
}else if(!IS_VALID_SMALLINT(result)){
*isValid = false;
buildInvalidDataMsg(msg, "small int our of range[-32768,32767]", endptr);
smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", endptr);
}else{
kvVal->i = result;
*isValid = true;
......@@ -569,7 +572,7 @@ static bool parseSmallInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
return false;
}
static bool parseSmallUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
static bool smlParseSmallUint(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) {
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if (len <= 3) {
......@@ -584,10 +587,10 @@ static bool parseSmallUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
int64_t result = strtoll(pVal, &endptr, 10);
if(endptr != signalPos){ // 78ri8
*isValid = false;
buildInvalidDataMsg(msg, "invalid unsigned small int", endptr);
smlBuildInvalidDataMsg(msg, "invalid unsigned small int", endptr);
}else if(!IS_VALID_USMALLINT(result)){
*isValid = false;
buildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", endptr);
smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", endptr);
}else{
kvVal->i = result;
*isValid = true;
......@@ -597,7 +600,7 @@ static bool parseSmallUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
return false;
}
static bool parseInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
static bool smlParseInt(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) {
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if (len <= 3) {
......@@ -609,10 +612,10 @@ static bool parseInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
int64_t result = strtoll(pVal, &endptr, 10);
if(endptr != signalPos){ // 78ri8
*isValid = false;
buildInvalidDataMsg(msg, "invalid int", endptr);
smlBuildInvalidDataMsg(msg, "invalid int", endptr);
}else if(!IS_VALID_INT(result)){
*isValid = false;
buildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", endptr);
smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", endptr);
}else{
kvVal->i = result;
*isValid = true;
......@@ -622,7 +625,7 @@ static bool parseInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
return false;
}
static bool parseUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
static bool smlParseUint(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) {
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if (len <= 3) {
......@@ -637,10 +640,10 @@ static bool parseUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
int64_t result = strtoll(pVal, &endptr, 10);
if(endptr != signalPos){ // 78ri8
*isValid = false;
buildInvalidDataMsg(msg, "invalid unsigned int", endptr);
smlBuildInvalidDataMsg(msg, "invalid unsigned int", endptr);
}else if(!IS_VALID_UINT(result)){
*isValid = false;
buildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", endptr);
smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", endptr);
}else{
kvVal->i = result;
*isValid = true;
......@@ -650,7 +653,7 @@ static bool parseUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
return false;
}
static bool parseBigInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
static bool smlParseBigInt(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) {
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if (len > 3 && strcasecmp(pVal + len - 3, "i64") == 0) {
......@@ -670,10 +673,10 @@ static bool parseBigInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
int64_t result = strtoll(pVal, &endptr, 10);
if(endptr != pVal + len - 1){ // 78ri8
*isValid = false;
buildInvalidDataMsg(msg, "invalid big int", endptr);
smlBuildInvalidDataMsg(msg, "invalid big int", endptr);
}else if(!IS_VALID_BIGINT(result)){
*isValid = false;
buildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", endptr);
smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", endptr);
}else{
kvVal->i = result;
*isValid = true;
......@@ -683,7 +686,7 @@ static bool parseBigInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
return false;
}
static bool parseBigUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
static bool smlParseBigUint(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) {
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if (len <= 3) {
......@@ -698,10 +701,10 @@ static bool parseBigUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
uint64_t result = strtoull(pVal, &endptr, 10);
if(endptr != signalPos){ // 78ri8
*isValid = false;
buildInvalidDataMsg(msg, "invalid unsigned big int", endptr);
smlBuildInvalidDataMsg(msg, "invalid unsigned big int", endptr);
}else if(!IS_VALID_UBIGINT(result)){
*isValid = false;
buildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", endptr);
smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", endptr);
}else{
kvVal->u = result;
*isValid = true;
......@@ -711,7 +714,7 @@ static bool parseBigUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
return false;
}
static bool parseFloat(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
static bool smlParseFloat(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) {
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
char *endptr = NULL;
......@@ -725,10 +728,10 @@ static bool parseFloat(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
if (len > 3 && len <strcasecmp(pVal + len - 3, "f32") == 0) {
if(endptr != pVal + len - 3){ // 78ri8
*isValid = false;
buildInvalidDataMsg(msg, "invalid float", endptr);
smlBuildInvalidDataMsg(msg, "invalid float", endptr);
}else if(!IS_VALID_FLOAT(result)){
*isValid = false;
buildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", endptr);
smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", endptr);
}else{
kvVal->f = result;
*isValid = true;
......@@ -738,7 +741,7 @@ static bool parseFloat(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
return false;
}
static bool parseDouble(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
static bool smlParseDouble(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) {
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if (len <= 3) {
......@@ -750,10 +753,10 @@ static bool parseDouble(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
double result = strtod(pVal, &endptr);
if(endptr != signalPos){ // 78ri8
*isValid = false;
buildInvalidDataMsg(msg, "invalid double", endptr);
smlBuildInvalidDataMsg(msg, "invalid double", endptr);
}else if(!IS_VALID_DOUBLE(result)){
*isValid = false;
buildInvalidDataMsg(msg, "double out of range[-1.7976931348623158e+308,1.7976931348623158e+308]", endptr);
smlBuildInvalidDataMsg(msg, "double out of range[-1.7976931348623158e+308,1.7976931348623158e+308]", endptr);
}else{
kvVal->d = result;
*isValid = true;
......@@ -763,70 +766,63 @@ static bool parseDouble(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
return false;
}
static bool parseBool(SSmlKv *kvVal) {
static bool smlParseBool(SSmlKv *kvVal) {
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if ((len == 1) && pVal[len - 1] == 't') {
//printf("Type is bool(%c)\n", pVal[len - 1]);
kvVal->i = true;
return true;
}
if ((len == 1) && pVal[len - 1] == 'f') {
//printf("Type is bool(%c)\n", pVal[len - 1]);
kvVal->i = false;
return true;
}
if((len == 4) && !strcasecmp(pVal, "true")) {
//printf("Type is bool(%s)\n", &pVal[len - 4]);
kvVal->i = true;
return true;
}
if((len == 5) && !strcasecmp(pVal, "false")) {
//printf("Type is bool(%s)\n", &pVal[len - 5]);
kvVal->i = false;
return true;
}
return false;
}
static bool isBinary(const char *pVal, uint16_t len) {
static bool smlIsBinary(const char *pVal, uint16_t len) {
//binary: "abc"
if (len < 2) {
return false;
}
//binary
if (pVal[0] == '"' && pVal[len - 1] == '"') {
//printf("Type is binary(%s)\n", pVal);
return true;
}
return false;
}
static bool isNchar(const char *pVal, uint16_t len) {
static bool smlIsNchar(const char *pVal, uint16_t len) {
//nchar: L"abc"
if (len < 3) {
return false;
}
if ((pVal[0] == 'l' || pVal[0] == 'L')&& pVal[1] == '"' && pVal[len - 1] == '"') {
//printf("Type is nchar(%s)\n", pVal);
return true;
}
return false;
}
static bool convertSmlValue(SSmlKv *pVal, SMsgBuf *msg) {
static bool smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
// put high probability matching type first
bool isValid = false;
if (parseFloat(pVal, &isValid, msg)) {
if (smlParseFloat(pVal, &isValid, msg)) {
if(!isValid) return false;
pVal->type = TSDB_DATA_TYPE_FLOAT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
return true;
}
//binary
if (isBinary(pVal->value, pVal->valueLen)) {
if (smlIsBinary(pVal->value, pVal->valueLen)) {
pVal->type = TSDB_DATA_TYPE_BINARY;
pVal->length = pVal->valueLen - 2;
pVal->valueLen -= 2;
......@@ -834,13 +830,13 @@ static bool convertSmlValue(SSmlKv *pVal, SMsgBuf *msg) {
return true;
}
//nchar
if (isNchar(pVal->value, pVal->valueLen)) {
if (smlIsNchar(pVal->value, pVal->valueLen)) {
pVal->type = TSDB_DATA_TYPE_NCHAR;
pVal->length = pVal->valueLen - 3;
pVal->value = pVal->value+2;
return true;
}
if (parseDouble(pVal, &isValid, msg)) {
if (smlParseDouble(pVal, &isValid, msg)) {
if(!isValid) return false;
pVal->type = TSDB_DATA_TYPE_DOUBLE;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
......@@ -848,66 +844,66 @@ static bool convertSmlValue(SSmlKv *pVal, SMsgBuf *msg) {
return true;
}
//bool
if (parseBool(pVal)) {
if (smlParseBool(pVal)) {
pVal->type = TSDB_DATA_TYPE_BOOL;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
return true;
}
if (parseTinyInt(pVal, &isValid, msg)) {
if (smlParseTinyInt(pVal, &isValid, msg)) {
if(!isValid) return false;
pVal->type = TSDB_DATA_TYPE_TINYINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
return true;
}
if (parseTinyUint(pVal, &isValid, msg)) {
if (smlParseTinyUint(pVal, &isValid, msg)) {
if(!isValid) return false;
pVal->type = TSDB_DATA_TYPE_UTINYINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
return true;
}
if (parseSmallInt(pVal, &isValid, msg)) {
if (smlParseSmallInt(pVal, &isValid, msg)) {
if(!isValid) return false;
pVal->type = TSDB_DATA_TYPE_SMALLINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
return true;
}
if (parseSmallUint(pVal, &isValid, msg)) {
if (smlParseSmallUint(pVal, &isValid, msg)) {
if(!isValid) return false;
pVal->type = TSDB_DATA_TYPE_USMALLINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
return true;
}
if (parseInt(pVal, &isValid, msg)) {
if (smlParseInt(pVal, &isValid, msg)) {
if(!isValid) return false;
pVal->type = TSDB_DATA_TYPE_INT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
return true;
}
if (parseUint(pVal, &isValid, msg)) {
if (smlParseUint(pVal, &isValid, msg)) {
if(!isValid) return false;
pVal->type = TSDB_DATA_TYPE_UINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
return true;
}
if (parseBigInt(pVal, &isValid, msg)) {
if (smlParseBigInt(pVal, &isValid, msg)) {
if(!isValid) return false;
pVal->type = TSDB_DATA_TYPE_BIGINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
return true;
}
if (parseBigUint(pVal, &isValid, msg)) {
if (smlParseBigUint(pVal, &isValid, msg)) {
if(!isValid) return false;
pVal->type = TSDB_DATA_TYPE_UBIGINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
return true;
}
buildInvalidDataMsg(msg, "invalid data", pVal->value);
smlBuildInvalidDataMsg(msg, "invalid data", pVal->value);
return false;
}
bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info) {
static bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlHandle* info) {
char *val = NULL;
val = taosHashGet(pHash, key, strlen(key));
if (val) {
......@@ -921,7 +917,7 @@ bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info) {
return false;
}
int32_t parseSml(const char* sql, TAOS_PARSE_ELEMENTS *elements){
static int32_t smlParseString(const char* sql, SSmlLineInfo *elements, SSmlMsgBuf *msg){
if(!sql) return TSDB_CODE_SML_INVALID_DATA;
while (*sql != '\0') { // jump the space at the begining
if(*sql != SPACE) {
......@@ -930,7 +926,10 @@ int32_t parseSml(const char* sql, TAOS_PARSE_ELEMENTS *elements){
}
sql++;
}
if (!elements->measure || *sql == COMMA) return TSDB_CODE_SML_INVALID_DATA;
if (!elements->measure || *sql == COMMA) {
smlBuildInvalidDataMsg(msg, "invalid data", sql);
return TSDB_CODE_SML_INVALID_DATA;
}
// parse measure and tag
while (*sql != '\0') {
......@@ -953,7 +952,13 @@ int32_t parseSml(const char* sql, TAOS_PARSE_ELEMENTS *elements){
sql++;
}
if(elements->measureLen == 0) return TSDB_CODE_SML_INVALID_DATA;
if(elements->tagsLen == 0){ // measure, cols1=a measure cols1=a
elements->measureTagsLen = elements->measureLen;
}
if(elements->measureLen == 0) {
smlBuildInvalidDataMsg(msg, "invalid measure", elements->measure);
return TSDB_CODE_SML_INVALID_DATA;
}
// parse cols
while (*sql != '\0') {
......@@ -963,7 +968,10 @@ int32_t parseSml(const char* sql, TAOS_PARSE_ELEMENTS *elements){
}
sql++;
}
if(!elements->cols) return TSDB_CODE_SML_INVALID_DATA;
if(!elements->cols) {
smlBuildInvalidDataMsg(msg, "invalid columns", elements->cols);
return TSDB_CODE_SML_INVALID_DATA;
}
bool isInQuote = false;
while (*sql != '\0') {
......@@ -992,13 +1000,13 @@ int32_t parseSml(const char* sql, TAOS_PARSE_ELEMENTS *elements){
return TSDB_CODE_SUCCESS;
}
bool parseSmlCols(const char* data, int32_t len, SArray *cols, bool isTag, SMsgBuf *msg){
static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool isTag, SSmlMsgBuf *msg){
if(isTag && len == 0){
SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1);
kv->key = TAG;
kv->keyLen = TAGNAMELEN;
kv->keyLen = strlen(TAG);
kv->value = TAG;
kv->valueLen = TAGNAMELEN;
kv->valueLen = strlen(TAG);
kv->type = TSDB_DATA_TYPE_NCHAR;
if(cols) taosArrayPush(cols, &kv);
return true;
......@@ -1016,7 +1024,7 @@ bool parseSmlCols(const char* data, int32_t len, SArray *cols, bool isTag, SMsgB
i++;
}
if(keyLen == 0 || keyLen >= TSDB_COL_NAME_LEN){
buildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
return TSDB_CODE_SML_INVALID_DATA;
}
......@@ -1031,7 +1039,7 @@ bool parseSmlCols(const char* data, int32_t len, SArray *cols, bool isTag, SMsgB
}
int32_t valueLen = data + i - value;
if(valueLen == 0){
buildInvalidDataMsg(msg, "invalid value", value);
smlBuildInvalidDataMsg(msg, "invalid value", value);
return TSDB_CODE_SML_INVALID_DATA;
}
......@@ -1044,7 +1052,7 @@ bool parseSmlCols(const char* data, int32_t len, SArray *cols, bool isTag, SMsgB
if(isTag){
kv->type = TSDB_DATA_TYPE_NCHAR;
}else{
if(!convertSmlValue(kv, msg)){
if(!smlParseValue(kv, msg)){
return TSDB_CODE_SML_INVALID_DATA;
}
}
......@@ -1055,53 +1063,56 @@ bool parseSmlCols(const char* data, int32_t len, SArray *cols, bool isTag, SMsgB
return TSDB_CODE_SUCCESS;
}
static int64_t getTimeStampValue(const char *value, int32_t type) {
double ts = (double)strtoll(value, NULL, 10);
static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) {
char *endPtr = NULL;
double ts = (double)strtoll(value, &endPtr, 10);
if(value + len != endPtr){
return -1;
}
switch (type) {
case TSDB_TIME_PRECISION_HOURS:
ts *= (3600 * 1e9);
break;
case TSDB_TIME_PRECISION_MINUTES:
ts *= (60 * 1e9);
break;
case TSDB_TIME_PRECISION_SECONDS:
ts *= (1e9);
break;
case TSDB_TIME_PRECISION_MICRO:
ts *= (1e6);
break;
case TSDB_TIME_PRECISION_MILLI:
ts *= (1e3);
default:
break;
default:
ASSERT(0);
}
if(ts > (double)INT64_MAX || ts < 0){
return -1;
}else{
return (int64_t)ts;
}
return (int64_t)ts;
}
static int64_t getTimeStampNow(int32_t precision) {
static int64_t smlGetTimeNow(int8_t precision) {
switch (precision) {
case TSDB_TIME_PRECISION_HOURS:
return taosGetTimestampMs()/1000/3600;
case TSDB_TIME_PRECISION_MINUTES:
return taosGetTimestampMs()/1000/60;
case TSDB_TIME_PRECISION_SECONDS:
return taosGetTimestampMs()/1000;
default:
case TSDB_TIME_PRECISION_MILLI:
case TSDB_TIME_PRECISION_MICRO:
case TSDB_TIME_PRECISION_NANO:
return taosGetTimestamp(precision);
default:
ASSERT(0);
}
}
static bool isValidateTimeStamp(const char *pVal, int32_t len) {
for (int i = 0; i < len; ++i) {
if (!isdigit(pVal[i])) {
return false;
}
}
return true;
}
static int32_t getTsType(int32_t len) {
static int8_t smlGetTsTypeByLen(int32_t len) {
if (len == TSDB_TIME_PRECISION_SEC_DIGITS) {
return TSDB_TIME_PRECISION_SECONDS;
} else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) {
......@@ -1111,52 +1122,87 @@ static int32_t getTsType(int32_t len) {
}
}
static int32_t parseSmlTS(const char* data, int32_t len, SArray *tags, SSmlLinesInfo* info){
int64_t ts = 0;
if(data == NULL){
if(info->protocol != TSDB_SML_LINE_PROTOCOL){
buildInvalidDataMsg(&info->msgBuf, "timestamp can not be null", NULL);
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
ts = getTimeStampNow(info->tsType);
}else{
int ret = isValidateTimeStamp(data, len);
if(!ret){
buildInvalidDataMsg(&info->msgBuf, "timestamp must be digit", data);
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
static int8_t smlGetTsTypeByPrecision(int8_t precision) {
switch (precision) {
case TSDB_SML_TIMESTAMP_HOURS:
return TSDB_TIME_PRECISION_HOURS;
case TSDB_SML_TIMESTAMP_MILLI_SECONDS:
return TSDB_TIME_PRECISION_MILLI;
case TSDB_SML_TIMESTAMP_NANO_SECONDS:
case TSDB_SML_TIMESTAMP_NOT_CONFIGURED:
return TSDB_TIME_PRECISION_NANO;
case TSDB_SML_TIMESTAMP_MICRO_SECONDS:
return TSDB_TIME_PRECISION_MICRO;
case TSDB_SML_TIMESTAMP_SECONDS:
return TSDB_TIME_PRECISION_SECONDS;
case TSDB_SML_TIMESTAMP_MINUTES:
return TSDB_TIME_PRECISION_MINUTES;
default:
return -1;
}
int32_t tsType = -1;
if(info->protocol != TSDB_SML_LINE_PROTOCOL){
tsType = getTsType(len);
}
static int64_t smlParseInfluxTime(SSmlHandle* info, const char* data, int32_t len){
int8_t tsType = smlGetTsTypeByPrecision(info->precision);
if (tsType == -1) {
buildInvalidDataMsg(&info->msgBuf, "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", data);
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp precision", NULL);
return -1;
}
}else{
tsType = info->tsType;
if(!data){
return smlGetTimeNow(tsType);
}
ts = getTimeStampValue(data, tsType);
int64_t ts = smlGetTimeValue(data, len, tsType);
if(ts == -1){
buildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
return -1;
}
return ts;
}
static int64_t smlParseOpenTsdbTime(SSmlHandle* info, const char* data, int32_t len){
if(!data){
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp can not be null", NULL);
return -1;
}
int8_t tsType = smlGetTsTypeByLen(len);
if (tsType == -1) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", data);
return -1;
}
int64_t ts = smlGetTimeValue(data, len, tsType);
if(ts == -1){
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
return -1;
}
return ts;
}
static int32_t smlParseTS(SSmlHandle* info, const char* data, int32_t len, SArray *cols){
int64_t ts = 0;
if(info->protocol == TSDB_SML_LINE_PROTOCOL){
ts = smlParseInfluxTime(info, data, len);
}else{
ts = smlParseOpenTsdbTime(info, data, len);
}
if(ts == -1) return TSDB_CODE_TSC_INVALID_TIME_STAMP;
// add ts to
SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1);
if(!kv){
return TSDB_CODE_OUT_OF_MEMORY;
}
kv->key = TS;
kv->keyLen = TSNAMELEN;
kv->keyLen = strlen(kv->key);
kv->i = ts;
kv->type = TSDB_DATA_TYPE_TIMESTAMP;
kv->length = (int16_t)tDataTypes[kv->type].bytes;
if(tags) taosArrayPush(tags, &kv);
if(cols) taosArrayPush(cols, &kv);
return TSDB_CODE_SUCCESS;
}
//int32_t parseSmlCols(const char* data, SArray *cols){
//static int32_t parseSmlCols(const char* data, SArray *cols){
// while(*data != '\0'){
// if(*data == EQUAL) return TSDB_CODE_SML_INVALID_DATA;
// const char *key = data;
......@@ -1199,7 +1245,7 @@ static int32_t parseSmlTS(const char* data, int32_t len, SArray *tags, SSmlLines
// return TSDB_CODE_SUCCESS;
//}
bool updateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols, SMsgBuf *msg){
static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols, SSmlMsgBuf *msg){
if(tags){
for (int i = 0; i < taosArrayGetSize(tags); ++i) {
SSmlKv *kv = taosArrayGetP(tags, i);
......@@ -1223,7 +1269,7 @@ bool updateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols, SMsgBuf *
SSmlKv **value = taosHashGet(tableMeta->fieldHash, kv->key, kv->keyLen);
if(value){
if(kv->type != (*value)->type){
buildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
return false;
}else{
if(IS_VAR_DATA_TYPE(kv->type)){ // update string len, if bigger
......@@ -1237,9 +1283,10 @@ bool updateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols, SMsgBuf *
}
}
}
return true;
}
void insertMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols){
static void smlInsertMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols){
if(tags){
for (int i = 0; i < taosArrayGetSize(tags); ++i) {
SSmlKv *kv = taosArrayGetP(tags, i);
......@@ -1255,87 +1302,194 @@ void insertMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols){
}
}
static int32_t smlParseLine(const char* sql, SSmlLinesInfo* info) {
TAOS_PARSE_ELEMENTS elements = {0};
int ret = parseSml(sql, &elements);
static SSmlTableInfo* smlBuildTableInfo(bool format){
SSmlTableInfo *tag = taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
if(!tag){
return NULL;
}
if(format){
tag->colsFormat = taosArrayInit(16, POINTER_BYTES);
if (tag->colsFormat == NULL) {
uError("SML:smlParseLine failed to allocate memory");
goto cleanup;
}
}else{
tag->cols = taosArrayInit(16, POINTER_BYTES);
if (tag->cols == NULL) {
uError("SML:smlParseLine failed to allocate memory");
goto cleanup;
}
tag->columnsHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false);
if (tag->columnsHash == NULL) {
uError("SML:smlParseLine failed to allocate memory");
goto cleanup;
}
}
tag->tags = taosArrayInit(16, POINTER_BYTES);
if (tag->tags == NULL) {
uError("SML:smlParseLine failed to allocate memory");
goto cleanup;
}
return tag;
cleanup:
taosMemoryFreeClear(tag);
return NULL;
}
static void smlDestroyBuildTableInfo(SSmlTableInfo *tag, bool format){
if(format){
taosArrayDestroy(tag->colsFormat);
}else{
tag->cols = taosArrayInit(16, POINTER_BYTES);
for(size_t i = 0; i < taosArrayGetSize(tag->cols); i++){
SHashObj *kvHash = taosArrayGetP(tag->cols, i);
void** p1 = taosHashIterate(kvHash, NULL);
while (p1) {
SSmlKv* kv = *p1;
taosMemoryFreeClear(kv);
p1 = taosHashIterate(kvHash, p1);
}
taosHashCleanup(kvHash);
}
taosHashCleanup(tag->columnsHash);
}
taosArrayDestroy(tag->tags);
taosMemoryFreeClear(tag);
}
static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *cols){
if(dataFormat){
taosArrayPush(oneTable->colsFormat, &cols);
}else{
SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false);
if(!kvHash){
uError("SML:smlDealCols failed to allocate memory");
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for(size_t i = 0; i < taosArrayGetSize(cols); i++){
SSmlKv *kv = taosArrayGetP(cols, i);
taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); // todo key need escape, like \=, because find by schema name later
if(taosHashGet(oneTable->columnsHash, kv->key, kv->keyLen) != NULL){
continue;
}
taosHashPut(oneTable->columnsHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
}
taosArrayPush(oneTable->cols, &kvHash);
}
}
static SSmlSTableMeta* smlBuildSTableMeta(){
SSmlSTableMeta* meta = taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
if(!meta){
return NULL;
}
meta->tagHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false);
if (meta->tagHash == NULL) {
uError("SML:smlBuildSTableMeta failed to allocate memory");
goto cleanup;
}
meta->fieldHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false);
if (meta->fieldHash == NULL) {
uError("SML:smlBuildSTableMeta failed to allocate memory");
goto cleanup;
}
return meta;
cleanup:
taosMemoryFreeClear(meta);
return NULL;
}
static void smlDestroySTableMeta(SSmlSTableMeta *meta){
taosHashCleanup(meta->tagHash);
taosHashCleanup(meta->fieldHash);
taosMemoryFree(meta->tableMeta);
}
static int32_t smlParseLine(SSmlHandle* info, const char* sql) {
SSmlLineInfo elements = {0};
int ret = smlParseString(sql, &elements, &info->msgBuf);
if(ret != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" smlParseString failed", info->id);
return ret;
}
SArray *cols = taosArrayInit(16, POINTER_BYTES);
if (cols == NULL) {
uError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id);
uError("SML:0x%"PRIx64" smlParseLine failed to allocate memory", info->id);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
ret = parseSmlTS(elements.timestamp, elements.timestampLen, cols, info);
ret = smlParseTS(info, elements.timestamp, elements.timestampLen, cols);
if(ret != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" smlParseTS failed", info->id);
return ret;
}
ret = parseSmlCols(elements.cols, elements.colsLen, cols, false, &info->msgBuf);
ret = smlParseCols(elements.cols, elements.colsLen, cols, false, &info->msgBuf);
if(ret != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" smlParseCols parse cloums fields failed", info->id);
return ret;
}
if(taosArrayGetSize(cols) > TSDB_MAX_COLUMNS){
buildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL);
smlBuildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL);
return TSDB_CODE_SML_INVALID_DATA;
}
TAOS_SML_DATA_POINT_TAGS** oneTable = taosHashGet(info->childTables, elements.measure, elements.measureTagsLen);
SSmlTableInfo **oneTable = taosHashGet(info->childTables, elements.measure, elements.measureTagsLen);
if(oneTable){
SSmlSTableMeta** tableMeta = taosHashGet(info->superTables, elements.measure, elements.measureLen);
ASSERT(tableMeta);
ret = updateMeta(*tableMeta, NULL, cols, &info->msgBuf); // update meta
ret = smlUpdateMeta(*tableMeta, NULL, cols, &info->msgBuf); // update meta cols
if(!ret){
uError("SML:0x%"PRIx64" smlUpdateMeta cols failed", info->id);
return TSDB_CODE_SML_INVALID_DATA;
}
taosArrayPush((*oneTable)->cols, &cols);
ret = smlDealCols(*oneTable, info->dataFormat, cols);
if(ret != TSDB_CODE_SUCCESS){
return ret;
}
}else{
TAOS_SML_DATA_POINT_TAGS *tag = taosMemoryCalloc(sizeof(TAOS_SML_DATA_POINT_TAGS), 1);
SSmlTableInfo *tag = smlBuildTableInfo(info->dataFormat);
if(!tag){
return TSDB_CODE_OUT_OF_MEMORY;
}
tag->cols = taosArrayInit(16, POINTER_BYTES);
if (tag->cols == NULL) {
uError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
taosArrayPush(tag->cols, &cols);
tag->colsColumn = taosArrayInit(16, POINTER_BYTES);
if (tag->cols == NULL) {
uError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
ret = smlDealCols(tag, info->dataFormat, cols);
if(ret != TSDB_CODE_SUCCESS){
return ret;
}
tag->tags = taosArrayInit(16, POINTER_BYTES);
if (tag->tags == NULL) {
uError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
ret = parseSmlCols(elements.tags, elements.tagsLen, tag->tags, true, &info->msgBuf);
ret = smlParseCols(elements.tags, elements.tagsLen, tag->tags, true, &info->msgBuf);
if(ret != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" smlParseCols parse tag fields failed", info->id);
return ret;
}
if(taosArrayGetSize(tag->tags) > TSDB_MAX_TAGS){
buildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
return TSDB_CODE_SML_INVALID_DATA;
}
tag->sTableName = elements.measure;
tag->sTableNameLen = elements.measureLen;
buildSmlChildTableName(tag);
smlBuildChildTableName(tag);
uDebug("SML:0x%"PRIx64" child table name: %s", info->id, tag->childTableName);
SSmlSTableMeta** tableMeta = taosHashGet(info->superTables, elements.measure, elements.measureLen);
if(tableMeta){ // update meta
ret = updateMeta(*tableMeta, tag->tags, cols, &info->msgBuf);
ret = smlUpdateMeta(*tableMeta, tag->tags, cols, &info->msgBuf);
if(!ret){
uError("SML:0x%"PRIx64" smlUpdateMeta failed", info->id);
return TSDB_CODE_SML_INVALID_DATA;
}
}else{
SSmlSTableMeta* meta = taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
insertMeta(meta, tag->tags, cols);
SSmlSTableMeta *meta = smlBuildSTableMeta();
smlInsertMeta(meta, tag->tags, cols);
taosHashPut(info->superTables, elements.measure, elements.measureLen, &meta, POINTER_BYTES);
}
......@@ -1344,114 +1498,165 @@ static int32_t smlParseLine(const char* sql, SSmlLinesInfo* info) {
return TSDB_CODE_SUCCESS;
}
static void smlDestroyInfo(SSmlLinesInfo* info){
static void smlDestroyInfo(SSmlHandle* info){
if(!info) return;
qDestroyQuery(info->pQuery);
tscSmlDestroyHandle(info->exec);
smlDestroyHandle(info->exec);
// destroy info->childTables
void** p1 = taosHashIterate(info->childTables, NULL);
while (p1) {
SSmlTableInfo* oneTable = *p1;
smlDestroyBuildTableInfo(oneTable, info->dataFormat);
p1 = taosHashIterate(info->childTables, p1);
}
taosHashCleanup(info->childTables);
// destroy info->superTables
p1 = taosHashIterate(info->superTables, NULL);
while (p1) {
SSmlSTableMeta* oneTable = *p1;
smlDestroySTableMeta(oneTable);
p1 = taosHashIterate(info->superTables, p1);
}
taosHashCleanup(info->superTables);
taosHashCleanup(info->metaHashObj);
// destroy info->pVgHash
taosHashCleanup(info->pVgHash);
taosMemoryFree(info);
taosMemoryFreeClear(info);
}
static SSmlLinesInfo* smlBuildInfo(TAOS* taos, SRequestObj* request, SMLProtocolType protocol, int32_t tsType){
SSmlLinesInfo* info = taosMemoryMalloc(sizeof(SSmlLinesInfo));
static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocolType protocol, int8_t precision, bool dataFormat){
SSmlHandle* info = taosMemoryMalloc(sizeof(SSmlHandle));
if (NULL == info) {
return NULL;
}
info->id = genLinesSmlId();
info->tsType = tsType;
info->taos = taos;
info->protocol = protocol;
info->id = smlGenId();
info->pQuery = taosMemoryCalloc(1, sizeof(SQuery));
if (NULL == info->pQuery) {
uError("SML:0x%"PRIx64" create info->pQuery error", info->id);
goto cleanup;
}
info->pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
info->pQuery->haveResultSet = false;
info->pQuery->msgType = TDMT_VND_SUBMIT;
info->pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
if(NULL == info->pQuery->pRoot){
uError("SML:0x%"PRIx64" create info->pQuery->pRoot error", info->id);
goto cleanup;
}
((SVnodeModifOpStmt*)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV;
info->exec = tscSmlInitHandle(info->pQuery);
info->taos = taos;
int32_t code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
if(code != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" get catalog error %d", info->id, code);
goto cleanup;
}
info->precision = precision;
info->protocol = protocol;
info->dataFormat = dataFormat;
info->pRequest = request;
info->msgBuf.buf = info->pRequest->msgBuf;
info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
info->exec = smlInitHandle(info->pQuery);
info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false);
info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false);
info->metaHashObj = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, false);
info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
return info;
if(NULL == info->exec || NULL == info->childTables
|| NULL == info->superTables || NULL == info->pVgHash){
uError("SML:0x%"PRIx64" create info failed", info->id);
goto cleanup;
}
return info;
cleanup:
smlDestroyInfo(info);
return NULL;
}
int sml_insert_lines(TAOS* taos, SRequestObj* request, char* lines[], int numLines, SMLProtocolType protocol, int32_t tsType) {
static int32_t smlInsertData(SSmlHandle* info) {
int32_t code = TSDB_CODE_SUCCESS;
SSmlLinesInfo* info = smlBuildInfo(taos, request, protocol, tsType);
if(!info){
code = TSDB_CODE_OUT_OF_MEMORY;
goto cleanup;
SSmlTableInfo** oneTable = taosHashIterate(info->childTables, NULL);
while (oneTable) {
SSmlTableInfo* tableData = *oneTable;
SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
strcpy(pName.dbname, info->pRequest->pDb);
memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName));
SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
SVgroupInfo vg;
code = catalogGetTableHashVgroup(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &vg);
if (code != 0) {
uError("SML:0x%"PRIx64" catalogGetTableHashVgroup failed. table name: %s", info->id, tableData->childTableName);
return code;
}
taosHashPut(info->pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
SSmlSTableMeta** pMeta = taosHashGet(info->superTables, tableData->sTableName, tableData->sTableNameLen);
ASSERT (NULL != pMeta && NULL != *pMeta);
(*pMeta)->tableMeta->vgId = vg.vgId;
(*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid
code = smlBindData(info->exec, tableData->tags, tableData->colsFormat, tableData->columnsHash,
tableData->cols, info->dataFormat, (*pMeta)->tableMeta, info->msgBuf.buf, info->msgBuf.len);
if(code != TSDB_CODE_SUCCESS){
return code;
}
oneTable = taosHashIterate(info->childTables, oneTable);
}
smlBuildOutput(info->exec, info->pVgHash);
launchQueryImpl(info->pRequest, info->pQuery, TSDB_CODE_SUCCESS, true);
info->affectedRows = taos_affected_rows(info->pRequest);
return info->pRequest->code;
}
static int smlInsertLines(SSmlHandle *info, char* lines[], int numLines) {
int32_t code = TSDB_CODE_SUCCESS;
if (numLines <= 0 || numLines > 65536) {
uError("SML:0x%"PRIx64" taos_insert_lines numLines should be between 1 and 65536. numLines: %d", info->id, numLines);
uError("SML:0x%"PRIx64" smlInsertLines numLines should be between 1 and 65536. numLines: %d", info->id, numLines);
code = TSDB_CODE_TSC_APP_ERROR;
goto cleanup;
}
for (int32_t i = 0; i < numLines; ++i) {
code = smlParseLine(lines[i], info);
code = smlParseLine(info, lines[i]);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" data point line parse failed. line %d : %s", info->id, i, lines[i]);
uError("SML:0x%"PRIx64" smlParseLine failed. line %d : %s", info->id, i, lines[i]);
goto cleanup;
}
}
uDebug("SML:0x%"PRIx64" data point line parse success. tables %d", info->id, taosHashGetSize(info->childTables));
uDebug("SML:0x%"PRIx64" smlInsertLines parse success. tables %d", info->id, taosHashGetSize(info->childTables));
uDebug("SML:0x%"PRIx64" smlInsertLines parse success. super tables %d", info->id, taosHashGetSize(info->superTables));
code = smlInsert(taos, info);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" taos_sml_insert error: %s", info->id, tstrerror((code)));
code = smlModifyDBSchemas(info);
if (code != 0) {
uError("SML:0x%"PRIx64" smlModifyDBSchemas error : %s", info->id, tstrerror(code));
goto cleanup;
}
code = smlInsertData(info);
if (code != 0) {
uError("SML:0x%"PRIx64" smlInsertData error : %s", info->id, tstrerror(code));
goto cleanup;
}
uDebug("SML:0x%"PRIx64" taos_insert_lines finish inserting %d lines. code: %d", info->id, numLines, code);
uDebug("SML:0x%"PRIx64" smlInsertLines finish inserting %d lines.", info->id, numLines);
cleanup:
smlDestroyInfo(info);
return code;
}
static int32_t convertPrecisionType(int precision) {
switch (precision) {
case TSDB_SML_TIMESTAMP_HOURS:
return TSDB_TIME_PRECISION_HOURS;
case TSDB_SML_TIMESTAMP_MILLI_SECONDS:
return TSDB_TIME_PRECISION_MILLI;
case TSDB_SML_TIMESTAMP_NANO_SECONDS:
case TSDB_SML_TIMESTAMP_NOT_CONFIGURED:
return TSDB_TIME_PRECISION_NANO;
case TSDB_SML_TIMESTAMP_MICRO_SECONDS:
return TSDB_TIME_PRECISION_MICRO;
case TSDB_SML_TIMESTAMP_SECONDS:
return TSDB_TIME_PRECISION_SECONDS;
case TSDB_SML_TIMESTAMP_MINUTES:
return TSDB_TIME_PRECISION_MINUTES;
default:
return -1;
}
}
/**
* taos_schemaless_insert() parse and insert data points into database according to
* different protocol.
......@@ -1473,19 +1678,20 @@ static int32_t convertPrecisionType(int precision) {
*
*/
TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) {
int code = TSDB_CODE_SUCCESS;
TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision, bool dataFormat) {
SRequestObj* request = createRequest(taos, NULL, NULL, TSDB_SQL_INSERT);
switch (protocol) {
case TSDB_SML_LINE_PROTOCOL:{
int32_t tsType = convertPrecisionType(precision);
if(tsType == -1){
request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
if(!request){
goto end;
}
code = sml_insert_lines(taos, request, lines, numLines, protocol, tsType);
SSmlHandle* info = smlBuildSmlInfo(taos, request, protocol, precision, dataFormat);
if(!info){
goto end;
}
switch (protocol) {
case TSDB_SML_LINE_PROTOCOL:{
smlInsertLines(info, lines, numLines);
break;
}
case TSDB_SML_TELNET_PROTOCOL:
......@@ -1495,7 +1701,6 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
//code = taos_insert_json_payload(taos, *lines, protocol, tsType, &affected_rows);
break;
default:
code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
break;
}
......
......@@ -1514,6 +1514,7 @@ int32_t qBuildStmtColFields(void *pBlock, int32_t *fieldNum, TAOS_FIELD** fields
return TSDB_CODE_SUCCESS;
}
// schemaless logic start
typedef struct SmlExecHandle {
SHashObj* pBlockHash;
......@@ -1523,7 +1524,7 @@ typedef struct SmlExecHandle {
SVCreateTbReq createTblReq; // each table
SQuery* pQuery;
} SmlExecHandle;
} SSmlExecHandle;
static int32_t smlBoundColumns(SArray *cols, SParsedDataColInfo* pColList, SSchema* pSchema) {
col_id_t nCols = pColList->numOfCols;
......@@ -1620,14 +1621,15 @@ static int32_t smlParseTags(SArray *cols, SKVRowBuilder *tagsBuilder, SParsedDat
return TSDB_CODE_SUCCESS;
}
int32_t smlBind(void *handle, SArray *tags, SArray *cols, STableMeta *pTableMeta, char *msgBuf, int16_t msgBufLen) {
int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SHashObj *colsHash, SArray *cols, bool format,
STableMeta *pTableMeta, char *msgBuf, int16_t msgBufLen) {
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
int32_t rowNum = taosArrayGetSize(cols);
if(rowNum <= 0) {
return buildInvalidOperationMsg(&pBuf, "cols size <= 0");
}
SmlExecHandle *smlHandle = (SmlExecHandle *)handle;
SSmlExecHandle *smlHandle = (SSmlExecHandle *)handle;
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
setBoundColumnInfo(&smlHandle->tags, pTagsSchema, getNumOfTags(pTableMeta));
int ret = smlBoundColumns(tags, &smlHandle->tags, pTagsSchema);
......@@ -1651,7 +1653,21 @@ int32_t smlBind(void *handle, SArray *tags, SArray *cols, STableMeta *pTableMeta
SSchema* pSchema = getTableColumnSchema(pTableMeta);
ret = smlBoundColumns(taosArrayGetP(cols, 0), &pDataBlock->boundColumnInfo, pSchema);
if(format){
ret = smlBoundColumns(taosArrayGetP(colsFormat, 0), &pDataBlock->boundColumnInfo, pSchema);
}else{
SArray *columns = taosArrayInit(16, POINTER_BYTES);
void **p1 = taosHashIterate(colsHash, NULL);
while (p1) {
SSmlKv* kv = *p1;
taosArrayPush(columns, &kv);
p1 = taosHashIterate(colsHash, p1);
}
ret = smlBoundColumns(columns, &pDataBlock->boundColumnInfo, pSchema);
taosArrayDestroy(columns);
}
if(ret != TSDB_CODE_SUCCESS){
buildInvalidOperationMsg(&pBuf, "bound cols error");
return ret;
......@@ -1671,7 +1687,12 @@ int32_t smlBind(void *handle, SArray *tags, SArray *cols, STableMeta *pTableMeta
for (int32_t r = 0; r < rowNum; ++r) {
STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header
tdSRowResetBuf(pBuilder, row);
SArray *rowData = taosArrayGetP(cols, r);
void *rowData = NULL;
if(format){
rowData = taosArrayGetP(colsFormat, r);
}else{
rowData = taosArrayGetP(cols, r);
}
// 1. set the parsed value from sql string
for (int c = 0; c < spd->numOfBound; ++c) {
......@@ -1680,7 +1701,18 @@ int32_t smlBind(void *handle, SArray *tags, SArray *cols, STableMeta *pTableMeta
param.schema = pColSchema;
getSTSRowAppendInfo(pBuilder->rowType, spd, c, &param.toffset, &param.colIdx);
SSmlKv *kv = taosArrayGetP(rowData, c);
SSmlKv *kv = NULL;
if(format){
kv = taosArrayGetP(rowData, c);
if (!kv){
char msg[64] = {0};
sprintf(msg, "cols num not the same like before:%d", r);
return buildInvalidOperationMsg(&pBuf, msg);
}
}else{
void **p =taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name));
kv = *p;
}
if (kv->valueLen == 0) {
MemRowAppend(&pBuf, NULL, 0, &param);
......@@ -1720,23 +1752,25 @@ int32_t smlBind(void *handle, SArray *tags, SArray *cols, STableMeta *pTableMeta
return TSDB_CODE_SUCCESS;
}
void* tscSmlInitHandle(SQuery *pQuery){
SmlExecHandle *handle = taosMemoryCalloc(sizeof(SmlExecHandle));
void* smlInitHandle(SQuery *pQuery){
SSmlExecHandle *handle = taosMemoryCalloc(1, sizeof(SSmlExecHandle));
if(!handle) return NULL;
handle->pBlockHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
handle->pQuery = pQuery;
return handle;
}
void tscSmlDestroyHandle(void *pHandle){
void smlDestroyHandle(void *pHandle){
if(!pHandle) return;
SmlExecHandle *handle = (SmlExecHandle *)pHandle;
taosHashCleanup(handle->pBlockHash);
SSmlExecHandle *handle = (SSmlExecHandle *)pHandle;
destroyBlockHashmap(handle->pBlockHash);
taosMemoryFree(handle);
}
int32_t smlBuildOutput(void* handle, SHashObj* pVgHash) {
SmlExecHandle *smlHandle = (SmlExecHandle *)handle;
SSmlExecHandle *smlHandle = (SSmlExecHandle *)handle;
return qBuildStmtOutput(smlHandle->pQuery, pVgHash, smlHandle->pBlockHash);
}
// schemaless logic end
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册