diff --git a/include/common/tglobal.h b/include/common/tglobal.h index ed0f5cd31d62f57b42cb7f63e539768c2763dac6..89ec9dc6c833a7173d83c67dc386a476ff5bf4ef 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -129,6 +129,7 @@ extern char tsUdfdLdLibPath[]; extern char tsSmlChildTableName[]; extern char tsSmlTagName[]; extern bool tsSmlDataFormat; +extern int32_t tsSmlBatchSize; // wal extern int64_t tsWalFsyncDataSizeLimit; diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index f4d8c80e3f1d09dedaf1b759f854b334d96212d5..d811eb7fecc0b62ee5517850424f3e4a8c2b05b9 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -79,7 +79,6 @@ #define NCHAR_ADD_LEN 3 // L"nchar" 3 means L" " #define MAX_RETRY_TIMES 5 -#define LINE_BATCH 2000 //================================================================================================= typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType; @@ -467,6 +466,13 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { goto end; } info->cost.numOfCreateSTables++; + taosMemoryFreeClear(pTableMeta); + + code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta); + if (code != TSDB_CODE_SUCCESS) { + uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname); + goto end; + } } else if (code == TSDB_CODE_SUCCESS) { hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); @@ -505,16 +511,16 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname); goto end; } - } - taosMemoryFreeClear(pTableMeta); - code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } - code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta); - if (code != TSDB_CODE_SUCCESS) { - goto end; + taosMemoryFreeClear(pTableMeta); + code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1); + if (code != TSDB_CODE_SUCCESS) { + goto end; + } + code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta); + if (code != TSDB_CODE_SUCCESS) { + goto end; + } } taosHashClear(hashTmp); @@ -552,12 +558,18 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname); goto end; } - } - code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1); - if (code != TSDB_CODE_SUCCESS) { - goto end; + code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1); + if (code != TSDB_CODE_SUCCESS) { + goto end; + } + code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta); + if (code != TSDB_CODE_SUCCESS) { + uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname); + goto end; + } } + needCheckMeta = true; taosHashCleanup(hashTmp); hashTmp = NULL; @@ -565,13 +577,6 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { uError("SML:0x%" PRIx64 " load table meta error: %s", info->id, tstrerror(code)); goto end; } - taosMemoryFreeClear(pTableMeta); - - code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta); - if (code != TSDB_CODE_SUCCESS) { - uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname); - goto end; - } if (needCheckMeta) { code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags, @@ -596,7 +601,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { end: taosHashCleanup(hashTmp); taosMemoryFreeClear(pTableMeta); - catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1); +// catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1); return code; } @@ -815,6 +820,11 @@ static int8_t smlGetTsTypeByPrecision(int8_t precision) { } static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t len) { + void *tmp = taosMemoryCalloc(1, len + 1); + memcpy(tmp, data, len); + uDebug("SML:0x%" PRIx64 " smlParseInfluxTime tslen:%d, ts:%s", info->id, len, (char*)tmp); + taosMemoryFree(tmp); + if (len == 0 || (len == 1 && data[0] == '0')) { return taosGetTimestampNs(); } @@ -2066,7 +2076,10 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo * static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql, const int len) { SSmlLineInfo elements = {0}; - uDebug("SML:0x%" PRIx64 " smlParseInfluxLine sql:%s", info->id, (info->isRawLine ? "rawdata" : sql)); + void *tmp = taosMemoryCalloc(1, len + 1); + memcpy(tmp, sql, len); + uDebug("SML:0x%" PRIx64 " smlParseInfluxLine raw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, (info->isRawLine ? (char*)tmp : sql)); + taosMemoryFree(tmp); int ret = smlParseInfluxString(sql, sql + len, &elements, &info->msgBuf); if (ret != TSDB_CODE_SUCCESS) { @@ -2562,7 +2575,7 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char goto end; } - batchs = ceil(((double)numLines) / LINE_BATCH); + batchs = ceil(((double)numLines) / tsSmlBatchSize); params.total = batchs; for (int i = 0; i < batchs; ++i) { SRequestObj *req = (SRequestObj *)createRequest(pTscObj->id, TSDB_SQL_INSERT, 0); @@ -2581,7 +2594,7 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char info->isRawLine = (rawLine == NULL); info->ttl = ttl; - int32_t perBatch = LINE_BATCH; + int32_t perBatch = tsSmlBatchSize; if (numLines > perBatch) { numLines -= perBatch; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index b6c039f375668516a10725fd38eb50f9b51ea299..ab46ba24cffbfbe6e04989cc892c1bb40c856360 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -75,6 +75,7 @@ char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table // If set to empty system will generate table name using MD5 hash. // true means that the name and order of cols in each line are the same(only for influx protocol) bool tsSmlDataFormat = false; +int32_t tsSmlBatchSize = 10000; // query int32_t tsQueryPolicy = 1; @@ -306,6 +307,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1; if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1; if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1; + if (cfgAddInt32(pCfg, "smlBatchSize", tsSmlBatchSize, 1, INT32_MAX, true) != 0) return -1; if (cfgAddInt32(pCfg, "maxMemUsedByInsert", tsMaxMemUsedByInsert, 1, INT32_MAX, true) != 0) return -1; if (cfgAddInt32(pCfg, "rpcRetryLimit", tsRpcRetryLimit, 1, 100000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "rpcRetryInterval", tsRpcRetryInterval, 1, 100000, 0) != 0) return -1; @@ -648,6 +650,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN); tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval; + tsSmlBatchSize = cfgGetItem(pCfg, "smlBatchSize")->i32; tsMaxMemUsedByInsert = cfgGetItem(pCfg, "maxMemUsedByInsert")->i32; tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32; @@ -1021,6 +1024,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN); } else if (strcasecmp("smlDataFormat", name) == 0) { tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval; + } else if (strcasecmp("smlBatchSize", name) == 0) { + tsSmlBatchSize = cfgGetItem(pCfg, "smlBatchSize")->i32; } else if (strcasecmp("shellActivityTimer", name) == 0) { tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32; } else if (strcasecmp("supportVnodes", name) == 0) {