提交 a4629e56 编写于 作者: wmmhello's avatar wmmhello

refactor:add schemaless function

上级 990205d6
......@@ -246,7 +246,13 @@ typedef struct {
int32_t keyLen;
uint8_t type;
int16_t length;
const char* value;
union{
const char* value;
int64_t i;
uint64_t u;
double d;
float f;
};
int32_t valueLen;
} SSmlKv;
......
......@@ -186,14 +186,14 @@ typedef struct {
#define IS_NUMERIC_TYPE(_t) ((IS_SIGNED_NUMERIC_TYPE(_t)) || (IS_UNSIGNED_NUMERIC_TYPE(_t)) || (IS_FLOAT_TYPE(_t)))
#define IS_MATHABLE_TYPE(_t) (IS_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL) || (_t) == (TSDB_DATA_TYPE_TIMESTAMP))
#define IS_VALID_TINYINT(_t) ((_t) > INT8_MIN && (_t) <= INT8_MAX)
#define IS_VALID_SMALLINT(_t) ((_t) > INT16_MIN && (_t) <= INT16_MAX)
#define IS_VALID_INT(_t) ((_t) > INT32_MIN && (_t) <= INT32_MAX)
#define IS_VALID_BIGINT(_t) ((_t) > INT64_MIN && (_t) <= INT64_MAX)
#define IS_VALID_UTINYINT(_t) ((_t) >= 0 && (_t) < UINT8_MAX)
#define IS_VALID_USMALLINT(_t) ((_t) >= 0 && (_t) < UINT16_MAX)
#define IS_VALID_UINT(_t) ((_t) >= 0 && (_t) < UINT32_MAX)
#define IS_VALID_UBIGINT(_t) ((_t) >= 0 && (_t) < UINT64_MAX)
#define IS_VALID_TINYINT(_t) ((_t) >= INT8_MIN && (_t) <= INT8_MAX)
#define IS_VALID_SMALLINT(_t) ((_t) >= INT16_MIN && (_t) <= INT16_MAX)
#define IS_VALID_INT(_t) ((_t) >= INT32_MIN && (_t) <= INT32_MAX)
#define IS_VALID_BIGINT(_t) ((_t) >= INT64_MIN && (_t) <= INT64_MAX)
#define IS_VALID_UTINYINT(_t) ((_t) >= 0 && (_t) <= UINT8_MAX)
#define IS_VALID_USMALLINT(_t) ((_t) >= 0 && (_t) <= UINT16_MAX)
#define IS_VALID_UINT(_t) ((_t) >= 0 && (_t) <= UINT32_MAX)
#define IS_VALID_UBIGINT(_t) ((_t) >= 0 && (_t) <= UINT64_MAX)
#define IS_VALID_FLOAT(_t) ((_t) >= -FLT_MAX && (_t) <= FLT_MAX)
#define IS_VALID_DOUBLE(_t) ((_t) >= -DBL_MAX && (_t) <= DBL_MAX)
......
......@@ -36,48 +36,55 @@ typedef struct {
int32_t measureTagsLen;
int32_t tagsLen;
int32_t colsLen;
int32_t timestampLen;
} TAOS_PARSE_ELEMENTS;
typedef struct {
const char *sTableName; // super table name
uint8_t sTableNameLen;
char childTableName[TSDB_TABLE_NAME_LEN];
uint64_t uid;
char childTableName[TSDB_TABLE_NAME_LEN];
uint64_t uid;
SArray* tags;
SArray *cols;
SArray *tags;
SArray *cols; // elements are SHashObj<key, SSmlKv*> for find by key quickly
SArray colsColumn; // elements are cols key string
} TAOS_SML_DATA_POINT_TAGS;
typedef struct SSmlSTableMeta {
// char *sTableName; // super table name
// uint8_t sTableNameLen;
uint8_t precision; // the number of precision
SHashObj* tagHash;
SHashObj* fieldHash;
uint8_t precision; // the number of precision
SHashObj *tagHash;
SHashObj *fieldHash;
} SSmlSTableMeta;
typedef struct SMsgBuf {
int32_t len;
char *buf;
} SMsgBuf;
typedef struct {
uint64_t id;
uint64_t id;
SMLProtocolType protocol;
int32_t tsType;
SMLProtocolType protocol;
int32_t tsType;
SHashObj* childTables;
SHashObj* superTables;
SHashObj *childTables;
SHashObj *superTables;
SHashObj* metaHashObj;
SHashObj* pVgHash;
SHashObj *metaHashObj;
SHashObj *pVgHash;
void* exec;
void *exec;
STscObj* taos;
SCatalog* pCatalog;
SRequestObj* pRequest;
SQuery* pQuery;
STscObj *taos;
SCatalog *pCatalog;
SRequestObj *pRequest;
SQuery *pQuery;
int32_t affectedRows;
char *msgBuf;
int16_t msgLen;
int32_t affectedRows;
SMsgBuf msgBuf;
} SSmlLinesInfo;
int smlInsert(TAOS* taos, SSmlLinesInfo* info);
......
......@@ -13,6 +13,7 @@
#include "taoserror.h"
#include "taos.h"
#include "ttime.h"
#include "tstrbuild.h"
typedef struct {
......@@ -31,14 +32,15 @@ typedef struct {
#define SLASH '\\'
#define tsMaxSQLStringLen (1024*1024)
#define TSNAMELEN 2
#define TAGNAMELEN 3
//=================================================================================================
static uint64_t linesSmlHandleId = 0;
static const char* TS = "ts";
static const char* TAG = "tag";
static int32_t insertChildTablePointsBatch(void* pVoid, char* name, char* name1, SArray* pArray, SArray* pArray1,
SArray* pArray2, SArray* pArray3, size_t size, SSmlLinesInfo* info);
static int32_t doInsertChildTablePoints(void* pVoid, char* sql, char* name, SArray* pArray, SArray* pArray1,
SSmlLinesInfo* info);
uint64_t genLinesSmlId() {
uint64_t id;
......@@ -49,9 +51,15 @@ uint64_t genLinesSmlId() {
return id;
}
static int32_t buildInvalidDataMsg(SMsgBuf* pBuf, const char *msg1, const char *msg2) {
if(msg1) snprintf(pBuf->buf, pBuf->len, "%s:", msg1);
if(msg2) strncpy(pBuf->buf, msg2, pBuf->len);
return TSDB_CODE_SML_INVALID_DATA;
}
int compareSmlColKv(const void* p1, const void* p2) {
TAOS_SML_KV* kv1 = (TAOS_SML_KV*)p1;
TAOS_SML_KV* kv2 = (TAOS_SML_KV*)p2;
SSmlKv* kv1 = (SSmlKv *)p1;
SSmlKv* kv2 = (SSmlKv*)p2;
int kvLen1 = (int)strlen(kv1->key);
int kvLen2 = (int)strlen(kv2->key);
int res = strncasecmp(kv1->key, kv2->key, MIN(kvLen1, kvLen2));
......@@ -78,7 +86,7 @@ typedef struct {
typedef struct {
char sTableName[TSDB_TABLE_NAME_LEN];
SSchema* field;
SSmlKv * field;
} SAlterSTableActionInfo;
typedef struct {
......@@ -89,94 +97,17 @@ typedef struct {
};
} SSchemaAction;
static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes, uint64_t id) {
if (!IS_VAR_DATA_TYPE(kv->type)) {
*bytes = tDataTypes[kv->type].bytes;
} else {
if (kv->type == TSDB_DATA_TYPE_NCHAR) {
TdUcs4 *ucs = taosMemoryMalloc(kv->length * TSDB_NCHAR_SIZE + 1);
int32_t bytesNeeded = 0;
bool succ = taosMbsToUcs4(kv->value, kv->length, ucs, kv->length * TSDB_NCHAR_SIZE, &bytesNeeded);
if (!succ) {
taosMemoryFree(ucs);
uError("SML:0x%"PRIx64" convert nchar string to UCS4_LE failed:%s", id, kv->value);
return TSDB_CODE_TSC_INVALID_VALUE;
}
taosMemoryFree(ucs);
*bytes = bytesNeeded + VARSTR_HEADER_SIZE;
} else if (kv->type == TSDB_DATA_TYPE_BINARY) {
*bytes = kv->length + VARSTR_HEADER_SIZE;
}
}
return 0;
}
static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* array, SSmlLinesInfo* info) {
SSchema* pField = NULL;
size_t* pFieldIdx = taosHashGet(hash, smlKv->key, strlen(smlKv->key));
size_t fieldIdx = -1;
int32_t code = 0;
if (pFieldIdx) {
fieldIdx = *pFieldIdx;
pField = taosArrayGet(array, fieldIdx);
if (pField->type != smlKv->type) {
uError("SML:0x%"PRIx64" type mismatch. key %s, type %d. type before %d", info->id, smlKv->key, smlKv->type, pField->type);
return TSDB_CODE_TSC_INVALID_VALUE;
}
int32_t bytes = 0;
code = getFieldBytesFromSmlKv(smlKv, &bytes, info->id);
if (code != 0) {
return code;
}
pField->bytes = MAX(pField->bytes, bytes);
} else {
SSchema field = {0};
size_t tagKeyLen = strlen(smlKv->key);
strncpy(field.name, smlKv->key, tagKeyLen);
field.name[tagKeyLen] = '\0';
field.type = smlKv->type;
int32_t bytes = 0;
code = getFieldBytesFromSmlKv(smlKv, &bytes, info->id);
if (code != 0) {
return code;
}
field.bytes = bytes;
pField = taosArrayPush(array, &field);
fieldIdx = taosArrayGetSize(array) - 1;
taosHashPut(hash, field.name, tagKeyLen, &fieldIdx, sizeof(fieldIdx));
}
smlKv->fieldSchemaIdx = (uint32_t)fieldIdx;
static int32_t buildSmlChildTableName(TAOS_SML_DATA_POINT_TAGS *tags) {
int32_t size = taosArrayGetSize(tags->tags);
ASSERT(size > 0);
qsort(tags->tags, size, POINTER_BYTES, compareSmlColKv);
return 0;
}
static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen,
SSmlLinesInfo* info) {
uDebug("SML:0x%"PRIx64" taos_sml_insert get child table name through md5", info->id);
if (point->tagNum) {
qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv);
}
SStringBuilder sb; memset(&sb, 0, sizeof(sb));
char sTableName[TSDB_TABLE_NAME_LEN] = {0};
strncpy(sTableName, point->stableName, strlen(point->stableName));
//strtolower(sTableName, point->stableName);
taosStringBuilderAppendString(&sb, sTableName);
for (int j = 0; j < point->tagNum; ++j) {
taosStringBuilderAppendChar(&sb, ',');
TAOS_SML_KV* tagKv = point->tags + j;
char tagName[TSDB_COL_NAME_LEN] = {0};
strncpy(tagName, tagKv->key, strlen(tagKv->key));
//strtolower(tagName, tagKv->key);
taosStringBuilderAppendString(&sb, tagName);
taosStringBuilderAppendChar(&sb, '=');
taosStringBuilderAppend(&sb, tagKv->value, tagKv->length);
SStringBuilder sb = {0};
taosStringBuilderAppendStringLen(&sb, tags->sTableName, tags->sTableNameLen);
for (int j = 0; j < size; ++j) {
SSmlKv *tagKv = taosArrayGetP(tags->tags, j);
taosStringBuilderAppendStringLen(&sb, tagKv->key, tagKv->keyLen);
taosStringBuilderAppendStringLen(&sb, tagKv->value, tagKv->valueLen);
}
size_t len = 0;
char* keyJoined = taosStringBuilderGetResult(&sb, &len);
......@@ -186,183 +117,74 @@ static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableNa
tMD5Final(&context);
uint64_t digest1 = *(uint64_t*)(context.digest);
uint64_t digest2 = *(uint64_t*)(context.digest + 8);
*tableNameLen = snprintf(tableName, *tableNameLen,
"t_%016"PRIx64"%016"PRIx64, digest1, digest2);
snprintf(tags->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64"%016"PRIx64, digest1, digest2);
taosStringBuilderDestroy(&sb);
uDebug("SML:0x%"PRIx64" child table name: %s", info->id, tableName);
return 0;
}
static int32_t buildSmlChildTableName(TAOS_SML_DATA_POINT* point, SSmlLinesInfo* info) {
uDebug("SML:0x%"PRIx64" taos_sml_insert build child table name", info->id);
char childTableName[TSDB_TABLE_NAME_LEN];
int32_t tableNameLen = TSDB_TABLE_NAME_LEN;
getSmlMd5ChildTableName(point, childTableName, &tableNameLen, info);
point->childTableName = calloc(1, tableNameLen+1);
strncpy(point->childTableName, childTableName, tableNameLen);
point->childTableName[tableNameLen] = '\0';
return 0;
}
static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, SArray* stableSchemas, SSmlLinesInfo* info) {
int32_t code = 0;
SHashObj* sname2shema = taosHashInit(32,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
for (int i = 0; i < numPoint; ++i) {
TAOS_SML_DATA_POINT* point = &points[i];
size_t stableNameLen = strlen(point->stableName);
size_t* pStableIdx = taosHashGet(sname2shema, point->stableName, stableNameLen);
SSmlSTableSchema* pStableSchema = NULL;
size_t stableIdx = -1;
if (pStableIdx) {
pStableSchema= taosArrayGet(stableSchemas, *pStableIdx);
stableIdx = *pStableIdx;
} else {
SSmlSTableSchema schema;
strncpy(schema.sTableName, point->stableName, stableNameLen);
schema.sTableName[stableNameLen] = '\0';
schema.fields = taosArrayInit(64, sizeof(SSchema));
schema.tags = taosArrayInit(8, sizeof(SSchema));
schema.tagHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
schema.fieldHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
pStableSchema = taosArrayPush(stableSchemas, &schema);
stableIdx = taosArrayGetSize(stableSchemas) - 1;
taosHashPut(sname2shema, schema.sTableName, stableNameLen, &stableIdx, sizeof(size_t));
}
for (int j = 0; j < point->tagNum; ++j) {
TAOS_SML_KV* tagKv = point->tags + j;
if (!point->childTableName) {
buildSmlChildTableName(point, info);
}
code = buildSmlKvSchema(tagKv, pStableSchema->tagHash, pStableSchema->tags, info);
if (code != 0) {
uError("SML:0x%"PRIx64" build data point schema failed. point no.: %d, tag key: %s", info->id, i, tagKv->key);
return code;
}
}
//for Line Protocol tags may be omitted, add a tag with NULL value
if (point->tagNum == 0) {
if (!point->childTableName) {
buildSmlChildTableName(point, info);
}
char tagNullName[TSDB_COL_NAME_LEN] = {0};
size_t nameLen = strlen(tsSmlTagNullName);
strncpy(tagNullName, tsSmlTagNullName, nameLen);
addEscapeCharToString(tagNullName, (int32_t)nameLen);
size_t* pTagNullIdx = taosHashGet(pStableSchema->tagHash, tagNullName, nameLen);
if (!pTagNullIdx) {
SSchema tagNull = {0};
tagNull.type = TSDB_DATA_TYPE_NCHAR;
tagNull.bytes = TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
strncpy(tagNull.name, tagNullName, nameLen);
taosArrayPush(pStableSchema->tags, &tagNull);
size_t tagNullIdx = taosArrayGetSize(pStableSchema->tags) - 1;
taosHashPut(pStableSchema->tagHash, tagNull.name, nameLen, &tagNullIdx, sizeof(tagNullIdx));
}
}
for (int j = 0; j < point->fieldNum; ++j) {
TAOS_SML_KV* fieldKv = point->fields + j;
code = buildSmlKvSchema(fieldKv, pStableSchema->fieldHash, pStableSchema->fields, info);
if (code != 0) {
uError("SML:0x%"PRIx64" build data point schema failed. point no.: %d, tag key: %s", info->id, i, fieldKv->key);
return code;
}
}
point->schemaIdx = (uint32_t)stableIdx;
}
size_t numStables = taosArrayGetSize(stableSchemas);
for (int32_t i = 0; i < numStables; ++i) {
SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i);
taosHashCleanup(schema->tagHash);
taosHashCleanup(schema->fieldHash);
}
taosHashCleanup(sname2shema);
uDebug("SML:0x%"PRIx64" build point schema succeed. num of super table: %zu", info->id, numStables);
for (int32_t i = 0; i < numStables; ++i) {
SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i);
uDebug("\ttable name: %s, tags number: %zu, fields number: %zu", schema->sTableName,
taosArrayGetSize(schema->tags), taosArrayGetSize(schema->fields));
}
tags->uid = digest1;
uDebug("SML: child table name: %s", tags->childTableName);
return 0;
}
static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, SArray* dbAttrArray, bool isTag, char sTableName[],
SSchemaAction* action, bool* actionNeeded, SSmlLinesInfo* info) {
char fieldName[TSDB_COL_NAME_LEN] = {0};
strcpy(fieldName, pointColField->name);
size_t* pDbIndex = taosHashGet(dbAttrHash, fieldName, strlen(fieldName));
if (pDbIndex) {
SSchema* dbAttr = taosArrayGet(dbAttrArray, *pDbIndex);
assert(strcasecmp(dbAttr->name, pointColField->name) == 0);
if (pointColField->type != dbAttr->type) {
uError("SML:0x%"PRIx64" point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id, pointColField->name,
pointColField->type, dbAttr->type);
return TSDB_CODE_TSC_INVALID_VALUE;
}
if (IS_VAR_DATA_TYPE(pointColField->type) && (pointColField->bytes > dbAttr->bytes)) {
if (isTag) {
action->action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
} else {
action->action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
}
memset(&action->alterSTable, 0, sizeof(SAlterSTableActionInfo));
memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN);
action->alterSTable.field = pointColField;
*actionNeeded = true;
}
} else {
if (isTag) {
action->action = SCHEMA_ACTION_ADD_TAG;
} else {
action->action = SCHEMA_ACTION_ADD_COLUMN;
}
memset(&action->alterSTable, 0, sizeof(SAlterSTableActionInfo));
memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN);
action->alterSTable.field = pointColField;
*actionNeeded = true;
}
if (*actionNeeded) {
uDebug("SML:0x%" PRIx64 " generate schema action. column name: %s, action: %d", info->id, fieldName,
action->action);
}
// char fieldName[TSDB_COL_NAME_LEN] = {0};
// strcpy(fieldName, pointColField->name);
//
// size_t* pDbIndex = taosHashGet(dbAttrHash, fieldName, strlen(fieldName));
// if (pDbIndex) {
// SSchema* dbAttr = taosArrayGet(dbAttrArray, *pDbIndex);
// assert(strcasecmp(dbAttr->name, pointColField->name) == 0);
// if (pointColField->type != dbAttr->type) {
// uError("SML:0x%"PRIx64" point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id, pointColField->name,
// pointColField->type, dbAttr->type);
// return TSDB_CODE_TSC_INVALID_VALUE;
// }
//
// if (IS_VAR_DATA_TYPE(pointColField->type) && (pointColField->bytes > dbAttr->bytes)) {
// if (isTag) {
// action->action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
// } else {
// action->action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
// }
// memset(&action->alterSTable, 0, sizeof(SAlterSTableActionInfo));
// memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN);
// action->alterSTable.field = pointColField;
// *actionNeeded = true;
// }
// } else {
// if (isTag) {
// action->action = SCHEMA_ACTION_ADD_TAG;
// } else {
// action->action = SCHEMA_ACTION_ADD_COLUMN;
// }
// memset(&action->alterSTable, 0, sizeof(SAlterSTableActionInfo));
// memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN);
// action->alterSTable.field = pointColField;
// *actionNeeded = true;
// }
// if (*actionNeeded) {
// uDebug("SML:0x%" PRIx64 " generate schema action. column name: %s, action: %d", info->id, fieldName,
// action->action);
// }
return 0;
}
static int32_t buildColumnDescription(TAOS_SML_KV* field,
char* buf, int32_t bufSize, int32_t* outBytes) {
static int32_t buildColumnDescription(SSmlKv* field, char* buf, int32_t bufSize, int32_t* outBytes) {
uint8_t type = field->type;
char tname[TSDB_TABLE_NAME_LEN] = {0};
memcpy(tname, field->key, field->keyLen);
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
int32_t bytes = field->length - VARSTR_HEADER_SIZE;
if (type == TSDB_DATA_TYPE_NCHAR) {
bytes = bytes/TSDB_NCHAR_SIZE;
}
int32_t bytes = field->valueLen; // todo
int out = snprintf(buf, bufSize,"%s %s(%d)",
tname,tDataTypes[field->type].name, bytes);
*outBytes = out;
} else {
int out = snprintf(buf, bufSize, "%s %s",
tname, tDataTypes[type].name);
int out = snprintf(buf, bufSize, "%s %s", tname, tDataTypes[type].name);
*outBytes = out;
}
return 0;
}
static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInfo* info) {
int32_t code = 0;
int32_t outBytes = 0;
......@@ -472,7 +294,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
int n = sprintf(result, "create stable %s (", action->createSTable.sTableName);
char* pos = result + n; int freeBytes = capacity - n;
TAOS_SML_KV **kv = taosHashIterate(action->createSTable.fields, NULL);
SSmlKv **kv = taosHashIterate(action->createSTable.fields, NULL);
while(kv){
buildColumnDescription(*kv, pos, freeBytes, &outBytes);
pos += outBytes; freeBytes -= outBytes;
......@@ -523,168 +345,6 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf
return code;
}
static int32_t destroySmlSTableSchema(SSmlSTableSchema* schema) {
taosHashCleanup(schema->tagHash);
taosHashCleanup(schema->fieldHash);
taosArrayDestroy(&schema->tags);
taosArrayDestroy(&schema->fields);
return 0;
}
static int32_t fillDbSchema(STableMeta* tableMeta, char* tableName, SSmlSTableSchema* schema, SSmlLinesInfo* info) {
schema->tags = taosArrayInit(8, sizeof(SSchema));
schema->fields = taosArrayInit(64, sizeof(SSchema));
schema->tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
schema->fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
tstrncpy(schema->sTableName, tableName, strlen(tableName)+1);
schema->precision = tableMeta->tableInfo.precision;
for (int i=0; i<tableMeta->tableInfo.numOfColumns; ++i) {
SSchema field;
tstrncpy(field.name, tableMeta->schema[i].name, strlen(tableMeta->schema[i].name)+1);
addEscapeCharToString(field.name, (int16_t)strlen(field.name));
field.type = tableMeta->schema[i].type;
field.bytes = tableMeta->schema[i].bytes;
taosArrayPush(schema->fields, &field);
size_t fieldIndex = taosArrayGetSize(schema->fields) - 1;
taosHashPut(schema->fieldHash, field.name, strlen(field.name), &fieldIndex, sizeof(fieldIndex));
}
for (int i=0; i<tableMeta->tableInfo.numOfTags; ++i) {
int j = i + tableMeta->tableInfo.numOfColumns;
SSchema field;
tstrncpy(field.name, tableMeta->schema[j].name, strlen(tableMeta->schema[j].name)+1);
addEscapeCharToString(field.name, (int16_t)strlen(field.name));
field.type = tableMeta->schema[j].type;
field.bytes = tableMeta->schema[j].bytes;
taosArrayPush(schema->tags, &field);
size_t tagIndex = taosArrayGetSize(schema->tags) - 1;
taosHashPut(schema->tagHash, field.name, strlen(field.name), &tagIndex, sizeof(tagIndex));
}
uDebug("SML:0x%"PRIx64 " load table schema succeed. table name: %s, columns number: %d, tag number: %d, precision: %d",
info->id, tableName, tableMeta->tableInfo.numOfColumns, tableMeta->tableInfo.numOfTags, schema->precision);
return TSDB_CODE_SUCCESS;
}
static int32_t getSuperTableMetaFromLocalCache(TAOS* taos, char* tableName, STableMeta** outTableMeta, SSmlLinesInfo* info) {
int32_t code = 0;
STableMeta* tableMeta = NULL;
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
if (pSql == NULL) {
uError("SML:0x%" PRIx64 " failed to allocate memory, reason:%s", info->id, strerror(errno));
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return code;
}
pSql->pTscObj = taos;
pSql->signature = pSql;
pSql->fp = NULL;
registerSqlObj(pSql);
char tableNameBuf[TSDB_TABLE_NAME_LEN + TS_BACKQUOTE_CHAR_SIZE] = {0};
memcpy(tableNameBuf, tableName, strlen(tableName));
SStrToken tableToken = {.z = tableNameBuf, .n = (uint32_t)strlen(tableName), .type = TK_ID};
tGetToken(tableNameBuf, &tableToken.type);
bool dbIncluded = false;
// Check if the table name available or not
if (tscValidateName(&tableToken, true, &dbIncluded) != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
sprintf(pSql->cmd.payload, "table name is invalid");
taosReleaseRef(tscObjRef, pSql->self);
return code;
}
SName sname = {0};
if ((code = tscSetTableFullName(&sname, &tableToken, pSql, dbIncluded)) != TSDB_CODE_SUCCESS) {
taosReleaseRef(tscObjRef, pSql->self);
return code;
}
char fullTableName[TSDB_TABLE_FNAME_LEN] = {0};
memset(fullTableName, 0, tListLen(fullTableName));
tNameExtractFullName(&sname, fullTableName);
size_t size = 0;
taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), fullTableName, strlen(fullTableName), NULL, (void**)&tableMeta, &size);
STableMeta* stableMeta = tableMeta;
if (tableMeta != NULL && tableMeta->tableType == TSDB_CHILD_TABLE) {
taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), tableMeta->sTableName, strlen(tableMeta->sTableName), NULL,
(void**)stableMeta, &size);
}
taosReleaseRef(tscObjRef, pSql->self);
if (stableMeta != tableMeta) {
taosMemoryFree(tableMeta);
}
if (stableMeta != NULL) {
if (outTableMeta != NULL) {
*outTableMeta = stableMeta;
} else {
taosMemoryFree(stableMeta);
}
return TSDB_CODE_SUCCESS;
} else {
return TSDB_CODE_TSC_NO_META_CACHED;
}
}
static int32_t retrieveTableMeta(TAOS* taos, char* tableName, STableMeta** pTableMeta, SSmlLinesInfo* info) {
int32_t code = 0;
int32_t retries = 0;
STableMeta* tableMeta = NULL;
while (retries++ <= TSDB_MAX_REPLICA && tableMeta == NULL) {
STscObj* pObj = (STscObj*)taos;
if (pObj == NULL || pObj->signature != pObj) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return TSDB_CODE_TSC_DISCONNECTED;
}
uDebug("SML:0x%" PRIx64 " retrieve table meta. super table name: %s", info->id, tableName);
code = getSuperTableMetaFromLocalCache(taos, tableName, &tableMeta, info);
if (code == TSDB_CODE_SUCCESS) {
uDebug("SML:0x%" PRIx64 " successfully retrieved table meta. super table name: %s", info->id, tableName);
break;
} else if (code == TSDB_CODE_TSC_NO_META_CACHED) {
char sql[256];
snprintf(sql, 256, "describe %s", tableName);
TAOS_RES* res = taos_query(taos, sql);
code = taos_errno(res);
if (code != 0) {
uError("SML:0x%" PRIx64 " describe table failure. %s", info->id, taos_errstr(res));
taos_free_result(res);
return code;
}
taos_free_result(res);
} else {
return code;
}
}
if (tableMeta != NULL) {
*pTableMeta = tableMeta;
return TSDB_CODE_SUCCESS;
} else {
uError("SML:0x%" PRIx64 " failed to retrieve table meta. super table name: %s", info->id, tableName);
return TSDB_CODE_TSC_NO_META_CACHED;
}
}
static int32_t loadTableSchemaFromDB(TAOS* taos, char* tableName, SSmlSTableSchema* schema, SSmlLinesInfo* info) {
int32_t code = 0;
STableMeta* tableMeta = NULL;
code = retrieveTableMeta(taos, tableName, &tableMeta, info);
if (code == TSDB_CODE_SUCCESS) {
assert(tableMeta != NULL);
fillDbSchema(tableMeta, tableName, schema, info);
taosMemoryFree(tableMeta);
tableMeta = NULL;
}
return code;
}
static int32_t modifyDBSchemas(TAOS* taos, SSmlLinesInfo* info) {
int32_t code = 0;
......@@ -706,7 +366,6 @@ static int32_t modifyDBSchemas(TAOS* taos, SSmlLinesInfo* info) {
if (code == TSDB_CODE_TDB_INVALID_TABLE_ID) {
SSchemaAction schemaAction = {0};
schemaAction.action = SCHEMA_ACTION_CREATE_STABLE;
memset(&schemaAction.createSTable, 0, sizeof(SCreateSTableActionInfo));
memcpy(schemaAction.createSTable.sTableName, superTable, superTableLen);
schemaAction.createSTable.tags = cTablePoints->tagHash;
schemaAction.createSTable.fields = cTablePoints->fieldHash;
......@@ -728,7 +387,7 @@ static int32_t modifyDBSchemas(TAOS* taos, SSmlLinesInfo* info) {
return 0;
}
static int32_t applyDataPoints(TAOS* taos, SSmlLinesInfo* info) {
static int32_t applyDataPoints(SSmlLinesInfo* info) {
int32_t code = TSDB_CODE_SUCCESS;
TAOS_SML_DATA_POINT_TAGS** oneTable = taosHashIterate(info->childTables, NULL);
......@@ -746,43 +405,40 @@ static int32_t applyDataPoints(TAOS* taos, SSmlLinesInfo* info) {
STableMeta** pMeta = taosHashGet(info->metaHashObj, tableData->sTableName, tableData->sTableNameLen);
ASSERT (NULL != pMeta && NULL != *pMeta);
(*pMeta)->vgId = vg.vgId;
(*pMeta)->uid = tableData->uid;
smlBind(info->exec, tableData->tags, tableData->cols, *pMeta, info->msgBuf, info->msgLen);
(*pMeta)->uid = tableData->uid; // one table merge data block together according uid
code = smlBind(info->exec, tableData->tags, tableData->cols, *pMeta, info->msgBuf.buf, info->msgBuf.len);
if(code != TSDB_CODE_SUCCESS){
return code;
}
oneTable = taosHashIterate(info->childTables, oneTable);
}
smlBuildOutput(info->exec, info->pVgHash);
launchQueryImpl(info->pRequest, info->pQuery, TSDB_CODE_SUCCESS, true);
if(info->pRequest->code != TSDB_CODE_SUCCESS){
}
info->affectedRows = taos_affected_rows(info->pRequest);
return code;
return info->pRequest->code;
}
int tscSmlInsert(TAOS* taos, SSmlLinesInfo* info) {
int smlInsert(TAOS* taos, SSmlLinesInfo* info) {
uDebug("SML:0x%"PRIx64" taos_sml_insert. number of super tables: %d", info->id, taosHashGetSize(info->superTables));
int32_t code = TSDB_CODE_SUCCESS;
info->affectedRows = 0;
uDebug("SML:0x%"PRIx64" modify db schemas", info->id);
code = modifyDBSchemas(taos, info);
int32_t code = modifyDBSchemas(taos, info);
if (code != 0) {
uError("SML:0x%"PRIx64" error change db schema : %s", info->id, tstrerror(code));
goto clean_up;
return code;
}
uDebug("SML:0x%"PRIx64" apply data points", info->id);
code = applyDataPoints(taos, info);
code = applyDataPoints(info);
if (code != 0) {
uError("SML:0x%"PRIx64" error apply data points : %s", info->id, tstrerror(code));
return code;
}
clean_up:
return code;
return TSDB_CODE_SUCCESS;
}
//=========================================================================
......@@ -835,242 +491,307 @@ static void escapeSpecialCharacter(uint8_t field, const char **pos) {
*pos = cur;
}
bool isValidInteger(char *str) {
char *c = str;
if (*c != '+' && *c != '-' && !isdigit(*c)) {
return false;
}
c++;
while (*c != '\0') {
if (!isdigit(*c)) {
return false;
}
c++;
}
return true;
}
bool isValidFloat(char *str) {
char *c = str;
uint8_t has_dot, has_exp, has_sign;
has_dot = 0;
has_exp = 0;
has_sign = 0;
if (*c != '+' && *c != '-' && *c != '.' && !isdigit(*c)) {
return false;
}
if (*c == '.' && isdigit(*(c + 1))) {
has_dot = 1;
}
c++;
while (*c != '\0') {
if (!isdigit(*c)) {
switch (*c) {
case '.': {
if (!has_dot && !has_exp && isdigit(*(c + 1))) {
has_dot = 1;
} else {
return false;
}
break;
}
case 'e':
case 'E': {
if (!has_exp && isdigit(*(c - 1)) &&
(isdigit(*(c + 1)) ||
*(c + 1) == '+' ||
*(c + 1) == '-')) {
has_exp = 1;
} else {
return false;
}
break;
}
case '+':
case '-': {
if (!has_sign && has_exp && isdigit(*(c + 1))) {
has_sign = 1;
} else {
return false;
}
break;
}
default: {
return false;
}
}
}
c++;
} //while
return true;
}
static bool isInteger(char *pVal, uint16_t len, bool *has_sign) {
if (len <= 1) {
return false;
}
if (pVal[len - 1] == 'i') {
*has_sign = true;
return true;
}
if (pVal[len - 1] == 'u') {
*has_sign = false;
return true;
}
return false;
}
static bool isTinyInt(char *pVal, uint16_t len) {
static bool parseTinyInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if (len <= 2) {
return false;
}
if (!strcasecmp(&pVal[len - 2], "i8")) {
//printf("Type is int8(%s)\n", pVal);
const char *signalPos = pVal + len - 2;
if (!strcasecmp(signalPos, "i8")) {
char *endptr = NULL;
int64_t result = strtoll(pVal, &endptr, 10);
if(endptr != signalPos){ // 78ri8
*isValid = false;
buildInvalidDataMsg(msg, "invalid tiny int", endptr);
}else if(!IS_VALID_TINYINT(result)){
*isValid = false;
buildInvalidDataMsg(msg, "tiny int out of range[-128,127]", endptr);
}else{
kvVal->i = result;
*isValid = true;
}
return true;
}
return false;
}
static bool isTinyUint(char *pVal, uint16_t len) {
static bool parseTinyUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if (len <= 2) {
return false;
}
if (pVal[0] == '-') {
return false;
}
if (!strcasecmp(&pVal[len - 2], "u8")) {
//printf("Type is uint8(%s)\n", pVal);
const char *signalPos = pVal + len - 2;
if (!strcasecmp(signalPos, "u8")) {
char *endptr = NULL;
int64_t result = strtoll(pVal, &endptr, 10);
if(endptr != signalPos){ // 78ri8
*isValid = false;
buildInvalidDataMsg(msg, "invalid unsigned tiny int", endptr);
}else if(!IS_VALID_UTINYINT(result)){
*isValid = false;
buildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", endptr);
}else{
kvVal->i = result;
*isValid = true;
}
return true;
}
return false;
}
static bool isSmallInt(char *pVal, uint16_t len) {
static bool parseSmallInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if (len <= 3) {
return false;
}
if (!strcasecmp(&pVal[len - 3], "i16")) {
//printf("Type is int16(%s)\n", pVal);
const char *signalPos = pVal + len - 3;
if (!strcasecmp(signalPos, "i16")) {
char *endptr = NULL;
int64_t result = strtoll(pVal, &endptr, 10);
if(endptr != signalPos){ // 78ri8
*isValid = false;
buildInvalidDataMsg(msg, "invalid small int", endptr);
}else if(!IS_VALID_SMALLINT(result)){
*isValid = false;
buildInvalidDataMsg(msg, "small int our of range[-32768,32767]", endptr);
}else{
kvVal->i = result;
*isValid = true;
}
return true;
}
return false;
}
static bool isSmallUint(char *pVal, uint16_t len) {
static bool parseSmallUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if (len <= 3) {
return false;
}
if (pVal[0] == '-') {
return false;
}
if (strcasecmp(&pVal[len - 3], "u16") == 0) {
//printf("Type is uint16(%s)\n", pVal);
const char *signalPos = pVal + len - 3;
if (strcasecmp(signalPos, "u16") == 0) {
char *endptr = NULL;
int64_t result = strtoll(pVal, &endptr, 10);
if(endptr != signalPos){ // 78ri8
*isValid = false;
buildInvalidDataMsg(msg, "invalid unsigned small int", endptr);
}else if(!IS_VALID_USMALLINT(result)){
*isValid = false;
buildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", endptr);
}else{
kvVal->i = result;
*isValid = true;
}
return true;
}
return false;
}
static bool isInt(char *pVal, uint16_t len) {
static bool parseInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if (len <= 3) {
return false;
}
if (strcasecmp(&pVal[len - 3], "i32") == 0) {
//printf("Type is int32(%s)\n", pVal);
const char *signalPos = pVal + len - 3;
if (strcasecmp(signalPos, "i32") == 0) {
char *endptr = NULL;
int64_t result = strtoll(pVal, &endptr, 10);
if(endptr != signalPos){ // 78ri8
*isValid = false;
buildInvalidDataMsg(msg, "invalid int", endptr);
}else if(!IS_VALID_INT(result)){
*isValid = false;
buildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", endptr);
}else{
kvVal->i = result;
*isValid = true;
}
return true;
}
return false;
}
static bool isUint(char *pVal, uint16_t len) {
static bool parseUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if (len <= 3) {
return false;
}
if (pVal[0] == '-') {
return false;
}
if (strcasecmp(&pVal[len - 3], "u32") == 0) {
//printf("Type is uint32(%s)\n", pVal);
const char *signalPos = pVal + len - 3;
if (strcasecmp(signalPos, "u32") == 0) {
char *endptr = NULL;
int64_t result = strtoll(pVal, &endptr, 10);
if(endptr != signalPos){ // 78ri8
*isValid = false;
buildInvalidDataMsg(msg, "invalid unsigned int", endptr);
}else if(!IS_VALID_UINT(result)){
*isValid = false;
buildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", endptr);
}else{
kvVal->i = result;
*isValid = true;
}
return true;
}
return false;
}
static bool isBigInt(char *pVal, uint16_t len) {
if (len <= 3) {
return false;
}
if (strcasecmp(&pVal[len - 3], "i64") == 0) {
//printf("Type is int64(%s)\n", pVal);
static bool parseBigInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if (len > 3 && strcasecmp(pVal + len - 3, "i64") == 0) {
char *endptr = NULL;
int64_t result = strtoll(pVal, &endptr, 10);
if(endptr != pVal + len - 3){ // 78ri8
*isValid = false;
}else if(!IS_VALID_BIGINT(result)){
*isValid = false;
}else{
kvVal->i = result;
*isValid = true;
}
return true;
}else if (len > 1 && pVal[len - 1] == 'i') {
char *endptr = NULL;
int64_t result = strtoll(pVal, &endptr, 10);
if(endptr != pVal + len - 1){ // 78ri8
*isValid = false;
buildInvalidDataMsg(msg, "invalid big int", endptr);
}else if(!IS_VALID_BIGINT(result)){
*isValid = false;
buildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", endptr);
}else{
kvVal->i = result;
*isValid = true;
}
return true;
}
return false;
}
static bool isBigUint(char *pVal, uint16_t len) {
static bool parseBigUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if (len <= 3) {
return false;
}
if (pVal[0] == '-') {
return false;
}
if (strcasecmp(&pVal[len - 3], "u64") == 0) {
//printf("Type is uint64(%s)\n", pVal);
const char *signalPos = pVal + len - 3;
if (strcasecmp(signalPos, "u64") == 0) {
char *endptr = NULL;
uint64_t result = strtoull(pVal, &endptr, 10);
if(endptr != signalPos){ // 78ri8
*isValid = false;
buildInvalidDataMsg(msg, "invalid unsigned big int", endptr);
}else if(!IS_VALID_UBIGINT(result)){
*isValid = false;
buildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", endptr);
}else{
kvVal->u = result;
*isValid = true;
}
return true;
}
return false;
}
static bool isFloat(char *pVal, uint16_t len) {
if (len <= 3) {
return false;
static bool parseFloat(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
char *endptr = NULL;
float result = strtof(pVal, &endptr);
if(endptr == pVal + len && IS_VALID_FLOAT(result)){ // 78
kvVal->f = result;
*isValid = true;
return true;
}
if (strcasecmp(&pVal[len - 3], "f32") == 0) {
//printf("Type is float(%s)\n", pVal);
if (len > 3 && len <strcasecmp(pVal + len - 3, "f32") == 0) {
if(endptr != pVal + len - 3){ // 78ri8
*isValid = false;
buildInvalidDataMsg(msg, "invalid float", endptr);
}else if(!IS_VALID_FLOAT(result)){
*isValid = false;
buildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", endptr);
}else{
kvVal->f = result;
*isValid = true;
}
return true;
}
return false;
}
static bool isDouble(char *pVal, uint16_t len) {
static bool parseDouble(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) {
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if (len <= 3) {
return false;
}
if (strcasecmp(&pVal[len - 3], "f64") == 0) {
//printf("Type is double(%s)\n", pVal);
const char *signalPos = pVal + len - 3;
if (len <strcasecmp(signalPos, "f64") == 0) {
char *endptr = NULL;
double result = strtod(pVal, &endptr);
if(endptr != signalPos){ // 78ri8
*isValid = false;
buildInvalidDataMsg(msg, "invalid double", endptr);
}else if(!IS_VALID_DOUBLE(result)){
*isValid = false;
buildInvalidDataMsg(msg, "double out of range[-1.7976931348623158e+308,1.7976931348623158e+308]", endptr);
}else{
kvVal->d = result;
*isValid = true;
}
return true;
}
return false;
}
static bool isBool(char *pVal, uint16_t len, bool *bVal) {
if ((len == 1) && !strcasecmp(&pVal[len - 1], "t")) {
static bool parseBool(SSmlKv *kvVal) {
const char *pVal = kvVal->value;
int32_t len = kvVal->valueLen;
if ((len == 1) && pVal[len - 1] == 't') {
//printf("Type is bool(%c)\n", pVal[len - 1]);
*bVal = true;
kvVal->i = true;
return true;
}
if ((len == 1) && !strcasecmp(&pVal[len - 1], "f")) {
if ((len == 1) && pVal[len - 1] == 'f') {
//printf("Type is bool(%c)\n", pVal[len - 1]);
*bVal = false;
kvVal->i = false;
return true;
}
if((len == 4) && !strcasecmp(&pVal[len - 4], "true")) {
if((len == 4) && !strcasecmp(pVal, "true")) {
//printf("Type is bool(%s)\n", &pVal[len - 4]);
*bVal = true;
kvVal->i = true;
return true;
}
if((len == 5) && !strcasecmp(&pVal[len - 5], "false")) {
if((len == 5) && !strcasecmp(pVal, "false")) {
//printf("Type is bool(%s)\n", &pVal[len - 5]);
*bVal = false;
kvVal->i = false;
return true;
}
return false;
}
static bool isBinary(char *pVal, uint16_t len) {
static bool isBinary(const char *pVal, uint16_t len) {
//binary: "abc"
if (len < 2) {
return false;
......@@ -1083,7 +804,7 @@ static bool isBinary(char *pVal, uint16_t len) {
return false;
}
static bool isNchar(char *pVal, uint16_t len) {
static bool isNchar(const char *pVal, uint16_t len) {
//nchar: L"abc"
if (len < 3) {
return false;
......@@ -1095,266 +816,97 @@ static bool isNchar(char *pVal, uint16_t len) {
return false;
}
static bool convertStrToNumber(TAOS_SML_KV *pVal, char *str, SSmlLinesInfo* info) {
errno = 0;
uint8_t type = pVal->type;
int16_t length = pVal->length;
int64_t val_s = 0;
uint64_t val_u = 0;
double val_d = 0.0;
strntolower_s(str, str, (int32_t)strlen(str));
if (IS_FLOAT_TYPE(type)) {
val_d = strtod(str, NULL);
} else {
if (IS_SIGNED_NUMERIC_TYPE(type)) {
val_s = strtoll(str, NULL, 10);
} else {
val_u = strtoull(str, NULL, 10);
}
}
if (errno == ERANGE) {
uError("SML:0x%"PRIx64" Convert number(%s) out of range", info->id, str);
return false;
}
switch (type) {
case TSDB_DATA_TYPE_TINYINT:
if (!IS_VALID_TINYINT(val_s)) {
return false;
}
pVal->value = calloc(length, 1);
*(int8_t *)(pVal->value) = (int8_t)val_s;
break;
case TSDB_DATA_TYPE_UTINYINT:
if (!IS_VALID_UTINYINT(val_u)) {
return false;
}
pVal->value = calloc(length, 1);
*(uint8_t *)(pVal->value) = (uint8_t)val_u;
break;
case TSDB_DATA_TYPE_SMALLINT:
if (!IS_VALID_SMALLINT(val_s)) {
return false;
}
pVal->value = calloc(length, 1);
*(int16_t *)(pVal->value) = (int16_t)val_s;
break;
case TSDB_DATA_TYPE_USMALLINT:
if (!IS_VALID_USMALLINT(val_u)) {
return false;
}
pVal->value = calloc(length, 1);
*(uint16_t *)(pVal->value) = (uint16_t)val_u;
break;
case TSDB_DATA_TYPE_INT:
if (!IS_VALID_INT(val_s)) {
return false;
}
pVal->value = calloc(length, 1);
*(int32_t *)(pVal->value) = (int32_t)val_s;
break;
case TSDB_DATA_TYPE_UINT:
if (!IS_VALID_UINT(val_u)) {
return false;
}
pVal->value = calloc(length, 1);
*(uint32_t *)(pVal->value) = (uint32_t)val_u;
break;
case TSDB_DATA_TYPE_BIGINT:
if (!IS_VALID_BIGINT(val_s)) {
return false;
}
pVal->value = calloc(length, 1);
*(int64_t *)(pVal->value) = (int64_t)val_s;
break;
case TSDB_DATA_TYPE_UBIGINT:
if (!IS_VALID_UBIGINT(val_u)) {
return false;
}
pVal->value = calloc(length, 1);
*(uint64_t *)(pVal->value) = (uint64_t)val_u;
break;
case TSDB_DATA_TYPE_FLOAT:
if (!IS_VALID_FLOAT(val_d)) {
return false;
}
pVal->value = calloc(length, 1);
*(float *)(pVal->value) = (float)val_d;
break;
case TSDB_DATA_TYPE_DOUBLE:
if (!IS_VALID_DOUBLE(val_d)) {
return false;
}
pVal->value = calloc(length, 1);
*(double *)(pVal->value) = (double)val_d;
break;
default:
return false;
static bool convertSmlValue(SSmlKv *pVal, SMsgBuf *msg) {
// put high probability matching type first
bool isValid = false;
if (parseFloat(pVal, &isValid, msg)) {
if(!isValid) return false;
pVal->type = TSDB_DATA_TYPE_FLOAT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
return true;
}
return true;
}
//len does not include '\0' from value.
bool convertSmlValueType(TAOS_SML_KV *pVal, char *value,
uint16_t len, SSmlLinesInfo* info, bool isTag) {
if (len <= 0) {
return false;
//binary
if (isBinary(pVal->value, pVal->valueLen)) {
pVal->type = TSDB_DATA_TYPE_BINARY;
pVal->length = pVal->valueLen - 2;
pVal->valueLen -= 2;
pVal->value = pVal->value++;
return true;
}
//convert tags value to Nchar
if (isTag) {
//nchar
if (isNchar(pVal->value, pVal->valueLen)) {
pVal->type = TSDB_DATA_TYPE_NCHAR;
pVal->length = len;
pVal->value = calloc(pVal->length, 1);
memcpy(pVal->value, value, pVal->length);
pVal->length = pVal->valueLen - 3;
pVal->value = pVal->value+2;
return true;
}
if (parseDouble(pVal, &isValid, msg)) {
if(!isValid) return false;
pVal->type = TSDB_DATA_TYPE_DOUBLE;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
//integer number
bool has_sign;
if (isInteger(value, len, &has_sign)) {
pVal->type = has_sign ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_UBIGINT;
return true;
}
//bool
if (parseBool(pVal)) {
pVal->type = TSDB_DATA_TYPE_BOOL;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 1] = '\0';
if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
return false;
}
return true;
}
if (isTinyInt(value, len)) {
if (parseTinyInt(pVal, &isValid, msg)) {
if(!isValid) return false;
pVal->type = TSDB_DATA_TYPE_TINYINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 2] = '\0';
if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
return false;
}
return true;
}
if (isTinyUint(value, len)) {
if (parseTinyUint(pVal, &isValid, msg)) {
if(!isValid) return false;
pVal->type = TSDB_DATA_TYPE_UTINYINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 2] = '\0';
if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
return false;
}
return true;
}
if (isSmallInt(value, len)) {
if (parseSmallInt(pVal, &isValid, msg)) {
if(!isValid) return false;
pVal->type = TSDB_DATA_TYPE_SMALLINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 3] = '\0';
if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
return false;
}
return true;
}
if (isSmallUint(value, len)) {
if (parseSmallUint(pVal, &isValid, msg)) {
if(!isValid) return false;
pVal->type = TSDB_DATA_TYPE_USMALLINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 3] = '\0';
if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
return false;
}
return true;
}
if (isInt(value, len)) {
if (parseInt(pVal, &isValid, msg)) {
if(!isValid) return false;
pVal->type = TSDB_DATA_TYPE_INT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 3] = '\0';
if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
return false;
}
return true;
}
if (isUint(value, len)) {
if (parseUint(pVal, &isValid, msg)) {
if(!isValid) return false;
pVal->type = TSDB_DATA_TYPE_UINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 3] = '\0';
if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
return false;
}
return true;
}
if (isBigInt(value, len)) {
if (parseBigInt(pVal, &isValid, msg)) {
if(!isValid) return false;
pVal->type = TSDB_DATA_TYPE_BIGINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 3] = '\0';
if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
return false;
}
return true;
}
if (isBigUint(value, len)) {
if (parseBigUint(pVal, &isValid, msg)) {
if(!isValid) return false;
pVal->type = TSDB_DATA_TYPE_UBIGINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 3] = '\0';
if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) {
return false;
}
return true;
}
//floating number
if (isFloat(value, len)) {
pVal->type = TSDB_DATA_TYPE_FLOAT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 3] = '\0';
if (!isValidFloat(value) || !convertStrToNumber(pVal, value, info)) {
return false;
}
return true;
}
if (isDouble(value, len)) {
pVal->type = TSDB_DATA_TYPE_DOUBLE;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
value[len - 3] = '\0';
if (!isValidFloat(value) || !convertStrToNumber(pVal, value, info)) {
return false;
}
return true;
}
//binary
if (isBinary(value, len)) {
pVal->type = TSDB_DATA_TYPE_BINARY;
pVal->length = len - 2;
pVal->value = calloc(pVal->length, 1);
//copy after "
memcpy(pVal->value, value + 1, pVal->length);
return true;
}
//nchar
if (isNchar(value, len)) {
pVal->type = TSDB_DATA_TYPE_NCHAR;
pVal->length = len - 3;
pVal->value = calloc(pVal->length, 1);
//copy after L"
memcpy(pVal->value, value + 2, pVal->length);
return true;
}
//bool
bool bVal;
if (isBool(value, len, &bVal)) {
pVal->type = TSDB_DATA_TYPE_BOOL;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->value = calloc(pVal->length, 1);
memcpy(pVal->value, &bVal, pVal->length);
return true;
}
//Handle default(no appendix) type as DOUBLE
if (isValidInteger(value) || isValidFloat(value)) {
pVal->type = TSDB_DATA_TYPE_DOUBLE;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
if (!convertStrToNumber(pVal, value, info)) {
return false;
}
return true;
}
buildInvalidDataMsg(msg, "invalid data", pVal->value);
return false;
}
bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info) {
char *val = NULL;
val = taosHashGet(pHash, key, strlen(key));
......@@ -1369,21 +921,6 @@ bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info) {
return false;
}
//Table name can only contain digits(0-9),alphebet(a-z),underscore(_)
int32_t isValidChildTableName(const char *pTbName, int16_t len, SSmlLinesInfo* info) {
if (len > TSDB_TABLE_NAME_LEN - 1) {
uError("SML:0x%"PRIx64" child table name cannot exceeds %d characters", info->id, TSDB_TABLE_NAME_LEN - 1);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
const char *cur = pTbName;
for (int i = 0; i < len; ++i) {
if(!isdigit(cur[i]) && !isalpha(cur[i]) && (cur[i] != '_')) {
return TSDB_CODE_TSC_LINE_SYNTAX_ERROR;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t parseSml(const char* sql, TAOS_PARSE_ELEMENTS *elements){
if(!sql) return TSDB_CODE_SML_INVALID_DATA;
while (*sql != '\0') { // jump the space at the begining
......@@ -1428,8 +965,12 @@ int32_t parseSml(const char* sql, TAOS_PARSE_ELEMENTS *elements){
}
if(!elements->cols) return TSDB_CODE_SML_INVALID_DATA;
bool isInQuote = false;
while (*sql != '\0') {
if(*sql == SPACE && *(sql - 1) != SLASH) {
if(*sql == QUOTE && *(sql - 1) != SLASH){
isInQuote = !isInQuote;
}
if(!isInQuote && *sql == SPACE && *(sql - 1) != SLASH) {
break;
}
sql++;
......@@ -1444,12 +985,27 @@ int32_t parseSml(const char* sql, TAOS_PARSE_ELEMENTS *elements){
}
sql++;
}
if(elements->timestamp){
elements->timestampLen = sql - elements->timestamp;
}
return TSDB_CODE_SUCCESS;
}
int32_t parseSmlKV(const char* data, int32_t len, SArray *cols, bool isTag){
bool parseSmlCols(const char* data, int32_t len, SArray *cols, bool isTag, SMsgBuf *msg){
if(isTag && len == 0){
SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1);
kv->key = TAG;
kv->keyLen = TAGNAMELEN;
kv->value = TAG;
kv->valueLen = TAGNAMELEN;
kv->type = TSDB_DATA_TYPE_NCHAR;
if(cols) taosArrayPush(cols, &kv);
return true;
}
for(int i = 0; i < len; i++){
// parse key
const char *key = data + i;
int32_t keyLen = 0;
while(i < len){
......@@ -1459,23 +1015,27 @@ int32_t parseSmlKV(const char* data, int32_t len, SArray *cols, bool isTag){
}
i++;
}
if(keyLen == 0){
if(keyLen == 0 || keyLen >= TSDB_COL_NAME_LEN){
buildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
return TSDB_CODE_SML_INVALID_DATA;
}
// parse value
i++;
const char *value = data + i;
int32_t valueLen = 0;
while(i < len){
if(data[i] == COMMA && i > 0 && data[i-1] != SLASH){
valueLen = data + i - value;
break;
}
i++;
}
int32_t valueLen = data + i - value;
if(valueLen == 0){
buildInvalidDataMsg(msg, "invalid value", value);
return TSDB_CODE_SML_INVALID_DATA;
}
// add kv to SSmlKv
SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1);
kv->key = key;
kv->keyLen = keyLen;
......@@ -1483,9 +1043,15 @@ int32_t parseSmlKV(const char* data, int32_t len, SArray *cols, bool isTag){
kv->valueLen = valueLen;
if(isTag){
kv->type = TSDB_DATA_TYPE_NCHAR;
}else{
if(!convertSmlValue(kv, msg)){
return TSDB_CODE_SML_INVALID_DATA;
}
}
if(cols) taosArrayPush(cols, &kv);
}
return TSDB_CODE_SUCCESS;
}
......@@ -1526,13 +1092,13 @@ static int64_t getTimeStampNow(int32_t precision) {
}
}
static int32_t isValidateTimeStamp(const char *pVal, int32_t len) {
static bool isValidateTimeStamp(const char *pVal, int32_t len) {
for (int i = 0; i < len; ++i) {
if (!isdigit(pVal[i])) {
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
return false;
}
}
return TSDB_CODE_SUCCESS;
return true;
}
static int32_t getTsType(int32_t len) {
......@@ -1541,47 +1107,53 @@ static int32_t getTsType(int32_t len) {
} else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) {
return TSDB_TIME_PRECISION_MILLI_DIGITS;
} else {
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
return -1;
}
}
static int32_t parseSmlTS(const char* data, SArray *tags, int8_t tsType, SMLProtocolType protocolType){
int64_t *ts = taosMemoryCalloc(1, sizeof(int64_t));
static int32_t parseSmlTS(const char* data, int32_t len, SArray *tags, SSmlLinesInfo* info){
int64_t ts = 0;
if(data == NULL){
if(protocolType == TSDB_SML_LINE_PROTOCOL){
*ts = getTimeStampNow(tsType);
}else{
goto cleanup;
if(info->protocol != TSDB_SML_LINE_PROTOCOL){
buildInvalidDataMsg(&info->msgBuf, "timestamp can not be null", NULL);
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
ts = getTimeStampNow(info->tsType);
}else{
int32_t len = strlen(data);
int ret = isValidateTimeStamp(data, len);
if(!ret){
goto cleanup;
buildInvalidDataMsg(&info->msgBuf, "timestamp must be digit", data);
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
if(protocolType != TSDB_SML_LINE_PROTOCOL){
int32_t tsType = -1;
if(info->protocol != TSDB_SML_LINE_PROTOCOL){
tsType = getTsType(len);
if (tsType == TSDB_CODE_TSC_INVALID_TIME_STAMP) {
goto cleanup;
if (tsType == -1) {
buildInvalidDataMsg(&info->msgBuf, "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", data);
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
}else{
tsType = info->tsType;
}
*ts = getTimeStampValue(data, tsType);
if(*ts == -1){
goto cleanup;
ts = getTimeStampValue(data, tsType);
if(ts == -1){
buildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
}
SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1);
kv->value = (const char*)ts;
kv->valueLen = sizeof(int64_t);
if(!kv){
return TSDB_CODE_OUT_OF_MEMORY;
}
kv->key = TS;
kv->keyLen = TSNAMELEN;
kv->i = ts;
kv->type = TSDB_DATA_TYPE_TIMESTAMP;
kv->length = (int16_t)tDataTypes[kv->type].bytes;
if(tags) taosArrayPush(tags, &kv);
return TSDB_CODE_SUCCESS;
cleanup:
taosMemoryFree(ts);
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
//int32_t parseSmlCols(const char* data, SArray *cols){
......@@ -1627,19 +1199,21 @@ cleanup:
// return TSDB_CODE_SUCCESS;
//}
void updateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols){
bool updateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols, SMsgBuf *msg){
if(tags){
for (int i = 0; i < taosArrayGetSize(tags); ++i) {
SSmlKv *kv = taosArrayGetP(tags, i);
ASSERT(kv->type == TSDB_DATA_TYPE_NCHAR);
SSmlKv **value = taosHashGet(tableMeta->tagHash, kv->key, kv->keyLen);
if(value){
if(kv->type != (*value)->type){
// todo
ASSERT((*value)->type == TSDB_DATA_TYPE_NCHAR);
if(kv->valueLen > (*value)->valueLen){ // tags type is nchar
*value = kv;
}
}else{
taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
}
}
}
......@@ -1649,7 +1223,14 @@ void updateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols){
SSmlKv **value = taosHashGet(tableMeta->fieldHash, kv->key, kv->keyLen);
if(value){
if(kv->type != (*value)->type){
// todo
buildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
return false;
}else{
if(IS_VAR_DATA_TYPE(kv->type)){ // update string len, if bigger
if(kv->valueLen > (*value)->valueLen){
*value = kv;
}
}
}
}else{
taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
......@@ -1687,21 +1268,33 @@ static int32_t smlParseLine(const char* sql, SSmlLinesInfo* info) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
parseSmlTS(elements.timestamp, cols, info->tsType);
ret = parseSmlCols(elements.cols, elements.colsLen, cols, false);
ret = parseSmlTS(elements.timestamp, elements.timestampLen, cols, info);
if(ret != TSDB_CODE_SUCCESS){
return ret;
}
ret = parseSmlCols(elements.cols, elements.colsLen, cols, false, &info->msgBuf);
if(ret != TSDB_CODE_SUCCESS){
return ret;
}
if(taosArrayGetSize(cols) > TSDB_MAX_COLUMNS){
buildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL);
return TSDB_CODE_SML_INVALID_DATA;
}
TAOS_SML_DATA_POINT_TAGS** oneTable = taosHashGet(info->childTables, elements.measure, elements.measureTagsLen);
if(oneTable){
SSmlSTableMeta** tableMeta = taosHashGet(info->superTables, elements.measure, elements.measureLen);
ASSERT(tableMeta);
updateMeta(*tableMeta, NULL, cols); // update meta
ret = updateMeta(*tableMeta, NULL, cols, &info->msgBuf); // update meta
if(!ret){
return TSDB_CODE_SML_INVALID_DATA;
}
taosArrayPush((*oneTable)->cols, &cols);
}else{
TAOS_SML_DATA_POINT_TAGS *tag = taosMemoryCalloc(sizeof(TAOS_SML_DATA_POINT_TAGS), 1);
if(!tag){
return TSDB_CODE_OUT_OF_MEMORY;
}
tag->cols = taosArrayInit(16, POINTER_BYTES);
if (tag->cols == NULL) {
uError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id);
......@@ -1709,19 +1302,37 @@ static int32_t smlParseLine(const char* sql, SSmlLinesInfo* info) {
}
taosArrayPush(tag->cols, &cols);
tag->colsColumn = taosArrayInit(16, POINTER_BYTES);
if (tag->cols == NULL) {
uError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
tag->tags = taosArrayInit(16, POINTER_BYTES);
if (tag->tags == NULL) {
uError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
ret = parseSmlTags(elements.tags, elements.tagsLen, tag->tags);
ret = parseSmlCols(elements.tags, elements.tagsLen, tag->tags, true, &info->msgBuf);
if(ret != TSDB_CODE_SUCCESS){
return ret;
}
if(taosArrayGetSize(tag->tags) > TSDB_MAX_TAGS){
buildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
return TSDB_CODE_SML_INVALID_DATA;
}
tag->sTableName = elements.measure;
tag->sTableNameLen = elements.measureLen;
buildSmlChildTableName(tag);
SSmlSTableMeta** tableMeta = taosHashGet(info->superTables, elements.measure, elements.measureLen);
if(tableMeta){ // update meta
updateMeta(*tableMeta, tag->tags, cols);
ret = updateMeta(*tableMeta, tag->tags, cols, &info->msgBuf);
if(!ret){
return TSDB_CODE_SML_INVALID_DATA;
}
}else{
SSmlSTableMeta* meta = taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
insertMeta(meta, tag->tags, cols);
......@@ -1771,9 +1382,8 @@ static SSmlLinesInfo* smlBuildInfo(TAOS* taos, SRequestObj* request, SMLProtocol
goto cleanup;
}
info->pRequest = request;
info->msgBuf = info->pRequest->msgBuf;
info->msgLen = ERROR_MSG_BUF_DEFAULT_SIZE;
info->msgBuf.buf = info->pRequest->msgBuf;
info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false);
info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false);
......@@ -1868,7 +1478,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
SRequestObj* request = createRequest(taos, NULL, NULL, TSDB_SQL_INSERT);
switch (protocol) {
case TSDB_SML_LINE_PROTOCOL:
case TSDB_SML_LINE_PROTOCOL:{
int32_t tsType = convertPrecisionType(precision);
if(tsType == -1){
request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
......@@ -1877,6 +1487,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
code = sml_insert_lines(taos, request, lines, numLines, protocol, tsType);
break;
}
case TSDB_SML_TELNET_PROTOCOL:
//code = taos_insert_telnet_lines(taos, lines, numLines, protocol, tsType, &affected_rows);
break;
......
......@@ -1539,7 +1539,7 @@ static int32_t smlBoundColumns(SArray *cols, SParsedDataColInfo* pColList, SSche
col_id_t lastColIdx = -1; // last column found
for (int i = 0; i < taosArrayGetSize(cols); ++i) {
SSmlKv *kv = taosArrayGetP(cols, i);
SToken sToken = {.n=kv->keyLen, .z=kv->key};
SToken sToken = {.n=kv->keyLen, .z=(char*)kv->key};
col_id_t t = lastColIdx + 1;
col_id_t index = findCol(&sToken, t, nCols, pSchema);
if (index < 0 && t > 0) {
......@@ -1596,18 +1596,17 @@ static int32_t smlBoundColumns(SArray *cols, SParsedDataColInfo* pColList, SSche
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseTags(SArray *cols, SKVRowBuilder *tagsBuilder, SParsedDataColInfo* tags, SSchema* pSchema, SVCreateTbReq *createTblReq) {
static int32_t smlParseTags(SArray *cols, SKVRowBuilder *tagsBuilder, SParsedDataColInfo* tags, SSchema* pSchema, SVCreateTbReq *createTblReq, SMsgBuf *msg) {
if (tdInitKVRowBuilder(tagsBuilder) < 0) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SKvParam param = {.builder = tagsBuilder};
for (int i = 0; i < tags->numOfBound; ++i) {
SSchema* pTagSchema = &pSchema[tags->boundColumns[i] - 1]; // colId starts with 1
param.schema = pTagSchema;
SSmlKv *kv = taosArrayGetP(cols, i);
KvRowAppend(NULL, kv->value, kv->valueLen, &param) ;
KvRowAppend(msg, kv->value, kv->valueLen, &param) ;
}
......@@ -1630,18 +1629,33 @@ int32_t smlBind(void *handle, SArray *tags, SArray *cols, STableMeta *pTableMeta
SmlExecHandle *smlHandle = (SmlExecHandle *)handle;
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
smlBoundColumns(tags, &smlHandle->tags, pTagsSchema);
smlParseTags(tags, &smlHandle->tagsBuilder, &smlHandle->tags, pTagsSchema, &smlHandle->createTblReq);
setBoundColumnInfo(&smlHandle->tags, pTagsSchema, getNumOfTags(pTableMeta));
int ret = smlBoundColumns(tags, &smlHandle->tags, pTagsSchema);
if(ret != TSDB_CODE_SUCCESS){
buildInvalidOperationMsg(&pBuf, "bound tags error");
return ret;
}
ret = smlParseTags(tags, &smlHandle->tagsBuilder, &smlHandle->tags, pTagsSchema, &smlHandle->createTblReq, &pBuf);
if(ret != TSDB_CODE_SUCCESS){
return ret;
}
STableDataBlocks* pDataBlock = NULL;
getDataBlockFromList(smlHandle->pBlockHash, pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
ret = getDataBlockFromList(smlHandle->pBlockHash, pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), getTableInfo(pTableMeta).rowSize, pTableMeta,
&pDataBlock, NULL, &smlHandle->createTblReq);
if(ret != TSDB_CODE_SUCCESS){
buildInvalidOperationMsg(&pBuf, "create data block error");
return ret;
}
SSchema* pSchema = getTableColumnSchema(pTableMeta);
smlBoundColumns(taosArrayGetP(cols, 0), &pDataBlock->boundColumnInfo, pSchema);
ret = smlBoundColumns(taosArrayGetP(cols, 0), &pDataBlock->boundColumnInfo, pSchema);
if(ret != TSDB_CODE_SUCCESS){
buildInvalidOperationMsg(&pBuf, "bound cols error");
return ret;
}
int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
SRowBuilder* pBuilder = &pDataBlock->rowBuilder;
......@@ -1649,8 +1663,11 @@ int32_t smlBind(void *handle, SArray *tags, SArray *cols, STableMeta *pTableMeta
initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo);
allocateMemForSize(pDataBlock, extendedRowSize * rowNum);
ret = allocateMemForSize(pDataBlock, extendedRowSize * rowNum);
if(ret != TSDB_CODE_SUCCESS){
buildInvalidOperationMsg(&pBuf, "allocate memory error");
return ret;
}
for (int32_t r = 0; r < rowNum; ++r) {
STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header
tdSRowResetBuf(pBuilder, row);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册