From 517c8b1d5ad6192612784f02e1ecc5d9108a13b0 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 6 Dec 2022 21:07:53 +0800 Subject: [PATCH] fix:modify influxdb parse logic for sml --- include/libs/parser/parser.h | 6 +- source/client/src/clientSml.c | 1284 +++++++++++++++---------- source/libs/parser/src/parInsertSml.c | 88 +- 3 files changed, 854 insertions(+), 524 deletions(-) diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 88422072b8..67daaf2f2a 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -108,7 +108,11 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* void qDestroyBoundColInfo(void* pInfo); SQuery* smlInitHandle(); -int32_t smlBindData(SQuery* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta, +int32_t smlBuildRow(STableDataCxt* pTableCxt); +int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void *kv, int32_t index); +STableDataCxt* smlInitTableDataCtx(SQuery* query, STableMeta* pTableMeta); + +int32_t smlBindData(SQuery* handle, bool dataFormat, SArray* tags, SArray* colsSchema, SArray* cols, STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen); int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash); diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 163e704424..0ba3edbc15 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -20,6 +20,19 @@ #include "ttime.h" #include "ttypes.h" +#if (defined(__GNUC__) && (__GNUC__ >= 3)) || (defined(__INTEL_COMPILER) && (__INTEL_COMPILER >= 800)) || defined(__clang__) +# define expect(expr,value) (__builtin_expect ((expr),(value)) ) +#else +# define expect(expr,value) (expr) +#endif + +#ifndef likely +#define likely(expr) expect((expr) != 0, 1) +#endif +#ifndef unlikely +#define unlikely(expr) expect((expr) != 0, 0) +#endif + //================================================================================================= #define SPACE ' ' @@ -102,6 +115,8 @@ typedef struct { int32_t tagsLen; int32_t colsLen; int32_t timestampLen; + + SArray *colArray; } SSmlLineInfo; typedef struct { @@ -112,9 +127,9 @@ typedef struct { SArray *tags; - // if info->formatData is true, elements are SArray. - // if info->formatData is false, elements are SHashObj for find by key quickly + // elements are SHashObj for find by key quickly SArray *cols; + STableDataCxt *tableDataCtx; } SSmlTableInfo; typedef struct { @@ -163,6 +178,7 @@ typedef struct { SMLProtocolType protocol; int8_t precision; + bool reRun; bool dataFormat; // true means that the name and order of keys in each line are the same(only for influx protocol) bool isRawLine; int32_t ttl; @@ -180,22 +196,37 @@ typedef struct { int32_t affectedRows; SSmlMsgBuf msgBuf; SHashObj *dumplicateKey; // for dumplicate key - SArray *colsContainer; // for cols parse, if dataFormat == false cJSON *root; // for parse json + SArray *lines; // element is SSmlLineInfo + + // + SHashObj *superTableTagKeyStr; + SHashObj *superTableColKeyStr; + void *currentLineTagKeys; + void *preLineTagKeys; + void *currentLineColKeys; + void *preLineColKeys; + + SArray *preLineTagKV; + SArray *preLineColKV; + + SSmlLineInfo preLine; + STableMeta *currSTableMeta; + STableDataCxt *currTableDataCtx; } SSmlHandle; //================================================================================================= //================================================================================================= static volatile int64_t linesSmlHandleId = 0; static int64_t smlGenId() { - int64_t id; + int64_t id; - do { - id = atomic_add_fetch_64(&linesSmlHandleId, 1); + do { + id = atomic_add_fetch_64(&linesSmlHandleId, 1); } while (id == 0); - return id; + return id; } static inline bool smlDoubleToInt64OverFlow(double num) { @@ -279,7 +310,7 @@ static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SH int32_t code = TSDB_CODE_SUCCESS; for (int j = 0; j < taosArrayGetSize(cols); ++j) { if (j == 0 && !isTag) continue; - SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, j); + SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j); code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, info); if (code != TSDB_CODE_SUCCESS) { return code; @@ -301,7 +332,7 @@ static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool i = 1; } for (; i < taosArrayGetSize(cols); i++) { - SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i); + SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i); if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) { taosHashCleanup(hashTmp); return -1; @@ -322,7 +353,7 @@ static int32_t getBytes(uint8_t type, int32_t length) { static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols, SArray *results, int32_t numOfCols, bool isTag) { for (int j = 0; j < taosArrayGetSize(cols); ++j) { - SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, j); + SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j); ESchemaAction action = SCHEMA_ACTION_NULL; smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, &action, info); if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_ADD_TAG) { @@ -423,7 +454,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, code = pRequest->code; taosMemoryFree(pCmdMsg.pMsg); -end: + end: destroyRequest(pRequest); tFreeSMCreateStbReq(&pReq); return code; @@ -602,13 +633,14 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { } return 0; -end: + end: taosHashCleanup(hashTmp); taosMemoryFreeClear(pTableMeta); // catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1); return code; } +/******************************* parse basic type function **********************/ static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg) { const char *pVal = kvVal->value; int32_t len = kvVal->length; @@ -752,7 +784,9 @@ static bool smlIsNchar(const char *pVal, uint16_t len) { } return false; } +/******************************* parse basic type function end **********************/ +/******************************* time function **********************/ static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) { char *endPtr = NULL; int64_t tsInt64 = taosStr2Int64(value, &endPtr, 10); @@ -882,21 +916,213 @@ static int32_t smlParseTS(SSmlHandle *info, const char *data, int32_t len, SArra } // add ts to - SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); - if (!kv) { - return TSDB_CODE_OUT_OF_MEMORY; + SSmlKv kv = { .key = TS, .keyLen = TS_LEN, .i = ts, .type = TSDB_DATA_TYPE_TIMESTAMP, .length = (int16_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes}; + if(info->dataFormat){ + kv.i = convertTimePrecision(kv.i, TSDB_TIME_PRECISION_NANO, info->currSTableMeta->tableInfo.precision); + smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0); + }else{ + taosArraySet(cols, 0, &kv); } - kv->key = TS; - kv->keyLen = TS_LEN; - kv->i = ts; - kv->type = TSDB_DATA_TYPE_TIMESTAMP; - kv->length = (int16_t)tDataTypes[kv->type].bytes; - taosArrayPush(cols, &kv); + return TSDB_CODE_SUCCESS; +} +/******************************* time function end **********************/ + +/******************************* Sml struct related function **********************/ +static SSmlTableInfo *smlBuildTableInfo(int numRows, const char* measure, int32_t measureLen) { + SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1); + if (!tag) { + return NULL; + } + + tag->sTableName = measure; + tag->sTableNameLen = measureLen; + + tag->cols = taosArrayInit(numRows, POINTER_BYTES); + if (tag->cols == NULL) { + uError("SML:smlBuildTableInfo failed to allocate memory"); + goto cleanup; + } + + tag->tags = taosArrayInit(16, sizeof(SSmlKv)); + if (tag->tags == NULL) { + uError("SML:smlBuildTableInfo failed to allocate memory"); + goto cleanup; + } + return tag; + + cleanup: + taosMemoryFree(tag); + return NULL; +} + +static int32_t smlCheckDupUnit(SHashObj *dumplicateKey, SArray *tags, SSmlMsgBuf *msg){ + for(int i = 0; i < taosArrayGetSize(tags); i++) { + SSmlKv *tag = taosArrayGet(tags, i); + if (smlCheckDuplicateKey(tag->key, tag->keyLen, dumplicateKey)) { + smlBuildInvalidDataMsg(msg, "dumplicate key", tag->key); + return TSDB_CODE_TSC_DUP_NAMES; + } + } + return TSDB_CODE_SUCCESS; +} + +static int32_t smlJudgeDupColName(SArray *cols, SArray *tags, SSmlMsgBuf *msg) { + SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + int ret = smlCheckDupUnit(dumplicateKey, cols, msg); + if(ret != TSDB_CODE_SUCCESS){ + goto end; + } + ret = smlCheckDupUnit(dumplicateKey, tags, msg); + if(ret != TSDB_CODE_SUCCESS){ + goto end; + } + + end: + taosHashCleanup(dumplicateKey); + return ret; +} + +static int32_t smlParseTableName(SArray *tags, char *childTableName) { + size_t childTableNameLen = strlen(tsSmlChildTableName); + if (childTableNameLen <= 0) return TSDB_CODE_SUCCESS; + + for(int i = 0; i < taosArrayGetSize(tags); i++){ + SSmlKv *tag = taosArrayGet(tags, i); + // handle child table name + if (childTableNameLen == tag->keyLen && strncmp(tag->key, tsSmlChildTableName, tag->keyLen) == 0) { + memset(childTableName, 0, TSDB_TABLE_NAME_LEN); + strncpy(childTableName, tag->value, (tag->length < TSDB_TABLE_NAME_LEN ? tag->length : TSDB_TABLE_NAME_LEN)); + break; + } + } return TSDB_CODE_SUCCESS; } +static int32_t smlSetCTableName(SSmlTableInfo *oneTable){ + smlParseTableName(oneTable->tags, oneTable->childTableName); + + if (strlen(oneTable->childTableName) == 0) { + RandTableName rName = {oneTable->tags, oneTable->sTableName, (uint8_t)oneTable->sTableNameLen, + oneTable->childTableName, 0}; + + buildChildTableName(&rName); + oneTable->uid = rName.uid; + } else { + oneTable->uid = *(uint64_t *)(oneTable->childTableName); + } + return TSDB_CODE_SUCCESS; +} + +static SSmlSTableMeta *smlBuildSTableMeta(bool isDataFormat) { + SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1); + if (!meta) { + return NULL; + } + + if(unlikely(!isDataFormat)){ + meta->tagHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + if (meta->tagHash == NULL) { + uError("SML:smlBuildSTableMeta failed to allocate memory"); + goto cleanup; + } + + meta->colHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + if (meta->colHash == NULL) { + uError("SML:smlBuildSTableMeta failed to allocate memory"); + goto cleanup; + } + } + + meta->tags = taosArrayInit(32, sizeof(SSmlKv)); + if (meta->tags == NULL) { + uError("SML:smlBuildSTableMeta failed to allocate memory"); + goto cleanup; + } + + meta->cols = taosArrayInit(32, sizeof(SSmlKv)); + if (meta->cols == NULL) { + uError("SML:smlBuildSTableMeta failed to allocate memory"); + goto cleanup; + } + return meta; + + cleanup: + taosMemoryFree(meta); + return NULL; +} + +static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols){ + for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) { + SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i); + taosArrayPush(metaArray, &kv); + if(unlikely(metaHash != NULL)) { + taosHashPut(metaHash, kv->key, kv->keyLen, &i, SHORT_BYTES); + } + } +} + +bool smlFormatJudge(SHashObj* superTableKeyStr, void** preLineKeys, void* currentLineKeys, + SSmlLineInfo *currElements, SSmlLineInfo *preElements, int32_t len){ + if(*preLineKeys == NULL){ + *preLineKeys = taosMemoryMalloc(len); + varDataCopy(*preLineKeys, currentLineKeys); + return true; + } + + // same measure + if(preElements->measureLen == currElements->measureLen + && memcmp(preElements->measure, currElements->measure, currElements->measureLen) == 0){ + if(varDataTLen(preLineKeys) != varDataTLen(currentLineKeys) + || memcmp(preLineKeys, currentLineKeys, varDataTLen(preLineKeys)) != 0){ + return false; + } + }else{ // diff measure + void *keyStr = taosHashGet(superTableKeyStr, currElements->measure, currElements->measureLen); + if(unlikely(keyStr == NULL)){ + keyStr = taosMemoryMalloc(len); + varDataCopy(keyStr, currentLineKeys); + taosHashPut(superTableKeyStr, currElements->measure, currElements->measureLen, &keyStr, POINTER_BYTES); + }else{ + if(varDataTLen(keyStr) != varDataTLen(currentLineKeys) + && memcmp(keyStr, currentLineKeys, varDataTLen(currentLineKeys)) != 0){ + return false; + } + } + } + + return true; +} + +static STableMeta* smlGetMeta(SSmlHandle *info, const void* measure, int32_t measureLen){ + STableMeta *pTableMeta = NULL; + + SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}}; + tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname)); + + SRequestConnInfo conn = {0}; + conn.pTrans = info->taos->pAppInfo->pTransporter; + conn.requestId = info->pRequest->requestId; + conn.requestObjRefId = info->pRequest->self; + conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp); + memset(pName.tname, 0, TSDB_TABLE_NAME_LEN); + memcpy(pName.tname, measure, measureLen); + + catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta); + return pTableMeta; +} + +static void smlDestroySTableMeta(SSmlSTableMeta *meta) { + taosHashCleanup(meta->tagHash); + taosHashCleanup(meta->colHash); + taosArrayDestroy(meta->tags); + taosArrayDestroy(meta->cols); + taosMemoryFree(meta->tableMeta); + taosMemoryFree(meta); +} +/******************************* Sml struct related function end **********************/ + static int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) { // binary if (smlIsBinary(pVal->value, pVal->length)) { @@ -934,7 +1160,325 @@ static int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) { return TSDB_CODE_TSC_INVALID_VALUE; } -static int32_t smlParseInfluxString(const char *sql, const char *sqlEnd, SSmlLineInfo *elements, SSmlMsgBuf *msg) { +static int32_t smlParseKv(SSmlHandle *info, const char **sql, const char *sqlEnd, + SSmlLineInfo* currElement, bool isTag){ + bool isSameMeasure = false; + bool isSameCTable = false; + int cnt = 0; + void *keyStr = NULL; + bool isPreLineKVNULL = false; + SArray *preLineKV = NULL; + bool isSuperKVInit = false; + SArray *superKV = NULL; + if(info->dataFormat){ + if(currElement->measureLen == info->preLine.measureLen + && memcmp(currElement->measure, info->preLine.measure, currElement->measureLen) == 0){ + isSameMeasure = true; + } + + if(!isSameMeasure){ + SSmlSTableMeta *sMeta = NULL; + SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen); + if(tableMeta == NULL){ + SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat); + STableMeta * pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen); + meta->tableMeta = pTableMeta; + if(pTableMeta == NULL){ + info->dataFormat = false; + info->reRun = true; + return TSDB_CODE_SUCCESS; + } + taosHashPut(info->superTables, currElement->measure, currElement->measureLen, &meta, POINTER_BYTES); + sMeta = meta; + }else{ + sMeta = *tableMeta; + } + info->currSTableMeta = sMeta->tableMeta; + + if(isTag){ + superKV = sMeta->tags; + }else{ + superKV = sMeta->cols; + } + if(unlikely(taosArrayGetSize(superKV) == 0)){ + isSuperKVInit = true; + } + } + + if(currElement->measureTagsLen == info->preLine.measureTagsLen + && memcmp(currElement->measure, info->preLine.measure, currElement->measureTagsLen) == 0){ + isSameCTable = true; + if(isTag) return TSDB_CODE_SUCCESS; + }else if(!isTag){ + SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashGet(info->childTables, currElement->measure, currElement->measureTagsLen); + if (unlikely(oneTable == NULL)) { + smlBuildInvalidDataMsg(&info->msgBuf, "child table should inside", currElement->measure); + return TSDB_CODE_SML_INVALID_DATA; + } + info->currTableDataCtx = (*oneTable)->tableDataCtx; + } + + if(isTag){ + // prepare for judging if tag or col is the same for each line + if(unlikely(info->currentLineTagKeys == NULL)){ // sml todo size need remalloc + info->currentLineTagKeys = taosMemoryMalloc(sqlEnd - *sql); + } + keyStr = info->preLineTagKeys; + + if(info->preLineTagKV == NULL){ + info->preLineTagKV = taosArrayInit(8, sizeof(SSmlKv)); + isPreLineKVNULL = true; + } + preLineKV = info->preLineTagKV; + }else{ + if(unlikely(info->currentLineColKeys == NULL)){ // sml todo size need remalloc + info->currentLineColKeys = taosMemoryMalloc(sqlEnd - *sql); + } + keyStr = info->preLineColKeys; + + if(info->preLineColKV == NULL){ + info->preLineColKV = taosArrayInit(8, sizeof(SSmlKv)); + isPreLineKVNULL = true; + } + preLineKV = info->preLineColKV; + } + + if(!isSameMeasure){ + taosArraySetSize(preLineKV, 0); + } + varDataLen(keyStr) = 0; // clear keys + } + + while (*sql < sqlEnd) { + if (IS_SPACE(*sql)) { + break; + } + + // parse key + const char *key = *sql; + int32_t keyLen = 0; + while (*sql < sqlEnd) { + + if (IS_COMMA(*sql)) { + smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql); + return TSDB_CODE_SML_INVALID_DATA; + } + if (IS_EQUAL(*sql)) { + keyLen = *sql - key; + (*sql)++; + break; + } + (*sql)++; + } + + if (IS_INVALID_COL_LEN(keyLen)) { + smlBuildInvalidDataMsg(&info->msgBuf, "invalid key or key is too long than 64", key); + return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH; + } + + memcpy(keyStr + varDataTLen(keyStr), key, keyLen + 1); // use = symbol + varDataLen(keyStr) += keyLen + 1; + + // parse value + const char *value = *sql; + int32_t valueLen = 0; + bool isInQuote = false; + while (*sql < sqlEnd) { + // parse value + if (!isTag && IS_QUOTE(*sql)) { + isInQuote = !isInQuote; + (*sql)++; + continue; + } + if (!isInQuote && IS_COMMA(*sql)) { + break; + } + if (!isInQuote && IS_EQUAL(*sql)) { + smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql); + return TSDB_CODE_SML_INVALID_DATA; + } + (*sql)++; + } + valueLen = *sql - value; + + if (isInQuote) { + smlBuildInvalidDataMsg(&info->msgBuf, "only one quote", value); + return TSDB_CODE_SML_INVALID_DATA; + } + if (valueLen == 0) { + smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", value); + return TSDB_CODE_SML_INVALID_DATA; + } + PROCESS_SLASH(key, keyLen) + PROCESS_SLASH(value, valueLen) + + (*sql)++; + + SSmlKv kv = {.key = key, .keyLen = keyLen, .value = value, .length = valueLen}; + if (!isTag) { + int32_t ret = smlParseValue(&kv, &info->msgBuf); + if (ret != TSDB_CODE_SUCCESS) { + return ret; + } + } else { + if (valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) { + return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN; + } + kv.type = TSDB_DATA_TYPE_NCHAR; + } + + if(info->dataFormat){ + if(!isTag && cnt + 1 > info->currSTableMeta->tableInfo.numOfColumns){ + smlBuildInvalidDataMsg(&info->msgBuf, "col more than meta", NULL); + return TSDB_CODE_PAR_TOO_MANY_COLUMNS; + } + if(isTag && cnt + 1 > info->currSTableMeta->tableInfo.numOfTags){ + smlBuildInvalidDataMsg(&info->msgBuf, "tag more than meta", NULL); + return TSDB_CODE_PAR_TOO_MANY_COLUMNS; + } + // bind data + int ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, cnt + 1); + if (ret != TSDB_CODE_SUCCESS) { + smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL); + return ret; + } + + do { + if(isPreLineKVNULL){ + taosArrayPush(preLineKV, &kv); + break; + } + + if(isSameMeasure){ + if(cnt >= taosArrayGetSize(preLineKV)) { + info->dataFormat = false; + info->reRun = true; + return TSDB_CODE_SUCCESS; + } + SSmlKv *preKV = taosArrayGet(preLineKV, cnt); + if(!isTag && kv.type != preKV->type){ + info->dataFormat = false; + info->reRun = true; + return TSDB_CODE_SUCCESS; + } + + if(IS_VAR_DATA_TYPE(kv.type) && kv.length > preKV->length){ + preKV->length = kv.length; + SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen); + if(tableMeta == NULL){ + smlBuildInvalidDataMsg(&info->msgBuf, "measure should has inside", value); + return TSDB_CODE_SML_INVALID_DATA; + } + + if(isTag){ + superKV = (*tableMeta)->tags; + }else{ + superKV = (*tableMeta)->cols; + } + SSmlKv *oldKV = taosArrayGet(superKV, cnt); + oldKV->length = kv.length; + } + }else{ + if(isSuperKVInit){ + taosArrayPush(superKV, &kv); + }else{ + if(cnt >= taosArrayGetSize(superKV)) { + info->dataFormat = false; + info->reRun = true; + return TSDB_CODE_SUCCESS; + } + SSmlKv *preKV = taosArrayGet(superKV, cnt); + if(!isTag && kv.type != preKV->type){ + info->dataFormat = false; + info->reRun = true; + return TSDB_CODE_SUCCESS; + } + + if(IS_VAR_DATA_TYPE(kv.type)){ + if(kv.length > preKV->length) { + preKV->length = kv.length; + }else{ + kv.length = preKV->length; + } + } + } + taosArrayPush(preLineKV, &kv); + } + cnt++; + break; + }while(0); + } + + if(!info->dataFormat && !isTag){ + if(currElement->colArray == NULL){ + currElement->colArray = taosArrayInit(16, sizeof(SSmlKv)); + taosArraySetSize(currElement->colArray, 1); + } + taosArrayPush(currElement->colArray, &kv); //reserve for timestamp + } + } + + if(isTag && taosArrayGetSize(preLineKV) > TSDB_MAX_TAGS){ + smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL); + return TSDB_CODE_PAR_INVALID_TAGS_NUM; + } + + if(info->dataFormat){ + if(isTag){ + info->dataFormat = smlFormatJudge(info->superTableTagKeyStr, &info->preLineTagKeys, + info->currentLineTagKeys, currElement, &info->preLine, sqlEnd - currElement->tags); + if(!info->dataFormat) { + info->reRun = true; + return TSDB_CODE_SUCCESS; + } + if(!isSameCTable){ + if(taosArrayGetSize(preLineKV) > TSDB_MAX_TAGS){ + smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL); + return TSDB_CODE_PAR_INVALID_TAGS_NUM; + } + + void* oneTable = taosHashGet(info->childTables, currElement->measure, currElement->measureTagsLen); + if (unlikely(oneTable == NULL)) { + SSmlTableInfo *tinfo = smlBuildTableInfo(1, currElement->measure, currElement->measureLen); + if (!tinfo) { + return TSDB_CODE_OUT_OF_MEMORY; + } + for(int i = 0; i < taosArrayGetSize(preLineKV); i++){ + taosArrayPush(tinfo->tags, taosArrayGet(preLineKV, i)); + } + smlSetCTableName(tinfo); + info->currSTableMeta->uid = tinfo->uid; + tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta); + taosHashPut(info->childTables, currElement->measure, currElement->measureTagsLen, &tinfo, POINTER_BYTES); + } + } + }else{ + info->dataFormat = smlFormatJudge(info->superTableColKeyStr, &info->preLineColKeys, + info->currentLineColKeys, currElement, &info->preLine, sqlEnd - currElement->cols); + if(!info->dataFormat) { + info->reRun = true; + return TSDB_CODE_SUCCESS; + } + } + }else{ + void* oneTable = taosHashGet(info->childTables, currElement->measure, currElement->measureTagsLen); + if (unlikely(oneTable == NULL)) { + SSmlTableInfo *tinfo = smlBuildTableInfo(info->affectedRows / 2, currElement->measure, currElement->measureLen); + if (!tinfo) { + return TSDB_CODE_OUT_OF_MEMORY; + } + for(int i = 0; i < taosArrayGetSize(preLineKV); i++){ + taosArrayPush(tinfo->tags, taosArrayGet(preLineKV, i)); + } + smlSetCTableName(tinfo); + taosHashPut(info->childTables, currElement->measure, currElement->measureTagsLen, &tinfo, POINTER_BYTES); + } + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t smlParseInfluxString(SSmlHandle *info, const char *sql, const char *sqlEnd, SSmlLineInfo *elements) { if (!sql) return TSDB_CODE_SML_INVALID_DATA; JUMP_SPACE(sql, sqlEnd) if (*sql == COMMA) return TSDB_CODE_SML_INVALID_DATA; @@ -958,46 +1502,57 @@ static int32_t smlParseInfluxString(const char *sql, const char *sqlEnd, SSmlLin } elements->measureLen = sql - elements->measure; if (IS_INVALID_TABLE_LEN(elements->measureLen)) { - smlBuildInvalidDataMsg(msg, "measure is empty or too large than 192", NULL); + smlBuildInvalidDataMsg(&info->msgBuf, "measure is empty or too large than 192", NULL); return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH; } + // to get measureTagsLen before + const char* tmp = sql; + while (tmp < sqlEnd){ + if (IS_SPACE(tmp)) { + break; + } + tmp++; + } + elements->measureTagsLen = tmp - elements->measure; + // parse tag if (*sql == SPACE) { elements->tagsLen = 0; } else { if (*sql == COMMA) sql++; elements->tags = sql; - while (sql < sqlEnd) { - if (IS_SPACE(sql)) { - break; - } - sql++; + + // tinfo != NULL means child table has never occur before + int ret = smlParseKv(info, &sql, sqlEnd, elements, true); + if(ret != TSDB_CODE_SUCCESS){ + return ret; } + sql = elements->measure + elements->measureTagsLen; + + if(info->reRun){ + return TSDB_CODE_SUCCESS; + } + elements->tagsLen = sql - elements->tags; } - elements->measureTagsLen = sql - elements->measure; // parse cols JUMP_SPACE(sql, sqlEnd) elements->cols = sql; - bool isInQuote = false; - while (sql < sqlEnd) { - if (IS_QUOTE(sql)) { - isInQuote = !isInQuote; - } - if (!isInQuote && IS_SPACE(sql)) { - break; - } - sql++; + + int ret = smlParseKv(info, &sql, sqlEnd, elements, false); + if(ret != TSDB_CODE_SUCCESS){ + return ret; } - if (isInQuote) { - smlBuildInvalidDataMsg(msg, "only one quote", elements->cols); - return TSDB_CODE_SML_INVALID_DATA; + + if(info->reRun){ + return TSDB_CODE_SUCCESS; } + elements->colsLen = sql - elements->cols; if (elements->colsLen == 0) { - smlBuildInvalidDataMsg(msg, "cols is empty", NULL); + smlBuildInvalidDataMsg(&info->msgBuf, "cols is empty", NULL); return TSDB_CODE_SML_INVALID_DATA; } @@ -1012,6 +1567,17 @@ static int32_t smlParseInfluxString(const char *sql, const char *sqlEnd, SSmlLin } elements->timestampLen = sql - elements->timestamp; + ret = smlParseTS(info, elements->timestamp, elements->timestampLen, elements->colArray); + if (ret != TSDB_CODE_SUCCESS) { + uError("SML:0x%" PRIx64 " smlParseTS failed", info->id); + return ret; + } + + if(info->dataFormat){ + smlBuildRow(info->currTableDataCtx); + info->preLine = *elements; + } + return TSDB_CODE_SUCCESS; } @@ -1138,151 +1704,54 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char *sql, const cha // parse value const char *value = NULL; - int32_t valueLen = 0; - smlParseTelnetElement(&sql, sqlEnd, &value, &valueLen); - if (!value || valueLen == 0) { - smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql); - return TSDB_CODE_TSC_INVALID_VALUE; - } - - SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); - if (!kv) return TSDB_CODE_OUT_OF_MEMORY; - taosArrayPush(cols, &kv); - kv->key = VALUE; - kv->keyLen = VALUE_LEN; - kv->value = value; - kv->length = valueLen; - if ((ret = smlParseValue(kv, &info->msgBuf)) != TSDB_CODE_SUCCESS) { - return ret; - } - - // parse tags - ret = smlParseTelnetTags(sql, sqlEnd, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf); - if (ret != TSDB_CODE_SUCCESS) { - smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql); - return ret; - } - - return TSDB_CODE_SUCCESS; -} - -static int32_t smlParseCols(const char *data, int32_t len, SArray *cols, char *childTableName, bool isTag, - SHashObj *dumplicateKey, SSmlMsgBuf *msg) { - if (len == 0) { - return TSDB_CODE_SUCCESS; - } - - size_t childTableNameLen = strlen(tsSmlChildTableName); - const char *sql = data; - while (sql < data + len) { - const char *key = sql; - int32_t keyLen = 0; - - while (sql < data + len) { - // parse key - if (IS_COMMA(sql)) { - smlBuildInvalidDataMsg(msg, "invalid data", sql); - return TSDB_CODE_SML_INVALID_DATA; - } - if (IS_EQUAL(sql)) { - keyLen = sql - key; - sql++; - break; - } - sql++; - } - - if (IS_INVALID_COL_LEN(keyLen)) { - smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key); - return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH; - } - if (smlCheckDuplicateKey(key, keyLen, dumplicateKey)) { - smlBuildInvalidDataMsg(msg, "dumplicate key", key); - return TSDB_CODE_TSC_DUP_NAMES; - } - - // parse value - const char *value = sql; - int32_t valueLen = 0; - bool isInQuote = false; - while (sql < data + len) { - // parse value - if (!isTag && IS_QUOTE(sql)) { - isInQuote = !isInQuote; - sql++; - continue; - } - if (!isInQuote && IS_COMMA(sql)) { - break; - } - if (!isInQuote && IS_EQUAL(sql)) { - smlBuildInvalidDataMsg(msg, "invalid data", sql); - return TSDB_CODE_SML_INVALID_DATA; - } - sql++; - } - valueLen = sql - value; - sql++; - - if (isInQuote) { - smlBuildInvalidDataMsg(msg, "only one quote", value); - return TSDB_CODE_SML_INVALID_DATA; - } - if (valueLen == 0) { - smlBuildInvalidDataMsg(msg, "invalid value", value); - return TSDB_CODE_SML_INVALID_DATA; - } - PROCESS_SLASH(key, keyLen) - PROCESS_SLASH(value, valueLen) - - // handle child table name - if (childTableName && childTableNameLen != 0 && strncmp(key, tsSmlChildTableName, keyLen) == 0) { - memset(childTableName, 0, TSDB_TABLE_NAME_LEN); - strncpy(childTableName, value, (valueLen < TSDB_TABLE_NAME_LEN ? valueLen : TSDB_TABLE_NAME_LEN)); - continue; - } + int32_t valueLen = 0; + smlParseTelnetElement(&sql, sqlEnd, &value, &valueLen); + if (!value || valueLen == 0) { + smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql); + return TSDB_CODE_TSC_INVALID_VALUE; + } - // add kv to SSmlKv - SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); - if (!kv) return TSDB_CODE_OUT_OF_MEMORY; - if (cols) taosArrayPush(cols, &kv); + SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); + if (!kv) return TSDB_CODE_OUT_OF_MEMORY; + taosArrayPush(cols, &kv); + kv->key = VALUE; + kv->keyLen = VALUE_LEN; + kv->value = value; + kv->length = valueLen; + if ((ret = smlParseValue(kv, &info->msgBuf)) != TSDB_CODE_SUCCESS) { + return ret; + } - kv->key = key; - kv->keyLen = keyLen; - kv->value = value; - kv->length = valueLen; - if (isTag) { - if (valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) { - return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN; - } - kv->type = TSDB_DATA_TYPE_NCHAR; - } else { - int32_t ret = smlParseValue(kv, msg); - if (ret != TSDB_CODE_SUCCESS) { - return ret; - } - } + // parse tags + ret = smlParseTelnetTags(sql, sqlEnd, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf); + if (ret != TSDB_CODE_SUCCESS) { + smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql); + return ret; } return TSDB_CODE_SUCCESS; } -static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, SSmlMsgBuf *msg) { +static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, bool isTag, SSmlMsgBuf *msg) { for (int i = 0; i < taosArrayGetSize(cols); ++i) { - SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i); + SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i); int16_t *index = (int16_t *)taosHashGet(metaHash, kv->key, kv->keyLen); if (index) { - SSmlKv **value = (SSmlKv **)taosArrayGet(metaArray, *index); - if (kv->type != (*value)->type) { + SSmlKv *value = (SSmlKv *)taosArrayGet(metaArray, *index); + if (isTag){ + if (kv->length > value->length) { + value->length = kv->length; + } + continue; + } + if (kv->type != value->type) { smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key); return TSDB_CODE_SML_NOT_SAME_TYPE; - } else { - if (IS_VAR_DATA_TYPE(kv->type)) { // update string len, if bigger - if (kv->length > (*value)->length) { - *value = kv; - } - } + } + + if (IS_VAR_DATA_TYPE(kv->type) && (kv->length > value->length)) { // update string len, if bigger + value->length = kv->length; } } else { size_t tmp = taosArrayGetSize(metaArray); @@ -1296,194 +1765,32 @@ static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols return TSDB_CODE_SUCCESS; } -static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols) { - for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) { - SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i); - taosArrayPush(metaArray, &kv); - taosHashPut(metaHash, kv->key, kv->keyLen, &i, SHORT_BYTES); - } -} - -static SSmlTableInfo *smlBuildTableInfo() { - SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1); - if (!tag) { - return NULL; - } - - tag->cols = taosArrayInit(16, POINTER_BYTES); - if (tag->cols == NULL) { - uError("SML:smlBuildTableInfo failed to allocate memory"); - goto cleanup; - } - - tag->tags = taosArrayInit(16, POINTER_BYTES); - if (tag->tags == NULL) { - uError("SML:smlBuildTableInfo failed to allocate memory"); - goto cleanup; - } - return tag; - -cleanup: - taosMemoryFree(tag); - return NULL; -} - static void smlDestroyTableInfo(SSmlHandle *info, SSmlTableInfo *tag) { - if (info->dataFormat) { - for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) { - SArray *kvArray = (SArray *)taosArrayGetP(tag->cols, i); - for (int j = 0; j < taosArrayGetSize(kvArray); ++j) { - SSmlKv *p = (SSmlKv *)taosArrayGetP(kvArray, j); - taosMemoryFree(p); - } - taosArrayDestroy(kvArray); - } - } else { - for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) { - SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i); - void **p1 = (void **)taosHashIterate(kvHash, NULL); - while (p1) { - taosMemoryFree(*p1); - p1 = (void **)taosHashIterate(kvHash, p1); - } - taosHashCleanup(kvHash); - } - } - for (size_t i = 0; i < taosArrayGetSize(tag->tags); i++) { - SSmlKv *p = (SSmlKv *)taosArrayGetP(tag->tags, i); - taosMemoryFree(p); + for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) { + SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i); + taosHashCleanup(kvHash); } + taosArrayDestroy(tag->cols); taosArrayDestroy(tag->tags); taosMemoryFree(tag); } -static int32_t smlKvTimeArrayCompare(const void *key1, const void *key2) { - SArray *s1 = *(SArray **)key1; - SArray *s2 = *(SArray **)key2; - SSmlKv *kv1 = (SSmlKv *)taosArrayGetP(s1, 0); - SSmlKv *kv2 = (SSmlKv *)taosArrayGetP(s2, 0); - ASSERT(kv1->type == TSDB_DATA_TYPE_TIMESTAMP); - ASSERT(kv2->type == TSDB_DATA_TYPE_TIMESTAMP); - if (kv1->i < kv2->i) { - return -1; - } else if (kv1->i > kv2->i) { - return 1; - } else { - return 0; - } -} - -static int32_t smlKvTimeHashCompare(const void *key1, const void *key2) { - SHashObj *s1 = *(SHashObj **)key1; - SHashObj *s2 = *(SHashObj **)key2; - SSmlKv **kv1pp = (SSmlKv **)taosHashGet(s1, TS, TS_LEN); - SSmlKv **kv2pp = (SSmlKv **)taosHashGet(s2, TS, TS_LEN); - if (!kv1pp || !kv2pp) { - uError("smlKvTimeHashCompare kv is null"); - return -1; - } - SSmlKv *kv1 = *kv1pp; - SSmlKv *kv2 = *kv2pp; - if (!kv1 || kv1->type != TSDB_DATA_TYPE_TIMESTAMP) { - uError("smlKvTimeHashCompare kv1"); - return -1; - } - if (!kv2 || kv2->type != TSDB_DATA_TYPE_TIMESTAMP) { - uError("smlKvTimeHashCompare kv2"); - return -1; - } - if (kv1->i < kv2->i) { - return -1; - } else if (kv1->i > kv2->i) { - return 1; - } else { - return 0; - } -} - -static int32_t smlDealCols(SSmlTableInfo *oneTable, bool dataFormat, SArray *cols) { - if (dataFormat) { - void *p = taosArraySearch(oneTable->cols, &cols, smlKvTimeArrayCompare, TD_GT); - if (p == NULL) { - taosArrayPush(oneTable->cols, &cols); - } else { - taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &cols); - } - return TSDB_CODE_SUCCESS; - } - +static int32_t smlPushCols(SArray *colsArray, SArray *cols) { SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); if (!kvHash) { uError("SML:smlDealCols failed to allocate memory"); - return TSDB_CODE_TSC_OUT_OF_MEMORY; + return TSDB_CODE_OUT_OF_MEMORY; } for (size_t i = 0; i < taosArrayGetSize(cols); i++) { - SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i); + SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i); taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); } - void *p = taosArraySearch(oneTable->cols, &kvHash, smlKvTimeHashCompare, TD_GT); - if (p == NULL) { - taosArrayPush(oneTable->cols, &kvHash); - } else { - taosArrayInsert(oneTable->cols, TARRAY_ELEM_IDX(oneTable->cols, p), &kvHash); - } + taosArrayPush(colsArray, &kvHash); return TSDB_CODE_SUCCESS; } -static SSmlSTableMeta *smlBuildSTableMeta() { - SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1); - if (!meta) { - return NULL; - } - meta->tagHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - if (meta->tagHash == NULL) { - uError("SML:smlBuildSTableMeta failed to allocate memory"); - goto cleanup; - } - - meta->colHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - if (meta->colHash == NULL) { - uError("SML:smlBuildSTableMeta failed to allocate memory"); - goto cleanup; - } - - meta->tags = taosArrayInit(32, POINTER_BYTES); - if (meta->tags == NULL) { - uError("SML:smlBuildSTableMeta failed to allocate memory"); - goto cleanup; - } - - meta->cols = taosArrayInit(32, POINTER_BYTES); - if (meta->cols == NULL) { - uError("SML:smlBuildSTableMeta failed to allocate memory"); - goto cleanup; - } - return meta; - -cleanup: - taosMemoryFree(meta); - return NULL; -} - -static void smlDestroySTableMeta(SSmlSTableMeta *meta) { - taosHashCleanup(meta->tagHash); - taosHashCleanup(meta->colHash); - taosArrayDestroy(meta->tags); - taosArrayDestroy(meta->cols); - taosMemoryFree(meta->tableMeta); - taosMemoryFree(meta); -} - -static void smlDestroyCols(SArray *cols) { - if (!cols) return; - for (int i = 0; i < taosArrayGetSize(cols); ++i) { - void *kv = taosArrayGetP(cols, i); - taosMemoryFree(kv); - } -} - static void smlDestroyInfo(SSmlHandle *info) { if (!info) return; qDestroyQuery(info->pQuery); @@ -1506,17 +1813,26 @@ static void smlDestroyInfo(SSmlHandle *info) { // destroy info->pVgHash taosHashCleanup(info->pVgHash); - taosHashCleanup(info->dumplicateKey); - if (!info->dataFormat) { - taosArrayDestroy(info->colsContainer); - } destroyRequest(info->pRequest); + taosHashCleanup(info->superTableTagKeyStr); + taosHashCleanup(info->superTableColKeyStr); + taosMemoryFree(info->currentLineTagKeys); + taosMemoryFree(info->preLineTagKeys); + taosMemoryFree(info->currentLineColKeys); + taosMemoryFree(info->preLineColKeys); + taosArrayDestroy(info->preLineTagKV); + taosArrayDestroy(info->preLineColKV); + + for(int i = 0; i < taosArrayGetSize(info->lines); i++){ + taosArrayDestroy(((SSmlLineInfo*)taosArrayGet(info->lines, i))->colArray); + } + taosArrayDestroy(info->lines); cJSON_Delete(info->root); taosMemoryFreeClear(info); } -static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLProtocolType protocol, int8_t precision) { +static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLProtocolType protocol, int8_t precision, int32_t perBatch) { int32_t code = TSDB_CODE_SUCCESS; SSmlHandle *info = (SSmlHandle *)taosMemoryCalloc(1, sizeof(SSmlHandle)); if (NULL == info) { @@ -1537,11 +1853,8 @@ static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLPr info->precision = precision; info->protocol = protocol; - if (protocol == TSDB_SML_LINE_PROTOCOL) { - info->dataFormat = tsSmlDataFormat; - } else { - info->dataFormat = true; - } + info->dataFormat = true; + info->affectedRows = perBatch; if (request) { info->pRequest = request; @@ -1549,26 +1862,22 @@ static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLPr info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE; } + info->lines = taosArrayInit(perBatch, sizeof(SSmlLineInfo)); + info->superTableTagKeyStr = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + info->superTableColKeyStr = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - info->dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - if (!info->dataFormat) { - info->colsContainer = taosArrayInit(32, POINTER_BYTES); - if (NULL == info->colsContainer) { - uError("SML:0x%" PRIx64 " create info failed", info->id); - goto cleanup; - } - } - if (NULL == info->pQuery || NULL == info->childTables || NULL == info->superTables || NULL == info->pVgHash || - NULL == info->dumplicateKey) { + if (NULL == info->pQuery || NULL == info->childTables || NULL == info->superTables || NULL == info->superTableTagKeyStr + || NULL == info->superTableColKeyStr || NULL == info->pVgHash) { uError("SML:0x%" PRIx64 " create info failed", info->id); goto cleanup; } return info; -cleanup: + cleanup: smlDestroyInfo(info); return NULL; } @@ -1724,15 +2033,9 @@ static int32_t smlParseTSFromJSON(SSmlHandle *info, cJSON *root, SArray *cols) { } // add ts to - SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); - if (!kv) { - return TSDB_CODE_OUT_OF_MEMORY; - } - kv->key = TS; - kv->keyLen = TS_LEN; - kv->i = tsVal; - kv->type = TSDB_DATA_TYPE_TIMESTAMP; - kv->length = (int16_t)tDataTypes[kv->type].bytes; + SSmlKv kv = {.key = TS, .keyLen = TS_LEN, .i = tsVal, + .type = TSDB_DATA_TYPE_TIMESTAMP, .length = (int16_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes}; + taosArrayPush(cols, &kv); return TSDB_CODE_SUCCESS; } @@ -1937,18 +2240,13 @@ static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) { return TSDB_CODE_TSC_INVALID_JSON; } - SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); - if (!kv) { - return TSDB_CODE_OUT_OF_MEMORY; - } - taosArrayPush(cols, &kv); - - kv->key = VALUE; - kv->keyLen = VALUE_LEN; - int32_t ret = smlParseValueFromJSON(metricVal, kv); + SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN}; + int32_t ret = smlParseValueFromJSON(metricVal, &kv); if (ret != TSDB_CODE_SUCCESS) { return ret; } + taosArrayPush(cols, &kv); + return TSDB_CODE_SUCCESS; } @@ -1992,19 +2290,13 @@ static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, char *childTableN } // add kv to SSmlKv - SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); - if (!kv) return TSDB_CODE_OUT_OF_MEMORY; - taosArrayPush(pKVs, &kv); - - // key - kv->keyLen = keyLen; - kv->key = tag->string; - + SSmlKv kv ={.key = tag->string, .keyLen = keyLen}; // value - ret = smlParseValueFromJSON(tag, kv); + ret = smlParseValueFromJSON(tag, &kv); if (ret != TSDB_CODE_SUCCESS) { return ret; } + taosArrayPush(pKVs, &kv); } return ret; @@ -2061,128 +2353,70 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo * } /************* TSDB_SML_JSON_PROTOCOL function end **************/ -static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql, const int len) { - SSmlLineInfo elements = {0}; - uDebug("SML:0x%" PRIx64 " smlParseInfluxLine raw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, (info->isRawLine ? "rawdata" : sql)); - - int ret = smlParseInfluxString(sql, sql + len, &elements, &info->msgBuf); - if (ret != TSDB_CODE_SUCCESS) { - uError("SML:0x%" PRIx64 " smlParseInfluxLine failed", info->id); - return ret; - } - - SArray *cols = NULL; - if (info->dataFormat) { // if dataFormat, cols need new memory to save data - cols = taosArrayInit(16, POINTER_BYTES); - if (cols == NULL) { - uError("SML:0x%" PRIx64 " smlParseInfluxLine failed to allocate memory", info->id); - return TSDB_CODE_TSC_OUT_OF_MEMORY; +static int32_t smlParseLineBottom(SSmlHandle *info) { + if(info->dataFormat) return TSDB_CODE_SUCCESS; + + for(int32_t i = 0; i < taosArrayGetSize(info->lines); i ++){ + SSmlLineInfo* elements = taosArrayGet(info->lines, i); + bool hasTable = true; + SSmlTableInfo *tinfo = NULL; + SSmlTableInfo **oneTable = + (SSmlTableInfo **)taosHashGet(info->childTables, elements->measure, elements->measureTagsLen); + if(oneTable == NULL){ + uError("SML:0x%" PRIx64 "get oneTable failed, line num:%d", info->id, i); + smlBuildInvalidDataMsg(&info->msgBuf, "get oneTable failed", elements->measure); + return TSDB_CODE_SML_INVALID_DATA; } - } else { // if dataFormat is false, cols do not need to save data, there is another new memory to save data - cols = info->colsContainer; - } - - ret = smlParseTS(info, elements.timestamp, elements.timestampLen, cols); - if (ret != TSDB_CODE_SUCCESS) { - uError("SML:0x%" PRIx64 " smlParseTS failed", info->id); - if (info->dataFormat) taosArrayDestroy(cols); - return ret; - } - ret = smlParseCols(elements.cols, elements.colsLen, cols, NULL, false, info->dumplicateKey, &info->msgBuf); - if (ret != TSDB_CODE_SUCCESS) { - uError("SML:0x%" PRIx64 " smlParseCols parse cloums fields failed", info->id); - smlDestroyCols(cols); - if (info->dataFormat) taosArrayDestroy(cols); - return ret; - } + tinfo = *oneTable; - bool hasTable = true; - SSmlTableInfo *tinfo = NULL; - SSmlTableInfo **oneTable = - (SSmlTableInfo **)taosHashGet(info->childTables, elements.measure, elements.measureTagsLen); - if (!oneTable) { - tinfo = smlBuildTableInfo(); - if (!tinfo) { - smlDestroyCols(cols); - if (info->dataFormat) taosArrayDestroy(cols); - return TSDB_CODE_TSC_OUT_OF_MEMORY; + if (taosArrayGetSize(elements->colArray) + taosArrayGetSize(tinfo->tags) > TSDB_MAX_COLUMNS) { + smlBuildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL); + return TSDB_CODE_PAR_TOO_MANY_COLUMNS; } - taosHashPut(info->childTables, elements.measure, elements.measureTagsLen, &tinfo, POINTER_BYTES); - oneTable = &tinfo; - hasTable = false; - } - - ret = smlDealCols(*oneTable, info->dataFormat, cols); - if (ret != TSDB_CODE_SUCCESS) { - return ret; - } - if (!hasTable) { - ret = smlParseCols(elements.tags, elements.tagsLen, (*oneTable)->tags, (*oneTable)->childTableName, true, - info->dumplicateKey, &info->msgBuf); + int ret = smlPushCols(tinfo->cols, elements->colArray); if (ret != TSDB_CODE_SUCCESS) { - uError("SML:0x%" PRIx64 " smlParseCols parse tag fields failed", info->id); return ret; } - if (taosArrayGetSize((*oneTable)->tags) > TSDB_MAX_TAGS) { - smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL); - return TSDB_CODE_PAR_INVALID_TAGS_NUM; - } - - if (taosArrayGetSize(cols) + taosArrayGetSize((*oneTable)->tags) > TSDB_MAX_COLUMNS) { - smlBuildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL); - return TSDB_CODE_PAR_TOO_MANY_COLUMNS; - } - - (*oneTable)->sTableName = elements.measure; - (*oneTable)->sTableNameLen = elements.measureLen; - if (strlen((*oneTable)->childTableName) == 0) { - RandTableName rName = {(*oneTable)->tags, (*oneTable)->sTableName, (uint8_t)(*oneTable)->sTableNameLen, - (*oneTable)->childTableName, 0}; - - buildChildTableName(&rName); - (*oneTable)->uid = rName.uid; + SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen); + if (tableMeta) { // update meta + ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, elements->colArray, false, &info->msgBuf); + if (ret == TSDB_CODE_SUCCESS) { + ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, tinfo->tags, true, &info->msgBuf); + } + if (ret != TSDB_CODE_SUCCESS) { + uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id); + return ret; + } } else { - (*oneTable)->uid = *(uint64_t *)((*oneTable)->childTableName); - } - } + ret = smlJudgeDupColName(elements->colArray, tinfo->tags, &info->msgBuf); + if (ret != TSDB_CODE_SUCCESS) { + uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id); + return ret; + } - SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, elements.measure, elements.measureLen); - if (tableMeta) { // update meta - ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, cols, &info->msgBuf); - if (!hasTable && ret == TSDB_CODE_SUCCESS) { - ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, (*oneTable)->tags, &info->msgBuf); + SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat); + smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags); + smlInsertMeta(meta->colHash, meta->cols, elements->colArray); + taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES); } - if (ret != TSDB_CODE_SUCCESS) { - uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id); - return ret; - } - } else { - SSmlSTableMeta *meta = smlBuildSTableMeta(); - smlInsertMeta(meta->tagHash, meta->tags, (*oneTable)->tags); - smlInsertMeta(meta->colHash, meta->cols, cols); - taosHashPut(info->superTables, elements.measure, elements.measureLen, &meta, POINTER_BYTES); } - if (!info->dataFormat) { - taosArrayClear(info->colsContainer); - } - taosHashClear(info->dumplicateKey); return TSDB_CODE_SUCCESS; } static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) { int ret = TSDB_CODE_SUCCESS; - SSmlTableInfo *tinfo = smlBuildTableInfo(); + SSmlTableInfo *tinfo = smlBuildTableInfo(1, "", 0); if (!tinfo) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; + return TSDB_CODE_OUT_OF_MEMORY; } SArray *cols = taosArrayInit(16, POINTER_BYTES); if (cols == NULL) { uError("SML:0x%" PRIx64 " smlParseTelnetLine failed to allocate memory", info->id); - return TSDB_CODE_TSC_OUT_OF_MEMORY; + return TSDB_CODE_OUT_OF_MEMORY; } if (info->protocol == TSDB_SML_TELNET_PROTOCOL) { @@ -2195,7 +2429,6 @@ static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) { if (ret != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " smlParseTelnetLine failed", info->id); smlDestroyTableInfo(info, tinfo); - smlDestroyCols(cols); taosArrayDestroy(cols); return ret; } @@ -2203,7 +2436,6 @@ static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) { if (taosArrayGetSize(tinfo->tags) <= 0 || taosArrayGetSize(tinfo->tags) > TSDB_MAX_TAGS) { smlBuildInvalidDataMsg(&info->msgBuf, "invalidate tags length:[1,128]", NULL); smlDestroyTableInfo(info, tinfo); - smlDestroyCols(cols); taosArrayDestroy(cols); return TSDB_CODE_PAR_INVALID_TAGS_NUM; } @@ -2232,16 +2464,16 @@ static int32_t smlParseTelnetLine(SSmlHandle *info, void *data, const int len) { SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen); if (tableMeta) { // update meta - ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, cols, &info->msgBuf); + ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, cols, false, &info->msgBuf); if (!hasTable && ret == TSDB_CODE_SUCCESS) { - ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, (*oneTable)->tags, &info->msgBuf); + ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, (*oneTable)->tags, true, &info->msgBuf); } if (ret != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id); return ret; } } else { - SSmlSTableMeta *meta = smlBuildSTableMeta(); + SSmlSTableMeta *meta = smlBuildSTableMeta(false); smlInsertMeta(meta->tagHash, meta->tags, (*oneTable)->tags); smlInsertMeta(meta->colHash, meta->cols, cols); taosHashPut(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen, &meta, POINTER_BYTES); @@ -2284,7 +2516,7 @@ static int32_t smlParseJSON(SSmlHandle *info, char *payload) { } } -end: + end: return ret; } @@ -2321,7 +2553,7 @@ static int32_t smlInsertData(SSmlHandle *info) { (*pMeta)->tableMeta->vgId = vg.vgId; (*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid - code = smlBindData(info->pQuery, tableData->tags, (*pMeta)->cols, tableData->cols, info->dataFormat, + code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, (*pMeta)->cols, tableData->cols, (*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen, info->ttl, info->msgBuf.buf, info->msgBuf.len); if (code != TSDB_CODE_SUCCESS) { @@ -2356,9 +2588,9 @@ static int32_t smlInsertData(SSmlHandle *info) { static void smlPrintStatisticInfo(SSmlHandle *info) { uError("SML:0x%" PRIx64 - " smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \ + " smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \ parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64 - "", + "", info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables, info->cost.numOfCreateSTables, info->cost.numOfAlterTagSTables, info->cost.numOfAlterColSTables, info->cost.schemaTime - info->cost.parseTime, @@ -2400,8 +2632,10 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char } } + uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, (info->isRawLine ? "rawdata" : tmp)); + if (info->protocol == TSDB_SML_LINE_PROTOCOL) { - code = smlParseInfluxLine(info, tmp, len); + code = smlParseInfluxString(info, tmp, tmp + len, taosArrayGet(info->lines, i)); } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) { code = smlParseTelnetLine(info, tmp, len); } else { @@ -2411,7 +2645,27 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, tmp); return code; } + if(info->reRun){ + i = 0; + info->reRun = false; + // clear info->childTables + void **p1 = (void **)taosHashIterate(info->childTables, NULL); + while (p1) { + smlDestroyTableInfo(info, (SSmlTableInfo *)(*p1)); + p1 = (void **)taosHashIterate(info->childTables, p1); + } + taosHashClear(info->childTables); + + // clear info->superTables + p1 = (void **)taosHashIterate(info->superTables, NULL); + while (p1) { + smlDestroySTableMeta((SSmlSTableMeta *)(*p1)); + p1 = (void **)taosHashIterate(info->superTables, p1); + } + taosHashClear(info->superTables); + } } + return code; } @@ -2427,6 +2681,12 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL return code; } + code = smlParseLineBottom(info); + if (code != 0) { + uError("SML:0x%" PRIx64 " smlParseLineBottom error : %s", info->id, tstrerror(code)); + return code; + } + info->cost.lineNum = numLines; info->cost.numOfSTables = taosHashGetSize(info->superTables); info->cost.numOfCTables = taosHashGetSize(info->childTables); @@ -2569,15 +2829,6 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char uError("SML:taos_schemaless_insert error request is null"); goto end; } - SSmlHandle *info = smlBuildSmlInfo(pTscObj, req, (SMLProtocolType)protocol, precision); - if (!info) { - request->code = TSDB_CODE_OUT_OF_MEMORY; - uError("SML:taos_schemaless_insert error SSmlHandle is null"); - goto end; - } - - info->isRawLine = (rawLine == NULL); - info->ttl = ttl; int32_t perBatch = tsSmlBatchSize; @@ -2588,8 +2839,17 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char numLines = 0; } + SSmlHandle *info = smlBuildSmlInfo(pTscObj, req, (SMLProtocolType)protocol, precision, perBatch); + if (!info) { + request->code = TSDB_CODE_OUT_OF_MEMORY; + uError("SML:taos_schemaless_insert error SSmlHandle is null"); + goto end; + } + + info->isRawLine = (rawLine == NULL); + info->ttl = ttl; + info->params = ¶ms; - info->affectedRows = perBatch; info->pRequest->body.queryFp = smlInsertCallback; info->pRequest->body.param = info; int32_t code = smlProcess(info, lines, rawLine, rawLineEnd, perBatch); @@ -2613,7 +2873,7 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char } tsem_wait(¶ms.sem); -end: + end: taosThreadSpinDestroy(¶ms.lock); tsem_destroy(¶ms.sem); // ((STscObj *)taos)->schemalessType = 0; @@ -2677,7 +2937,7 @@ TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLi } TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol, - int precision, int32_t ttl, int64_t reqid) { + int precision, int32_t ttl, int64_t reqid) { if (NULL == taos) { terrno = TSDB_CODE_TSC_DISCONNECTED; return NULL; diff --git a/source/libs/parser/src/parInsertSml.c b/source/libs/parser/src/parInsertSml.c index 1fcc5f4ab7..f35ef2bd04 100644 --- a/source/libs/parser/src/parInsertSml.c +++ b/source/libs/parser/src/parInsertSml.c @@ -56,7 +56,7 @@ static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSche int32_t code = TSDB_CODE_SUCCESS; for (int i = 0; i < taosArrayGetSize(cols); ++i) { - SSmlKv* kv = taosArrayGetP(cols, i); + SSmlKv* kv = taosArrayGet(cols, i); SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key}; col_id_t t = lastColIdx + 1; col_id_t index = ((t == 0 && !isTag) ? 0 : insFindCol(&sToken, t, pBoundInfo->numOfCols, pSchema)); @@ -111,7 +111,7 @@ static int32_t smlBuildTagRow(SArray* cols, SBoundColInfo* tags, SSchema* pSchem int32_t code = TSDB_CODE_SUCCESS; for (int i = 0; i < tags->numOfBound; ++i) { SSchema* pTagSchema = &pSchema[tags->pColIndex[i]]; - SSmlKv* kv = taosArrayGetP(cols, i); + SSmlKv* kv = taosArrayGet(cols, i); taosArrayPush(*tagName, pTagSchema->name); STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type}; @@ -158,7 +158,66 @@ end: return code; } -int32_t smlBindData(SQuery* query, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta, +STableDataCxt* smlInitTableDataCtx(SQuery* query, STableMeta* pTableMeta){ + STableDataCxt* pTableCxt = NULL; + int ret = insGetTableDataCxt(((SVnodeModifOpStmt *)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid), + pTableMeta, NULL, &pTableCxt, false); + if (ret != TSDB_CODE_SUCCESS) { + return NULL; + } + + ret = initTableColSubmitData(pTableCxt); + if (ret != TSDB_CODE_SUCCESS) { + return NULL; + } + return pTableCxt; +} + +int32_t smlBuildRow(STableDataCxt* pTableCxt){ + SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1); + int ret = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow); + if (TSDB_CODE_SUCCESS != ret) { + return ret; + } + insCheckTableDataOrder(pTableCxt, TD_ROW_KEY(*pRow)); + return TSDB_CODE_SUCCESS; +} + +int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void *data, int32_t index){ + int ret = TSDB_CODE_SUCCESS; + SSchema* pColSchema = schema + index; + SColVal* pVal = taosArrayGet(pTableCxt->pValues, index); + SSmlKv* kv = (SSmlKv*)data; + if (kv->type == TSDB_DATA_TYPE_NCHAR){ + int32_t len = 0; + char* pUcs4 = taosMemoryCalloc(1, pColSchema->bytes - VARSTR_HEADER_SIZE); + if (NULL == pUcs4) { + ret = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) { + if (errno == E2BIG) { + ret = TSDB_CODE_PAR_VALUE_TOO_LONG; + goto end; + } + ret = TSDB_CODE_TSC_INVALID_VALUE; + goto end; + } + pVal->value.pData = pUcs4; + pVal->value.nData = len; + } else if(kv->type == TSDB_DATA_TYPE_BINARY) { + pVal->value.nData = kv->length; + pVal->value.pData = (uint8_t *)kv->value; + } else { + memcpy(&pVal->value.val, &(kv->value), kv->length); + } + pVal->flag = CV_FLAG_VALUE; + +end: + return ret; +} + +int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSchema, SArray* cols, STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen) { SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; @@ -192,6 +251,18 @@ int32_t smlBindData(SQuery* query, SArray* tags, SArray* colsSchema, SArray* col pCreateTblReq->ctb.stbName = taosMemoryCalloc(1, sTableNameLen + 1); memcpy(pCreateTblReq->ctb.stbName, sTableName, sTableNameLen); + if(dataFormat){ + STableDataCxt** pTableCxt = (STableDataCxt**)taosHashGet(((SVnodeModifOpStmt *)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid)); + if (NULL == pTableCxt) { + ret = buildInvalidOperationMsg(&pBuf, "dataformat true. get tableDataCtx error"); + goto end; + } + (*pTableCxt)->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE; + (*pTableCxt)->pData->pCreateTbReq = pCreateTblReq; + pCreateTblReq = NULL; + goto end; + } + STableDataCxt* pTableCxt = NULL; ret = insGetTableDataCxt(((SVnodeModifOpStmt *)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid), pTableMeta, &pCreateTblReq, &pTableCxt, false); @@ -226,15 +297,10 @@ int32_t smlBindData(SQuery* query, SArray* tags, SArray* colsSchema, SArray* col for (int c = 0; c < pTableCxt->boundColsInfo.numOfBound; ++c) { SSchema* pColSchema = &pSchema[pTableCxt->boundColsInfo.pColIndex[c]]; SColVal* pVal = taosArrayGet(pTableCxt->pValues, pTableCxt->boundColsInfo.pColIndex[c]); - SSmlKv* kv = NULL; - if (!format){ - void** p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name)); - if (p) kv = *p; - } + void** p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name)); + if (p == NULL) continue; + SSmlKv *kv = *(SSmlKv **)p; - if (kv == NULL) { - continue; - } if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) { kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision); } -- GitLab