未验证 提交 35b5f185 编写于 作者: wmmhello's avatar wmmhello 提交者: GitHub

Merge pull request #18833 from taosdata/refact/submit_req_marks

opti:performance of schemaless
......@@ -58,7 +58,6 @@ typedef struct SParseContext {
bool isSuperUser;
bool enableSysInfo;
bool async;
int8_t schemalessType;
const char* svrVer;
bool nodeOffline;
SArray* pTableMetaPos; // sql table pos => catalog data pos
......@@ -108,7 +107,11 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char*
void qDestroyBoundColInfo(void* pInfo);
SQuery* smlInitHandle();
int32_t smlBindData(SQuery* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,
int32_t smlBuildRow(STableDataCxt* pTableCxt);
int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void *kv, int32_t index);
STableDataCxt* smlInitTableDataCtx(SQuery* query, STableMeta* pTableMeta);
int32_t smlBindData(SQuery* handle, bool dataFormat, SArray* tags, SArray* colsSchema, SArray* cols, STableMeta* pTableMeta,
char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen);
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash);
......
......@@ -149,7 +149,6 @@ typedef struct STscObj {
int32_t numOfReqs; // number of sqlObj bound to this connection
SAppInstInfo* pAppInfo;
SHashObj* pRequests;
int8_t schemalessType; // todo remove it, this attribute should be move to request
} STscObj;
typedef struct SResultColumn {
......
......@@ -30,7 +30,7 @@ extern "C" {
#define tscDebug(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC ", DEBUG_DEBUG, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscTrace(...) do { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC ", DEBUG_TRACE, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscDebugL(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC ", DEBUG_DEBUG, cDebugFlag, __VA_ARGS__); }} while(0)
//#define tscPerf(...) do { if (cDebugFlag & DEBUG_INFO) { taosPrintLog("TSC ", DEBUG_INFO, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscPerf(...) do { if (cDebugFlag & DEBUG_INFO) { taosPrintLog("TSC ", 0, cDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
#ifdef __cplusplus
......
......@@ -76,13 +76,20 @@ static void deregisterRequest(SRequestObj *pRequest) {
"current:%d, app current:%d",
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
tscPerf("insert duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64
"us, exec:%" PRId64 "us, stmtType:%d",
duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart,
pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd -
pRequest->metric.ctgEnd, pRequest->metric.execEnd - pRequest->metric.semanticEnd,
pRequest->stmtType);
if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->stmtType) {
// tscPerf("insert duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64
// "us, exec:%" PRId64 "us",
// duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart,
// pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd -
// pRequest->metric.ctgEnd, pRequest->metric.execEnd - pRequest->metric.semanticEnd);
atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
// tscPerf("insert duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64
// "us, exec:%" PRId64 "us",
// duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart,
// pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd -
// pRequest->metric.ctgEnd, pRequest->metric.execEnd - pRequest->metric.semanticEnd);
// atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
// tscPerf("select duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64
// "us, planner:%" PRId64 "us, exec:%" PRId64 "us, reqId:0x%" PRIx64,
......@@ -265,7 +272,6 @@ void *createTscObj(const char *user, const char *auth, const char *db, int32_t c
taosThreadMutexInit(&pObj->mutex, NULL);
pObj->id = taosAddRef(clientConnRefPool, pObj);
pObj->schemalessType = 1;
atomic_add_fetch_64(&pObj->pAppInfo->numOfConns, 1);
......
......@@ -232,7 +232,6 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
.pTransporter = pTscObj->pAppInfo->pTransporter,
.pStmtCb = pStmtCb,
.pUser = pTscObj->user,
.schemalessType = pTscObj->schemalessType,
.isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
.enableSysInfo = pTscObj->sysInfo,
.svrVer = pTscObj->sVer,
......
......@@ -863,7 +863,6 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
.pTransporter = pTscObj->pAppInfo->pTransporter,
.pStmtCb = NULL,
.pUser = pTscObj->user,
.schemalessType = pTscObj->schemalessType,
.isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
.enableSysInfo = pTscObj->sysInfo,
.async = true,
......
......@@ -20,6 +20,19 @@
#include "ttime.h"
#include "ttypes.h"
#if (defined(__GNUC__) && (__GNUC__ >= 3)) || (defined(__INTEL_COMPILER) && (__INTEL_COMPILER >= 800)) || defined(__clang__)
# define expect(expr,value) (__builtin_expect ((expr),(value)) )
#else
# define expect(expr,value) (expr)
#endif
#ifndef likely
#define likely(expr) expect((expr) != 0, 1)
#endif
#ifndef unlikely
#define unlikely(expr) expect((expr) != 0, 0)
#endif
//=================================================================================================
#define SPACE ' '
......@@ -30,28 +43,29 @@
#define JUMP_SPACE(sql, sqlEnd) \
while (sql < sqlEnd) { \
if (*sql == SPACE) \
if (unlikely(*sql == SPACE)) \
sql++; \
else \
break; \
}
// comma ,
#define IS_SLASH_COMMA(sql) (*(sql) == COMMA && *((sql)-1) == SLASH)
//#define IS_SLASH_COMMA(sql) (*(sql) == COMMA && *((sql)-1) == SLASH)
#define IS_COMMA(sql) (*(sql) == COMMA && *((sql)-1) != SLASH)
// space
#define IS_SLASH_SPACE(sql) (*(sql) == SPACE && *((sql)-1) == SLASH)
//#define IS_SLASH_SPACE(sql) (*(sql) == SPACE && *((sql)-1) == SLASH)
#define IS_SPACE(sql) (*(sql) == SPACE && *((sql)-1) != SLASH)
// equal =
#define IS_SLASH_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) == SLASH)
//#define IS_SLASH_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) == SLASH)
#define IS_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) != SLASH)
// quote "
#define IS_SLASH_QUOTE(sql) (*(sql) == QUOTE && *((sql)-1) == SLASH)
//#define IS_SLASH_QUOTE(sql) (*(sql) == QUOTE && *((sql)-1) == SLASH)
#define IS_QUOTE(sql) (*(sql) == QUOTE && *((sql)-1) != SLASH)
// SLASH
#define IS_SLASH_SLASH(sql) (*(sql) == SLASH && *((sql)-1) == SLASH)
//#define IS_SLASH_SLASH(sql) (*(sql) == SLASH && *((sql)-1) == SLASH)
#define IS_SLASH_LETTER(sql) \
(IS_SLASH_COMMA(sql) || IS_SLASH_SPACE(sql) || IS_SLASH_EQUAL(sql) || IS_SLASH_QUOTE(sql) || IS_SLASH_SLASH(sql))
(*((sql)-1) == SLASH && (*(sql) == COMMA || *(sql) == SPACE || *(sql) == EQUAL || *(sql) == QUOTE || *(sql) == SLASH)) \
// (IS_SLASH_COMMA(sql) || IS_SLASH_SPACE(sql) || IS_SLASH_EQUAL(sql) || IS_SLASH_QUOTE(sql) || IS_SLASH_SLASH(sql))
#define MOVE_FORWARD_ONE(sql, len) (memmove((void *)((sql)-1), (sql), len))
......@@ -75,6 +89,8 @@
#define VALUE "_value"
#define VALUE_LEN 6
#define JSON_METERS_NAME "__JM"
#define BINARY_ADD_LEN 2 // "binary" 2 means " "
#define NCHAR_ADD_LEN 3 // L"nchar" 3 means L" "
......@@ -91,17 +107,99 @@ typedef enum {
SCHEMA_ACTION_CHANGE_TAG_SIZE,
} ESchemaAction;
/*********************** list start *********************************/
typedef struct {
const void *key;
int32_t keyLen;
void *value;
bool used;
}Node;
typedef struct NodeList{
Node data;
struct NodeList* next;
}NodeList;
typedef int32_t (*_equal_fn_sml)(const void *, const void *);
static void* nodeListGet(NodeList* list, const void *key, int32_t len, _equal_fn_sml fn){
NodeList *tmp = list;
while(tmp){
if(fn == NULL){
if(tmp->data.used && tmp->data.keyLen == len && memcmp(tmp->data.key, key, len) == 0) {
return tmp->data.value;
}
}else{
if(tmp->data.used && fn(tmp->data.key, key) == 0) {
return tmp->data.value;
}
}
tmp = tmp->next;
}
return NULL;
}
static int nodeListSet(NodeList** list, const void *key, int32_t len, void* value, _equal_fn_sml fn){
NodeList *tmp = *list;
while (tmp){
if(!tmp->data.used) break;
if(fn == NULL){
if(tmp->data.keyLen == len && memcmp(tmp->data.key, key, len) == 0) {
return -1;
}
}else{
if(tmp->data.keyLen == len && fn(tmp->data.key, key) == 0) {
return -1;
}
}
tmp = tmp->next;
}
if(tmp){
tmp->data.key = key;
tmp->data.keyLen = len;
tmp->data.value = value;
tmp->data.used = true;
}else{
NodeList *newNode = taosMemoryCalloc(1, sizeof(NodeList));
if(newNode == NULL){
return -1;
}
newNode->data.key = key;
newNode->data.keyLen = len;
newNode->data.value = value;
newNode->data.used = true;
newNode->next = *list;
*list = newNode;
}
return 0;
}
static int nodeListSize(NodeList* list){
int cnt = 0;
while(list){
if(list->data.used) cnt++;
else break;
list = list->next;
}
return cnt;
}
/*********************** list end *********************************/
typedef struct {
const char *measure;
const char *tags;
const char *cols;
const char *timestamp;
char *measure;
char *tags;
char *cols;
char *timestamp;
int32_t measureLen;
int32_t measureTagsLen;
int32_t tagsLen;
int32_t colsLen;
int32_t timestampLen;
SArray *colArray;
} SSmlLineInfo;
typedef struct {
......@@ -109,12 +207,13 @@ typedef struct {
int32_t sTableNameLen;
char childTableName[TSDB_TABLE_NAME_LEN];
uint64_t uid;
void *key; // for openTsdb telnet
SArray *tags;
// if info->formatData is true, elements are SArray<SSmlKv*>.
// if info->formatData is false, elements are SHashObj<cols key string, SSmlKv*> for find by key quickly
// elements are SHashObj<cols key string, SSmlKv*> for find by key quickly
SArray *cols;
STableDataCxt *tableDataCtx;
} SSmlTableInfo;
typedef struct {
......@@ -149,26 +248,18 @@ typedef struct {
int64_t endTime;
} SSmlCostInfo;
typedef struct {
SRequestObj *request;
tsem_t sem;
int32_t cnt;
int32_t total;
TdThreadSpinlock lock;
} Params;
typedef struct {
int64_t id;
Params *params;
SMLProtocolType protocol;
int8_t precision;
bool reRun;
bool dataFormat; // true means that the name and order of keys in each line are the same(only for influx protocol)
bool isRawLine;
int32_t ttl;
SHashObj *childTables;
SHashObj *superTables;
NodeList *childTables;
NodeList *superTables;
SHashObj *pVgHash;
STscObj *taos;
......@@ -177,25 +268,33 @@ typedef struct {
SQuery *pQuery;
SSmlCostInfo cost;
int32_t affectedRows;
int32_t lineNum;
SSmlMsgBuf msgBuf;
SHashObj *dumplicateKey; // for dumplicate key
SArray *colsContainer; // for cols parse, if dataFormat == false
cJSON *root; // for parse json
SSmlLineInfo *lines; // element is SSmlLineInfo
//
SArray *preLineTagKV;
SArray *preLineColKV;
SSmlLineInfo preLine;
STableMeta *currSTableMeta;
STableDataCxt *currTableDataCtx;
bool needModifySchema;
} SSmlHandle;
//=================================================================================================
//=================================================================================================
static volatile int64_t linesSmlHandleId = 0;
static int64_t smlGenId() {
int64_t id;
int64_t id;
do {
id = atomic_add_fetch_64(&linesSmlHandleId, 1);
do {
id = atomic_add_fetch_64(&linesSmlHandleId, 1);
} while (id == 0);
return id;
return id;
}
static inline bool smlDoubleToInt64OverFlow(double num) {
......@@ -279,7 +378,7 @@ static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SH
int32_t code = TSDB_CODE_SUCCESS;
for (int j = 0; j < taosArrayGetSize(cols); ++j) {
if (j == 0 && !isTag) continue;
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, j);
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j);
code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, info);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -301,7 +400,7 @@ static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool
i = 1;
}
for (; i < taosArrayGetSize(cols); i++) {
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) {
taosHashCleanup(hashTmp);
return -1;
......@@ -322,7 +421,7 @@ static int32_t getBytes(uint8_t type, int32_t length) {
static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
SArray *results, int32_t numOfCols, bool isTag) {
for (int j = 0; j < taosArrayGetSize(cols); ++j) {
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, j);
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j);
ESchemaAction action = SCHEMA_ACTION_NULL;
smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, &action, info);
if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_ADD_TAG) {
......@@ -423,13 +522,16 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
code = pRequest->code;
taosMemoryFree(pCmdMsg.pMsg);
end:
end:
destroyRequest(pRequest);
tFreeSMCreateStbReq(&pReq);
return code;
}
static int32_t smlModifyDBSchemas(SSmlHandle *info) {
if(info->dataFormat && !info->needModifySchema){
return TSDB_CODE_SUCCESS;
}
int32_t code = 0;
SHashObj *hashTmp = NULL;
STableMeta *pTableMeta = NULL;
......@@ -443,13 +545,13 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
conn.requestObjRefId = info->pRequest->self;
conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
SSmlSTableMeta **tableMetaSml = (SSmlSTableMeta **)taosHashIterate(info->superTables, NULL);
while (tableMetaSml) {
SSmlSTableMeta *sTableData = *tableMetaSml;
NodeList *tmp = info->superTables;
while (tmp) {
SSmlSTableMeta *sTableData = tmp->data.value;
bool needCheckMeta = false; // for multi thread
size_t superTableLen = 0;
void *superTable = taosHashGetKey(tableMetaSml, &superTableLen);
size_t superTableLen = (size_t)tmp->data.keyLen;
const void *superTable = tmp->data.key;
memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
memcpy(pName.tname, superTable, superTableLen);
......@@ -598,17 +700,18 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
sTableData->tableMeta = pTableMeta;
tableMetaSml = (SSmlSTableMeta **)taosHashIterate(info->superTables, tableMetaSml);
tmp = tmp->next;
}
return 0;
end:
end:
taosHashCleanup(hashTmp);
taosMemoryFreeClear(pTableMeta);
// catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
return code;
}
/******************************* parse basic type function **********************/
static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg) {
const char *pVal = kvVal->value;
int32_t len = kvVal->length;
......@@ -747,50 +850,38 @@ static bool smlIsNchar(const char *pVal, uint16_t len) {
if (len < 3) {
return false;
}
if ((pVal[0] == 'l' || pVal[0] == 'L') && pVal[1] == '"' && pVal[len - 1] == '"') {
if (pVal[1] == '"' && pVal[len - 1] == '"' && (pVal[0] == 'l' || pVal[0] == 'L')) {
return true;
}
return false;
}
/******************************* parse basic type function end **********************/
/******************************* time function **********************/
static uint8_t smlPrecisionConvert[7] = {TSDB_TIME_PRECISION_NANO, TSDB_TIME_PRECISION_HOURS, TSDB_TIME_PRECISION_MINUTES,
TSDB_TIME_PRECISION_SECONDS, TSDB_TIME_PRECISION_MILLI, TSDB_TIME_PRECISION_MICRO,
TSDB_TIME_PRECISION_NANO};
static int64_t smlFactorNS[3] = {NANOSECOND_PER_MSEC, NANOSECOND_PER_USEC, 1};
static int64_t smlFactorS[3] = {1000LL, 1000000LL, 1000000000LL};
static int64_t smlToMilli[3] = {3600000LL, 60000LL, 1000LL};
static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) {
static int64_t smlGetTimeValue(const char *value, int32_t len, uint8_t fromPrecision, uint8_t toPrecision) {
char *endPtr = NULL;
int64_t tsInt64 = taosStr2Int64(value, &endPtr, 10);
if (value + len != endPtr) {
if (unlikely(value + len != endPtr)) {
return -1;
}
double ts = tsInt64;
switch (type) {
case TSDB_TIME_PRECISION_HOURS:
ts *= NANOSECOND_PER_HOUR;
tsInt64 *= NANOSECOND_PER_HOUR;
break;
case TSDB_TIME_PRECISION_MINUTES:
ts *= NANOSECOND_PER_MINUTE;
tsInt64 *= NANOSECOND_PER_MINUTE;
break;
case TSDB_TIME_PRECISION_SECONDS:
ts *= NANOSECOND_PER_SEC;
tsInt64 *= NANOSECOND_PER_SEC;
break;
case TSDB_TIME_PRECISION_MILLI:
ts *= NANOSECOND_PER_MSEC;
tsInt64 *= NANOSECOND_PER_MSEC;
break;
case TSDB_TIME_PRECISION_MICRO:
ts *= NANOSECOND_PER_USEC;
tsInt64 *= NANOSECOND_PER_USEC;
break;
case TSDB_TIME_PRECISION_NANO:
break;
default:
ASSERT(0);
}
if (ts >= (double)INT64_MAX || ts < 0) {
return -1;
if(unlikely(fromPrecision >= TSDB_TIME_PRECISION_HOURS)){
int64_t unit = smlToMilli[fromPrecision - TSDB_TIME_PRECISION_HOURS];
if(unit > INT64_MAX / tsInt64){
return -1;
}
tsInt64 *= unit;
fromPrecision = TSDB_TIME_PRECISION_MILLI;
}
return tsInt64;
return convertTimePrecision(tsInt64, fromPrecision, toPrecision);
}
static int8_t smlGetTsTypeByLen(int32_t len) {
......@@ -803,39 +894,17 @@ static int8_t smlGetTsTypeByLen(int32_t len) {
}
}
static int8_t smlGetTsTypeByPrecision(int8_t precision) {
switch (precision) {
case TSDB_SML_TIMESTAMP_HOURS:
return TSDB_TIME_PRECISION_HOURS;
case TSDB_SML_TIMESTAMP_MILLI_SECONDS:
return TSDB_TIME_PRECISION_MILLI;
case TSDB_SML_TIMESTAMP_NANO_SECONDS:
case TSDB_SML_TIMESTAMP_NOT_CONFIGURED:
return TSDB_TIME_PRECISION_NANO;
case TSDB_SML_TIMESTAMP_MICRO_SECONDS:
return TSDB_TIME_PRECISION_MICRO;
case TSDB_SML_TIMESTAMP_SECONDS:
return TSDB_TIME_PRECISION_SECONDS;
case TSDB_SML_TIMESTAMP_MINUTES:
return TSDB_TIME_PRECISION_MINUTES;
default:
return -1;
}
}
static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t len) {
if (len == 0 || (len == 1 && data[0] == '0')) {
return taosGetTimestampNs();
}
uint8_t toPrecision = info->currSTableMeta ? info->currSTableMeta->tableInfo.precision : TSDB_TIME_PRECISION_NANO;
int8_t tsType = smlGetTsTypeByPrecision(info->precision);
if (tsType == -1) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp precision", NULL);
return -1;
if(unlikely(len == 0 || (len == 1 && data[0] == '0'))){
return taosGetTimestampNs()/smlFactorNS[toPrecision];
}
int64_t ts = smlGetTimeValue(data, len, tsType);
if (ts == -1) {
uint8_t fromPrecision = smlPrecisionConvert[info->precision];
int64_t ts = smlGetTimeValue(data, len, fromPrecision, toPrecision);
if (unlikely(ts == -1)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
return -1;
}
......@@ -843,28 +912,30 @@ static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t le
}
static int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t len) {
if (!data) {
uint8_t toPrecision = info->currSTableMeta ? info->currSTableMeta->tableInfo.precision : TSDB_TIME_PRECISION_NANO;
if (unlikely(!data)) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp can not be null", NULL);
return -1;
}
if (len == 1 && data[0] == '0') {
return taosGetTimestampNs();
if (unlikely(len == 1 && data[0] == '0')) {
return taosGetTimestampNs()/smlFactorNS[toPrecision];
}
int8_t tsType = smlGetTsTypeByLen(len);
if (tsType == -1) {
uint8_t fromPrecision = smlGetTsTypeByLen(len);
if (unlikely(fromPrecision == -1)) {
smlBuildInvalidDataMsg(&info->msgBuf,
"timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", data);
return -1;
}
int64_t ts = smlGetTimeValue(data, len, tsType);
if (ts == -1) {
int64_t ts = smlGetTimeValue(data, len, fromPrecision, toPrecision);
if (unlikely(ts == -1)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
return -1;
}
return ts;
}
static int32_t smlParseTS(SSmlHandle *info, const char *data, int32_t len, SArray *cols) {
static int64_t smlParseTS(SSmlHandle *info, const char *data, int32_t len) {
int64_t ts = 0;
if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
// uError("SML:data:%s,len:%d", data, len);
......@@ -876,27 +947,175 @@ static int32_t smlParseTS(SSmlHandle *info, const char *data, int32_t len, SArra
}
uDebug("SML:0x%" PRIx64 " smlParseTS:%" PRId64, info->id, ts);
if (ts <= 0) {
uError("SML:0x%" PRIx64 " smlParseTS error:%" PRId64, info->id, ts);
return TSDB_CODE_INVALID_TIMESTAMP;
return ts;
}
/******************************* time function end **********************/
/******************************* Sml struct related function **********************/
static SSmlTableInfo *smlBuildTableInfo(int numRows, const char* measure, int32_t measureLen) {
SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
if (!tag) {
return NULL;
}
// add ts to
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if (!kv) {
return TSDB_CODE_OUT_OF_MEMORY;
tag->sTableName = measure;
tag->sTableNameLen = measureLen;
tag->cols = taosArrayInit(numRows, POINTER_BYTES);
if (tag->cols == NULL) {
uError("SML:smlBuildTableInfo failed to allocate memory");
goto cleanup;
}
tag->tags = taosArrayInit(16, sizeof(SSmlKv));
if (tag->tags == NULL) {
uError("SML:smlBuildTableInfo failed to allocate memory");
goto cleanup;
}
return tag;
cleanup:
taosMemoryFree(tag);
return NULL;
}
static int32_t smlCheckDupUnit(SHashObj *dumplicateKey, SArray *tags, SSmlMsgBuf *msg){
for(int i = 0; i < taosArrayGetSize(tags); i++) {
SSmlKv *tag = taosArrayGet(tags, i);
if (smlCheckDuplicateKey(tag->key, tag->keyLen, dumplicateKey)) {
smlBuildInvalidDataMsg(msg, "dumplicate key", tag->key);
return TSDB_CODE_TSC_DUP_NAMES;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t smlJudgeDupColName(SArray *cols, SArray *tags, SSmlMsgBuf *msg) {
SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
int ret = smlCheckDupUnit(dumplicateKey, cols, msg);
if(ret != TSDB_CODE_SUCCESS){
goto end;
}
ret = smlCheckDupUnit(dumplicateKey, tags, msg);
if(ret != TSDB_CODE_SUCCESS){
goto end;
}
kv->key = TS;
kv->keyLen = TS_LEN;
kv->i = ts;
kv->type = TSDB_DATA_TYPE_TIMESTAMP;
kv->length = (int16_t)tDataTypes[kv->type].bytes;
taosArrayPush(cols, &kv);
end:
taosHashCleanup(dumplicateKey);
return ret;
}
static int32_t smlParseTableName(SArray *tags, char *childTableName) {
size_t childTableNameLen = strlen(tsSmlChildTableName);
if (childTableNameLen <= 0) return TSDB_CODE_SUCCESS;
for(int i = 0; i < taosArrayGetSize(tags); i++){
SSmlKv *tag = taosArrayGet(tags, i);
// handle child table name
if (childTableNameLen == tag->keyLen && strncmp(tag->key, tsSmlChildTableName, tag->keyLen) == 0) {
memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
strncpy(childTableName, tag->value, (tag->length < TSDB_TABLE_NAME_LEN ? tag->length : TSDB_TABLE_NAME_LEN));
break;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t smlSetCTableName(SSmlTableInfo *oneTable){
smlParseTableName(oneTable->tags, oneTable->childTableName);
if (strlen(oneTable->childTableName) == 0) {
SArray* dst = taosArrayDup(oneTable->tags, NULL);
RandTableName rName = {dst, oneTable->sTableName, (uint8_t)oneTable->sTableNameLen,
oneTable->childTableName, 0};
buildChildTableName(&rName);
taosArrayDestroy(dst);
oneTable->uid = rName.uid;
} else {
oneTable->uid = *(uint64_t *)(oneTable->childTableName);
}
return TSDB_CODE_SUCCESS;
}
static SSmlSTableMeta *smlBuildSTableMeta(bool isDataFormat) {
SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
if (!meta) {
return NULL;
}
if(unlikely(!isDataFormat)){
meta->tagHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (meta->tagHash == NULL) {
uError("SML:smlBuildSTableMeta failed to allocate memory");
goto cleanup;
}
meta->colHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (meta->colHash == NULL) {
uError("SML:smlBuildSTableMeta failed to allocate memory");
goto cleanup;
}
}
meta->tags = taosArrayInit(32, sizeof(SSmlKv));
if (meta->tags == NULL) {
uError("SML:smlBuildSTableMeta failed to allocate memory");
goto cleanup;
}
meta->cols = taosArrayInit(32, sizeof(SSmlKv));
if (meta->cols == NULL) {
uError("SML:smlBuildSTableMeta failed to allocate memory");
goto cleanup;
}
return meta;
cleanup:
taosMemoryFree(meta);
return NULL;
}
static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols){
for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
taosArrayPush(metaArray, kv);
if(unlikely(metaHash != NULL)) {
taosHashPut(metaHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
}
}
}
static STableMeta* smlGetMeta(SSmlHandle *info, const void* measure, int32_t measureLen){
STableMeta *pTableMeta = NULL;
SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
SRequestConnInfo conn = {0};
conn.pTrans = info->taos->pAppInfo->pTransporter;
conn.requestId = info->pRequest->requestId;
conn.requestObjRefId = info->pRequest->self;
conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
memcpy(pName.tname, measure, measureLen);
catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
return pTableMeta;
}
static void smlDestroySTableMeta(SSmlSTableMeta *meta) {
taosHashCleanup(meta->tagHash);
taosHashCleanup(meta->colHash);
taosArrayDestroy(meta->tags);
taosArrayDestroy(meta->cols);
taosMemoryFree(meta->tableMeta);
taosMemoryFree(meta);
}
/******************************* Sml struct related function end **********************/
static int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
// binary
if (smlIsBinary(pVal->value, pVal->length)) {
......@@ -934,257 +1153,609 @@ static int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
return TSDB_CODE_TSC_INVALID_VALUE;
}
static int32_t smlParseInfluxString(const char *sql, const char *sqlEnd, SSmlLineInfo *elements, SSmlMsgBuf *msg) {
if (!sql) return TSDB_CODE_SML_INVALID_DATA;
JUMP_SPACE(sql, sqlEnd)
if (*sql == COMMA) return TSDB_CODE_SML_INVALID_DATA;
elements->measure = sql;
int32_t is_same_child_table_json(const void *a, const void *b){
return (cJSON_Compare((const cJSON *)a, (const cJSON *)b, true)) ? 0 : 1;
}
// parse measure
while (sql < sqlEnd) {
if ((sql != elements->measure) && IS_SLASH_LETTER(sql)) {
MOVE_FORWARD_ONE(sql, sqlEnd - sql);
sqlEnd--;
continue;
}
if (IS_COMMA(sql)) {
break;
}
int32_t is_same_child_table_telnet(const void *a, const void *b){
SSmlLineInfo *t1 = (SSmlLineInfo *)a;
SSmlLineInfo *t2 = (SSmlLineInfo *)b;
return (((t1->measureLen == t2->measureLen) && memcmp(t1->measure, t2->measure, t1->measureLen) == 0)
&& ((t1->tagsLen == t2->tagsLen) && memcmp(t1->tags, t2->tags, t1->tagsLen) == 0)) ? 0 : 1;
}
if (IS_SPACE(sql)) {
break;
}
sql++;
}
elements->measureLen = sql - elements->measure;
if (IS_INVALID_TABLE_LEN(elements->measureLen)) {
smlBuildInvalidDataMsg(msg, "measure is empty or too large than 192", NULL);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
#define IS_SAME_CHILD_TABLE (elements->measureTagsLen == info->preLine.measureTagsLen \
&& memcmp(elements->measure, info->preLine.measure, elements->measureTagsLen) == 0)
// parse tag
if (*sql == SPACE) {
elements->tagsLen = 0;
} else {
if (*sql == COMMA) sql++;
elements->tags = sql;
while (sql < sqlEnd) {
if (IS_SPACE(sql)) {
break;
}
sql++;
}
elements->tagsLen = sql - elements->tags;
}
elements->measureTagsLen = sql - elements->measure;
#define IS_SAME_SUPER_TABLE (elements->measureLen == info->preLine.measureLen \
&& memcmp(elements->measure, info->preLine.measure, elements->measureLen) == 0)
// parse cols
JUMP_SPACE(sql, sqlEnd)
elements->cols = sql;
bool isInQuote = false;
while (sql < sqlEnd) {
if (IS_QUOTE(sql)) {
isInQuote = !isInQuote;
}
if (!isInQuote && IS_SPACE(sql)) {
break;
}
sql++;
}
if (isInQuote) {
smlBuildInvalidDataMsg(msg, "only one quote", elements->cols);
return TSDB_CODE_SML_INVALID_DATA;
}
elements->colsLen = sql - elements->cols;
if (elements->colsLen == 0) {
smlBuildInvalidDataMsg(msg, "cols is empty", NULL);
return TSDB_CODE_SML_INVALID_DATA;
#define IS_SAME_KEY (preKV->keyLen == kv.keyLen && memcmp(preKV->key, kv.key, kv.keyLen) == 0)
static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd,
SSmlLineInfo* currElement, bool isSameMeasure, bool isSameCTable){
if(isSameCTable){
return TSDB_CODE_SUCCESS;
}
// parse timestamp
JUMP_SPACE(sql, sqlEnd)
elements->timestamp = sql;
while (sql < sqlEnd) {
if (isspace(*sql)) {
break;
int cnt = 0;
SArray *preLineKV = info->preLineTagKV;
bool isSuperKVInit = true;
SArray *superKV = NULL;
if(info->dataFormat){
if(!isSameMeasure){
SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen, NULL);
if(unlikely(sMeta == NULL)){
sMeta = smlBuildSTableMeta(info->dataFormat);
STableMeta * pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen);
sMeta->tableMeta = pTableMeta;
if(pTableMeta == NULL){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
nodeListSet(&info->superTables, currElement->measure, currElement->measureLen, sMeta, NULL);
}
info->currSTableMeta = sMeta->tableMeta;
superKV = sMeta->tags;
if(unlikely(taosArrayGetSize(superKV) == 0)){
isSuperKVInit = false;
}
taosArraySetSize(preLineKV, 0);
}
sql++;
}else{
taosArraySetSize(preLineKV, 0);
}
elements->timestampLen = sql - elements->timestamp;
return TSDB_CODE_SUCCESS;
}
static void smlParseTelnetElement(const char **sql, const char *sqlEnd, const char **data, int32_t *len) {
while (*sql < sqlEnd) {
if (**sql != SPACE && !(*data)) {
*data = *sql;
} else if (**sql == SPACE && *data) {
*len = *sql - *data;
if (unlikely(IS_SPACE(*sql))) {
break;
}
(*sql)++;
}
}
static int32_t smlParseTelnetTags(const char *data, const char *sqlEnd, SArray *cols, char *childTableName,
SHashObj *dumplicateKey, SSmlMsgBuf *msg) {
if (!cols) return TSDB_CODE_OUT_OF_MEMORY;
const char *sql = data;
size_t childTableNameLen = strlen(tsSmlChildTableName);
while (sql < sqlEnd) {
JUMP_SPACE(sql, sqlEnd)
if (*sql == '\0') break;
const char *key = sql;
int32_t keyLen = 0;
bool hasSlash = false;
// parse key
while (sql < sqlEnd) {
if (*sql == SPACE) {
smlBuildInvalidDataMsg(msg, "invalid data", sql);
const char *key = *sql;
int32_t keyLen = 0;
while (*sql < sqlEnd) {
if (unlikely(IS_COMMA(*sql))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
return TSDB_CODE_SML_INVALID_DATA;
}
if (*sql == EQUAL) {
keyLen = sql - key;
sql++;
if (unlikely(IS_EQUAL(*sql))) {
keyLen = *sql - key;
(*sql)++;
break;
}
sql++;
if(!hasSlash){
hasSlash = (*(*sql) == SLASH);
}
(*sql)++;
}
if(unlikely(hasSlash)) {
PROCESS_SLASH(key, keyLen)
}
if (IS_INVALID_COL_LEN(keyLen)) {
smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid key or key is too long than 64", key);
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
}
if (smlCheckDuplicateKey(key, keyLen, dumplicateKey)) {
smlBuildInvalidDataMsg(msg, "dumplicate key", key);
return TSDB_CODE_TSC_DUP_NAMES;
}
// parse value
const char *value = sql;
const char *value = *sql;
int32_t valueLen = 0;
while (sql < sqlEnd) {
hasSlash = false;
while (*sql < sqlEnd) {
// parse value
if (*sql == SPACE) {
if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) {
break;
}else if (unlikely(IS_EQUAL(*sql))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
return TSDB_CODE_SML_INVALID_DATA;
}
if (*sql == EQUAL) {
smlBuildInvalidDataMsg(msg, "invalid data", sql);
return TSDB_CODE_SML_INVALID_DATA;
if(!hasSlash){
hasSlash = (*(*sql) == SLASH);
}
sql++;
(*sql)++;
}
valueLen = sql - value;
valueLen = *sql - value;
if (valueLen == 0) {
smlBuildInvalidDataMsg(msg, "invalid value", value);
return TSDB_CODE_TSC_INVALID_VALUE;
if (unlikely(valueLen == 0)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", value);
return TSDB_CODE_SML_INVALID_DATA;
}
// handle child table name
if (childTableNameLen != 0 && strncmp(key, tsSmlChildTableName, keyLen) == 0) {
memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
strncpy(childTableName, value, (valueLen < TSDB_TABLE_NAME_LEN ? valueLen : TSDB_TABLE_NAME_LEN));
continue;
if(unlikely(hasSlash)) {
PROCESS_SLASH(value, valueLen)
}
if (valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
if (unlikely(valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)) {
return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
}
// add kv to SSmlKv
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
kv->key = key;
kv->keyLen = keyLen;
kv->value = value;
kv->length = valueLen;
kv->type = TSDB_DATA_TYPE_NCHAR;
SSmlKv kv = {.key = key, .type = TSDB_DATA_TYPE_NCHAR, .keyLen = keyLen, .value = value, .length = valueLen};
if(info->dataFormat){
if(unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
taosArrayPush(cols, &kv);
if(isSameMeasure){
if(unlikely(cnt >= taosArrayGetSize(preLineKV))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *preKV = taosArrayGet(preLineKV, cnt);
if(unlikely(kv.length > preKV->length)){
preKV->length = kv.length;
SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen, NULL);
ASSERT(tableMeta != NULL);
SSmlKv *oldKV = taosArrayGet(tableMeta->tags, cnt);
oldKV->length = kv.length;
info->needModifySchema = true;
}
if(unlikely(!IS_SAME_KEY)){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
}else{
if(isSuperKVInit){
if(unlikely(cnt >= taosArrayGetSize(superKV))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *preKV = taosArrayGet(superKV, cnt);
if(unlikely(kv.length > preKV->length)) {
preKV->length = kv.length;
}else{
kv.length = preKV->length;
}
info->needModifySchema = true;
if(unlikely(!IS_SAME_KEY)){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
}else{
taosArrayPush(superKV, &kv);
}
taosArrayPush(preLineKV, &kv);
}
}else{
taosArrayPush(preLineKV, &kv);
}
cnt++;
if(IS_SPACE(*sql)){
break;
}
(*sql)++;
}
void* oneTable = nodeListGet(info->childTables, currElement->measure, currElement->measureTagsLen, NULL);
if ((oneTable != NULL)) {
return TSDB_CODE_SUCCESS;
}
SSmlTableInfo *tinfo = smlBuildTableInfo(1, currElement->measure, currElement->measureLen);
if (!tinfo) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for(int i = 0; i < taosArrayGetSize(preLineKV); i++){
taosArrayPush(tinfo->tags, taosArrayGet(preLineKV, i));
}
smlSetCTableName(tinfo);
if(info->dataFormat) {
info->currSTableMeta->uid = tinfo->uid;
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
if(tinfo->tableDataCtx == NULL){
smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
return TSDB_CODE_SML_INVALID_DATA;
}
}
nodeListSet(&info->childTables, currElement->measure, currElement->measureTagsLen, tinfo, NULL);
return TSDB_CODE_SUCCESS;
}
// format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, const char *sqlEnd, SSmlTableInfo *tinfo,
SArray *cols) {
static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd,
SSmlLineInfo* currElement, bool isSameMeasure, bool isSameCTable){
int cnt = 0;
SArray *preLineKV = info->preLineColKV;
bool isSuperKVInit = true;
SArray *superKV = NULL;
if(info->dataFormat){
if(unlikely(!isSameCTable)){
SSmlTableInfo *oneTable = (SSmlTableInfo *)nodeListGet(info->childTables, currElement->measure, currElement->measureTagsLen, NULL);
if (unlikely(oneTable == NULL)) {
smlBuildInvalidDataMsg(&info->msgBuf, "child table should inside", currElement->measure);
return TSDB_CODE_SML_INVALID_DATA;
}
info->currTableDataCtx = oneTable->tableDataCtx;
}
if(unlikely(!isSameMeasure)){
SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen, NULL);
if(unlikely(sMeta == NULL)){
sMeta = smlBuildSTableMeta(info->dataFormat);
STableMeta * pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen);
sMeta->tableMeta = pTableMeta;
if(pTableMeta == NULL){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
nodeListSet(&info->superTables, currElement->measure, currElement->measureLen, sMeta, NULL);
}
info->currSTableMeta = sMeta->tableMeta;
superKV = sMeta->cols;
if(unlikely(taosArrayGetSize(superKV) == 0)){
isSuperKVInit = false;
}
taosArraySetSize(preLineKV, 0);
}
}
while (*sql < sqlEnd) {
if (unlikely(IS_SPACE(*sql))) {
break;
}
bool hasSlash = false;
// parse key
const char *key = *sql;
int32_t keyLen = 0;
while (*sql < sqlEnd) {
if (unlikely(IS_COMMA(*sql))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
return TSDB_CODE_SML_INVALID_DATA;
}
if (unlikely(IS_EQUAL(*sql))) {
keyLen = *sql - key;
(*sql)++;
break;
}
if(!hasSlash){
hasSlash = (*(*sql) == SLASH);
}
(*sql)++;
}
if(unlikely(hasSlash)) {
PROCESS_SLASH(key, keyLen)
}
if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid key or key is too long than 64", key);
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
}
// parse value
const char *value = *sql;
int32_t valueLen = 0;
hasSlash = false;
bool isInQuote = false;
while (*sql < sqlEnd) {
// parse value
if (IS_QUOTE(*sql)) {
isInQuote = !isInQuote;
(*sql)++;
continue;
}
if (!isInQuote){
if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) {
break;
} else if (unlikely(IS_EQUAL(*sql))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
return TSDB_CODE_SML_INVALID_DATA;
}
}
if(!hasSlash){
hasSlash = (*(*sql) == SLASH);
}
(*sql)++;
}
valueLen = *sql - value;
if (unlikely(isInQuote)) {
smlBuildInvalidDataMsg(&info->msgBuf, "only one quote", value);
return TSDB_CODE_SML_INVALID_DATA;
}
if (unlikely(valueLen == 0)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", value);
return TSDB_CODE_SML_INVALID_DATA;
}
if(unlikely(hasSlash)) {
PROCESS_SLASH(value, valueLen)
}
SSmlKv kv = {.key = key, .keyLen = keyLen, .value = value, .length = valueLen};
int32_t ret = smlParseValue(&kv, &info->msgBuf);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
if(info->dataFormat){
//cnt begin 0, add ts so + 2
if(unlikely(cnt + 2 > info->currSTableMeta->tableInfo.numOfColumns)){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
// bind data
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, cnt + 1);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
uError("smlBuildCol error, retry");
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
if(isSameMeasure){
if(cnt >= taosArrayGetSize(preLineKV)) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *preKV = taosArrayGet(preLineKV, cnt);
if(kv.type != preKV->type){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
if(unlikely(IS_VAR_DATA_TYPE(kv.type) && kv.length > preKV->length)){
preKV->length = kv.length;
SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, currElement->measure, currElement->measureLen, NULL);
ASSERT(tableMeta != NULL);
SSmlKv *oldKV = taosArrayGet(tableMeta->cols, cnt);
oldKV->length = kv.length;
info->needModifySchema = true;
}
if(unlikely(!IS_SAME_KEY)){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
}else{
if(isSuperKVInit){
if(unlikely(cnt >= taosArrayGetSize(superKV))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *preKV = taosArrayGet(superKV, cnt);
if(unlikely(kv.type != preKV->type)){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
if(IS_VAR_DATA_TYPE(kv.type)){
if(kv.length > preKV->length) {
preKV->length = kv.length;
}else{
kv.length = preKV->length;
}
info->needModifySchema = true;
}
if(unlikely(!IS_SAME_KEY)){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
}else{
taosArrayPush(superKV, &kv);
}
taosArrayPush(preLineKV, &kv);
}
}else{
if(currElement->colArray == NULL){
currElement->colArray = taosArrayInit(16, sizeof(SSmlKv));
taosArraySetSize(currElement->colArray, 1);
}
taosArrayPush(currElement->colArray, &kv); //reserve for timestamp
}
cnt++;
if(IS_SPACE(*sql)){
break;
}
(*sql)++;
}
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements) {
if (!sql) return TSDB_CODE_SML_INVALID_DATA;
JUMP_SPACE(sql, sqlEnd)
if (unlikely(*sql == COMMA)) return TSDB_CODE_SML_INVALID_DATA;
elements->measure = sql;
// parse metric
smlParseTelnetElement(&sql, sqlEnd, &tinfo->sTableName, &tinfo->sTableNameLen);
if (!(tinfo->sTableName) || IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
// parse measure
while (sql < sqlEnd) {
if ((sql != elements->measure) && IS_SLASH_LETTER(sql)) {
MOVE_FORWARD_ONE(sql, sqlEnd - sql);
sqlEnd--;
continue;
}
if (IS_COMMA(sql)) {
break;
}
if (IS_SPACE(sql)) {
break;
}
sql++;
}
elements->measureLen = sql - elements->measure;
if (unlikely(IS_INVALID_TABLE_LEN(elements->measureLen))) {
smlBuildInvalidDataMsg(&info->msgBuf, "measure is empty or too large than 192", NULL);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
// parse timestamp
const char *timestamp = NULL;
int32_t tLen = 0;
smlParseTelnetElement(&sql, sqlEnd, &timestamp, &tLen);
if (!timestamp || tLen == 0) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
return TSDB_CODE_SML_INVALID_DATA;
// to get measureTagsLen before
const char* tmp = sql;
while (tmp < sqlEnd){
if (IS_SPACE(tmp)) {
break;
}
tmp++;
}
elements->measureTagsLen = tmp - elements->measure;
int32_t ret = smlParseTS(info, timestamp, tLen, cols);
if (ret != TSDB_CODE_SUCCESS) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
return ret;
bool isSameCTable = false;
bool isSameMeasure = false;
if(IS_SAME_CHILD_TABLE){
isSameCTable = true;
isSameMeasure = true;
}else if(info->dataFormat) {
isSameMeasure = IS_SAME_SUPER_TABLE;
}
// parse tag
if (*sql == COMMA) sql++;
elements->tags = sql;
// parse value
const char *value = NULL;
int32_t valueLen = 0;
smlParseTelnetElement(&sql, sqlEnd, &value, &valueLen);
if (!value || valueLen == 0) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql);
return TSDB_CODE_TSC_INVALID_VALUE;
// tinfo != NULL means child table has never occur before
int ret = smlParseTagKv(info, &sql, sqlEnd, elements, isSameMeasure, isSameCTable);
if(unlikely(ret != TSDB_CODE_SUCCESS)){
return ret;
}
if(unlikely(info->reRun)){
return TSDB_CODE_SUCCESS;
}
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
taosArrayPush(cols, &kv);
kv->key = VALUE;
kv->keyLen = VALUE_LEN;
kv->value = value;
kv->length = valueLen;
if ((ret = smlParseValue(kv, &info->msgBuf)) != TSDB_CODE_SUCCESS) {
sql = elements->measure + elements->measureTagsLen;
elements->tagsLen = sql - elements->tags;
// parse cols
JUMP_SPACE(sql, sqlEnd)
elements->cols = sql;
ret = smlParseColKv(info, &sql, sqlEnd, elements, isSameMeasure, isSameCTable);
if(unlikely(ret != TSDB_CODE_SUCCESS)){
return ret;
}
// parse tags
ret = smlParseTelnetTags(sql, sqlEnd, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
if (ret != TSDB_CODE_SUCCESS) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
return ret;
if(unlikely(info->reRun)){
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_SUCCESS;
elements->colsLen = sql - elements->cols;
if (unlikely(elements->colsLen == 0)) {
smlBuildInvalidDataMsg(&info->msgBuf, "cols is empty", NULL);
return TSDB_CODE_SML_INVALID_DATA;
}
// parse timestamp
JUMP_SPACE(sql, sqlEnd)
elements->timestamp = sql;
while (sql < sqlEnd) {
if (isspace(*sql)) {
break;
}
sql++;
}
elements->timestampLen = sql - elements->timestamp;
int64_t ts = smlParseTS(info, elements->timestamp, elements->timestampLen);
if (ts <= 0) {
uError("SML:0x%" PRIx64 " smlParseTS error:%" PRId64, info->id, ts);
return TSDB_CODE_INVALID_TIMESTAMP;
}
// add ts to
SSmlKv kv = { .key = TS, .keyLen = TS_LEN, .i = ts, .type = TSDB_DATA_TYPE_TIMESTAMP, .length = (int16_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
if(info->dataFormat){
smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0);
smlBuildRow(info->currTableDataCtx);
}else{
taosArraySet(elements->colArray, 0, &kv);
}
info->preLine = *elements;
return ret;
}
static void smlParseTelnetElement(char **sql, char *sqlEnd, char **data, int32_t *len) {
while (*sql < sqlEnd) {
if (unlikely((**sql != SPACE && !(*data)))) {
*data = *sql;
} else if (unlikely(**sql == SPACE && *data)) {
*len = *sql - *data;
break;
}
(*sql)++;
}
}
static int32_t smlParseCols(const char *data, int32_t len, SArray *cols, char *childTableName, bool isTag,
SHashObj *dumplicateKey, SSmlMsgBuf *msg) {
if (len == 0) {
static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SSmlLineInfo *elements, SSmlMsgBuf *msg) {
if(is_same_child_table_telnet(elements, &info->preLine) == 0){
return TSDB_CODE_SUCCESS;
}
size_t childTableNameLen = strlen(tsSmlChildTableName);
bool isSameMeasure = IS_SAME_SUPER_TABLE;
int cnt = 0;
SArray *preLineKV = info->preLineTagKV;
bool isSuperKVInit = true;
SArray *superKV = NULL;
if(info->dataFormat){
if(!isSameMeasure){
SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
if(unlikely(sMeta == NULL)){
sMeta = smlBuildSTableMeta(info->dataFormat);
STableMeta * pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen);
sMeta->tableMeta = pTableMeta;
if(pTableMeta == NULL){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
nodeListSet(&info->superTables, elements->measure, elements->measureLen, sMeta, NULL);
}
info->currSTableMeta = sMeta->tableMeta;
superKV = sMeta->tags;
if(unlikely(taosArrayGetSize(superKV) == 0)){
isSuperKVInit = false;
}
taosArraySetSize(preLineKV, 0);
}
}else{
taosArraySetSize(preLineKV, 0);
}
const char *sql = data;
while (sql < data + len) {
while (sql < sqlEnd) {
JUMP_SPACE(sql, sqlEnd)
if (unlikely(*sql == '\0')) break;
const char *key = sql;
int32_t keyLen = 0;
while (sql < data + len) {
// parse key
if (IS_COMMA(sql)) {
// parse key
while (sql < sqlEnd) {
if (unlikely(*sql == SPACE)) {
smlBuildInvalidDataMsg(msg, "invalid data", sql);
return TSDB_CODE_SML_INVALID_DATA;
}
if (IS_EQUAL(sql)) {
if (unlikely(*sql == EQUAL)) {
keyLen = sql - key;
sql++;
break;
......@@ -1192,103 +1763,238 @@ static int32_t smlParseCols(const char *data, int32_t len, SArray *cols, char *c
sql++;
}
if (IS_INVALID_COL_LEN(keyLen)) {
if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
}
if (smlCheckDuplicateKey(key, keyLen, dumplicateKey)) {
smlBuildInvalidDataMsg(msg, "dumplicate key", key);
return TSDB_CODE_TSC_DUP_NAMES;
}
// if (smlCheckDuplicateKey(key, keyLen, dumplicateKey)) {
// smlBuildInvalidDataMsg(msg, "dumplicate key", key);
// return TSDB_CODE_TSC_DUP_NAMES;
// }
// parse value
const char *value = sql;
int32_t valueLen = 0;
bool isInQuote = false;
while (sql < data + len) {
while (sql < sqlEnd) {
// parse value
if (!isTag && IS_QUOTE(sql)) {
isInQuote = !isInQuote;
sql++;
continue;
}
if (!isInQuote && IS_COMMA(sql)) {
if (unlikely(*sql == SPACE)) {
break;
}
if (!isInQuote && IS_EQUAL(sql)) {
if (unlikely(*sql == EQUAL)) {
smlBuildInvalidDataMsg(msg, "invalid data", sql);
return TSDB_CODE_SML_INVALID_DATA;
}
sql++;
}
valueLen = sql - value;
sql++;
if (isInQuote) {
smlBuildInvalidDataMsg(msg, "only one quote", value);
return TSDB_CODE_SML_INVALID_DATA;
}
if (valueLen == 0) {
if (unlikely(valueLen == 0)) {
smlBuildInvalidDataMsg(msg, "invalid value", value);
return TSDB_CODE_SML_INVALID_DATA;
return TSDB_CODE_TSC_INVALID_VALUE;
}
PROCESS_SLASH(key, keyLen)
PROCESS_SLASH(value, valueLen)
// handle child table name
if (childTableName && childTableNameLen != 0 && strncmp(key, tsSmlChildTableName, keyLen) == 0) {
memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
strncpy(childTableName, value, (valueLen < TSDB_TABLE_NAME_LEN ? valueLen : TSDB_TABLE_NAME_LEN));
continue;
if (unlikely(valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)) {
return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
}
// add kv to SSmlKv
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
if (cols) taosArrayPush(cols, &kv);
kv->key = key;
kv->keyLen = keyLen;
kv->value = value;
kv->length = valueLen;
if (isTag) {
if (valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
SSmlKv kv = {.key = key, .keyLen = keyLen, .value = value, .length = valueLen, .type = TSDB_DATA_TYPE_NCHAR};
if(info->dataFormat){
if(unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
kv->type = TSDB_DATA_TYPE_NCHAR;
} else {
int32_t ret = smlParseValue(kv, msg);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
if(isSameMeasure){
if(unlikely(cnt >= taosArrayGetSize(preLineKV))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *preKV = taosArrayGet(preLineKV, cnt);
if(unlikely(kv.length > preKV->length)){
preKV->length = kv.length;
SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
ASSERT(tableMeta != NULL);
SSmlKv *oldKV = taosArrayGet(tableMeta->tags, cnt);
oldKV->length = kv.length;
info->needModifySchema = true;
}
if(unlikely(!IS_SAME_KEY)){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
}else{
if(isSuperKVInit){
if(unlikely(cnt >= taosArrayGetSize(superKV))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *preKV = taosArrayGet(superKV, cnt);
if(unlikely(kv.length > preKV->length)) {
preKV->length = kv.length;
}else{
kv.length = preKV->length;
}
info->needModifySchema = true;
if(unlikely(!IS_SAME_KEY)){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
}else{
taosArrayPush(superKV, &kv);
}
taosArrayPush(preLineKV, &kv);
}
}else{
taosArrayPush(preLineKV, &kv);
}
cnt++;
}
SSmlTableInfo *tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements, POINTER_BYTES, is_same_child_table_telnet);
if (unlikely(tinfo == NULL)) {
tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen);
if (!tinfo) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for(int i = 0; i < taosArrayGetSize(preLineKV); i++){
taosArrayPush(tinfo->tags, taosArrayGet(preLineKV, i));
}
smlSetCTableName(tinfo);
if (info->dataFormat) {
info->currSTableMeta->uid = tinfo->uid;
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
if (tinfo->tableDataCtx == NULL) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
return TSDB_CODE_SML_INVALID_DATA;
}
}
SSmlLineInfo *key = taosMemoryMalloc(sizeof(SSmlLineInfo));
*key = *elements;
tinfo->key = key;
nodeListSet(&info->childTables, key, POINTER_BYTES, tinfo, is_same_child_table_telnet);
}
info->currTableDataCtx = tinfo->tableDataCtx;
return TSDB_CODE_SUCCESS;
}
// format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
static int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements) {
if (!sql) return TSDB_CODE_SML_INVALID_DATA;
// parse metric
smlParseTelnetElement(&sql, sqlEnd, &elements->measure, &elements->measureLen);
if (unlikely((!(elements->measure) || IS_INVALID_TABLE_LEN(elements->measureLen)))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
// parse timestamp
smlParseTelnetElement(&sql, sqlEnd, &elements->timestamp, &elements->timestampLen);
if (unlikely(!elements->timestamp || elements->timestampLen == 0)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
return TSDB_CODE_SML_INVALID_DATA;
}
bool needConverTime = false; // get TS before parse tag(get meta), so need conver time
if(info->dataFormat && info->currSTableMeta == NULL){
needConverTime = true;
}
int64_t ts = smlParseTS(info, elements->timestamp, elements->timestampLen);
if (unlikely(ts < 0)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
return TSDB_CODE_INVALID_TIMESTAMP;
}
SSmlKv kvTs = { .key = TS, .keyLen = TS_LEN, .i = ts, .type = TSDB_DATA_TYPE_TIMESTAMP, .length = (int16_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
// parse value
smlParseTelnetElement(&sql, sqlEnd, &elements->cols, &elements->colsLen);
if (unlikely(!elements->cols || elements->colsLen == 0)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql);
return TSDB_CODE_TSC_INVALID_VALUE;
}
SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN, .value = elements->cols, .length = elements->colsLen};
if (smlParseNumber(&kv, &info->msgBuf)) {
kv.length = (int16_t)tDataTypes[kv.type].bytes;
}else{
return TSDB_CODE_TSC_INVALID_VALUE;
}
JUMP_SPACE(sql, sqlEnd)
elements->tags = sql;
elements->tagsLen = sqlEnd - sql;
int ret = smlParseTelnetTags(info, sql, sqlEnd, elements, &info->msgBuf);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
return ret;
}
if(unlikely(info->reRun)){
return TSDB_CODE_SUCCESS;
}
if(info->dataFormat){
if(needConverTime) {
kvTs.i = convertTimePrecision(kvTs.i, TSDB_TIME_PRECISION_NANO, info->currSTableMeta->tableInfo.precision);
}
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kvTs, 0);
if(ret == TSDB_CODE_SUCCESS){
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 1);
}
if(ret == TSDB_CODE_SUCCESS){
ret = smlBuildRow(info->currTableDataCtx);
}
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
return ret;
}
}else{
if(elements->colArray == NULL){
elements->colArray = taosArrayInit(16, sizeof(SSmlKv));
}
taosArrayPush(elements->colArray, &kvTs);
taosArrayPush(elements->colArray, &kv);
}
info->preLine = *elements;
return TSDB_CODE_SUCCESS;
}
static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, SSmlMsgBuf *msg) {
static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, bool isTag, SSmlMsgBuf *msg) {
for (int i = 0; i < taosArrayGetSize(cols); ++i) {
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
int16_t *index = (int16_t *)taosHashGet(metaHash, kv->key, kv->keyLen);
if (index) {
SSmlKv **value = (SSmlKv **)taosArrayGet(metaArray, *index);
if (kv->type != (*value)->type) {
SSmlKv *value = (SSmlKv *)taosArrayGet(metaArray, *index);
if (isTag){
if (kv->length > value->length) {
value->length = kv->length;
}
continue;
}
if (kv->type != value->type) {
smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
return TSDB_CODE_SML_NOT_SAME_TYPE;
} else {
if (IS_VAR_DATA_TYPE(kv->type)) { // update string len, if bigger
if (kv->length > (*value)->length) {
*value = kv;
}
}
}
if (IS_VAR_DATA_TYPE(kv->type) && (kv->length > value->length)) { // update string len, if bigger
value->length = kv->length;
}
} else {
size_t tmp = taosArrayGetSize(metaArray);
ASSERT(tmp <= INT16_MAX);
int16_t size = tmp;
taosArrayPush(metaArray, &kv);
taosArrayPush(metaArray, kv);
taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
}
}
......@@ -1296,374 +2002,189 @@ static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols
return TSDB_CODE_SUCCESS;
}
static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols) {
for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
taosArrayPush(metaArray, &kv);
taosHashPut(metaHash, kv->key, kv->keyLen, &i, SHORT_BYTES);
}
}
static SSmlTableInfo *smlBuildTableInfo() {
SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
if (!tag) {
return NULL;
}
tag->cols = taosArrayInit(16, POINTER_BYTES);
if (tag->cols == NULL) {
uError("SML:smlBuildTableInfo failed to allocate memory");
goto cleanup;
}
tag->tags = taosArrayInit(16, POINTER_BYTES);
if (tag->tags == NULL) {
uError("SML:smlBuildTableInfo failed to allocate memory");
goto cleanup;
static void smlDestroyTableInfo(SSmlTableInfo *tag) {
for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i);
taosHashCleanup(kvHash);
}
return tag;
cleanup:
taosMemoryFree(tag);
return NULL;
}
static void smlDestroyTableInfo(SSmlHandle *info, SSmlTableInfo *tag) {
if (info->dataFormat) {
for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
SArray *kvArray = (SArray *)taosArrayGetP(tag->cols, i);
for (int j = 0; j < taosArrayGetSize(kvArray); ++j) {
SSmlKv *p = (SSmlKv *)taosArrayGetP(kvArray, j);
taosMemoryFree(p);
}
taosArrayDestroy(kvArray);
}
} else {
for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i);
void **p1 = (void **)taosHashIterate(kvHash, NULL);
while (p1) {
taosMemoryFree(*p1);
p1 = (void **)taosHashIterate(kvHash, p1);
}
taosHashCleanup(kvHash);
}
}
for (size_t i = 0; i < taosArrayGetSize(tag->tags); i++) {
SSmlKv *p = (SSmlKv *)taosArrayGetP(tag->tags, i);
taosMemoryFree(p);
}
taosMemoryFree(tag->key);
taosArrayDestroy(tag->cols);
taosArrayDestroy(tag->tags);
taosMemoryFree(tag);
}
static int32_t smlKvTimeArrayCompare(const void *key1, const void *key2) {
SArray *s1 = *(SArray **)key1;
SArray *s2 = *(SArray **)key2;
SSmlKv *kv1 = (SSmlKv *)taosArrayGetP(s1, 0);
SSmlKv *kv2 = (SSmlKv *)taosArrayGetP(s2, 0);
ASSERT(kv1->type == TSDB_DATA_TYPE_TIMESTAMP);
ASSERT(kv2->type == TSDB_DATA_TYPE_TIMESTAMP);
if (kv1->i < kv2->i) {
return -1;
} else if (kv1->i > kv2->i) {
return 1;
} else {
return 0;
}
}
static int32_t smlKvTimeHashCompare(const void *key1, const void *key2) {
SHashObj *s1 = *(SHashObj **)key1;
SHashObj *s2 = *(SHashObj **)key2;
SSmlKv **kv1pp = (SSmlKv **)taosHashGet(s1, TS, TS_LEN);
SSmlKv **kv2pp = (SSmlKv **)taosHashGet(s2, TS, TS_LEN);
if (!kv1pp || !kv2pp) {
uError("smlKvTimeHashCompare kv is null");
return -1;
}
SSmlKv *kv1 = *kv1pp;
SSmlKv *kv2 = *kv2pp;
if (!kv1 || kv1->type != TSDB_DATA_TYPE_TIMESTAMP) {
uError("smlKvTimeHashCompare kv1");
return -1;
}
if (!kv2 || kv2->type != TSDB_DATA_TYPE_TIMESTAMP) {
uError("smlKvTimeHashCompare kv2");
return -1;
}
if (kv1->i < kv2->i) {
return -1;
} else if (kv1->i > kv2->i) {
return 1;
} else {
return 0;
}
}
static int32_t smlDealCols(SSmlTableInfo *oneTable, bool dataFormat, SArray *cols) {
if (dataFormat) {
void *p = taosArraySearch(oneTable->cols, &cols, smlKvTimeArrayCompare, TD_GT);
if (p == NULL) {
taosArrayPush(oneTable->cols, &cols);
} else {
taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &cols);
}
return TSDB_CODE_SUCCESS;
}
static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (!kvHash) {
uError("SML:smlDealCols failed to allocate memory");
return TSDB_CODE_OUT_OF_MEMORY;
}
for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
}
void *p = taosArraySearch(oneTable->cols, &kvHash, smlKvTimeHashCompare, TD_GT);
if (p == NULL) {
taosArrayPush(oneTable->cols, &kvHash);
} else {
taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &kvHash);
}
taosArrayPush(colsArray, &kvHash);
return TSDB_CODE_SUCCESS;
}
static SSmlSTableMeta *smlBuildSTableMeta() {
SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
if (!meta) {
return NULL;
}
meta->tagHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (meta->tagHash == NULL) {
uError("SML:smlBuildSTableMeta failed to allocate memory");
goto cleanup;
}
meta->colHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (meta->colHash == NULL) {
uError("SML:smlBuildSTableMeta failed to allocate memory");
goto cleanup;
}
meta->tags = taosArrayInit(32, POINTER_BYTES);
if (meta->tags == NULL) {
uError("SML:smlBuildSTableMeta failed to allocate memory");
goto cleanup;
}
meta->cols = taosArrayInit(32, POINTER_BYTES);
if (meta->cols == NULL) {
uError("SML:smlBuildSTableMeta failed to allocate memory");
goto cleanup;
}
return meta;
cleanup:
taosMemoryFree(meta);
return NULL;
}
static void smlDestroySTableMeta(SSmlSTableMeta *meta) {
taosHashCleanup(meta->tagHash);
taosHashCleanup(meta->colHash);
taosArrayDestroy(meta->tags);
taosArrayDestroy(meta->cols);
taosMemoryFree(meta->tableMeta);
taosMemoryFree(meta);
}
static void smlDestroyCols(SArray *cols) {
if (!cols) return;
for (int i = 0; i < taosArrayGetSize(cols); ++i) {
void *kv = taosArrayGetP(cols, i);
taosMemoryFree(kv);
}
}
static void smlDestroyInfo(SSmlHandle *info) {
void smlDestroyInfo(SSmlHandle *info) {
if (!info) return;
qDestroyQuery(info->pQuery);
// destroy info->childTables
void **p1 = (void **)taosHashIterate(info->childTables, NULL);
while (p1) {
smlDestroyTableInfo(info, (SSmlTableInfo *)(*p1));
p1 = (void **)taosHashIterate(info->childTables, p1);
NodeList* tmp = info->childTables;
while (tmp) {
if(tmp->data.used) {
smlDestroyTableInfo(tmp->data.value);
}
NodeList* t = tmp->next;
taosMemoryFree(tmp);
tmp = t;
}
taosHashCleanup(info->childTables);
// destroy info->superTables
p1 = (void **)taosHashIterate(info->superTables, NULL);
while (p1) {
smlDestroySTableMeta((SSmlSTableMeta *)(*p1));
p1 = (void **)taosHashIterate(info->superTables, p1);
tmp = info->superTables;
while (tmp) {
if(tmp->data.used) {
smlDestroySTableMeta(tmp->data.value);
}
NodeList* t = tmp->next;
taosMemoryFree(tmp);
tmp = t;
}
taosHashCleanup(info->superTables);
// destroy info->pVgHash
taosHashCleanup(info->pVgHash);
taosHashCleanup(info->dumplicateKey);
if (!info->dataFormat) {
taosArrayDestroy(info->colsContainer);
taosArrayDestroy(info->preLineTagKV);
taosArrayDestroy(info->preLineColKV);
if(!info->dataFormat){
for(int i = 0; i < info->lineNum; i++){
taosArrayDestroy(info->lines[i].colArray);
}
taosMemoryFree(info->lines);
}
destroyRequest(info->pRequest);
cJSON_Delete(info->root);
taosMemoryFreeClear(info);
}
static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLProtocolType protocol, int8_t precision) {
static SSmlHandle *smlBuildSmlInfo(TAOS *taos) {
int32_t code = TSDB_CODE_SUCCESS;
SSmlHandle *info = (SSmlHandle *)taosMemoryCalloc(1, sizeof(SSmlHandle));
if (NULL == info) {
return NULL;
}
info->taos = acquireTscObj(*(int64_t *)taos);
code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code);
goto cleanup;
}
info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
info->id = smlGenId();
info->pQuery = smlInitHandle();
info->dataFormat = true;
if (pTscObj) {
info->taos = pTscObj;
code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code);
goto cleanup;
}
}
info->precision = precision;
info->protocol = protocol;
if (protocol == TSDB_SML_LINE_PROTOCOL) {
info->dataFormat = tsSmlDataFormat;
} else {
info->dataFormat = true;
}
if (request) {
info->pRequest = request;
info->msgBuf.buf = info->pRequest->msgBuf;
info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
}
info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
info->preLineTagKV = taosArrayInit(8, sizeof(SSmlKv));
info->preLineColKV = taosArrayInit(8, sizeof(SSmlKv));
info->dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (!info->dataFormat) {
info->colsContainer = taosArrayInit(32, POINTER_BYTES);
if (NULL == info->colsContainer) {
uError("SML:0x%" PRIx64 " create info failed", info->id);
goto cleanup;
}
}
if (NULL == info->pQuery || NULL == info->childTables || NULL == info->superTables || NULL == info->pVgHash ||
NULL == info->dumplicateKey) {
uError("SML:0x%" PRIx64 " create info failed", info->id);
if (NULL == info->pVgHash) {
uError("create SSmlHandle failed");
goto cleanup;
}
return info;
cleanup:
smlDestroyInfo(info);
return NULL;
}
/************* TSDB_SML_JSON_PROTOCOL function start **************/
static int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *root, SSmlTableInfo *tinfo) {
static int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) {
cJSON *metric = cJSON_GetObjectItem(root, "metric");
if (!cJSON_IsString(metric)) {
return TSDB_CODE_TSC_INVALID_JSON;
}
tinfo->sTableNameLen = strlen(metric->valuestring);
if (IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
elements->measureLen = strlen(metric->valuestring);
if (IS_INVALID_TABLE_LEN(elements->measureLen)) {
uError("OTD:0x%" PRIx64 " Metric lenght is 0 or large than 192", info->id);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
tinfo->sTableName = metric->valuestring;
elements->measure = metric->valuestring;
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int64_t *tsVal) {
static int64_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int32_t toPrecision) {
int32_t size = cJSON_GetArraySize(root);
if (size != OTD_JSON_SUB_FIELDS_NUM) {
return TSDB_CODE_TSC_INVALID_JSON;
if (unlikely(size != OTD_JSON_SUB_FIELDS_NUM)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
return -1;
}
cJSON *value = cJSON_GetObjectItem(root, "value");
if (!cJSON_IsNumber(value)) {
return TSDB_CODE_TSC_INVALID_JSON;
if (unlikely(!cJSON_IsNumber(value))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
return -1;
}
cJSON *type = cJSON_GetObjectItem(root, "type");
if (!cJSON_IsString(type)) {
return TSDB_CODE_TSC_INVALID_JSON;
if (unlikely(!cJSON_IsString(type))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
return -1;
}
double timeDouble = value->valuedouble;
if (smlDoubleToInt64OverFlow(timeDouble)) {
if (unlikely(smlDoubleToInt64OverFlow(timeDouble))) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return TSDB_CODE_INVALID_TIMESTAMP;
return -1;
}
if (timeDouble == 0) {
*tsVal = taosGetTimestampNs();
return TSDB_CODE_SUCCESS;
return taosGetTimestampNs()/smlFactorNS[toPrecision];
}
if (timeDouble < 0) {
return TSDB_CODE_INVALID_TIMESTAMP;
return timeDouble;
}
*tsVal = timeDouble;
int64_t tsInt64 = timeDouble;
size_t typeLen = strlen(type->valuestring);
if (typeLen == 1 && (type->valuestring[0] == 's' || type->valuestring[0] == 'S')) {
// seconds
*tsVal = *tsVal * NANOSECOND_PER_SEC;
timeDouble = timeDouble * NANOSECOND_PER_SEC;
if (smlDoubleToInt64OverFlow(timeDouble)) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return TSDB_CODE_INVALID_TIMESTAMP;
int8_t fromPrecision = TSDB_TIME_PRECISION_SECONDS;
if(smlFactorS[toPrecision] < INT64_MAX / tsInt64){
return tsInt64 * smlFactorS[toPrecision];
}
return -1;
} else if (typeLen == 2 && (type->valuestring[1] == 's' || type->valuestring[1] == 'S')) {
switch (type->valuestring[0]) {
case 'm':
case 'M':
// milliseconds
*tsVal = *tsVal * NANOSECOND_PER_MSEC;
timeDouble = timeDouble * NANOSECOND_PER_MSEC;
if (smlDoubleToInt64OverFlow(timeDouble)) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return TSDB_CODE_INVALID_TIMESTAMP;
}
return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_MILLI, toPrecision);
break;
case 'u':
case 'U':
// microseconds
*tsVal = *tsVal * NANOSECOND_PER_USEC;
timeDouble = timeDouble * NANOSECOND_PER_USEC;
if (smlDoubleToInt64OverFlow(timeDouble)) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return TSDB_CODE_INVALID_TIMESTAMP;
}
return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_MICRO, toPrecision);
break;
case 'n':
case 'N':
return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_NANO, toPrecision);
break;
default:
return TSDB_CODE_TSC_INVALID_JSON;
return -1;
}
} else {
return TSDB_CODE_TSC_INVALID_JSON;
return -1;
}
return TSDB_CODE_SUCCESS;
}
static uint8_t smlGetTimestampLen(int64_t num) {
......@@ -1675,66 +2196,49 @@ static uint8_t smlGetTimestampLen(int64_t num) {
return len;
}
static int32_t smlParseTSFromJSON(SSmlHandle *info, cJSON *root, SArray *cols) {
static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *root) {
// Timestamp must be the first KV to parse
int64_t tsVal = 0;
int32_t toPrecision = info->currSTableMeta ? info->currSTableMeta->tableInfo.precision : TSDB_TIME_PRECISION_NANO;
cJSON *timestamp = cJSON_GetObjectItem(root, "timestamp");
if (cJSON_IsNumber(timestamp)) {
// timestamp value 0 indicates current system time
double timeDouble = timestamp->valuedouble;
if (smlDoubleToInt64OverFlow(timeDouble)) {
if (unlikely(smlDoubleToInt64OverFlow(timeDouble))) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return TSDB_CODE_INVALID_TIMESTAMP;
return -1;
}
if (timeDouble < 0) {
return TSDB_CODE_INVALID_TIMESTAMP;
if (unlikely(timeDouble < 0)) {
smlBuildInvalidDataMsg(&info->msgBuf,
"timestamp is negative", NULL);
return timeDouble;
}else if (unlikely(timeDouble == 0)) {
return taosGetTimestampNs()/smlFactorNS[toPrecision];
}
uint8_t tsLen = smlGetTimestampLen((int64_t)timeDouble);
tsVal = (int64_t)timeDouble;
if (tsLen == TSDB_TIME_PRECISION_SEC_DIGITS) {
tsVal = tsVal * NANOSECOND_PER_SEC;
timeDouble = timeDouble * NANOSECOND_PER_SEC;
if (smlDoubleToInt64OverFlow(timeDouble)) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return TSDB_CODE_INVALID_TIMESTAMP;
}
} else if (tsLen == TSDB_TIME_PRECISION_MILLI_DIGITS) {
tsVal = tsVal * NANOSECOND_PER_MSEC;
timeDouble = timeDouble * NANOSECOND_PER_MSEC;
if (smlDoubleToInt64OverFlow(timeDouble)) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return TSDB_CODE_INVALID_TIMESTAMP;
}
} else if (timeDouble == 0) {
tsVal = taosGetTimestampNs();
} else {
return TSDB_CODE_INVALID_TIMESTAMP;
int8_t fromPrecision = smlGetTsTypeByLen(tsLen);
if (unlikely(fromPrecision == -1)) {
smlBuildInvalidDataMsg(&info->msgBuf,
"timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", NULL);
return -1;
}
} else if (cJSON_IsObject(timestamp)) {
int32_t ret = smlParseTSFromJSONObj(info, timestamp, &tsVal);
if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " Failed to parse timestamp from JSON Obj", info->id);
return ret;
int64_t tsInt64 = timeDouble;
if(fromPrecision == TSDB_TIME_PRECISION_SECONDS){
if(smlFactorS[toPrecision] < INT64_MAX / tsInt64){
return tsInt64 * smlFactorS[toPrecision];
}
return -1;
}else{
return convertTimePrecision(timeDouble, fromPrecision, toPrecision);
}
} else if (cJSON_IsObject(timestamp)) {
return smlParseTSFromJSONObj(info, timestamp, toPrecision);
} else {
return TSDB_CODE_TSC_INVALID_JSON;
}
// add ts to
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if (!kv) {
return TSDB_CODE_OUT_OF_MEMORY;
smlBuildInvalidDataMsg(&info->msgBuf,
"invalidate json", NULL);
return -1;
}
kv->key = TS;
kv->keyLen = TS_LEN;
kv->i = tsVal;
kv->type = TSDB_DATA_TYPE_TIMESTAMP;
kv->length = (int16_t)tDataTypes[kv->type].bytes;
taosArrayPush(cols, &kv);
return TSDB_CODE_SUCCESS;
}
static int32_t smlConvertJSONBool(SSmlKv *pVal, char *typeStr, cJSON *value) {
......@@ -1930,321 +2434,309 @@ static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
if (!cols) return TSDB_CODE_OUT_OF_MEMORY;
static int32_t smlParseColsFromJSON(cJSON *root, SSmlKv *kv) {
cJSON *metricVal = cJSON_GetObjectItem(root, "value");
if (metricVal == NULL) {
return TSDB_CODE_TSC_INVALID_JSON;
}
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if (!kv) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(cols, &kv);
kv->key = VALUE;
kv->keyLen = VALUE_LEN;
int32_t ret = smlParseValueFromJSON(metricVal, kv);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, char *childTableName, SHashObj *dumplicateKey,
SSmlMsgBuf *msg) {
static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) {
int32_t ret = TSDB_CODE_SUCCESS;
if (!pKVs) {
return TSDB_CODE_OUT_OF_MEMORY;
}
cJSON *tags = cJSON_GetObjectItem(root, "tags");
if (tags == NULL || tags->type != cJSON_Object) {
if (unlikely(tags == NULL || tags->type != cJSON_Object)) {
return TSDB_CODE_TSC_INVALID_JSON;
}
size_t childTableNameLen = strlen(tsSmlChildTableName);
// add measure to tags to identify one child table
cJSON *cMeasure = cJSON_AddStringToObject(tags, JSON_METERS_NAME, elements->measure);
if(unlikely(cMeasure == NULL)){
return TSDB_CODE_TSC_INVALID_JSON;
}
elements->tags = (char*)tags;
if(is_same_child_table_json(elements->tags, info->preLine.tags) == 0){
cJSON_DeleteItemFromObjectCaseSensitive(tags, JSON_METERS_NAME);
return TSDB_CODE_SUCCESS;
}
cJSON_DeleteItemFromObjectCaseSensitive(tags, JSON_METERS_NAME);
bool isSameMeasure = IS_SAME_SUPER_TABLE;
int cnt = 0;
SArray *preLineKV = info->preLineTagKV;
bool isSuperKVInit = true;
SArray *superKV = NULL;
if(info->dataFormat){
if(unlikely(!isSameMeasure)){
SSmlSTableMeta *sMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
if(unlikely(sMeta == NULL)){
sMeta = smlBuildSTableMeta(info->dataFormat);
STableMeta * pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen);
sMeta->tableMeta = pTableMeta;
if(pTableMeta == NULL){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
nodeListSet(&info->superTables, elements->measure, elements->measureLen, sMeta, NULL);
}
info->currSTableMeta = sMeta->tableMeta;
superKV = sMeta->tags;
if(unlikely(taosArrayGetSize(superKV) == 0)){
isSuperKVInit = false;
}
taosArraySetSize(preLineKV, 0);
}
}else{
taosArraySetSize(preLineKV, 0);
}
int32_t tagNum = cJSON_GetArraySize(tags);
for (int32_t i = 0; i < tagNum; ++i) {
cJSON *tag = cJSON_GetArrayItem(tags, i);
if (tag == NULL) {
if (unlikely(tag == NULL)) {
return TSDB_CODE_TSC_INVALID_JSON;
}
if(unlikely(tag == cMeasure)) continue;
size_t keyLen = strlen(tag->string);
if (IS_INVALID_COL_LEN(keyLen)) {
if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
uError("OTD:Tag key length is 0 or too large than 64");
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
}
// check duplicate keys
if (smlCheckDuplicateKey(tag->string, keyLen, dumplicateKey)) {
return TSDB_CODE_TSC_DUP_NAMES;
// add kv to SSmlKv
SSmlKv kv ={.key = tag->string, .keyLen = keyLen};
// value
ret = smlParseValueFromJSON(tag, &kv);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
return ret;
}
// handle child table name
if (childTableNameLen != 0 && strcmp(tag->string, tsSmlChildTableName) == 0) {
if (!cJSON_IsString(tag)) {
uError("OTD:ID must be JSON string");
return TSDB_CODE_TSC_INVALID_JSON;
if(info->dataFormat){
if(unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
tstrncpy(childTableName, tag->valuestring, TSDB_TABLE_NAME_LEN);
continue;
}
// add kv to SSmlKv
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
taosArrayPush(pKVs, &kv);
if(isSameMeasure){
if(unlikely(cnt >= taosArrayGetSize(preLineKV))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *preKV = taosArrayGet(preLineKV, cnt);
if(unlikely(kv.length > preKV->length)){
preKV->length = kv.length;
SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
ASSERT(tableMeta != NULL);
SSmlKv *oldKV = taosArrayGet(tableMeta->tags, cnt);
oldKV->length = kv.length;
info->needModifySchema = true;
}
if(unlikely(!IS_SAME_KEY)){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
}else{
if(isSuperKVInit){
if(unlikely(cnt >= taosArrayGetSize(superKV))) {
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
SSmlKv *preKV = taosArrayGet(superKV, cnt);
if(unlikely(kv.length > preKV->length)) {
preKV->length = kv.length;
}else{
kv.length = preKV->length;
}
info->needModifySchema = true;
// key
kv->keyLen = keyLen;
kv->key = tag->string;
if(unlikely(!IS_SAME_KEY)){
info->dataFormat = false;
info->reRun = true;
return TSDB_CODE_SUCCESS;
}
}else{
taosArrayPush(superKV, &kv);
}
taosArrayPush(preLineKV, &kv);
}
}else{
taosArrayPush(preLineKV, &kv);
}
cnt++;
}
// value
ret = smlParseValueFromJSON(tag, kv);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
SSmlTableInfo *tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements->tags, POINTER_BYTES, is_same_child_table_json);
if (unlikely(tinfo == NULL)) {
tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen);
if (unlikely(!tinfo)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int i = 0; i < taosArrayGetSize(preLineKV); i++) {
taosArrayPush(tinfo->tags, taosArrayGet(preLineKV, i));
}
smlSetCTableName(tinfo);
if (info->dataFormat) {
info->currSTableMeta->uid = tinfo->uid;
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
if (tinfo->tableDataCtx == NULL) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
return TSDB_CODE_SML_INVALID_DATA;
}
}
nodeListSet(&info->childTables, tags, POINTER_BYTES, tinfo, is_same_child_table_json);
}
info->currTableDataCtx = tinfo->tableDataCtx;
return ret;
}
static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo *tinfo, SArray *cols) {
static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) {
int32_t ret = TSDB_CODE_SUCCESS;
if (!cJSON_IsObject(root)) {
uError("OTD:0x%" PRIx64 " data point needs to be JSON object", info->id);
return TSDB_CODE_TSC_INVALID_JSON;
}
int32_t size = cJSON_GetArraySize(root);
// outmost json fields has to be exactly 4
if (size != OTD_JSON_FIELDS_NUM) {
if (unlikely(size != OTD_JSON_FIELDS_NUM)) {
uError("OTD:0x%" PRIx64 " Invalid number of JSON fields in data point %d", info->id, size);
return TSDB_CODE_TSC_INVALID_JSON;
}
// Parse metric
ret = smlParseMetricFromJSON(info, root, tinfo);
if (ret != TSDB_CODE_SUCCESS) {
ret = smlParseMetricFromJSON(info, root, elements);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
uError("OTD:0x%" PRIx64 " Unable to parse metric from JSON payload", info->id);
return ret;
}
uDebug("OTD:0x%" PRIx64 " Parse metric from JSON payload finished", info->id);
// Parse timestamp
ret = smlParseTSFromJSON(info, root, cols);
if (ret) {
uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id);
return ret;
}
uDebug("OTD:0x%" PRIx64 " Parse timestamp from JSON payload finished", info->id);
// Parse metric value
ret = smlParseColsFromJSON(root, cols);
if (ret) {
SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN};
ret = smlParseColsFromJSON(root, &kv);
if (unlikely(ret)) {
uError("OTD:0x%" PRIx64 " Unable to parse metric value from JSON payload", info->id);
return ret;
}
uDebug("OTD:0x%" PRIx64 " Parse metric value from JSON payload finished", info->id);
// Parse tags
ret = smlParseTagsFromJSON(root, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
if (ret) {
ret = smlParseTagsFromJSON(info, root, elements);
if (unlikely(ret)) {
uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
return ret;
}
uDebug("OTD:0x%" PRIx64 " Parse tags from JSON payload finished", info->id);
return TSDB_CODE_SUCCESS;
}
/************* TSDB_SML_JSON_PROTOCOL function end **************/
static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql, const int len) {
SSmlLineInfo elements = {0};
uDebug("SML:0x%" PRIx64 " smlParseInfluxLine raw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, (info->isRawLine ? "rawdata" : sql));
int ret = smlParseInfluxString(sql, sql + len, &elements, &info->msgBuf);
if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlParseInfluxLine failed", info->id);
return ret;
}
SArray *cols = NULL;
if (info->dataFormat) { // if dataFormat, cols need new memory to save data
cols = taosArrayInit(16, POINTER_BYTES);
if (cols == NULL) {
uError("SML:0x%" PRIx64 " smlParseInfluxLine failed to allocate memory", info->id);
return TSDB_CODE_OUT_OF_MEMORY;
}
} else { // if dataFormat is false, cols do not need to save data, there is another new memory to save data
cols = info->colsContainer;
if(unlikely(info->reRun)){
return TSDB_CODE_SUCCESS;
}
ret = smlParseTS(info, elements.timestamp, elements.timestampLen, cols);
if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlParseTS failed", info->id);
if (info->dataFormat) taosArrayDestroy(cols);
return ret;
}
ret = smlParseCols(elements.cols, elements.colsLen, cols, NULL, false, info->dumplicateKey, &info->msgBuf);
if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlParseCols parse cloums fields failed", info->id);
smlDestroyCols(cols);
if (info->dataFormat) taosArrayDestroy(cols);
return ret;
// Parse timestamp
// notice!!! put ts back to tag to ensure get meta->precision
int64_t ts = smlParseTSFromJSON(info, root);
if (unlikely(ts < 0)) {
uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id);
return TSDB_CODE_INVALID_TIMESTAMP;
}
uDebug("OTD:0x%" PRIx64 " Parse timestamp from JSON payload finished", info->id);
SSmlKv kvTs = { .key = TS, .keyLen = TS_LEN, .i = ts, .type = TSDB_DATA_TYPE_TIMESTAMP, .length = (int16_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
bool hasTable = true;
SSmlTableInfo *tinfo = NULL;
SSmlTableInfo **oneTable =
(SSmlTableInfo **)taosHashGet(info->childTables, elements.measure, elements.measureTagsLen);
if (!oneTable) {
tinfo = smlBuildTableInfo();
if (!tinfo) {
smlDestroyCols(cols);
if (info->dataFormat) taosArrayDestroy(cols);
return TSDB_CODE_OUT_OF_MEMORY;
if(info->dataFormat){
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kvTs, 0);
if(ret == TSDB_CODE_SUCCESS){
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 1);
}
taosHashPut(info->childTables, elements.measure, elements.measureTagsLen, &tinfo, POINTER_BYTES);
oneTable = &tinfo;
hasTable = false;
}
ret = smlDealCols(*oneTable, info->dataFormat, cols);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
if(ret == TSDB_CODE_SUCCESS){
ret = smlBuildRow(info->currTableDataCtx);
}
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
return ret;
}
}else{
if(elements->colArray == NULL){
elements->colArray = taosArrayInit(16, sizeof(SSmlKv));
}
taosArrayPush(elements->colArray, &kvTs);
taosArrayPush(elements->colArray, &kv);
}
info->preLine = *elements;
if (!hasTable) {
ret = smlParseCols(elements.tags, elements.tagsLen, (*oneTable)->tags, (*oneTable)->childTableName, true,
info->dumplicateKey, &info->msgBuf);
if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlParseCols parse tag fields failed", info->id);
return ret;
return TSDB_CODE_SUCCESS;
}
/************* TSDB_SML_JSON_PROTOCOL function end **************/
static int32_t smlParseLineBottom(SSmlHandle *info) {
if(info->dataFormat) return TSDB_CODE_SUCCESS;
for(int32_t i = 0; i < info->lineNum; i ++){
SSmlLineInfo* elements = info->lines + i;
SSmlTableInfo *tinfo = NULL;
if(info->protocol == TSDB_SML_LINE_PROTOCOL){
tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements->measure, elements->measureTagsLen, NULL);
}else if(info->protocol == TSDB_SML_TELNET_PROTOCOL){
tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements, POINTER_BYTES, is_same_child_table_telnet);
}else{
tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements->tags, POINTER_BYTES, is_same_child_table_json);
}
if(tinfo == NULL){
uError("SML:0x%" PRIx64 "get oneTable failed, line num:%d", info->id, i);
smlBuildInvalidDataMsg(&info->msgBuf, "get oneTable failed", elements->measure);
return TSDB_CODE_SML_INVALID_DATA;
}
if (taosArrayGetSize((*oneTable)->tags) > TSDB_MAX_TAGS) {
if (taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS) {
smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL);
return TSDB_CODE_PAR_INVALID_TAGS_NUM;
}
if (taosArrayGetSize(cols) + taosArrayGetSize((*oneTable)->tags) > TSDB_MAX_COLUMNS) {
if (taosArrayGetSize(elements->colArray) + taosArrayGetSize(tinfo->tags) > TSDB_MAX_COLUMNS) {
smlBuildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL);
return TSDB_CODE_PAR_TOO_MANY_COLUMNS;
}
(*oneTable)->sTableName = elements.measure;
(*oneTable)->sTableNameLen = elements.measureLen;
if (strlen((*oneTable)->childTableName) == 0) {
RandTableName rName = {(*oneTable)->tags, (*oneTable)->sTableName, (uint8_t)(*oneTable)->sTableNameLen,
(*oneTable)->childTableName, 0};
buildChildTableName(&rName);
(*oneTable)->uid = rName.uid;
} else {
(*oneTable)->uid = *(uint64_t *)((*oneTable)->childTableName);
}
}
SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, elements.measure, elements.measureLen);
if (tableMeta) { // update meta
ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, cols, &info->msgBuf);
if (!hasTable && ret == TSDB_CODE_SUCCESS) {
ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, (*oneTable)->tags, &info->msgBuf);
}
int ret = smlPushCols(tinfo->cols, elements->colArray);
if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
return ret;
}
} else {
SSmlSTableMeta *meta = smlBuildSTableMeta();
smlInsertMeta(meta->tagHash, meta->tags, (*oneTable)->tags);
smlInsertMeta(meta->colHash, meta->cols, cols);
taosHashPut(info->superTables, elements.measure, elements.measureLen, &meta, POINTER_BYTES);
}
if (!info->dataFormat) {
taosArrayClear(info->colsContainer);
}
taosHashClear(info->dumplicateKey);
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) {
int ret = TSDB_CODE_SUCCESS;
SSmlTableInfo *tinfo = smlBuildTableInfo();
if (!tinfo) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SArray *cols = taosArrayInit(16, POINTER_BYTES);
if (cols == NULL) {
uError("SML:0x%" PRIx64 " smlParseTelnetLine failed to allocate memory", info->id);
return TSDB_CODE_OUT_OF_MEMORY;
}
if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
ret = smlParseTelnetString(info, (const char *)data, (char *)data + len, tinfo, cols);
} else if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
ret = smlParseJSONString(info, (cJSON *)data, tinfo, cols);
} else {
ASSERT(0);
}
if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlParseTelnetLine failed", info->id);
smlDestroyTableInfo(info, tinfo);
smlDestroyCols(cols);
taosArrayDestroy(cols);
return ret;
}
if (taosArrayGetSize(tinfo->tags) <= 0 || taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalidate tags length:[1,128]", NULL);
smlDestroyTableInfo(info, tinfo);
smlDestroyCols(cols);
taosArrayDestroy(cols);
return TSDB_CODE_PAR_INVALID_TAGS_NUM;
}
taosHashClear(info->dumplicateKey);
if (strlen(tinfo->childTableName) == 0) {
RandTableName rName = {tinfo->tags, tinfo->sTableName, (uint8_t)tinfo->sTableNameLen, tinfo->childTableName, 0};
buildChildTableName(&rName);
tinfo->uid = rName.uid;
} else {
tinfo->uid = *(uint64_t *)(tinfo->childTableName); // generate uid by name simple
}
bool hasTable = true;
SSmlTableInfo **oneTable =
(SSmlTableInfo **)taosHashGet(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName));
if (!oneTable) {
taosHashPut(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName), &tinfo, POINTER_BYTES);
oneTable = &tinfo;
hasTable = false;
} else {
smlDestroyTableInfo(info, tinfo);
}
SSmlSTableMeta *tableMeta = (SSmlSTableMeta *)nodeListGet(info->superTables, elements->measure, elements->measureLen, NULL);
if (tableMeta) { // update meta
ret = smlUpdateMeta(tableMeta->colHash, tableMeta->cols, elements->colArray, false, &info->msgBuf);
if (ret == TSDB_CODE_SUCCESS) {
ret = smlUpdateMeta(tableMeta->tagHash, tableMeta->tags, tinfo->tags, true, &info->msgBuf);
}
if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
return ret;
}
} else {
ret = smlJudgeDupColName(elements->colArray, tinfo->tags, &info->msgBuf);
if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
return ret;
}
taosArrayPush((*oneTable)->cols, &cols);
SSmlSTableMeta **tableMeta =
(SSmlSTableMeta **)taosHashGet(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen);
if (tableMeta) { // update meta
ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, cols, &info->msgBuf);
if (!hasTable && ret == TSDB_CODE_SUCCESS) {
ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, (*oneTable)->tags, &info->msgBuf);
}
if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
return ret;
SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat);
smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags);
smlInsertMeta(meta->colHash, meta->cols, elements->colArray);
nodeListSet(&info->superTables, elements->measure, elements->measureLen, meta, NULL);
}
} else {
SSmlSTableMeta *meta = smlBuildSTableMeta();
smlInsertMeta(meta->tagHash, meta->tags, (*oneTable)->tags);
smlInsertMeta(meta->colHash, meta->cols, cols);
taosHashPut(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen, &meta, POINTER_BYTES);
}
return TSDB_CODE_SUCCESS;
......@@ -2254,46 +2746,91 @@ static int32_t smlParseJSON(SSmlHandle *info, char *payload) {
int32_t payloadNum = 0;
int32_t ret = TSDB_CODE_SUCCESS;
if (payload == NULL) {
if (unlikely(payload == NULL)) {
uError("SML:0x%" PRIx64 " empty JSON Payload", info->id);
return TSDB_CODE_TSC_INVALID_JSON;
}
info->root = cJSON_Parse(payload);
if (info->root == NULL) {
if (unlikely(info->root == NULL)) {
uError("SML:0x%" PRIx64 " parse json failed:%s", info->id, payload);
return TSDB_CODE_TSC_INVALID_JSON;
}
// multiple data points must be sent in JSON array
if (cJSON_IsObject(info->root)) {
payloadNum = 1;
} else if (cJSON_IsArray(info->root)) {
if (cJSON_IsArray(info->root)) {
payloadNum = cJSON_GetArraySize(info->root);
} else if (cJSON_IsObject(info->root)) {
payloadNum = 1;
} else {
uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
ret = TSDB_CODE_TSC_INVALID_JSON;
goto end;
return TSDB_CODE_TSC_INVALID_JSON;
}
for (int32_t i = 0; i < payloadNum; ++i) {
int32_t i = 0;
while (i < payloadNum) {
cJSON *dataPoint = (payloadNum == 1 && cJSON_IsObject(info->root)) ? info->root : cJSON_GetArrayItem(info->root, i);
ret = smlParseTelnetLine(info, dataPoint, -1);
if (ret != TSDB_CODE_SUCCESS) {
if(info->dataFormat) {
SSmlLineInfo element = {0};
ret = smlParseJSONString(info, dataPoint, &element);
}else{
ret = smlParseJSONString(info, dataPoint, info->lines + i);
}
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
goto end;
return ret;
}
if(unlikely(info->reRun)){
i = 0;
info->reRun = false;
// clear info->childTables
NodeList* pList = info->childTables;
while (pList) {
if(pList->data.used) {
smlDestroyTableInfo(pList->data.value);
pList->data.used = false;
}
pList = pList->next;
}
// clear info->superTables
pList = info->superTables;
while (pList) {
if(pList->data.used) {
smlDestroySTableMeta(pList->data.value);
pList->data.used = false;
}
pList = pList->next;
}
if(unlikely(info->lines != NULL)){
uError("SML:0x%" PRIx64 " info->lines != NULL", info->id);
return TSDB_CODE_SML_INVALID_DATA;
}
info->lineNum = payloadNum;
info->lines = taosMemoryCalloc(info->lineNum, sizeof(SSmlLineInfo));
memset(&info->preLine, 0, sizeof(SSmlLineInfo));
info->currSTableMeta = NULL;
info->currTableDataCtx = NULL;
SVnodeModifOpStmt* stmt= (SVnodeModifOpStmt*)(info->pQuery->pRoot);
stmt->freeHashFunc(stmt->pTableBlockHashObj);
stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
continue;
}
i++;
}
end:
return ret;
return TSDB_CODE_SUCCESS;
}
static int32_t smlInsertData(SSmlHandle *info) {
int32_t code = TSDB_CODE_SUCCESS;
SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
while (oneTable) {
SSmlTableInfo *tableData = *oneTable;
NodeList* tmp = info->childTables;
while (tmp) {
SSmlTableInfo *tableData = (SSmlTableInfo *)tmp->data.value;
SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
......@@ -2313,22 +2850,22 @@ static int32_t smlInsertData(SSmlHandle *info) {
}
taosHashPut(info->pVgHash, (const char *)&vg.vgId, sizeof(vg.vgId), (char *)&vg, sizeof(vg));
SSmlSTableMeta **pMeta =
(SSmlSTableMeta **)taosHashGet(info->superTables, tableData->sTableName, tableData->sTableNameLen);
ASSERT(NULL != pMeta && NULL != *pMeta);
SSmlSTableMeta *pMeta =
(SSmlSTableMeta *)nodeListGet(info->superTables, tableData->sTableName, tableData->sTableNameLen, NULL);
ASSERT(NULL != pMeta);
// use tablemeta of stable to save vgid and uid of child table
(*pMeta)->tableMeta->vgId = vg.vgId;
(*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid
pMeta->tableMeta->vgId = vg.vgId;
pMeta->tableMeta->uid = tableData->uid; // one table merge data block together according uid
code = smlBindData(info->pQuery, tableData->tags, (*pMeta)->cols, tableData->cols, info->dataFormat,
(*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen,
code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, pMeta->cols, tableData->cols,
pMeta->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen,
info->ttl, info->msgBuf.buf, info->msgBuf.len);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlBindData failed", info->id);
return code;
}
oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable);
tmp = tmp->next;
}
code = smlBuildOutput(info->pQuery, info->pVgHash);
......@@ -2338,27 +2875,18 @@ static int32_t smlInsertData(SSmlHandle *info) {
}
info->cost.insertRpcTime = taosGetTimestampUs();
// launchQueryImpl(info->pRequest, info->pQuery, false, NULL);
// info->affectedRows = taos_affected_rows(info->pRequest);
// return info->pRequest->code;
SAppClusterSummary *pActivity = &info->taos->pAppInfo->summary;
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
if (pWrapper == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pWrapper->pRequest = info->pRequest;
launchAsyncQuery(info->pRequest, info->pQuery, NULL, pWrapper);
return TSDB_CODE_SUCCESS;
launchQueryImpl(info->pRequest, info->pQuery, true, NULL);
return info->pRequest->code;
}
static void smlPrintStatisticInfo(SSmlHandle *info) {
uError("SML:0x%" PRIx64
" smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \
" smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \
parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64
"",
"",
info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables,
info->cost.numOfCreateSTables, info->cost.numOfAlterTagSTables, info->cost.numOfAlterColSTables,
info->cost.schemaTime - info->cost.parseTime,
......@@ -2381,7 +2909,9 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
return code;
}
for (int32_t i = 0; i < numLines; ++i) {
char *oldRaw = rawLine;
int32_t i = 0;
while (i < numLines) {
char *tmp = NULL;
int len = 0;
if (lines) {
......@@ -2400,10 +2930,23 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
}
}
uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, (info->isRawLine ? "rawdata" : tmp));
if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
code = smlParseInfluxLine(info, tmp, len);
if(info->dataFormat){
SSmlLineInfo element = {0};
code = smlParseInfluxString(info, tmp, tmp + len, &element);
}else{
code = smlParseInfluxString(info, tmp, tmp + len, info->lines + i);
}
} else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
code = smlParseTelnetLine(info, tmp, len);
if(info->dataFormat) {
SSmlLineInfo element = {0};
code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, &element);
}else{
code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, info->lines + i);
}
} else {
ASSERT(0);
}
......@@ -2411,7 +2954,48 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, tmp);
return code;
}
if(info->reRun){
i = 0;
rawLine = oldRaw;
info->reRun = false;
// clear info->childTables
NodeList* pList = info->childTables;
while (pList) {
if(pList->data.used) {
smlDestroyTableInfo(pList->data.value);
pList->data.used = false;
}
pList = pList->next;
}
// clear info->superTables
pList = info->superTables;
while (pList) {
if(pList->data.used) {
smlDestroySTableMeta(pList->data.value);
pList->data.used = false;
}
pList = pList->next;
}
if(info->lines != NULL){
uError("SML:0x%" PRIx64 " info->lines != NULL", info->id);
return TSDB_CODE_SML_INVALID_DATA;
}
info->lines = taosMemoryCalloc(info->lineNum, sizeof(SSmlLineInfo));
memset(&info->preLine, 0, sizeof(SSmlLineInfo));
info->currSTableMeta = NULL;
info->currTableDataCtx = NULL;
SVnodeModifOpStmt* stmt= (SVnodeModifOpStmt*)(info->pQuery->pRoot);
stmt->freeHashFunc(stmt->pTableBlockHashObj);
stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
continue;
}
i++;
}
return code;
}
......@@ -2427,16 +3011,22 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
return code;
}
code = smlParseLineBottom(info);
if (code != 0) {
uError("SML:0x%" PRIx64 " smlParseLineBottom error : %s", info->id, tstrerror(code));
return code;
}
info->cost.lineNum = numLines;
info->cost.numOfSTables = taosHashGetSize(info->superTables);
info->cost.numOfCTables = taosHashGetSize(info->childTables);
info->cost.numOfSTables = nodeListSize(info->superTables);
info->cost.numOfCTables = nodeListSize(info->childTables);
info->cost.schemaTime = taosGetTimestampUs();
do {
code = smlModifyDBSchemas(info);
if (code == 0) break;
} while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
} while (retryNum++ < nodeListSize(info->superTables) * MAX_RETRY_TIMES);
if (code != 0) {
uError("SML:0x%" PRIx64 " smlModifyDBSchemas error : %s", info->id, tstrerror(code));
......@@ -2453,92 +3043,41 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
return code;
}
static int32_t isSchemalessDb(STscObj *taos, SRequestObj *request) {
// SCatalog *catalog = NULL;
// int32_t code = catalogGetHandle(((STscObj *)taos)->pAppInfo->clusterId, &catalog);
// if (code != TSDB_CODE_SUCCESS) {
// uError("SML get catalog error %d", code);
// return code;
// }
//
// SName name;
// tNameSetDbName(&name, taos->acctId, taos->db, strlen(taos->db));
// char dbFname[TSDB_DB_FNAME_LEN] = {0};
// tNameGetFullDbName(&name, dbFname);
// SDbCfgInfo pInfo = {0};
//
// SRequestConnInfo conn = {0};
// conn.pTrans = taos->pAppInfo->pTransporter;
// conn.requestId = request->requestId;
// conn.requestObjRefId = request->self;
// conn.mgmtEps = getEpSet_s(&taos->pAppInfo->mgmtEp);
//
// code = catalogGetDBCfg(catalog, &conn, dbFname, &pInfo);
// if (code != TSDB_CODE_SUCCESS) {
// return code;
// }
// taosArrayDestroy(pInfo.pRetensions);
//
// if (!pInfo.schemaless) {
// return TSDB_CODE_SML_INVALID_DB_CONF;
// }
return TSDB_CODE_SUCCESS;
}
static void smlInsertCallback(void *param, void *res, int32_t code) {
SRequestObj *pRequest = (SRequestObj *)res;
SSmlHandle *info = (SSmlHandle *)param;
int32_t rows = taos_affected_rows(pRequest);
uDebug("SML:0x%" PRIx64 " result. code:%d, msg:%s", info->id, pRequest->code, pRequest->msgBuf);
Params *pParam = info->params;
// lock
taosThreadSpinLock(&pParam->lock);
pParam->cnt++;
if (code != TSDB_CODE_SUCCESS) {
pParam->request->code = code;
pParam->request->body.resInfo.numOfRows += rows;
} else {
pParam->request->body.resInfo.numOfRows += info->affectedRows;
TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, char *rawLineEnd,
int numLines, int protocol, int precision, int32_t ttl, int64_t reqid) {
if (NULL == taos) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
// unlock
taosThreadSpinUnlock(&pParam->lock);
if (pParam->cnt == pParam->total) {
tsem_post(&pParam->sem);
SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
if (request == NULL) {
uError("SML:taos_schemaless_insert error request is null");
return NULL;
}
uDebug("SML:0x%" PRIx64 " insert finished, code: %d, rows: %d, total: %d", info->id, code, rows, info->affectedRows);
info->cost.endTime = taosGetTimestampUs();
info->cost.code = code;
smlPrintStatisticInfo(info);
smlDestroyInfo(info);
}
TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd,
int numLines, int protocol, int precision, int32_t ttl) {
int batchs = 0;
STscObj *pTscObj = request->pTscObj;
SSmlHandle *info = smlBuildSmlInfo(taos);
if (info == NULL) {
request->code = TSDB_CODE_OUT_OF_MEMORY;
uError("SML:taos_schemaless_insert error SSmlHandle is null");
goto end;
}
info->pRequest = request;
info->isRawLine = rawLine != NULL;
info->ttl = ttl;
info->precision = precision;
info->protocol = protocol;
info->msgBuf.buf = info->pRequest->msgBuf;
info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
info->lineNum = numLines;
pTscObj->schemalessType = 1;
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
Params params = {0};
params.request = request;
tsem_init(&params.sem, 0, 0);
taosThreadSpinInit(&(params.lock), 0);
if (request->pDb == NULL) {
request->code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
smlBuildInvalidDataMsg(&msg, "Database not specified", NULL);
goto end;
}
if (isSchemalessDb(pTscObj, request) != TSDB_CODE_SUCCESS) {
request->code = TSDB_CODE_SML_INVALID_DB_CONF;
smlBuildInvalidDataMsg(&msg, "Cannot write data to a non schemaless database", NULL);
goto end;
}
if (protocol < TSDB_SML_LINE_PROTOCOL || protocol > TSDB_SML_JSON_PROTOCOL) {
request->code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
smlBuildInvalidDataMsg(&msg, "protocol invalidate", NULL);
......@@ -2560,65 +3099,15 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char
goto end;
}
batchs = ceil(((double)numLines) / tsSmlBatchSize);
params.total = batchs;
for (int i = 0; i < batchs; ++i) {
SRequestObj *req = (SRequestObj *)createRequest(pTscObj->id, TSDB_SQL_INSERT, 0);
if (!req) {
request->code = TSDB_CODE_OUT_OF_MEMORY;
uError("SML:taos_schemaless_insert error request is null");
goto end;
}
SSmlHandle *info = smlBuildSmlInfo(pTscObj, req, (SMLProtocolType)protocol, precision);
if (!info) {
request->code = TSDB_CODE_OUT_OF_MEMORY;
uError("SML:taos_schemaless_insert error SSmlHandle is null");
goto end;
}
info->isRawLine = (rawLine == NULL);
info->ttl = ttl;
int32_t perBatch = tsSmlBatchSize;
if (numLines > perBatch) {
numLines -= perBatch;
} else {
perBatch = numLines;
numLines = 0;
}
info->params = &params;
info->affectedRows = perBatch;
info->pRequest->body.queryFp = smlInsertCallback;
info->pRequest->body.param = info;
int32_t code = smlProcess(info, lines, rawLine, rawLineEnd, perBatch);
if (lines) {
lines += perBatch;
}
if (rawLine) {
int num = 0;
while (rawLine < rawLineEnd) {
if (*(rawLine++) == '\n') {
num++;
}
if (num == perBatch) {
break;
}
}
}
if (code != TSDB_CODE_SUCCESS) {
info->pRequest->body.queryFp(info, req, code);
}
}
tsem_wait(&params.sem);
int32_t code = smlProcess(info, lines, rawLine, rawLineEnd, numLines);
request->code = code;
info->cost.endTime = taosGetTimestampUs();
info->cost.code = code;
smlPrintStatisticInfo(info);
end:
taosThreadSpinDestroy(&params.lock);
tsem_destroy(&params.sem);
// ((STscObj *)taos)->schemalessType = 0;
pTscObj->schemalessType = 1;
uDebug("resultend:%s", request->msgBuf);
smlDestroyInfo(info);
return (TAOS_RES *)request;
}
......@@ -2643,25 +3132,7 @@ end:
TAOS_RES *taos_schemaless_insert_ttl_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol, int precision,
int32_t ttl, int64_t reqid) {
if (NULL == taos) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
if (!request) {
uError("SML:taos_schemaless_insert error request is null");
return NULL;
}
if (!lines) {
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
request->code = TSDB_CODE_SML_INVALID_DATA;
smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
return (TAOS_RES *)request;
}
return taos_schemaless_insert_inner(request, lines, NULL, NULL, numLines, protocol, precision, ttl);
return taos_schemaless_insert_inner(taos, lines, NULL, NULL, numLines, protocol, precision, ttl, reqid);
}
TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) {
......@@ -2677,25 +3148,7 @@ TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLi
}
TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
int precision, int32_t ttl, int64_t reqid) {
if (NULL == taos) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
SRequestObj *request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
if (!request) {
uError("SML:taos_schemaless_insert error request is null");
return NULL;
}
if (!lines || len <= 0) {
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
request->code = TSDB_CODE_SML_INVALID_DATA;
smlBuildInvalidDataMsg(&msg, "lines is null", NULL);
return (TAOS_RES *)request;
}
int precision, int32_t ttl, int64_t reqid) {
int numLines = 0;
*totalRows = 0;
char *tmp = lines;
......@@ -2708,7 +3161,7 @@ TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int
tmp = lines + i + 1;
}
}
return taos_schemaless_insert_inner(request, NULL, lines, lines + len, numLines, protocol, precision, ttl);
return taos_schemaless_insert_inner(taos, NULL, lines, lines + len, *totalRows, protocol, precision, ttl, reqid);
}
TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, int precision, int64_t reqid) {
......
......@@ -2381,24 +2381,15 @@ char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {
return NULL;
}
SSmlKv* pTag = taosMemoryCalloc(1, sizeof(SSmlKv));
if (pTag == NULL) {
taosArrayDestroy(tags);
return NULL;
}
void* cname = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1);
if (cname == NULL) {
taosArrayDestroy(tags);
taosMemoryFree(pTag);
return NULL;
}
pTag->key = "group_id";
pTag->keyLen = strlen(pTag->key);
pTag->type = TSDB_DATA_TYPE_UBIGINT;
pTag->u = groupId;
pTag->length = sizeof(uint64_t);
SSmlKv pTag = {.key = "group_id", .keyLen = sizeof("group_id") - 1,
.type = TSDB_DATA_TYPE_UBIGINT, .u = groupId,
.length = sizeof(uint64_t)};
taosArrayPush(tags, &pTag);
RandTableName rname = {
......@@ -2410,7 +2401,6 @@ char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {
buildChildTableName(&rname);
taosMemoryFree(pTag);
taosArrayDestroy(tags);
ASSERT(rname.ctbShortName && rname.ctbShortName[0]);
......
......@@ -298,8 +298,8 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
}
static int compareKv(const void* p1, const void* p2) {
SSmlKv* kv1 = *(SSmlKv**)p1;
SSmlKv* kv2 = *(SSmlKv**)p2;
SSmlKv* kv1 = (SSmlKv*)p1;
SSmlKv* kv2 = (SSmlKv*)p2;
int32_t kvLen1 = kv1->keyLen;
int32_t kvLen2 = kv2->keyLen;
int32_t res = strncasecmp(kv1->key, kv2->key, TMIN(kvLen1, kvLen2));
......@@ -320,7 +320,7 @@ void buildChildTableName(RandTableName* rName) {
taosArraySort(rName->tags, compareKv);
for (int j = 0; j < taosArrayGetSize(rName->tags); ++j) {
taosStringBuilderAppendChar(&sb, ',');
SSmlKv* tagKv = taosArrayGetP(rName->tags, j);
SSmlKv* tagKv = taosArrayGet(rName->tags, j);
taosStringBuilderAppendStringLen(&sb, tagKv->key, tagKv->keyLen);
taosStringBuilderAppendChar(&sb, '=');
if (IS_VAR_DATA_TYPE(tagKv->type)) {
......
......@@ -56,7 +56,7 @@ static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSche
int32_t code = TSDB_CODE_SUCCESS;
for (int i = 0; i < taosArrayGetSize(cols); ++i) {
SSmlKv* kv = taosArrayGetP(cols, i);
SSmlKv* kv = taosArrayGet(cols, i);
SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key};
col_id_t t = lastColIdx + 1;
col_id_t index = ((t == 0 && !isTag) ? 0 : insFindCol(&sToken, t, pBoundInfo->numOfCols, pSchema));
......@@ -111,7 +111,7 @@ static int32_t smlBuildTagRow(SArray* cols, SBoundColInfo* tags, SSchema* pSchem
int32_t code = TSDB_CODE_SUCCESS;
for (int i = 0; i < tags->numOfBound; ++i) {
SSchema* pTagSchema = &pSchema[tags->pColIndex[i]];
SSmlKv* kv = taosArrayGetP(cols, i);
SSmlKv* kv = taosArrayGet(cols, i);
taosArrayPush(*tagName, pTagSchema->name);
STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
......@@ -158,9 +158,68 @@ end:
return code;
}
int32_t smlBindData(SQuery* query, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,
char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf,
int16_t msgBufLen) {
STableDataCxt* smlInitTableDataCtx(SQuery* query, STableMeta* pTableMeta){
STableDataCxt* pTableCxt = NULL;
SVCreateTbReq *pCreateTbReq = NULL;
int ret = insGetTableDataCxt(((SVnodeModifOpStmt *)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid),
pTableMeta, &pCreateTbReq, &pTableCxt, false);
if (ret != TSDB_CODE_SUCCESS) {
return NULL;
}
ret = initTableColSubmitData(pTableCxt);
if (ret != TSDB_CODE_SUCCESS) {
return NULL;
}
return pTableCxt;
}
int32_t smlBuildRow(STableDataCxt* pTableCxt){
SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
int ret = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
if (TSDB_CODE_SUCCESS != ret) {
return ret;
}
insCheckTableDataOrder(pTableCxt, TD_ROW_KEY(*pRow));
return TSDB_CODE_SUCCESS;
}
int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void *data, int32_t index){
int ret = TSDB_CODE_SUCCESS;
SSchema* pColSchema = schema + index;
SColVal* pVal = taosArrayGet(pTableCxt->pValues, index);
SSmlKv* kv = (SSmlKv*)data;
if (kv->type == TSDB_DATA_TYPE_NCHAR){
int32_t len = 0;
char* pUcs4 = taosMemoryCalloc(1, pColSchema->bytes - VARSTR_HEADER_SIZE);
if (NULL == pUcs4) {
ret = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) {
if (errno == E2BIG) {
ret = TSDB_CODE_PAR_VALUE_TOO_LONG;
goto end;
}
ret = TSDB_CODE_TSC_INVALID_VALUE;
goto end;
}
pVal->value.pData = pUcs4;
pVal->value.nData = len;
} else if(kv->type == TSDB_DATA_TYPE_BINARY) {
pVal->value.nData = kv->length;
pVal->value.pData = (uint8_t *)kv->value;
} else {
memcpy(&pVal->value.val, &(kv->value), kv->length);
}
pVal->flag = CV_FLAG_VALUE;
end:
return ret;
}
int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSchema, SArray* cols, STableMeta* pTableMeta,
char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen) {
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
......@@ -193,6 +252,20 @@ int32_t smlBindData(SQuery* query, SArray* tags, SArray* colsSchema, SArray* col
pCreateTblReq->ctb.stbName = taosMemoryCalloc(1, sTableNameLen + 1);
memcpy(pCreateTblReq->ctb.stbName, sTableName, sTableNameLen);
if(dataFormat){
STableDataCxt** pTableCxt = (STableDataCxt**)taosHashGet(((SVnodeModifOpStmt *)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
if (NULL == pTableCxt) {
ret = buildInvalidOperationMsg(&pBuf, "dataformat true. get tableDataCtx error");
goto end;
}
(*pTableCxt)->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
(*pTableCxt)->pData->pCreateTbReq = pCreateTblReq;
(*pTableCxt)->pMeta->uid = pTableMeta->uid;
(*pTableCxt)->pMeta->vgId = pTableMeta->vgId;
pCreateTblReq = NULL;
goto end;
}
STableDataCxt* pTableCxt = NULL;
ret = insGetTableDataCxt(((SVnodeModifOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
sizeof(pTableMeta->uid), pTableMeta, &pCreateTblReq, &pTableCxt, false);
......@@ -227,15 +300,12 @@ int32_t smlBindData(SQuery* query, SArray* tags, SArray* colsSchema, SArray* col
for (int c = 0; c < pTableCxt->boundColsInfo.numOfBound; ++c) {
SSchema* pColSchema = &pSchema[pTableCxt->boundColsInfo.pColIndex[c]];
SColVal* pVal = taosArrayGet(pTableCxt->pValues, pTableCxt->boundColsInfo.pColIndex[c]);
SSmlKv* kv = NULL;
if (!format) {
void** p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name));
if (p) kv = *p;
}
if (kv == NULL) {
void** p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name));
if (p == NULL) {
continue;
}
SSmlKv *kv = *(SSmlKv **)p;
if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
}
......@@ -297,7 +367,7 @@ SQuery* smlInitHandle() {
qDestroyQuery(pQuery);
return NULL;
}
stmt->pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
stmt->freeHashFunc = insDestroyTableDataCxtHashMap;
stmt->freeArrayFunc = insDestroyVgroupDataCxtList;
......
......@@ -77,11 +77,20 @@ int smlProcess_telnet_Test() {
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
const char *sql[] = {"sys.if.bytes.out 1479496100 1.3E0 host=web01 interface=eth0",
char *sql[4] = {0};
sql[0] = taosMemoryCalloc(1, 128);
sql[1] = taosMemoryCalloc(1, 128);
sql[2] = taosMemoryCalloc(1, 128);
sql[3] = taosMemoryCalloc(1, 128);
const char *sql1[] = {"sys.if.bytes.out 1479496100 1.3E0 host=web01 interface=eth0",
"sys.if.bytes.out 1479496101 1.3E1 interface=eth0 host=web01 ",
"sys.if.bytes.out 1479496102 1.3E3 network=tcp",
" sys.procs.running 1479496100 42 host=web01 "};
for(int i = 0; i < 4; i++){
strncpy(sql[i], sql1[i], 128);
}
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_TELNET_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
......@@ -1017,7 +1026,7 @@ int smlProcess_18784_Test() {
printf("%s result:%s, rows:%d\n", __FUNCTION__, taos_errstr(pRes), taos_affected_rows(pRes));
int code = taos_errno(pRes);
ASSERT(!code);
ASSERT(taos_affected_rows(pRes) == 2);
ASSERT(taos_affected_rows(pRes) == 1);
taos_free_result(pRes);
pRes = taos_query(taos, "select * from disk");
......@@ -1091,9 +1100,10 @@ int sml_ts2164_Test() {
taos_free_result(pRes);
const char *sql[] = {
// "meters,location=la,groupid=ca current=11.8,voltage=221,phase=0.27",
"meters,location=la,groupid=ca current=11.8,voltage=221",
"meters,location=la,groupid=ca current=11.8,voltage=221,phase=0.27",
"meters,location=la,groupid=ca current=11.8,voltage=221,phase=0.27",
"meters,location=la,groupid=cb current=11.8,voltage=221,phase=0.27",
// "meters,location=la,groupid=cb current=11.8,voltage=221,phase=0.27",
};
pRes = taos_query(taos, "use line_test");
......@@ -1118,6 +1128,9 @@ int sml_ttl_Test() {
const char *sql[] = {
"meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=\"2022-02-0210:22:22\" 1626006833739000000",
};
const char *sql1[] = {
"meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=\"2022-02-0210:22:22\" 1626006833339000000",
};
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
......@@ -1127,6 +1140,11 @@ int sml_ttl_Test() {
printf("%s result1:%s\n", __FUNCTION__, taos_errstr(pRes));
taos_free_result(pRes);
pRes = taos_schemaless_insert_ttl(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, 20);
printf("%s result1:%s\n", __FUNCTION__, taos_errstr(pRes));
taos_free_result(pRes);
pRes = taos_query(taos, "select `ttl` from information_schema.ins_tables where table_name='t_be97833a0e1f523fcdaeb6291d6fdf27'");
printf("%s result2:%s\n", __FUNCTION__, taos_errstr(pRes));
TAOS_ROW row = taos_fetch_row(pRes);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册