diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 171f06c2575af50ca7b514c70b9b30ddb601b46b..bb60624145f47545769b3a606a25db93268ac1ce 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -167,7 +167,7 @@ void *createTscObj(const char *user, const char *auth, const char *db, int32_t c taosThreadMutexInit(&pObj->mutex, NULL); pObj->id = taosAddRef(clientConnRefPool, pObj); - pObj->schemalessType = 0; + pObj->schemalessType = 1; tscDebug("connObj created, 0x%" PRIx64, pObj->id); return pObj; diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 68bfa99eadfde94f1b86e90e21d41ec419a2f457..bbf9eba363a398c243439f811e9046c796fa027c 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -306,19 +306,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { const char* errStr = taos_errstr(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%"PRIx64" apply schema action. error: %s", info->id, errStr); + taosMsleep(100); } taos_free_result(res); -// if (code == TSDB_CODE_MND_FIELD_ALREADY_EXIST || code == TSDB_CODE_MND_TAG_ALREADY_EXIST || tscDupColNames) { - if (code == TSDB_CODE_MND_TAG_ALREADY_EXIST) { - TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE"); - code = taos_errno(res2); - if (code != TSDB_CODE_SUCCESS) { - uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); - } - taos_free_result(res2); - taosMsleep(500); - } break; } case SCHEMA_ACTION_ADD_TAG: { @@ -330,19 +321,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { const char* errStr = taos_errstr(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res)); + taosMsleep(100); } taos_free_result(res); -// if (code ==TSDB_CODE_MND_TAG_ALREADY_EXIST || code == TSDB_CODE_MND_FIELD_ALREAY_EXIST || tscDupColNames) { - if (code ==TSDB_CODE_MND_TAG_ALREADY_EXIST) { - TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE"); - code = taos_errno(res2); - if (code != TSDB_CODE_SUCCESS) { - uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); - } - taos_free_result(res2); - taosMsleep(500); - } break; } case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: { @@ -353,19 +335,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { code = taos_errno(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res)); + taosMsleep(100); } taos_free_result(res); -// if (code == TSDB_CODE_MND_INVALID_COLUMN_LENGTH || code == TSDB_CODE_TSC_INVALID_COLUMN_LENGTH) { - if (code == TSDB_CODE_TSC_INVALID_COLUMN_LENGTH) { - TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE"); - code = taos_errno(res2); - if (code != TSDB_CODE_SUCCESS) { - uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); - } - taos_free_result(res2); - taosMsleep(500); - } break; } case SCHEMA_ACTION_CHANGE_TAG_SIZE: { @@ -376,19 +349,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { code = taos_errno(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res)); + taosMsleep(100); } taos_free_result(res); -// if (code == TSDB_CODE_MND_INVALID_TAG_LENGTH || code == TSDB_CODE_TSC_INVALID_TAG_LENGTH) { - if (code == TSDB_CODE_TSC_INVALID_TAG_LENGTH) { - TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE"); - code = taos_errno(res2); - if (code != TSDB_CODE_SUCCESS) { - uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); - } - taos_free_result(res2); - taosMsleep(500); - } break; } case SCHEMA_ACTION_CREATE_STABLE: { @@ -428,18 +392,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { code = taos_errno(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res)); + taosMsleep(100); } taos_free_result(res); - if (code == TSDB_CODE_MND_STB_ALREADY_EXIST) { - TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE"); - code = taos_errno(res2); - if (code != TSDB_CODE_SUCCESS) { - uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); - } - taos_free_result(res2); - taosMsleep(500); - } break; } @@ -473,6 +429,21 @@ static int32_t smlProcessSchemaAction(SSmlHandle* info, SSchema* schemaField, SH return TSDB_CODE_SUCCESS; } +static int32_t smlCheckMeta(SSchema* schema, int32_t length, SArray* cols){ + SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + for(uint16_t i = 0; i < length; i++){ + taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES); + } + + for(int32_t i = 0; i < taosArrayGetSize(cols); i++){ + SSmlKv* kv = (SSmlKv*)taosArrayGetP(cols, i); + if(taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL){ + return -1; + } + } + return 0; +} + static int32_t smlModifyDBSchemas(SSmlHandle* info) { int32_t code = 0; SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp); @@ -483,6 +454,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) { while (tableMetaSml) { SSmlSTableMeta* sTableData = *tableMetaSml; STableMeta *pTableMeta = NULL; + bool needCheckMeta = false; // for multi thread size_t superTableLen = 0; void *superTable = taosHashGetKey(tableMetaSml, &superTableLen); @@ -533,6 +505,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) { if (code != TSDB_CODE_SUCCESS) { goto end; } + needCheckMeta = true; } else { uError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code)); goto end; @@ -544,6 +517,20 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) { uError("SML:0x%"PRIx64" catalogGetSTableMeta failed. super table name %s", info->id, (char*)superTable); goto end; } + + if(needCheckMeta){ + code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags, sTableData->tags); + if (code != TSDB_CODE_SUCCESS) { + uError("SML:0x%"PRIx64" check tag failed. super table name %s", info->id, (char*)superTable); + goto end; + } + code = smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols); + if (code != TSDB_CODE_SUCCESS) { + uError("SML:0x%"PRIx64" check cols failed. super table name %s", info->id, (char*)superTable); + goto end; + } + } + sTableData->tableMeta = pTableMeta; tableMetaSml = (SSmlSTableMeta**)taosHashIterate(info->superTables, tableMetaSml); @@ -2368,6 +2355,7 @@ static void smlInsertCallback(void* param, void* res, int32_t code) { SRequestObj *pRequest = (SRequestObj *)res; SSmlHandle* info = (SSmlHandle *)param; + uDebug("SML:0x%"PRIx64" result. code:%d, msg:%s", info->id, pRequest->code, pRequest->msgBuf); // lock if(code != TSDB_CODE_SUCCESS){ taosThreadSpinLock(&info->params->lock); @@ -2496,8 +2484,9 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr end: taosThreadSpinDestroy(¶ms.lock); tsem_destroy(¶ms.sem); - ((STscObj *)taos)->schemalessType = 0; - uDebug("result:%s", request->msgBuf); +// ((STscObj *)taos)->schemalessType = 0; + ((STscObj *)taos)->schemalessType = 1; + uDebug("resultend:%s", request->msgBuf); return (TAOS_RES*)request; } diff --git a/source/client/test/CMakeLists.txt b/source/client/test/CMakeLists.txt index bb9f0ed23ee9ec5fb1b961f8d15370764814102b..03d2b4813446e97e7427f98356d7279bae355412 100644 --- a/source/client/test/CMakeLists.txt +++ b/source/client/test/CMakeLists.txt @@ -41,7 +41,7 @@ TARGET_INCLUDE_DIRECTORIES( PRIVATE "${TD_SOURCE_DIR}/source/client/inc" ) -#add_test( -# NAME smlTest -# COMMAND smlTest -#) +add_test( + NAME smlTest + COMMAND smlTest +) diff --git a/source/client/test/smlTest.cpp b/source/client/test/smlTest.cpp index 49e26a818f3d23bfe26076a9d71384cb83447c79..4ad73a6424b89aec1ab81eb1b29e7fb0e3dbe248 100644 --- a/source/client/test/smlTest.cpp +++ b/source/client/test/smlTest.cpp @@ -499,6 +499,7 @@ TEST(testCase, smlGetTimestampLen_Test) { ASSERT_EQ(len, 3); } +/* TEST(testCase, smlProcess_influx_Test) { TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(taos, nullptr); @@ -1259,4 +1260,4 @@ TEST(testCase, sml_16368_Test) { pRes = taos_schemaless_insert(taos, (char**)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_MICRO_SECONDS); ASSERT_EQ(taos_errno(pRes), 0); taos_free_result(pRes); -} +}*/ diff --git a/source/common/src/systable.c b/source/common/src/systable.c index bb00c28c1338725d3c9203d769f0cd433ede536f..08977abd61aeefc64ffca9702c6a04b7cd529858 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -91,7 +91,7 @@ static const SSysDbTableSchema userDBSchema[] = { {.name = "precision", .bytes = 2 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "single_stable_model", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL}, {.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "schemaless", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL}, +// {.name = "schemaless", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL}, {.name = "retension", .bytes = 60 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, // {.name = "update", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, // disable update diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index c20459829ece38be56beb6d6fcf9b27f1684577a..b4237466d4ca84442e0a0e4ef0c5b1dbcbf0da69 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1503,8 +1503,8 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, rows, (const char *)statusB, false); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.schemaless, false); +// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); +// colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.schemaless, false); char *p = buildRetension(pDb->cfg.pRetensions); diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 054912d540dac2bac00011ec5d2caffbb5ca4a6a..e7f4b05bc8bd3168ce8b41a211ed852dc2b3d1f7 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -801,7 +801,8 @@ SNode* setDatabaseOption(SAstCreateContext* pCxt, SNode* pOptions, EDatabaseOpti ((SDatabaseOptions*)pOptions)->pRetentions = pVal; break; case DB_OPTION_SCHEMALESS: - ((SDatabaseOptions*)pOptions)->schemaless = taosStr2Int8(((SToken*)pVal)->z, NULL, 10); +// ((SDatabaseOptions*)pOptions)->schemaless = taosStr2Int8(((SToken*)pVal)->z, NULL, 10); + ((SDatabaseOptions*)pOptions)->schemaless = 1; break; default: break; diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 54c8a1821809ffe867addf788027b54123c1a989..c721491489f298b2f95940bf3b3ef9a3de0cb412 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1297,11 +1297,12 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) { } static int32_t checkSchemalessDb(SInsertParseContext* pCxt, char* pDbName) { - SDbCfgInfo pInfo = {0}; - char fullName[TSDB_TABLE_FNAME_LEN]; - snprintf(fullName, sizeof(fullName), "%d.%s", pCxt->pComCxt->acctId, pDbName); - CHECK_CODE(getDBCfg(pCxt, fullName, &pInfo)); - return pInfo.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS; +// SDbCfgInfo pInfo = {0}; +// char fullName[TSDB_TABLE_FNAME_LEN]; +// snprintf(fullName, sizeof(fullName), "%d.%s", pCxt->pComCxt->acctId, pDbName); +// CHECK_CODE(getDBCfg(pCxt, fullName, &pInfo)); +// return pInfo.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS; + return TSDB_CODE_SUCCESS; } // tb_name @@ -2119,9 +2120,11 @@ static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SS isOrdered = false; } if (index < 0) { + uError("smlBoundColumnData. index:%d", index); return TSDB_CODE_SML_INVALID_DATA; } if (pColList->cols[index].valStat == VAL_STAT_HAS) { + uError("smlBoundColumnData. already set. index:%d", index); return TSDB_CODE_SML_INVALID_DATA; } lastColIdx = index; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 8c61c372fe325e65e2c49eaa168a65f1c9424e46..84bded8a58cf75313dcee0c90ffb1dbe09fa6e2a 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2762,15 +2762,16 @@ static int32_t checkTableSchema(STranslateContext* pCxt, SCreateTableStmt* pStmt } static int32_t checkSchemalessDb(STranslateContext* pCxt, const char* pDbName) { - if (0 != pCxt->pParseCxt->schemalessType) { - return TSDB_CODE_SUCCESS; - } - SDbCfgInfo info = {0}; - int32_t code = getDBCfg(pCxt, pDbName, &info); - if (TSDB_CODE_SUCCESS == code) { - code = info.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS; - } - return code; +// if (0 != pCxt->pParseCxt->schemalessType) { +// return TSDB_CODE_SUCCESS; +// } +// SDbCfgInfo info = {0}; +// int32_t code = getDBCfg(pCxt, pDbName, &info); +// if (TSDB_CODE_SUCCESS == code) { +// code = info.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS; +// } +// return code; + return TSDB_CODE_SUCCESS; } static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) { @@ -5048,6 +5049,9 @@ static int32_t buildModifyVnodeArray(STranslateContext* pCxt, SAlterTableStmt* p static int32_t rewriteAlterTable(STranslateContext* pCxt, SQuery* pQuery) { SAlterTableStmt* pStmt = (SAlterTableStmt*)pQuery->pRoot; int32_t code = checkSchemalessDb(pCxt, pStmt->dbName); + if (TSDB_CODE_SUCCESS != code) { + return code; + } STableMeta* pTableMeta = NULL; code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta); if (TSDB_CODE_SUCCESS != code) {