提交 0d4cad40 编写于 作者: O obdev 提交者: wangzelin.wzl

fix information_schema table priv problem

上级 d6d35f67
......@@ -107,7 +107,8 @@ int ObInfoSchemaPartitionsTable::add_partitions(
const ObDatabaseSchema& database_schema, ObObj* cells, const int64_t col_count)
{
int ret = OB_SUCCESS;
ObSArray<const ObTableSchema*> table_schemas;
ObSArray<const ObTableSchema *> table_schemas;
bool priv_passed = true;
if (OB_ISNULL(schema_guard_) || OB_ISNULL(cells)) {
ret = OB_ERR_UNEXPECTED;
SERVER_LOG(WARN, "schema manager or cells should not be null", K(ret));
......@@ -122,8 +123,21 @@ int ObInfoSchemaPartitionsTable::add_partitions(
SERVER_LOG(WARN, "table schema not exist", K(ret));
} else if (table_schema->is_index_table() || table_schema->is_vir_table()) {
continue;
} else if (OB_FAIL(add_partitions(*table_schema, database_schema.get_database_name_str(), cells, col_count))) {
SERVER_LOG(WARN, "failed to add table constraint of table schema", "table_schema", *table_schema, K(ret));
} else if (OB_FAIL(check_priv("table_acc",
database_schema.get_database_name_str(),
table_schema->get_table_name_str(),
tenant_id_,
priv_passed))) {
SERVER_LOG(WARN, "failed to check priv", K(ret), K(database_schema.get_database_name_str()),
K(table_schema->get_table_name_str()));
} else if (!priv_passed) {
continue;
} else if (OB_FAIL(add_partitions(*table_schema,
database_schema.get_database_name_str(),
cells,
col_count))){
SERVER_LOG(WARN, "failed to add table constraint of table schema",
"table_schema", *table_schema, K(ret));
}
}
}
......
......@@ -106,7 +106,8 @@ int ObInfoSchemaReferentialConstraintsTable::add_fk_constraints_in_db(
const share::schema::ObDatabaseSchema& database_schema, common::ObObj* cells, const int64_t col_count)
{
int ret = OB_SUCCESS;
ObSArray<const ObTableSchema*> table_schemas;
ObSArray<const ObTableSchema *> table_schemas;
bool priv_passed = true;
if (OB_ISNULL(schema_guard_)) {
ret = OB_ERR_UNEXPECTED;
......@@ -122,6 +123,15 @@ int ObInfoSchemaReferentialConstraintsTable::add_fk_constraints_in_db(
SERVER_LOG(WARN, "table schema not exist", K(ret));
} else if (table_schema->is_index_table() || table_schema->is_view_table()) {
continue;
} else if (OB_FAIL(check_priv("table_acc",
database_schema.get_database_name_str(),
table_schema->get_table_name_str(),
tenant_id_,
priv_passed))) {
SERVER_LOG(WARN, "failed to check priv", K(ret), K(database_schema.get_database_name_str()),
K(table_schema->get_table_name_str()));
} else if (!priv_passed) {
continue;
} else if (OB_FAIL(add_fk_constraints_in_table(
*table_schema, database_schema.get_database_name_str(), cells, col_count))) {
SERVER_LOG(WARN, "failed to add fk constraints of table schema", "table_schema", *table_schema, K(ret));
......
......@@ -372,7 +372,7 @@ int ObInnerTableSchema::views_schema(ObTableSchema &table_schema)
table_schema.set_create_mem_version(1);
if (OB_SUCC(ret)) {
if (OB_FAIL(table_schema.set_view_definition(R"__(select /*+ READ_CONSISTENCY(WEAK) */ 'def' AS TABLE_CATALOG, d.database_name as TABLE_SCHEMA, t.table_name as TABLE_NAME, t.view_definition as VIEW_DEFINITION, 'NONE' as CHECK_OPTION, case t.view_is_updatable when 1 then 'YES' else 'NO' end as IS_UPDATABLE, 'NONE' as DEFINER, 'NONE' AS SECURITY_TYPE, case t.collation_type when 45 then 'utf8mb4' else 'NONE' end AS CHARACTER_SET_CLIENT, case t.collation_type when 45 then 'utf8mb4_general_ci' else 'NONE' end AS COLLATION_CONNECTION from oceanbase.__all_virtual_table as t join oceanbase.__all_virtual_database as d on t.tenant_id = effective_tenant_id() and d.tenant_id = effective_tenant_id() and t.database_id = d.database_id where (t.table_type = 1 or t.table_type = 4) and d.in_recyclebin = 0 and d.database_name != '__recyclebin' and d.database_name != 'information_schema' and d.database_name != 'oceanbase' )__"))) {
if (OB_FAIL(table_schema.set_view_definition(R"__(select /*+ READ_CONSISTENCY(WEAK) */ 'def' AS TABLE_CATALOG, d.database_name as TABLE_SCHEMA, t.table_name as TABLE_NAME, t.view_definition as VIEW_DEFINITION, 'NONE' as CHECK_OPTION, case t.view_is_updatable when 1 then 'YES' else 'NO' end as IS_UPDATABLE, 'NONE' as DEFINER, 'NONE' AS SECURITY_TYPE, case t.collation_type when 45 then 'utf8mb4' else 'NONE' end AS CHARACTER_SET_CLIENT, case t.collation_type when 45 then 'utf8mb4_general_ci' else 'NONE' end AS COLLATION_CONNECTION from oceanbase.__all_virtual_table as t join oceanbase.__all_virtual_database as d on t.tenant_id = effective_tenant_id() and d.tenant_id = effective_tenant_id() and t.database_id = d.database_id where (t.table_type = 1 or t.table_type = 4) and d.in_recyclebin = 0 and d.database_name != '__recyclebin' and d.database_name != 'information_schema' and d.database_name != 'oceanbase' and 0 = sys_privilege_check('table_acc', effective_tenant_id(), d.database_name, t.table_name) )__"))) {
LOG_ERROR("fail to set view_definition", K(ret));
}
}
......@@ -424,7 +424,7 @@ int ObInnerTableSchema::tables_schema(ObTableSchema &table_schema)
table_schema.set_create_mem_version(1);
if (OB_SUCC(ret)) {
if (OB_FAIL(table_schema.set_view_definition(R"__(select /*+ READ_CONSISTENCY(WEAK), use_merge(b, c, d, e)*/ 'def' as TABLE_CATALOG, b.database_name as TABLE_SCHEMA, a.table_name as TABLE_NAME, case when a.database_id & 0xFFFFFFFFFF = 2 then 'SYSTEM VIEW' when (a.table_type = 1 or a.table_type = 4) then 'VIEW' when a.table_type = 2 then 'SYSTEM TABLE' when a.table_type = 1 then 'INDEX' else 'BASE TABLE' end as TABLE_TYPE, NULL as ENGINE, NULL as VERSION, NULL as ROW_FORMAT, sum(c.row_count) as TABLE_ROWS, case when sum(c.row_count) = 0 then 0 else sum(c.data_size)/sum(c.row_count) end as AVG_ROW_LENGTH, sum(c.data_size) as DATA_LENGTH, NULL as MAX_DATA_LENGTH, NULL as INDEX_LENGTH, NULL as DATA_FREE, NULL as AUTO_INCREMENT, a.gmt_create as CREATE_TIME, a.gmt_modified as UPDATE_TIME, NULL as CHECK_TIME, d.collation as TABLE_COLLATION, cast(NULL as unsigned) as CHECKSUM, NULL as CREATE_OPTIONS, a.comment as TABLE_COMMENT from oceanbase.__all_virtual_table a inner join oceanbase.__all_virtual_database b on a.database_id = b.database_id left join oceanbase.__all_virtual_tenant_partition_meta_table c on a.table_id = c.table_id and c.tenant_id = effective_tenant_id() and a.tenant_id = c.tenant_id and c.role = 1 inner join oceanbase.__all_collation d on a.collation_type = d.id where a.tenant_id = effective_tenant_id() and b.tenant_id = effective_tenant_id() and a.table_type != 5 and b.database_name != '__recyclebin' and b.in_recyclebin = 0 group by a.table_id, b.database_name, a.table_name, a.table_type, a.gmt_create, a.gmt_modified, d.collation, a.comment )__"))) {
if (OB_FAIL(table_schema.set_view_definition(R"__(select /*+ READ_CONSISTENCY(WEAK), use_merge(b, c, d, e)*/ 'def' as TABLE_CATALOG, b.database_name as TABLE_SCHEMA, a.table_name as TABLE_NAME, case when a.database_id & 0xFFFFFFFFFF = 2 then 'SYSTEM VIEW' when (a.table_type = 1 or a.table_type = 4) then 'VIEW' when a.table_type = 2 then 'SYSTEM TABLE' when a.table_type = 1 then 'INDEX' else 'BASE TABLE' end as TABLE_TYPE, NULL as ENGINE, NULL as VERSION, NULL as ROW_FORMAT, sum(c.row_count) as TABLE_ROWS, case when sum(c.row_count) = 0 then 0 else sum(c.data_size)/sum(c.row_count) end as AVG_ROW_LENGTH, sum(c.data_size) as DATA_LENGTH, NULL as MAX_DATA_LENGTH, NULL as INDEX_LENGTH, NULL as DATA_FREE, NULL as AUTO_INCREMENT, a.gmt_create as CREATE_TIME, a.gmt_modified as UPDATE_TIME, NULL as CHECK_TIME, d.collation as TABLE_COLLATION, cast(NULL as unsigned) as CHECKSUM, NULL as CREATE_OPTIONS, a.comment as TABLE_COMMENT from oceanbase.__all_virtual_table a inner join oceanbase.__all_virtual_database b on a.database_id = b.database_id left join oceanbase.__all_virtual_tenant_partition_meta_table c on a.table_id = c.table_id and c.tenant_id = effective_tenant_id() and a.tenant_id = c.tenant_id and c.role = 1 inner join oceanbase.__all_collation d on a.collation_type = d.id where a.tenant_id = effective_tenant_id() and b.tenant_id = effective_tenant_id() and a.table_type != 5 and b.database_name != '__recyclebin' and b.in_recyclebin = 0 and 0 = sys_privilege_check('table_acc', effective_tenant_id(), b.database_name, a.table_name) group by a.table_id, b.database_name, a.table_name, a.table_type, a.gmt_create, a.gmt_modified, d.collation, a.comment )__"))) {
LOG_ERROR("fail to set view_definition", K(ret));
}
}
......
......@@ -757,7 +757,7 @@ int ObInnerTableSchema::columns_schema(ObTableSchema &table_schema)
PRIVILEGES,
COLUMN_COMMENT,
GENERATION_EXPRESSION
FROM OCEANBASE.__ALL_VIRTUAL_INFORMATION_COLUMNS)__"))) {
FROM OCEANBASE.__ALL_VIRTUAL_INFORMATION_COLUMNS where 0 = sys_privilege_check('table_acc', effective_tenant_id(), table_schema, table_name))__"))) {
LOG_ERROR("fail to set view_definition", K(ret));
}
}
......
......@@ -10111,7 +10111,7 @@ def_table_schema(
case t.collation_type when 45 then 'utf8mb4' else 'NONE' end AS CHARACTER_SET_CLIENT,
case t.collation_type when 45 then 'utf8mb4_general_ci' else 'NONE' end AS COLLATION_CONNECTION
from oceanbase.__all_virtual_table as t join oceanbase.__all_virtual_database as d on t.tenant_id = effective_tenant_id() and d.tenant_id = effective_tenant_id() and t.database_id = d.database_id
where (t.table_type = 1 or t.table_type = 4) and d.in_recyclebin = 0 and d.database_name != '__recyclebin' and d.database_name != 'information_schema' and d.database_name != 'oceanbase'
where (t.table_type = 1 or t.table_type = 4) and d.in_recyclebin = 0 and d.database_name != '__recyclebin' and d.database_name != 'information_schema' and d.database_name != 'oceanbase' and 0 = sys_privilege_check('table_acc', effective_tenant_id(), d.database_name, t.table_name)
""".replace("\n", " "),
......@@ -10154,7 +10154,7 @@ def_table_schema(
inner join oceanbase.__all_virtual_database b on a.database_id = b.database_id
left join oceanbase.__all_virtual_tenant_partition_meta_table c on a.table_id = c.table_id and c.tenant_id = effective_tenant_id() and a.tenant_id = c.tenant_id and c.role = 1
inner join oceanbase.__all_collation d on a.collation_type = d.id
where a.tenant_id = effective_tenant_id() and b.tenant_id = effective_tenant_id() and a.table_type != 5 and b.database_name != '__recyclebin' and b.in_recyclebin = 0
where a.tenant_id = effective_tenant_id() and b.tenant_id = effective_tenant_id() and a.table_type != 5 and b.database_name != '__recyclebin' and b.in_recyclebin = 0 and 0 = sys_privilege_check('table_acc', effective_tenant_id(), b.database_name, a.table_name)
group by a.table_id, b.database_name, a.table_name, a.table_type, a.gmt_create, a.gmt_modified, d.collation, a.comment
""".replace("\n", " "),
......@@ -12022,7 +12022,7 @@ def_table_schema(
PRIVILEGES,
COLUMN_COMMENT,
GENERATION_EXPRESSION
FROM OCEANBASE.__ALL_VIRTUAL_INFORMATION_COLUMNS""",
FROM OCEANBASE.__ALL_VIRTUAL_INFORMATION_COLUMNS where 0 = sys_privilege_check('table_acc', effective_tenant_id(), table_schema, table_name)""",
in_tenant_space = True,
normal_columns = [ ],
)
......
......@@ -499,5 +499,42 @@ int ObVirtualTableIterator::close()
return ret;
}
} // namespace common
} // namespace oceanbase
// level_str support: db_acc, table_acc
// reference: ob_expr_sys_privilege_check.cpp:calc_resultN
int ObVirtualTableIterator::check_priv(const ObString &level_str,
const ObString &db_name,
const ObString &table_name,
int64_t tenant_id,
bool &passed)
{
int ret = OB_SUCCESS;
share::schema::ObSessionPrivInfo session_priv;
CK (OB_NOT_NULL(session_) && OB_NOT_NULL(schema_guard_));
OX (session_->get_session_priv_info(session_priv));
// bool allow_show = true;
if (OB_SUCC(ret)) {
//tenant_id in table is static casted to int64_t,
//and use statis_cast<uint64_t> for retrieving(same with schema_service)
// schema拆分后,普通租户schema表的tenant_id为0,此时鉴权取session_priv.tenant_id_
if (session_priv.tenant_id_ != static_cast<uint64_t>(tenant_id)
&& OB_INVALID_TENANT_ID != tenant_id) {
//not current tenant's row
} else if (0 == level_str.case_compare("db_acc")) {
if (OB_FAIL(schema_guard_->check_db_show(session_priv, db_name, passed))) {
LOG_WARN("Check db show failed", K(ret));
}
} else if (0 == level_str.case_compare("table_acc")) {
//if (OB_FAIL(priv_mgr.check_table_show(session_priv,
if (OB_FAIL(schema_guard_->check_table_show(session_priv, db_name, table_name, passed))) {
LOG_WARN("Check table show failed", K(ret));
}
} else {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("Check priv level error", K(ret));
}
}
return ret;
}
}// common
}// oceanbase
......@@ -102,6 +102,8 @@ public:
{
return common::ObQueryFlag::Reverse == scan_flag_.scan_order_;
}
virtual int check_priv(const ObString &level_str, const ObString &db_name,
const ObString &table_name, int64_t tenant_id, bool &passed);
VIRTUAL_TO_STRING_KV(K_(output_column_ids));
private:
......
......@@ -265,6 +265,7 @@ TEST_F(TestInformationSchemaService, partitions)
partitions_iter.set_schema_guard(&schema_guard);
partitions_iter.set_tenant_id(real_tenant_id);
partitions_iter.set_allocator(&allocator_);
partitions_iter.set_session(&session_info_);
add_output_column_ids(partitions_iter, share::ObInnerTableSchema::partitions_schema, table);
partitions_iter.set_table_schema(&table);
share::FakePartPropertyGetter property_getter;
......@@ -288,6 +289,7 @@ TEST_F(TestInformationSchemaService, partitions)
partitions_iter.set_schema_guard(&schema_guard);
partitions_iter.set_tenant_id(real_tenant_id);
partitions_iter.set_allocator(&allocator_);
partitions_iter.set_session(&session_info_);
partitions_iter.set_table_schema(&null_table_schema);
row = NULL;
partitions_iter.set_reserved_column_cnt(25);
......@@ -310,6 +312,7 @@ TEST_F(TestInformationSchemaService, partitions)
partitions_iter.set_schema_guard(&schema_guard);
partitions_iter.set_tenant_id(real_tenant_id);
partitions_iter.set_allocator(&allocator_);
partitions_iter.set_session(&session_info_);
partitions_iter.set_table_schema(&table_schema_1);
row = NULL;
partitions_iter.set_reserved_column_cnt(25);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册