未验证 提交 0f417eb5 编写于 作者: wmmhello's avatar wmmhello 提交者: GitHub

Merge pull request #13743 from taosdata/feature/TD-14761

fix:disable schemaless database parameters & fix insert error in multi thread in schemaless
......@@ -167,7 +167,7 @@ void *createTscObj(const char *user, const char *auth, const char *db, int32_t c
taosThreadMutexInit(&pObj->mutex, NULL);
pObj->id = taosAddRef(clientConnRefPool, pObj);
pObj->schemalessType = 0;
pObj->schemalessType = 1;
tscDebug("connObj created, 0x%" PRIx64, pObj->id);
return pObj;
......
......@@ -306,19 +306,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
const char* errStr = taos_errstr(res);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" apply schema action. error: %s", info->id, errStr);
taosMsleep(100);
}
taos_free_result(res);
// if (code == TSDB_CODE_MND_FIELD_ALREADY_EXIST || code == TSDB_CODE_MND_TAG_ALREADY_EXIST || tscDupColNames) {
if (code == TSDB_CODE_MND_TAG_ALREADY_EXIST) {
TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
code = taos_errno(res2);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
taosMsleep(500);
}
break;
}
case SCHEMA_ACTION_ADD_TAG: {
......@@ -330,19 +321,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
const char* errStr = taos_errstr(res);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
taosMsleep(100);
}
taos_free_result(res);
// if (code ==TSDB_CODE_MND_TAG_ALREADY_EXIST || code == TSDB_CODE_MND_FIELD_ALREAY_EXIST || tscDupColNames) {
if (code ==TSDB_CODE_MND_TAG_ALREADY_EXIST) {
TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
code = taos_errno(res2);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
taosMsleep(500);
}
break;
}
case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: {
......@@ -353,19 +335,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
code = taos_errno(res);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
taosMsleep(100);
}
taos_free_result(res);
// if (code == TSDB_CODE_MND_INVALID_COLUMN_LENGTH || code == TSDB_CODE_TSC_INVALID_COLUMN_LENGTH) {
if (code == TSDB_CODE_TSC_INVALID_COLUMN_LENGTH) {
TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
code = taos_errno(res2);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
taosMsleep(500);
}
break;
}
case SCHEMA_ACTION_CHANGE_TAG_SIZE: {
......@@ -376,19 +349,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
code = taos_errno(res);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
taosMsleep(100);
}
taos_free_result(res);
// if (code == TSDB_CODE_MND_INVALID_TAG_LENGTH || code == TSDB_CODE_TSC_INVALID_TAG_LENGTH) {
if (code == TSDB_CODE_TSC_INVALID_TAG_LENGTH) {
TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
code = taos_errno(res2);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
taosMsleep(500);
}
break;
}
case SCHEMA_ACTION_CREATE_STABLE: {
......@@ -428,18 +392,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
code = taos_errno(res);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res));
taosMsleep(100);
}
taos_free_result(res);
if (code == TSDB_CODE_MND_STB_ALREADY_EXIST) {
TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE");
code = taos_errno(res2);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2));
}
taos_free_result(res2);
taosMsleep(500);
}
break;
}
......@@ -473,6 +429,21 @@ static int32_t smlProcessSchemaAction(SSmlHandle* info, SSchema* schemaField, SH
return TSDB_CODE_SUCCESS;
}
static int32_t smlCheckMeta(SSchema* schema, int32_t length, SArray* cols){
SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
for(uint16_t i = 0; i < length; i++){
taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES);
}
for(int32_t i = 0; i < taosArrayGetSize(cols); i++){
SSmlKv* kv = (SSmlKv*)taosArrayGetP(cols, i);
if(taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL){
return -1;
}
}
return 0;
}
static int32_t smlModifyDBSchemas(SSmlHandle* info) {
int32_t code = 0;
SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
......@@ -483,6 +454,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
while (tableMetaSml) {
SSmlSTableMeta* sTableData = *tableMetaSml;
STableMeta *pTableMeta = NULL;
bool needCheckMeta = false; // for multi thread
size_t superTableLen = 0;
void *superTable = taosHashGetKey(tableMetaSml, &superTableLen);
......@@ -533,6 +505,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
needCheckMeta = true;
} else {
uError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code));
goto end;
......@@ -544,6 +517,20 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) {
uError("SML:0x%"PRIx64" catalogGetSTableMeta failed. super table name %s", info->id, (char*)superTable);
goto end;
}
if(needCheckMeta){
code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags, sTableData->tags);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" check tag failed. super table name %s", info->id, (char*)superTable);
goto end;
}
code = smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%"PRIx64" check cols failed. super table name %s", info->id, (char*)superTable);
goto end;
}
}
sTableData->tableMeta = pTableMeta;
tableMetaSml = (SSmlSTableMeta**)taosHashIterate(info->superTables, tableMetaSml);
......@@ -2368,6 +2355,7 @@ static void smlInsertCallback(void* param, void* res, int32_t code) {
SRequestObj *pRequest = (SRequestObj *)res;
SSmlHandle* info = (SSmlHandle *)param;
uDebug("SML:0x%"PRIx64" result. code:%d, msg:%s", info->id, pRequest->code, pRequest->msgBuf);
// lock
if(code != TSDB_CODE_SUCCESS){
taosThreadSpinLock(&info->params->lock);
......@@ -2496,8 +2484,9 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
end:
taosThreadSpinDestroy(&params.lock);
tsem_destroy(&params.sem);
((STscObj *)taos)->schemalessType = 0;
uDebug("result:%s", request->msgBuf);
// ((STscObj *)taos)->schemalessType = 0;
((STscObj *)taos)->schemalessType = 1;
uDebug("resultend:%s", request->msgBuf);
return (TAOS_RES*)request;
}
......@@ -41,7 +41,7 @@ TARGET_INCLUDE_DIRECTORIES(
PRIVATE "${TD_SOURCE_DIR}/source/client/inc"
)
#add_test(
# NAME smlTest
# COMMAND smlTest
#)
add_test(
NAME smlTest
COMMAND smlTest
)
......@@ -499,6 +499,7 @@ TEST(testCase, smlGetTimestampLen_Test) {
ASSERT_EQ(len, 3);
}
/*
TEST(testCase, smlProcess_influx_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
......@@ -1259,4 +1260,4 @@ TEST(testCase, sml_16368_Test) {
pRes = taos_schemaless_insert(taos, (char**)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_MICRO_SECONDS);
ASSERT_EQ(taos_errno(pRes), 0);
taos_free_result(pRes);
}
}*/
......@@ -91,7 +91,7 @@ static const SSysDbTableSchema userDBSchema[] = {
{.name = "precision", .bytes = 2 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "single_stable_model", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL},
{.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "schemaless", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL},
// {.name = "schemaless", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL},
{.name = "retension", .bytes = 60 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
// {.name = "update", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, // disable update
......
......@@ -1503,8 +1503,8 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)statusB, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.schemaless, false);
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.schemaless, false);
char *p = buildRetension(pDb->cfg.pRetensions);
......
......@@ -801,7 +801,8 @@ SNode* setDatabaseOption(SAstCreateContext* pCxt, SNode* pOptions, EDatabaseOpti
((SDatabaseOptions*)pOptions)->pRetentions = pVal;
break;
case DB_OPTION_SCHEMALESS:
((SDatabaseOptions*)pOptions)->schemaless = taosStr2Int8(((SToken*)pVal)->z, NULL, 10);
// ((SDatabaseOptions*)pOptions)->schemaless = taosStr2Int8(((SToken*)pVal)->z, NULL, 10);
((SDatabaseOptions*)pOptions)->schemaless = 1;
break;
default:
break;
......
......@@ -1297,11 +1297,12 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) {
}
static int32_t checkSchemalessDb(SInsertParseContext* pCxt, char* pDbName) {
SDbCfgInfo pInfo = {0};
char fullName[TSDB_TABLE_FNAME_LEN];
snprintf(fullName, sizeof(fullName), "%d.%s", pCxt->pComCxt->acctId, pDbName);
CHECK_CODE(getDBCfg(pCxt, fullName, &pInfo));
return pInfo.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS;
// SDbCfgInfo pInfo = {0};
// char fullName[TSDB_TABLE_FNAME_LEN];
// snprintf(fullName, sizeof(fullName), "%d.%s", pCxt->pComCxt->acctId, pDbName);
// CHECK_CODE(getDBCfg(pCxt, fullName, &pInfo));
// return pInfo.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS;
return TSDB_CODE_SUCCESS;
}
// tb_name
......@@ -2119,9 +2120,11 @@ static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SS
isOrdered = false;
}
if (index < 0) {
uError("smlBoundColumnData. index:%d", index);
return TSDB_CODE_SML_INVALID_DATA;
}
if (pColList->cols[index].valStat == VAL_STAT_HAS) {
uError("smlBoundColumnData. already set. index:%d", index);
return TSDB_CODE_SML_INVALID_DATA;
}
lastColIdx = index;
......
......@@ -2762,15 +2762,16 @@ static int32_t checkTableSchema(STranslateContext* pCxt, SCreateTableStmt* pStmt
}
static int32_t checkSchemalessDb(STranslateContext* pCxt, const char* pDbName) {
if (0 != pCxt->pParseCxt->schemalessType) {
return TSDB_CODE_SUCCESS;
}
SDbCfgInfo info = {0};
int32_t code = getDBCfg(pCxt, pDbName, &info);
if (TSDB_CODE_SUCCESS == code) {
code = info.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS;
}
return code;
// if (0 != pCxt->pParseCxt->schemalessType) {
// return TSDB_CODE_SUCCESS;
// }
// SDbCfgInfo info = {0};
// int32_t code = getDBCfg(pCxt, pDbName, &info);
// if (TSDB_CODE_SUCCESS == code) {
// code = info.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS;
// }
// return code;
return TSDB_CODE_SUCCESS;
}
static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) {
......@@ -5048,6 +5049,9 @@ static int32_t buildModifyVnodeArray(STranslateContext* pCxt, SAlterTableStmt* p
static int32_t rewriteAlterTable(STranslateContext* pCxt, SQuery* pQuery) {
SAlterTableStmt* pStmt = (SAlterTableStmt*)pQuery->pRoot;
int32_t code = checkSchemalessDb(pCxt, pStmt->dbName);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
STableMeta* pTableMeta = NULL;
code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta);
if (TSDB_CODE_SUCCESS != code) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册