未验证 提交 50865ef7 编写于 作者: L LiHongjian 提交者: GitHub

feat(tianmu) Supports conditional deletion of single table and multiple tables (#345)(#348) (#458)

Co-authored-by: Nmergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
上级 0be195ae
#
# DELETE FROM TABLE_NAME WHERE ...
#
use test;
CREATE TABLE `column_type_test` (
`c_tinyint` tinyint(4) DEFAULT NULL COMMENT 'tinyint',
`c_smallint` smallint(6) DEFAULT NULL COMMENT 'smallint',
`c_mediumint` mediumint(9) DEFAULT NULL COMMENT 'mediumint',
`c_int` int(11) DEFAULT NULL COMMENT 'int',
`c_bigint` bigint(20) DEFAULT NULL COMMENT 'bigint',
`c_float` float DEFAULT NULL COMMENT 'float',
`c_double` double DEFAULT NULL COMMENT 'double',
`c_decimal` decimal(10,5) DEFAULT NULL COMMENT 'decimal',
`c_date` date DEFAULT NULL COMMENT 'date',
`c_datetime` datetime DEFAULT NULL COMMENT 'datetime',
`c_timestamp` timestamp NULL DEFAULT NULL COMMENT 'timestamp',
`c_time` time DEFAULT NULL COMMENT 'time',
`c_char` char(10) DEFAULT NULL COMMENT 'char',
`c_varchar` varchar(10) DEFAULT NULL COMMENT 'varchar',
`c_blob` blob COMMENT 'blob',
`c_text` text COMMENT 'text',
`c_longblob` longblob COMMENT 'longblob'
) ENGINE=TIANMU;
#
# DELETE FROM TABLE_NAME WHERE ...for Large amount of data
#
load data infile '../../std_data/tianmu/bigdata4load' into table column_type_test fields terminated by '|';
select count(*) from column_type_test;
count(*)
100000
select max(c_int) from column_type_test;
max(c_int)
104
select min(c_int) from column_type_test;
min(c_int)
100
select avg(c_int) from column_type_test;
avg(c_int)
102.0000
select sum(c_int) from column_type_test;
sum(c_int)
10200000
show create table column_type_test;
Table Create Table
column_type_test CREATE TABLE `column_type_test` (
`c_tinyint` tinyint(4) DEFAULT NULL COMMENT 'tinyint',
`c_smallint` smallint(6) DEFAULT NULL COMMENT 'smallint',
`c_mediumint` mediumint(9) DEFAULT NULL COMMENT 'mediumint',
`c_int` int(11) DEFAULT NULL COMMENT 'int',
`c_bigint` bigint(20) DEFAULT NULL COMMENT 'bigint',
`c_float` float DEFAULT NULL COMMENT 'float',
`c_double` double DEFAULT NULL COMMENT 'double',
`c_decimal` decimal(10,5) DEFAULT NULL COMMENT 'decimal',
`c_date` date DEFAULT NULL COMMENT 'date',
`c_datetime` datetime DEFAULT NULL COMMENT 'datetime',
`c_timestamp` timestamp NULL DEFAULT NULL COMMENT 'timestamp',
`c_time` time DEFAULT NULL COMMENT 'time',
`c_char` char(10) DEFAULT NULL COMMENT 'char',
`c_varchar` varchar(10) DEFAULT NULL COMMENT 'varchar',
`c_blob` blob COMMENT 'blob',
`c_text` text COMMENT 'text',
`c_longblob` longblob COMMENT 'longblob'
) ENGINE=TIANMU DEFAULT CHARSET=latin1
select count(*) from column_type_test;
count(*)
100000
select max(c_int) from column_type_test;
max(c_int)
104
select min(c_int) from column_type_test;
min(c_int)
100
select avg(c_int) from column_type_test;
avg(c_int)
102.0000
select sum(c_int) from column_type_test;
sum(c_int)
10200000
drop table column_type_test;
drop table if exists t1,t2,t3,t11,t12;
CREATE TABLE t1 (a tinyint(3), b tinyint(5));
INSERT INTO t1 VALUES (1,1);
INSERT LOW_PRIORITY INTO t1 VALUES (1,2);
INSERT INTO t1 VALUES (1,3);
DELETE from t1 where a=1 limit 1;
DELETE LOW_PRIORITY from t1 where a=1;
INSERT INTO t1 VALUES (1,1);
DELETE from t1;
LOCK TABLE t1 write;
INSERT INTO t1 VALUES (1,2);
DELETE from t1;
UNLOCK TABLES;
INSERT INTO t1 VALUES (1,2);
SET AUTOCOMMIT=0;
DELETE from t1;
SET AUTOCOMMIT=1;
drop table t1;
create table t1 (
a bigint not null,
b bigint not null default 0,
c bigint not null default 0,
d bigint not null default 0,
e bigint not null default 0,
f bigint not null default 0,
g bigint not null default 0,
h bigint not null default 0,
i bigint not null default 0,
j bigint not null default 0,
primary key (a,b,c,d,e,f,g,h,i,j));
insert into t1 (a) values (2),(4),(6),(8),(10),(12),(14),(16),(18),(20),(22),(24),(26),(23);
delete from t1 where a=26;
drop table t1;
create table t1 (
a bigint not null,
b bigint not null default 0,
c bigint not null default 0,
d bigint not null default 0,
e bigint not null default 0,
f bigint not null default 0,
g bigint not null default 0,
h bigint not null default 0,
i bigint not null default 0,
j bigint not null default 0,
primary key (a,b,c,d,e,f,g,h,i,j));
insert into t1 (a) values (2),(4),(6),(8),(10),(12),(14),(16),(18),(20),(22),(24),(26),(23),(27);
delete from t1 where a=27;
drop table t1;
CREATE TABLE `t1` (
`i` int(10) NOT NULL default '0',
`i2` int(10) NOT NULL default '0',
PRIMARY KEY (`i`)
);
DELETE FROM t1 USING t1 WHERE post='1';
ERROR 42S22: Unknown column 'post' in 'where clause'
drop table t1;
CREATE TABLE t1 (
bool char(0) default NULL,
not_null varchar(20) binary NOT NULL default '',
misc integer not null,
PRIMARY KEY (not_null)
);
INSERT INTO t1 VALUES (NULL,'a',4), (NULL,'b',5), (NULL,'c',6), (NULL,'d',7);
select * from t1 where misc > 5 and bool is null;
bool not_null misc
NULL c 6
NULL d 7
delete from t1 where misc > 5 and bool is null;
select * from t1 where misc > 5 and bool is null;
bool not_null misc
select count(*) from t1;
count(*)
2
delete from t1 where 1 > 2;
select count(*) from t1;
count(*)
2
delete from t1 where 3 > 2;
select count(*) from t1;
count(*)
0
drop table t1;
create table t11 (a int NOT NULL, b int, primary key (a));
create table t12 (a int NOT NULL, b int, primary key (a));
create table t2 (a int NOT NULL, b int, primary key (a));
insert into t11 values (0, 10),(1, 11),(2, 12);
insert into t12 values (33, 10),(0, 11),(2, 12);
insert into t2 values (1, 21),(2, 12),(3, 23);
select * from t11;
a b
0 10
1 11
2 12
select * from t12;
a b
0 11
2 12
33 10
select * from t2;
a b
1 21
2 12
3 23
delete t11.*, t12.* from t11,t12 where t11.a = t12.a and t11.b <> (select b from t2 where t11.a < t2.a);
ERROR 21000: Subquery returns more than 1 row
select * from t11;
a b
0 10
1 11
2 12
select * from t12;
a b
0 11
2 12
33 10
delete ignore t11.*, t12.* from t11,t12 where t11.a = t12.a and t11.b <> (select b from t2 where t11.a < t2.a);
Warnings:
Warning 1242 Subquery returns more than 1 row
select * from t11;
a b
0 10
1 11
select * from t12;
a b
0 11
33 10
insert into t11 values (2, 12);
delete from t11 where t11.b <> (select b from t2 where t11.a < t2.a);
ERROR 21000: Subquery returns more than 1 row
select * from t11;
a b
0 10
1 11
2 12
delete ignore from t11 where t11.b <> (select b from t2 where t11.a < t2.a);
Warnings:
Warning 1242 Subquery returns more than 1 row
Warning 1242 Subquery returns more than 1 row
select * from t11;
a b
0 10
1 11
drop table t11, t12, t2;
# sql_safe_updates mode with multi-table DELETE
CREATE TABLE t1(a INTEGER PRIMARY KEY);
INSERT INTO t1 VALUES(10),(20);
CREATE TABLE t2(b INTEGER);
INSERT INTO t2 VALUES(10),(20);
SET SESSION sql_safe_updates=1;
EXPLAIN DELETE t2 FROM t1 JOIN t2 WHERE t1.a = 10;
id select_type table partitions type possible_keys key key_len ref rows filtered Extra
1 SIMPLE t1 NULL const PRIMARY PRIMARY 4 const 1 100.00 Using index
1 DELETE t2 NULL ALL NULL NULL NULL NULL 2 100.00 NULL
DELETE t2 FROM t1 JOIN t2 WHERE t1.a = 10;
ERROR HY000: You are using safe update mode and you tried to update a table without a WHERE that uses a KEY column.
SET SESSION sql_safe_updates=default;
DROP TABLE t1, t2;
create table t1 (a int, b int, unique key (a), key (b));
insert into t1 values (3, 3), (7, 7);
delete t1 from t1 where a = 3;
check table t1;
Table Op Msg_type Msg_text
test.t1 check status OK
select * from t1;
a b
7 7
drop table t1;
CREATE TABLE t1 ( a int PRIMARY KEY );
DELETE FROM t1 WHERE t1.a > 0 ORDER BY t1.a;
INSERT INTO t1 VALUES (0),(1),(2);
DELETE FROM t1 WHERE t1.a > 0 ORDER BY t1.a LIMIT 1;
SELECT * FROM t1;
a
0
2
DROP TABLE t1;
CREATE TABLE t1 (a int not null,b int not null);
CREATE TABLE t2 (a int not null, b int not null, primary key (a,b));
CREATE TABLE t3 (a int not null, b int not null, primary key (a,b));
insert into t1 values (1,1),(2,1),(1,3);
insert into t2 values (1,1),(2,2),(3,3);
insert into t3 values (1,1),(2,1),(1,3);
select * from t1,t2,t3 where t1.a=t2.a AND t2.b=t3.a and t1.b=t3.b;
a b a b a b
1 1 1 1 1 1
2 1 2 2 2 1
1 3 1 1 1 3
explain select * from t1,t2,t3 where t1.a=t2.a AND t2.b=t3.a and t1.b=t3.b;
id select_type table partitions type possible_keys key key_len ref rows filtered Extra
1 SIMPLE t1 NULL ALL NULL NULL NULL NULL 3 100.00 NULL
1 SIMPLE t2 NULL ref PRIMARY PRIMARY 4 test.t1.a 1 100.00 Using index
1 SIMPLE t3 NULL eq_ref PRIMARY PRIMARY 8 test.t2.b,test.t1.b 1 100.00 Using index
Warnings:
Note 1003 /* select#1 */ select `test`.`t1`.`a` AS `a`,`test`.`t1`.`b` AS `b`,`test`.`t2`.`a` AS `a`,`test`.`t2`.`b` AS `b`,`test`.`t3`.`a` AS `a`,`test`.`t3`.`b` AS `b` from `test`.`t1` join `test`.`t2` join `test`.`t3` where ((`test`.`t3`.`b` = `test`.`t1`.`b`) and (`test`.`t3`.`a` = `test`.`t2`.`b`) and (`test`.`t2`.`a` = `test`.`t1`.`a`))
delete t2.*,t3.* from t1,t2,t3 where t1.a=t2.a AND t2.b=t3.a and t1.b=t3.b;
select * from t3;
a b
drop table t1,t2,t3;
create table t1(a date not null);
insert ignore into t1 values (0);
Warnings:
Warning 1264 Out of range value for column 'a' at row 1
select * from t1 where a is null;
a
0000-00-00
delete from t1 where a is null;
select count(*) from t1;
count(*)
0
drop table t1;
#
# Adds some test cases for unicode. such as insert unicode chars, in values, table name, or etc.
#
CREATE TABLE `abc def` (i int)engine=tianmu;
INSERT INTO `abc def` VALUES (1);
delete from `abc def` where i=1;
drop table `abc def`;
CREATE TABLE t1 (`abc def1` int, `abc def2` int);
INSERT INTO t1 VALUES (1,1);
DELETE from t1 where `abc def1` = 1;
LOCK TABLE t1 write;
INSERT INTO t1 VALUES (1,2);
DELETE from t1 where `abc def1` = 1;
UNLOCK TABLES;
INSERT INTO t1 VALUES (1,2);
SET AUTOCOMMIT=0;
DELETE from t1 where `abc def2` = 2;
SET AUTOCOMMIT=1;
drop table t1;
#
# deleting rows from a temporary tables
#
CREATE TABLE t1 (c int not null, d char (10) not null);
insert into t1 values(1,""),(2,"a"),(3,"b");
CREATE TEMPORARY TABLE t1 (a int not null, b char (10) not null);
insert into t1 values(4,"e"),(5,"f"),(6,"g");
alter table t1 rename t2;
select * from t1;
c d
1
2 a
3 b
select * from t2;
a b
4 e
5 f
6 g
delete from t1;
drop table t1 , t2;
#
# Multi engine
#
drop table if exists tbIn,t1;
Warnings:
Note 1051 Unknown table 'test.tbIn'
Note 1051 Unknown table 'test.t1'
create table tbIn(c1 int,c2 varchar(255))engine=InnoDB;
insert into tbIn values(3,'hhhb');
insert into tbIn values(2,'hhhb');
insert into tbIn values(1,'hhhb');
create table t1(c1 int,c2 varchar(255))engine=tianmu;
insert into t1 values(3,'hhhb');
insert into t1 values(2,'hhhb');
insert into t1 values(1,'hhhb');
delete tbIn,t1 from tbIn,t1 where tbIn.c1=t1.c1 and tbIn.c1=1;
drop table tbIn,t1;
#--source include/have_tianmu.inc
--echo #
--echo # DELETE FROM TABLE_NAME WHERE ...
--echo #
use test;
CREATE TABLE `column_type_test` (
`c_tinyint` tinyint(4) DEFAULT NULL COMMENT 'tinyint',
`c_smallint` smallint(6) DEFAULT NULL COMMENT 'smallint',
`c_mediumint` mediumint(9) DEFAULT NULL COMMENT 'mediumint',
`c_int` int(11) DEFAULT NULL COMMENT 'int',
`c_bigint` bigint(20) DEFAULT NULL COMMENT 'bigint',
`c_float` float DEFAULT NULL COMMENT 'float',
`c_double` double DEFAULT NULL COMMENT 'double',
`c_decimal` decimal(10,5) DEFAULT NULL COMMENT 'decimal',
`c_date` date DEFAULT NULL COMMENT 'date',
`c_datetime` datetime DEFAULT NULL COMMENT 'datetime',
`c_timestamp` timestamp NULL DEFAULT NULL COMMENT 'timestamp',
`c_time` time DEFAULT NULL COMMENT 'time',
`c_char` char(10) DEFAULT NULL COMMENT 'char',
`c_varchar` varchar(10) DEFAULT NULL COMMENT 'varchar',
`c_blob` blob COMMENT 'blob',
`c_text` text COMMENT 'text',
`c_longblob` longblob COMMENT 'longblob'
) ENGINE=TIANMU;
--echo #
--echo # DELETE FROM TABLE_NAME WHERE ...for Large amount of data
--echo #
load data infile '../../std_data/tianmu/bigdata4load' into table column_type_test fields terminated by '|';
select count(*) from column_type_test;
select max(c_int) from column_type_test;
select min(c_int) from column_type_test;
select avg(c_int) from column_type_test;
select sum(c_int) from column_type_test;
#delete from column_type_test where c_tinyint=100; #This operation is too time-consuming under large data volume and can be released during private testing
show create table column_type_test;
select count(*) from column_type_test;
select max(c_int) from column_type_test;
select min(c_int) from column_type_test;
select avg(c_int) from column_type_test;
select sum(c_int) from column_type_test;
drop table column_type_test;
--disable_warnings
drop table if exists t1,t2,t3,t11,t12;
--enable_warnings
CREATE TABLE t1 (a tinyint(3), b tinyint(5));
INSERT INTO t1 VALUES (1,1);
INSERT LOW_PRIORITY INTO t1 VALUES (1,2);
INSERT INTO t1 VALUES (1,3);
DELETE from t1 where a=1 limit 1;
DELETE LOW_PRIORITY from t1 where a=1;
INSERT INTO t1 VALUES (1,1);
DELETE from t1;
LOCK TABLE t1 write;
INSERT INTO t1 VALUES (1,2);
DELETE from t1;
UNLOCK TABLES;
INSERT INTO t1 VALUES (1,2);
SET AUTOCOMMIT=0;
DELETE from t1;
SET AUTOCOMMIT=1;
drop table t1;
#
# Test of delete when the delete will cause a node to disappear and reappear
# (This assumes a block size of 1024)
#
create table t1 (
a bigint not null,
b bigint not null default 0,
c bigint not null default 0,
d bigint not null default 0,
e bigint not null default 0,
f bigint not null default 0,
g bigint not null default 0,
h bigint not null default 0,
i bigint not null default 0,
j bigint not null default 0,
primary key (a,b,c,d,e,f,g,h,i,j));
insert into t1 (a) values (2),(4),(6),(8),(10),(12),(14),(16),(18),(20),(22),(24),(26),(23);
delete from t1 where a=26;
drop table t1;
create table t1 (
a bigint not null,
b bigint not null default 0,
c bigint not null default 0,
d bigint not null default 0,
e bigint not null default 0,
f bigint not null default 0,
g bigint not null default 0,
h bigint not null default 0,
i bigint not null default 0,
j bigint not null default 0,
primary key (a,b,c,d,e,f,g,h,i,j));
insert into t1 (a) values (2),(4),(6),(8),(10),(12),(14),(16),(18),(20),(22),(24),(26),(23),(27);
delete from t1 where a=27;
drop table t1;
CREATE TABLE `t1` (
`i` int(10) NOT NULL default '0',
`i2` int(10) NOT NULL default '0',
PRIMARY KEY (`i`)
);
-- error 1054
DELETE FROM t1 USING t1 WHERE post='1';
drop table t1;
#
# CHAR(0) bug - not actually DELETE bug, but anyway...
#
CREATE TABLE t1 (
bool char(0) default NULL,
not_null varchar(20) binary NOT NULL default '',
misc integer not null,
PRIMARY KEY (not_null)
);
INSERT INTO t1 VALUES (NULL,'a',4), (NULL,'b',5), (NULL,'c',6), (NULL,'d',7);
select * from t1 where misc > 5 and bool is null;
delete from t1 where misc > 5 and bool is null;
select * from t1 where misc > 5 and bool is null;
select count(*) from t1;
delete from t1 where 1 > 2;
select count(*) from t1;
delete from t1 where 3 > 2;
select count(*) from t1;
drop table t1;
#
# IGNORE option
#
create table t11 (a int NOT NULL, b int, primary key (a));
create table t12 (a int NOT NULL, b int, primary key (a));
create table t2 (a int NOT NULL, b int, primary key (a));
insert into t11 values (0, 10),(1, 11),(2, 12);
insert into t12 values (33, 10),(0, 11),(2, 12);
insert into t2 values (1, 21),(2, 12),(3, 23);
select * from t11;
select * from t12;
select * from t2;
-- error 1242
delete t11.*, t12.* from t11,t12 where t11.a = t12.a and t11.b <> (select b from t2 where t11.a < t2.a);
select * from t11;
select * from t12;
delete ignore t11.*, t12.* from t11,t12 where t11.a = t12.a and t11.b <> (select b from t2 where t11.a < t2.a);
select * from t11;
select * from t12;
insert into t11 values (2, 12);
-- error 1242
delete from t11 where t11.b <> (select b from t2 where t11.a < t2.a);
select * from t11;
delete ignore from t11 where t11.b <> (select b from t2 where t11.a < t2.a);
select * from t11;
drop table t11, t12, t2;
--echo # sql_safe_updates mode with multi-table DELETE
CREATE TABLE t1(a INTEGER PRIMARY KEY);
INSERT INTO t1 VALUES(10),(20);
CREATE TABLE t2(b INTEGER);
INSERT INTO t2 VALUES(10),(20);
SET SESSION sql_safe_updates=1;
EXPLAIN DELETE t2 FROM t1 JOIN t2 WHERE t1.a = 10;
-- error ER_UPDATE_WITHOUT_KEY_IN_SAFE_MODE
DELETE t2 FROM t1 JOIN t2 WHERE t1.a = 10;
SET SESSION sql_safe_updates=default;
DROP TABLE t1, t2;
#
# deletion and KEYREAD
#
create table t1 (a int, b int, unique key (a), key (b));
insert into t1 values (3, 3), (7, 7);
delete t1 from t1 where a = 3;
check table t1;
select * from t1;
drop table t1;
#
# delete with ORDER BY containing a direct reference to the table
#
CREATE TABLE t1 ( a int PRIMARY KEY );
DELETE FROM t1 WHERE t1.a > 0 ORDER BY t1.a;
INSERT INTO t1 VALUES (0),(1),(2);
DELETE FROM t1 WHERE t1.a > 0 ORDER BY t1.a LIMIT 1;
SELECT * FROM t1;
DROP TABLE t1;
#
# Test of multi-delete where we are not scanning the first table
#
CREATE TABLE t1 (a int not null,b int not null);
CREATE TABLE t2 (a int not null, b int not null, primary key (a,b));
CREATE TABLE t3 (a int not null, b int not null, primary key (a,b));
insert into t1 values (1,1),(2,1),(1,3);
insert into t2 values (1,1),(2,2),(3,3);
insert into t3 values (1,1),(2,1),(1,3);
select * from t1,t2,t3 where t1.a=t2.a AND t2.b=t3.a and t1.b=t3.b;
explain select * from t1,t2,t3 where t1.a=t2.a AND t2.b=t3.a and t1.b=t3.b;
delete t2.*,t3.* from t1,t2,t3 where t1.a=t2.a AND t2.b=t3.a and t1.b=t3.b;
# This should be empty
select * from t3;
drop table t1,t2,t3;
#
# deleting '0000-00-00' values using IS NULL
#
create table t1(a date not null);
insert ignore into t1 values (0);
select * from t1 where a is null;
delete from t1 where a is null;
select count(*) from t1;
drop table t1;
--echo #
--echo # Adds some test cases for unicode. such as insert unicode chars, in values, table name, or etc.
--echo #
CREATE TABLE `abc def` (i int)engine=tianmu;
INSERT INTO `abc def` VALUES (1);
delete from `abc def` where i=1;
drop table `abc def`;
CREATE TABLE t1 (`abc def1` int, `abc def2` int);
INSERT INTO t1 VALUES (1,1);
DELETE from t1 where `abc def1` = 1;
LOCK TABLE t1 write;
INSERT INTO t1 VALUES (1,2);
DELETE from t1 where `abc def1` = 1;
UNLOCK TABLES;
INSERT INTO t1 VALUES (1,2);
SET AUTOCOMMIT=0;
DELETE from t1 where `abc def2` = 2;
SET AUTOCOMMIT=1;
drop table t1;
--echo #
--echo # deleting rows from a temporary tables
--echo #
CREATE TABLE t1 (c int not null, d char (10) not null);
insert into t1 values(1,""),(2,"a"),(3,"b");
CREATE TEMPORARY TABLE t1 (a int not null, b char (10) not null);
insert into t1 values(4,"e"),(5,"f"),(6,"g");
alter table t1 rename t2;
select * from t1;
select * from t2;
delete from t1;
drop table t1 , t2;
--echo #
--echo # Multi engine
--echo #
drop table if exists tbIn,t1;
create table tbIn(c1 int,c2 varchar(255))engine=InnoDB;
insert into tbIn values(3,'hhhb');
insert into tbIn values(2,'hhhb');
insert into tbIn values(1,'hhhb');
create table t1(c1 int,c2 varchar(255))engine=tianmu;
insert into t1 values(3,'hhhb');
insert into t1 values(2,'hhhb');
insert into t1 values(1,'hhhb');
delete tbIn,t1 from tbIn,t1 where tbIn.c1=t1.c1 and tbIn.c1=1;
drop table tbIn,t1;
\ No newline at end of file
delete : not support
update : not support
......@@ -42,7 +42,11 @@ constexpr const char *COL_FILTER_HIST_DIR = "hist";
constexpr const char *COL_KN_FILE = "KN";
constexpr const char *COL_META_FILE = "META";
constexpr const char *COL_DN_FILE = "DN";
constexpr size_t COL_DN_FILE_SIZE = 10 * 1024 * 1024; // given the pack size is 64K, we support up to 8G rows
/*
The size of the file where the DPN metadata resides, in bytes
At present, the size of a single DPN is 88 bytes, and the storage limit is 8.589 billion lines
*/
constexpr size_t COL_DN_FILE_SIZE = 11 * 1024 * 1024;
constexpr const char *COL_DATA_FILE = "DATA";
constexpr const char *COL_VERSION_DIR = "v";
constexpr uint32_t COL_FILE_VERSION = 3;
......
......@@ -27,8 +27,30 @@
namespace Tianmu {
namespace core {
/*
The size of the current DPN structure object is used to prevent the structure of
the DPN from being changed arbitrarily.
If modification is required,
please consider the size of PAGT_CNT and COL_DN_FILE_SIZE to
prevent space waste and give consideration to IO efficiency
*/
static constexpr size_t DPN_SIZE = 88;
// make sure the struct is not modified by mistake
static_assert(sizeof(DPN) == 80, "Bad struct size of DPN");
static_assert(sizeof(DPN) == DPN_SIZE, "Bad struct size of DPN");
//Operating system page size
static constexpr size_t PAGE_SIZE = 4096;
//Number of pages per allocation
static constexpr size_t PAGE_CNT = 11;
//Size of DPN memory allocation
static constexpr size_t ALLOC_UNIT = PAGE_CNT * PAGE_SIZE;
//Ensure that the allocated memory is an integer multiple of the DPN size
static_assert(ALLOC_UNIT % sizeof(DPN) == 0);
//Number of dpns allocated each time
static constexpr size_t DPN_INC_CNT = ALLOC_UNIT / sizeof(DPN);
ColumnShare::~ColumnShare() {
if (start != nullptr) {
......@@ -59,7 +81,7 @@ void ColumnShare::map_dpn() {
}
ASSERT(sb.st_size % sizeof(DPN) == 0);
cap = sb.st_size / sizeof(DPN);
capacity = sb.st_size / sizeof(DPN);
auto addr = ::mmap(0, common::COL_DN_FILE_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, dn_fd, 0);
if (addr == MAP_FAILED) {
......@@ -113,20 +135,20 @@ void ColumnShare::scan_dpn(common::TX_ID xid) {
fv.OpenReadOnly(m_path / common::COL_VERSION_DIR / xid.ToString());
fv.ReadExact(&hdr, sizeof(hdr));
ASSERT(hdr.np <= cap, "bad dpn index");
ASSERT(hdr.numOfPacks <= capacity, "bad dpn index");
if (hdr.np == 0) {
for (uint32_t i = 0; i < cap; i++) {
if (hdr.numOfPacks == 0) {
for (uint32_t i = 0; i < capacity; i++) {
start[i].reset();
}
return;
}
auto arr = std::make_unique<common::PACK_INDEX[]>(hdr.np);
fv.ReadExact(arr.get(), hdr.np * sizeof(common::PACK_INDEX));
auto end = arr.get() + hdr.np;
auto arr = std::make_unique<common::PACK_INDEX[]>(hdr.numOfPacks);
fv.ReadExact(arr.get(), hdr.numOfPacks * sizeof(common::PACK_INDEX));
auto end = arr.get() + hdr.numOfPacks;
for (uint32_t i = 0; i < cap; i++) {
for (uint32_t i = 0; i < capacity; i++) {
auto found = std::find(arr.get(), end, i);
if (found == end) {
start[i].reset();
......@@ -140,8 +162,8 @@ void ColumnShare::scan_dpn(common::TX_ID xid) {
TIANMU_LOG(LogCtl_Level::WARN, "uncommited pack found: %s %d", m_path.c_str(), i);
start[i].local = 0;
}
if (start[i].addr != DPN_INVALID_ADDR) {
segs.push_back({start[i].addr, start[i].len, i});
if (start[i].dataAddress != DPN_INVALID_ADDR) {
segs.push_back({start[i].dataAddress, start[i].dataLength, i});
} else {
}
}
......@@ -168,7 +190,7 @@ void ColumnShare::init_dpn(DPN &dpn, const common::TX_ID xid, const DPN *from) {
dpn = *from;
} else {
dpn.reset();
dpn.addr = DPN_INVALID_ADDR;
dpn.dataAddress = DPN_INVALID_ADDR;
if (pt == common::PackType::INT) {
dpn.min_i = common::PLUS_INF_64;
dpn.max_i = common::MINUS_INF_64;
......@@ -189,18 +211,9 @@ void ColumnShare::init_dpn(DPN &dpn, const common::TX_ID xid, const DPN *from) {
dpn.SetPackPtr(0);
}
// should get page size at run time with sysconf(_SC_PAGE_SIZE) but for
// efficiency...
static constexpr size_t PAGE_SIZE = 4096;
static constexpr size_t PAGE_CNT = 5;
static constexpr size_t ALLOC_UNIT = PAGE_CNT * PAGE_SIZE;
static_assert(ALLOC_UNIT % sizeof(DPN) == 0);
static constexpr size_t DPN_INC_CNT = ALLOC_UNIT / sizeof(DPN);
int ColumnShare::alloc_dpn(common::TX_ID xid, const DPN *from) {
for (uint32_t i = 0; i < cap; i++) {
for (uint32_t i = 0; i < capacity; i++) {
if (start[i].used == 1) {
if (!(start[i].xmax < ha_rcengine_->MinXID())) continue;
ha_rcengine_->cache.DropObject(PackCoordinate(owner->TabID(), col_id, i));
......@@ -210,17 +223,17 @@ int ColumnShare::alloc_dpn(common::TX_ID xid, const DPN *from) {
return i;
}
ASSERT((cap + DPN_INC_CNT) <= (common::COL_DN_FILE_SIZE / sizeof(DPN)),
ASSERT((capacity + DPN_INC_CNT) <= (common::COL_DN_FILE_SIZE / sizeof(DPN)),
"Failed to allocate new DN: " + m_path.string());
cap += DPN_INC_CNT;
capacity += DPN_INC_CNT;
// NOTICE:
// It is not portable to enlarge the file size after mmapping, but it seems to
// work well on Linux. Otherwise we'll need to remmap the file, which in turn
// requires locking because other threads may read simultaneously.
ftruncate(dn_fd, cap * sizeof(DPN));
init_dpn(start[cap - 1], xid, from);
return cap - 1;
ftruncate(dn_fd, capacity * sizeof(DPN));
init_dpn(start[capacity - 1], xid, from);
return capacity - 1;
}
void ColumnShare::alloc_seg(DPN *dpn) {
......@@ -228,15 +241,15 @@ void ColumnShare::alloc_seg(DPN *dpn) {
uint64_t prev = 0;
for (auto it = segs.cbegin(); it != segs.cend(); ++it) {
if (it->offset - prev > dpn->len) {
segs.insert(it, {prev, dpn->len, i});
dpn->addr = prev;
if (it->offset - prev > dpn->dataLength) {
segs.insert(it, {prev, dpn->dataLength, i});
dpn->dataAddress = prev;
return;
}
prev = it->offset + it->len;
}
segs.push_back({prev, dpn->len, i});
dpn->addr = prev;
segs.push_back({prev, dpn->dataLength, i});
dpn->dataAddress = prev;
}
void ColumnShare::sync_dpns() {
......
......@@ -43,13 +43,15 @@ struct COL_META {
};
struct alignas(128) COL_VER_HDR_V3 {
uint64_t nr; // no. of records
uint64_t nn; // no. of nulls
uint64_t np; // no. of packs
uint64_t numOfRecords; // number of records
uint64_t numOfNulls; // number of nulls
uint64_t numOfPacks; // number of packs
uint64_t numOfDeleted; //number of deleted
uint64_t auto_inc_next;
int64_t min;
int64_t max;
uint32_t dict_ver; // dict file version name. 0 means n/a
uint32_t dict_ver; // dict file version name. 0 means n/a
uint32_t unique : 1;
uint32_t unique_updated : 1;
uint64_t natural_size;
......@@ -92,7 +94,7 @@ class ColumnShare final {
uint8_t pss;
common::PACK_INDEX GetPackIndex(DPN *dpn) const {
auto i = std::distance(start, dpn);
ASSERT(i >= 0 && size_t(i) < cap, "bad index " + std::to_string(i));
ASSERT(i >= 0 && size_t(i) < capacity, "bad index " + std::to_string(i));
return i;
}
......@@ -107,7 +109,7 @@ class ColumnShare final {
ColumnType ct;
int dn_fd{-1};
DPN *start;
size_t cap{0}; // current capacity of the dn array
size_t capacity{0}; // current capacity of the dn array
common::PackType pt;
uint32_t col_id;
......
......@@ -39,14 +39,19 @@ struct DPN final {
uint8_t synced : 1; // if the pack data in memory is up to date with the
// version on disk
uint8_t null_compressed : 1;
uint8_t delete_compressed : 1;
uint8_t data_compressed : 1;
uint8_t no_compress : 1;
uint8_t padding[3];
uint32_t base; // index of the DPN from which we copied, used by local pack
uint64_t addr; // data start address
uint64_t len; // data length
uint32_t nr; // number of records
uint32_t nn; // number of nulls
uint8_t padding[1]; // Memory aligned padding has no practical effect
uint32_t base; // index of the DPN from which we copied, used by local pack
uint32_t numOfRecords; // number of records
uint32_t numOfNulls; // number of nulls
uint32_t numOfDeleted; // number of deletes
uint64_t dataAddress; // data start address
uint64_t dataLength; // data length
common::TX_ID xmin; // creation trx id
common::TX_ID xmax; // delete trx id
union {
......@@ -75,10 +80,15 @@ struct DPN final {
bool CAS(uint64_t &expected, uint64_t desired) { return tagged_ptr.compare_exchange_weak(expected, desired); }
uint64_t GetPackPtr() const { return tagged_ptr.load(); }
void SetPackPtr(uint64_t v) { tagged_ptr.store(v); }
bool Trivial() const { return Uniform() || NullOnly(); }
/*
Because the delete bitmap is in the pack,
when there are deleted records in the pack,
the package must be stored persistently.
*/
bool Trivial() const { return (Uniform() || NullOnly()) && numOfDeleted == 0; }
bool NotTrivial() const { return !Trivial(); }
bool Uniform() const { return nn == 0 && min_i == max_i; } // for packN, all records are the same and not null
bool NullOnly() const { return nr == nn; }
bool Uniform() const { return numOfNulls == 0 && min_i == max_i; } // for packN, all records are the same and not null
bool NullOnly() const { return numOfRecords == numOfNulls; }
bool IsLocal() const { return local == 1; }
void SetLocal(bool v) { local = v; }
......
......@@ -1548,7 +1548,7 @@ bool Engine::IsTIANMURoute(THD *thd, TABLE_LIST *table_list, SELECT_LEX *selects
// In this list we have all views, derived tables and their
// sources, so anyway we walk through all the source tables
// even though we seem to reject the control of views
if (!IsTIANMUTable(tl->table))
if (!IsTianmuTable(tl->table))
return false;
else
has_TIANMUTable = true;
......@@ -1590,7 +1590,7 @@ bool Engine::IsTIANMURoute(THD *thd, TABLE_LIST *table_list, SELECT_LEX *selects
return true;
}
bool Engine::IsTIANMUTable(TABLE *table) {
bool Engine::IsTianmuTable(TABLE *table) {
return table && table->s->db_type() == rcbase_hton; // table->db_type is always NULL
}
......
......@@ -146,7 +146,7 @@ class Engine final {
static AttributeTypeInfo GetCorrespondingATI(Field &field);
static AttributeTypeInfo GetAttrTypeInfo(const Field &field);
static common::CT GetCorrespondingType(const enum_field_types &eft);
static bool IsTIANMUTable(TABLE *table);
static bool IsTianmuTable(TABLE *table);
static bool ConvertToField(Field *field, types::RCDataType &rcitem, std::vector<uchar> *blob_buf);
static int Convert(int &is_null, my_decimal *value, types::RCDataType &rcitem, int output_scale = -1);
static int Convert(int &is_null, int64_t &value, types::RCDataType &rcitem, enum_field_types f_type);
......
......@@ -396,7 +396,7 @@ int Engine::Execute(THD *thd, LEX *lex, Query_result *result_output, SELECT_LEX_
try {
std::shared_ptr<RCTable> rct;
if (lex->sql_command == SQLCOM_INSERT_SELECT &&
Engine::IsTIANMUTable(((Query_tables_list *)lex)->query_tables->table)) {
Engine::IsTianmuTable(((Query_tables_list *)lex)->query_tables->table)) {
std::string table_path = Engine::GetTablePath(((Query_tables_list *)lex)->query_tables->table);
rct = current_txn_->GetTableByPathIfExists(table_path);
}
......
......@@ -21,11 +21,19 @@
#include "core/data_cache.h"
#include "core/tools.h"
#include "compress/bit_stream_compressor.h"
#include "compress/num_compressor.h"
#include "core/bin_tools.h"
#include "core/column_share.h"
#include "core/value.h"
#include "loader/value_cache.h"
#include "system/tianmu_file.h"
namespace Tianmu {
namespace core {
Pack::Pack(DPN *dpn, PackCoordinate pc, ColumnShare *s) : s(s), dpn(dpn) {
NULLS_SIZE = (1 << s->pss) / 8;
nulls = std::make_unique<uint32_t[]>(NULLS_SIZE / sizeof(uint32_t));
bitmapSize = (1 << s->pss) / 8;
nulls = std::make_unique<uint32_t[]>(bitmapSize / sizeof(uint32_t));
deletes = std::make_unique<uint32_t[]>(bitmapSize / sizeof(uint32_t));
// nulls MUST be initialized in the constructor, there are 3 cases in total:
// 1. All values are NULL. It is initialized here by InitNull();
// 2. All values are uniform. Then it would be all zeros already.
......@@ -38,9 +46,11 @@ Pack::Pack(DPN *dpn, PackCoordinate pc, ColumnShare *s) : s(s), dpn(dpn) {
Pack::Pack(const Pack &ap, const PackCoordinate &pc) : mm::TraceableObject(ap), s(ap.s), dpn(ap.dpn) {
m_coord.ID = COORD_TYPE::PACK;
m_coord.co.pack = pc;
NULLS_SIZE = ap.NULLS_SIZE;
nulls = std::make_unique<uint32_t[]>(NULLS_SIZE / sizeof(uint32_t));
std::memcpy(nulls.get(), ap.nulls.get(), NULLS_SIZE);
bitmapSize = ap.bitmapSize;
nulls = std::make_unique<uint32_t[]>(bitmapSize / sizeof(uint32_t));
deletes = std::make_unique<uint32_t[]>(bitmapSize / sizeof(uint32_t));
std::memcpy(nulls.get(), ap.nulls.get(), bitmapSize);
std::memcpy(deletes.get(), ap.deletes.get(), bitmapSize);
}
int64_t Pack::GetValInt([[maybe_unused]] int n) const {
......@@ -63,7 +73,41 @@ void Pack::Release() {
}
bool Pack::ShouldNotCompress() const {
return (dpn->nr < (1U << s->pss)) || (s->ColType().GetFmt() == common::PackFmt::NOCOMPRESS);
return (dpn->numOfRecords < (1U << s->pss)) || (s->ColType().GetFmt() == common::PackFmt::NOCOMPRESS);
}
bool Pack::CompressedBitMap(mm::MMGuard<uchar> &comp_buf, uint &comp_buf_size,
std::unique_ptr<uint32_t[]> &ptr_buf, uint32_t &dpn_num1) {
//Number of bits in bytes
int bitsInBytes = 8;
//Fill in values to prevent boundary errors
int padding = bitsInBytes-1;
//Because the maximum size of dpn->numofrecords is 65536, the buffer used by bitmaps is also limited
comp_buf_size = ((dpn->numOfRecords + padding) / bitsInBytes);
comp_buf =
mm::MMGuard<uchar>((uchar *)alloc((comp_buf_size + 2) * sizeof(char), mm::BLOCK_TYPE::BLOCK_TEMPORARY), *this);
uint cnbl = comp_buf_size + 1;
comp_buf[cnbl] = 0xBA; // just checking - buffer overrun
compress::BitstreamCompressor bsc;
CprsErr res = bsc.Compress((char *)comp_buf.get(), comp_buf_size, (char *)ptr_buf.get(), dpn->numOfRecords, dpn_num1);
if (comp_buf[cnbl] != 0xBA) {
TIANMU_LOG(LogCtl_Level::ERROR, "buffer overrun by BitstreamCompressor (N f).");
ASSERT(0, "ERROR: buffer overrun by BitstreamCompressor (N f).");
}
if (res == CprsErr::CPRS_SUCCESS)
return true;
else if (res == CprsErr::CPRS_ERR_BUF) {
comp_buf = mm::MMGuard<uchar>((uchar *)ptr_buf.get(), *this, false);
comp_buf_size = ((dpn->numOfRecords + padding) / bitsInBytes);
return false;
} else {
throw common::DatabaseException("Compression of nulls or deletes failed for column " +
std::to_string(pc_column(GetCoordinate().co.pack) + 1) + ", pack " +
std::to_string(pc_dp(GetCoordinate().co.pack) + 1) + " (error " +
std::to_string(static_cast<int>(res)) + ").");
}
}
} // namespace core
} // namespace Tianmu
......@@ -20,6 +20,7 @@
#include "core/dpn.h"
#include "core/tools.h"
#include "mm/mm_guard.h"
#include "mm/traceable_object.h"
#include "types/rc_data_types.h"
......@@ -50,35 +51,59 @@ class Pack : public mm::TraceableObject {
virtual void LoadDataFromFile(system::Stream *fcurfile) = 0;
virtual void Save() = 0;
virtual void UpdateValue(size_t i, const Value &v) = 0;
virtual void DeleteByRow(size_t i) = 0;
virtual int64_t GetValInt(int n) const;
virtual double GetValDouble(int n) const;
virtual types::BString GetValueBinary(int i) const;
void SetNull(int i) {
int mask = 1 << (i % 32);
ASSERT((nulls[i >> 5] & mask) == 0);
nulls[i >> 5] |= mask;
void SetNull(int locationInPack) {
int mask = 1 << (locationInPack % 32);
ASSERT((nulls[locationInPack >> 5] & mask) == 0);
nulls[locationInPack >> 5] |= mask;
}
void UnsetNull(int i) {
int mask = ~(1 << (i % 32));
ASSERT(IsNull(i), "already null!");
nulls[i >> 5] &= mask;
void UnsetNull(int locationInPack) {
int mask = ~(1 << (locationInPack % 32));
ASSERT(IsNull(locationInPack), "already null!");
nulls[locationInPack >> 5] &= mask;
}
bool IsNull(int i) const {
if (dpn->nn == dpn->nr) return true;
return ((nulls[i >> 5] & ((uint32_t)(1) << (i % 32))) != 0);
bool IsNull(int locationInPack) const {
if (dpn->numOfNulls == dpn->numOfRecords) return true;
return ((nulls[locationInPack >> 5] & ((uint32_t)(1) << (locationInPack % 32))) != 0);
}
bool NotNull(int i) const { return !IsNull(i); }
void SetDeleted(int locationInPack) {
int mask = 1 << (locationInPack % 32);
ASSERT((deletes[locationInPack >> 5] & mask) == 0);
deletes[locationInPack >> 5] |= mask;
}
void UnsetDeleted(int locationInPack) {
int mask = ~(1 << (locationInPack % 32));
ASSERT(IsDeleted(locationInPack), "already deleted!");
deletes[locationInPack >> 5] &= mask;
}
//If the line in the package has been deleted, return true; otherwise, return false
bool IsDeleted(int locationInPack) const {
if (dpn->numOfDeleted == dpn->numOfRecords) return true;
return ((deletes[locationInPack >> 5] & ((uint32_t)(1) << (locationInPack % 32))) != 0);
}
bool NotNull(int locationInPack) const { return !IsNull(locationInPack); }
void InitNull() {
if (dpn->NullOnly()) {
for (uint i = 0; i < dpn->nn; i++) SetNull(i);
for (uint i = 0; i < dpn->numOfNulls; i++) SetNull(i);
}
}
PackCoordinate GetPackCoordinate() const { return m_coord.co.pack; }
void SetDPN(DPN *new_dpn) { dpn = new_dpn; }
//Compress bitmap
bool CompressedBitMap(mm::MMGuard<uchar> &comp_buf, uint &comp_buf_size, std::unique_ptr<uint32_t[]> &ptr_buf,
uint32_t &dpn_num1);
protected:
Pack(DPN *dpn, PackCoordinate pc, ColumnShare *s);
......@@ -88,6 +113,7 @@ class Pack : public mm::TraceableObject {
bool ShouldNotCompress() const;
bool IsModeNullsCompressed() const { return dpn->null_compressed; }
bool IsModeDeletesCompressed() const { return dpn->delete_compressed; }
bool IsModeDataCompressed() const { return dpn->data_compressed; }
bool IsModeCompressionApplied() const { return IsModeDataCompressed() || IsModeNullsCompressed(); }
bool IsModeNoCompression() const { return dpn->no_compress; }
......@@ -110,12 +136,27 @@ class Pack : public mm::TraceableObject {
}
void ResetModeNullsCompressed() { dpn->null_compressed = 0; }
void SetModeDeletesCompressed() {
ResetModeNoCompression();
dpn->delete_compressed = 1;
}
void ResetModeDeletesCompressed() { dpn->delete_compressed = 0; }
protected:
ColumnShare *s = nullptr;
size_t NULLS_SIZE;
size_t bitmapSize;
DPN *dpn = nullptr;
std::unique_ptr<uint32_t[]> nulls;
/*
The actual storage form of a bitmap is an array of type int32.
The principle is to use the 32-bit space occupied by a value of type int32 to
store and record the states of these 32 values using 0 or 1.
The total number of bits in the bitmap is equal to the total number of rows in the pack,
and the position of the data in the pack and the position in the bitmap are also one-to-one correspondence
This can effectively save space.
*/
std::unique_ptr<uint32_t[]> nulls; //Null bitmap
std::unique_ptr<uint32_t[]> deletes; //deleted bitmap
};
} // namespace core
} // namespace Tianmu
......
此差异已折叠。
......@@ -48,13 +48,14 @@ class PackInt final : public Pack {
std::unique_ptr<Pack> Clone(const PackCoordinate &pc) const override;
void LoadDataFromFile(system::Stream *fcurfile) override;
void Save() override;
void UpdateValue(size_t i, const Value &v) override;
void UpdateValue(size_t locationInPack, const Value &v) override;
void DeleteByRow(size_t locationInPack) override;
void LoadValues(const loader::ValueCache *vc, const std::optional<common::double_int_t> &null_value);
int64_t GetValInt(int n) const override { return data[n]; }
double GetValDouble(int n) const override {
int64_t GetValInt(int locationInPack) const override { return data[locationInPack]; }
double GetValDouble(int locationInPack) const override {
ASSERT(is_real);
return data.pdouble[n];
return data.pdouble[locationInPack];
}
bool IsFixed() const { return !is_real; }
......@@ -66,24 +67,24 @@ class PackInt final : public Pack {
PackInt(const PackInt &apn, const PackCoordinate &pc);
void AppendValue(uint64_t v) {
dpn->nr++;
SetVal64(dpn->nr - 1, v);
dpn->numOfRecords++;
SetVal64(dpn->numOfRecords - 1, v);
}
void AppendNull() {
SetNull(dpn->nr);
dpn->nn++;
dpn->nr++;
SetNull(dpn->numOfRecords);
dpn->numOfNulls++;
dpn->numOfRecords++;
}
void SetValD(uint n, double v) {
dpn->synced = false;
ASSERT(n < dpn->nr);
ASSERT(n < dpn->numOfRecords);
ASSERT(is_real);
data.pdouble[n] = v;
}
void SetVal64(uint n, uint64_t v) {
dpn->synced = false;
ASSERT(n < dpn->nr);
ASSERT(n < dpn->numOfRecords);
switch (data.vt) {
case 8:
data.pint64[n] = v;
......@@ -107,8 +108,8 @@ class PackInt final : public Pack {
else
SetNull(n);
}
void UpdateValueFloat(size_t i, const Value &v);
void UpdateValueFixed(size_t i, const Value &v);
void UpdateValueFloat(size_t locationInPack, const Value &v);
void UpdateValueFixed(size_t locationInPack, const Value &v);
void ExpandOrShrink(uint64_t maxv, int64_t delta);
void SaveCompressed(system::Stream *fcurfile);
void SaveUncompressed(system::Stream *fcurfile);
......
此差异已折叠。
......@@ -42,10 +42,11 @@ class PackStr final : public Pack {
// overrides
std::unique_ptr<Pack> Clone(const PackCoordinate &pc) const override;
void LoadDataFromFile(system::Stream *fcurfile) override;
void UpdateValue(size_t i, const Value &v) override;
void UpdateValue(size_t locationInPack, const Value &v) override;
void DeleteByRow(size_t locationInPack) override;
void Save() override;
types::BString GetValueBinary(int i) const override;
types::BString GetValueBinary(int locationInPack) const override;
void LoadValues(const loader::ValueCache *vc);
bool IsTrie() const { return state_ == PackStrtate::PACK_TRIE; }
......@@ -66,27 +67,27 @@ class PackStr final : public Pack {
void Prepare(int no_nulls);
void AppendValue(const char *value, uint size) {
if (size == 0) {
SetPtrSize(dpn->nr, nullptr, 0);
SetPtrSize(dpn->numOfRecords, nullptr, 0);
} else {
SetPtrSize(dpn->nr, Put(value, size), size);
SetPtrSize(dpn->numOfRecords, Put(value, size), size);
data.sum_len += size;
}
dpn->nr++;
dpn->numOfRecords++;
}
size_t CalculateMaxLen() const;
types::BString GetStringValueTrie(int ono) const;
size_t GetSize(int ono) {
types::BString GetStringValueTrie(int locationInPack) const;
size_t GetSize(int locationInPack) {
if (data.len_mode == sizeof(ushort))
return data.lens16[ono];
return data.lens16[locationInPack];
else
return data.lens32[ono];
return data.lens32[locationInPack];
}
void SetSize(int ono, uint size) {
void SetSize(int locationInPack, uint size) {
if (data.len_mode == sizeof(uint16_t))
data.lens16[ono] = (ushort)size;
data.lens16[locationInPack] = (ushort)size;
else
data.lens32[ono] = (uint)size;
data.lens32[locationInPack] = (uint)size;
}
void SetMinS(const types::BString &s);
void SetMaxS(const types::BString &s);
......@@ -108,11 +109,11 @@ class PackStr final : public Pack {
}
};
char *GetPtr(int i) const { return data.index[i]; }
void SetPtr(int i, void *addr) { data.index[i] = reinterpret_cast<char *>(addr); }
void SetPtrSize(int i, void *addr, uint size) {
SetPtr(i, addr);
SetSize(i, size);
char *GetPtr(int locationInPack) const { return data.index[locationInPack]; }
void SetPtr(int locationInPack, void *addr) { data.index[locationInPack] = reinterpret_cast<char *>(addr); }
void SetPtrSize(int locationInPack, void *addr, uint size) {
SetPtr(locationInPack, addr);
SetSize(locationInPack, size);
}
enum class PackStrtate { PACK_ARRAY, PACK_TRIE };
......
......@@ -29,8 +29,13 @@
#include "core/transaction.h"
#include "core/value_set.h"
#include "util/thread_pool.h"
#include "vc/const_column.h"
#include "vc/const_expr_column.h"
#include "vc/expr_column.h"
#include "vc/in_set_column.h"
#include "vc/single_column.h"
#include "vc/subselect_column.h"
#include "vc/type_cast_column.h"
#include "vc/virtual_column.h"
namespace Tianmu {
......@@ -185,9 +190,9 @@ double ParameterizedFilter::EvaluateConditionNonJoinWeight(Descriptor &d, bool f
// Processing descriptor on PK firstly
if (d.IsleftIndexSearch()) eval = 0.001;
} else if (d.IsType_AttrAttr()) { // attr=attr on the same table
} else if (d.IsType_AttrAttr()) { // attr=attr on the same table
uint64_t no_obj = d.attr.vc->NumOfTuples(); // changed to uint64_t to prevent negative
// logarithm for common::NULL_VALUE_64
// logarithm for common::NULL_VALUE_64
if (!d.encoded)
return log(1 + double(2 * no_obj)) + 5; // +5 as a penalty for complex expression
else if (d.op == common::Operator::O_EQ) {
......@@ -963,9 +968,32 @@ void ParameterizedFilter::UpdateMultiIndex(bool count_only, int64_t limit) {
if (descriptors.Size() < 1) {
PrepareRoughMultiIndex();
FilterDeletedForSelectAll();
rough_mind->ClearLocalDescFilters();
return;
} else { /*Judge whether there is filtering logic of the current table.
If not, filter the data of the current table*/
auto &rcTables = table->GetTables();
int no_dims=0;
for (auto rcTable : rcTables) {
if(rcTable->TableType() == TType::TEMP_TABLE) continue;
bool isVald = false;
for (int i = 0; i < descriptors.Size(); i++) {
Descriptor &desc = descriptors[i];
if (desc.attr.vc &&
desc.attr.vc->GetVarMap().size() > 1 &&
desc.attr.vc->GetVarMap()[0].tabp == rcTable) {
isVald = true;
break;
}
}
if (!isVald) {
FilterDeletedByTable(rcTable, no_dims);
}
no_dims++;
}
}
SyntacticalDescriptorListPreprocessing();
bool empty_cannot_grow = true; // if false (e.g. outer joins), then do not
......@@ -1109,7 +1137,7 @@ void ParameterizedFilter::UpdateMultiIndex(bool count_only, int64_t limit) {
if (mind->GetFilter(i))
table->SetVCDistinctVals(i,
mind->GetFilter(i)->NumOfOnes()); // distinct values - not more than the
// number of rows after WHERE
// number of rows after WHERE
rough_mind->ClearLocalDescFilters();
// Some displays
......@@ -1232,6 +1260,7 @@ void ParameterizedFilter::UpdateMultiIndex(bool count_only, int64_t limit) {
join_or_delayed_present = true;
}
}
if (join_or_delayed_present) rough_mind->MakeDimensionSuspect(); // no common::RSValue::RS_ALL packs
mind->UpdateNumOfTuples();
}
......@@ -1426,5 +1455,48 @@ void ParameterizedFilter::TaskProcessPacks(MIUpdatingIterator *taskIterator, Tra
}
taskIterator->Commit(false);
}
void ParameterizedFilter::FilterDeletedByTable(JustATable *rcTable , int no_dims) {
Descriptor desc(table, no_dims);
desc.op = common::Operator::O_EQ_ALL;
desc.encoded = true;
DimensionVector dims(mind->NumOfDimensions());
desc.DimensionUsed(dims);
mind->MarkInvolvedDimGroups(dims); // create iterators on whole groups (important for
// multidimensional updatable iterators)
dims.SetAll();
MIUpdatingIterator mit(mind, dims);
desc.CopyDesCond(mit);
// Use column 0 to filter the table data
int firstColumn = 0;
PhysicalColumn *phc = rcTable->GetColumn(firstColumn);
vcolumn::SingleColumn *vc = new vcolumn::SingleColumn(phc, mind, 0, 0, rcTable, no_dims);
if (!vc) throw common::OutOfMemoryException();
vc->LockSourcePacks(mit);
while (mit.IsValid()) {
vc->EvaluatePack(mit, desc);
if (mind->m_conn->Killed()) throw common::KilledException();
}
vc->UnlockSourcePacks();
mit.Commit();
delete vc;
}
void ParameterizedFilter::FilterDeletedForSelectAll() {
if (table) {
auto &rcTables = table->GetTables();
int no_dims=0;
for (auto rcTable : rcTables) {
if(rcTable->TableType() == TType::TEMP_TABLE) continue;
FilterDeletedByTable(rcTable, no_dims);
no_dims++;
}
mind->UpdateNumOfTuples();
}
}
} // namespace core
} // namespace Tianmu
......@@ -22,12 +22,12 @@
#include "core/cq_term.h"
#include "core/joiner.h"
#include "core/multi_index.h"
#include "core/just_a_table.h"
namespace Tianmu {
namespace core {
class TempTable;
class RoughMultiIndex;
/*
A class defining multidimensional filter (by means of MultiIndex) on a set of
tables. It can store descriptors defining some restrictions on particular
......@@ -76,6 +76,9 @@ class ParameterizedFilter final {
Condition &GetConditions() { return descriptors; }
void TaskProcessPacks(MIUpdatingIterator *taskIterator, Transaction *ci, common::RSValue *rf, DimensionVector *dims,
int desc_number, int64_t limit, int one_dim);
void FilterDeletedByTable(JustATable *rcTable, int no_dims);
void FilterDeletedForSelectAll();
MultiIndex *mind;
bool mind_shallow_memory;
......
......@@ -983,7 +983,7 @@ int Query::Compile(CompiledQuery *compiled_query, SELECT_LEX *selects_list, SELE
TABLE_LIST *tables = sl->leaf_tables ? sl->leaf_tables : (TABLE_LIST *)sl->table_list.first;
for (TABLE_LIST *table_ptr = tables; table_ptr; table_ptr = table_ptr->next_leaf) {
if (!table_ptr->is_view_or_derived()) {
if (!Engine::IsTIANMUTable(table_ptr->table)) throw CompilationError();
if (!Engine::IsTianmuTable(table_ptr->table)) throw CompilationError();
std::string path = TablePath(table_ptr);
if (path2num.find(path) == path2num.end()) {
path2num[path] = NumOfTabs();
......
......@@ -91,6 +91,7 @@ void RCAttr::Create(const fs::path &dir, const AttributeTypeInfo &ati, uint8_t p
no_rows, // no_obj
no_rows, // no_nulls
no_pack, // no of packs
0, // no of deleted
0, // auto_inc next
0, // min
0, // max
......@@ -127,10 +128,10 @@ void RCAttr::Create(const fs::path &dir, const AttributeTypeInfo &ati, uint8_t p
DPN dpn;
dpn.reset();
dpn.used = 1;
dpn.nn = 1 << pss;
dpn.nr = 1 << pss;
dpn.numOfNulls = 1 << pss;
dpn.numOfRecords = 1 << pss;
dpn.xmax = common::MAX_XID;
dpn.addr = DPN_INVALID_ADDR;
dpn.dataAddress = DPN_INVALID_ADDR;
system::TianmuFile fdn;
fdn.OpenCreateEmpty(dir / common::COL_DN_FILE);
......@@ -139,8 +140,8 @@ void RCAttr::Create(const fs::path &dir, const AttributeTypeInfo &ati, uint8_t p
// the last one
auto left = no_rows % (1 << pss);
if (left != 0) {
dpn.nr = left;
dpn.nn = left;
dpn.numOfRecords = left;
dpn.numOfNulls = left;
}
fdn.WriteExact(&dpn, sizeof(dpn));
fdn.Flush();
......@@ -166,8 +167,8 @@ void RCAttr::LoadVersion(common::TX_ID xid) {
if (hdr.dict_ver != 0) {
m_dict = ha_rcengine_->cache.GetOrFetchObject<FTree>(FTreeCoordinate(m_tid, m_cid, hdr.dict_ver), this);
}
m_idx.resize(hdr.np);
fattr.ReadExact(&m_idx[0], sizeof(common::PACK_INDEX) * hdr.np);
m_idx.resize(hdr.numOfPacks);
fattr.ReadExact(&m_idx[0], sizeof(common::PACK_INDEX) * hdr.numOfPacks);
}
void RCAttr::Truncate() {
......@@ -290,11 +291,11 @@ bool RCAttr::SaveVersion() {
hdr.unique = IsUnique();
hdr.unique_updated = IsUniqueUpdated();
hdr.np = m_idx.size();
hdr.numOfPacks = m_idx.size();
hdr.compressed_size = std::accumulate(m_idx.begin(), m_idx.end(), size_t(0), [this](size_t sum, auto &pi) {
auto dpn = m_share->get_dpn_ptr(pi);
if (dpn->addr != DPN_INVALID_ADDR)
return sum + dpn->len;
if (dpn->dataAddress != DPN_INVALID_ADDR)
return sum + dpn->dataLength;
else
return sum;
});
......@@ -304,7 +305,7 @@ bool RCAttr::SaveVersion() {
system::TianmuFile fattr;
fattr.OpenCreate(fname);
fattr.WriteExact(&hdr, sizeof(hdr));
fattr.WriteExact(&m_idx[0], sizeof(decltype(m_idx)::value_type) * hdr.np);
fattr.WriteExact(&m_idx[0], sizeof(decltype(m_idx)::value_type) * hdr.numOfPacks);
if (tianmu_sysvar_sync_buffers) fattr.Flush();
......@@ -323,11 +324,14 @@ void RCAttr::PostCommit() {
ha_rcengine_->DeferRemove(Path() / common::COL_VERSION_DIR / m_version.ToString(), m_tid);
if (m_share->has_filter_bloom)
ha_rcengine_->DeferRemove(Path() / common::COL_FILTER_DIR / common::COL_FILTER_BLOOM_DIR / m_version.ToString(), m_tid);
ha_rcengine_->DeferRemove(Path() / common::COL_FILTER_DIR / common::COL_FILTER_BLOOM_DIR / m_version.ToString(),
m_tid);
if (m_share->has_filter_cmap)
ha_rcengine_->DeferRemove(Path() / common::COL_FILTER_DIR / common::COL_FILTER_CMAP_DIR / m_version.ToString(), m_tid);
ha_rcengine_->DeferRemove(Path() / common::COL_FILTER_DIR / common::COL_FILTER_CMAP_DIR / m_version.ToString(),
m_tid);
if (m_share->has_filter_hist)
ha_rcengine_->DeferRemove(Path() / common::COL_FILTER_DIR / common::COL_FILTER_HIST_DIR / m_version.ToString(), m_tid);
ha_rcengine_->DeferRemove(Path() / common::COL_FILTER_DIR / common::COL_FILTER_HIST_DIR / m_version.ToString(),
m_tid);
m_version = m_tx->GetID();
}
......@@ -357,7 +361,7 @@ PackOntologicalStatus RCAttr::GetPackOntologicalStatus(int pack_no) {
if (pack_no < 0 || dpn->NullOnly()) return PackOntologicalStatus::NULLS_ONLY;
if (GetPackType() == common::PackType::INT) {
if (dpn->min_i == dpn->max_i) {
if (dpn->nn == 0) return PackOntologicalStatus::UNIFORM;
if (dpn->numOfNulls == 0) return PackOntologicalStatus::UNIFORM;
return PackOntologicalStatus::UNIFORM_AND_NULLS;
}
}
......@@ -504,7 +508,7 @@ types::RCDataType &RCAttr::GetValueData(size_t obj, types::RCDataType &value, bo
int64_t RCAttr::GetNumOfNulls(int pack) {
LoadPackInfo();
if (pack == -1) return NumOfNulls();
return get_dpn(pack).nn;
return get_dpn(pack).numOfNulls;
}
size_t RCAttr::GetActualSize(int pack) {
......@@ -801,7 +805,7 @@ std::shared_ptr<FTree> RCAttr::Fetch([[maybe_unused]] const FTreeCoordinate &coo
}
void RCAttr::PreparePackForLoad() {
if (SizeOfPack() == 0 || get_last_dpn().nr == (1U << pss)) {
if (SizeOfPack() == 0 || get_last_dpn().numOfRecords == (1U << pss)) {
// just allocate a DPN but do not create dp for now
auto ret = m_share->alloc_dpn(m_tx->GetID());
m_idx.push_back(ret);
......@@ -831,8 +835,8 @@ void RCAttr::LoadData(loader::ValueCache *nvs, Transaction *conn_info) {
if (!get_dpn(pi).Trivial()) get_pack(pi)->Save();
hdr.nr += nvs->NumOfValues();
hdr.nn += (Type().NotNull() ? 0 : nvs->NumOfNulls());
hdr.numOfRecords += nvs->NumOfValues();
hdr.numOfNulls += (Type().NotNull() ? 0 : nvs->NumOfNulls());
hdr.natural_size += nvs->SumarizedSize();
}
......@@ -851,9 +855,9 @@ void RCAttr::LoadDataPackN(size_t pi, loader::ValueCache *nvs) {
size_t load_nulls = nv.has_value() ? 0 : nvs->NumOfNulls();
// nulls only
if (load_nulls == load_values && (dpn.nr == 0 || dpn.NullOnly())) {
dpn.nr += load_values;
dpn.nn += load_values;
if (load_nulls == load_values && (dpn.numOfRecords == 0 || dpn.NullOnly())) {
dpn.numOfRecords += load_values;
dpn.numOfNulls += load_values;
return;
}
......@@ -877,11 +881,11 @@ void RCAttr::LoadDataPackN(size_t pi, loader::ValueCache *nvs) {
// now dpn->sum has been updated
// uniform package
if ((dpn.nn + load_nulls) == 0 && load_min == load_max &&
(dpn.nr == 0 || (dpn.min_i == load_min && dpn.max_i == load_max))) {
if ((dpn.numOfNulls + load_nulls) == 0 && load_min == load_max &&
(dpn.numOfRecords == 0 || (dpn.min_i == load_min && dpn.max_i == load_max))) {
dpn.min_i = load_min;
dpn.max_i = load_max;
dpn.nr += load_values;
dpn.numOfRecords += load_values;
} else {
// new package (also in case of expanding so-far-uniform package)
if (dpn.Trivial()) {
......@@ -920,14 +924,14 @@ void RCAttr::LoadDataPackS(size_t pi, loader::ValueCache *nvs) {
auto cnt = nvs->NumOfValues();
// no need to store any values - uniform package
if (load_nulls == cnt && (dpn.nr == 0 || dpn.NullOnly())) {
dpn.nr += cnt;
dpn.nn += cnt;
if (load_nulls == cnt && (dpn.numOfRecords == 0 || dpn.NullOnly())) {
dpn.numOfRecords += cnt;
dpn.numOfNulls += cnt;
return;
}
// new package or expanding so-far-null package
if (dpn.nr == 0 || dpn.NullOnly()) {
if (dpn.numOfRecords == 0 || dpn.NullOnly()) {
auto sp = ha_rcengine_->cache.GetOrFetchObject<Pack>(get_pc(pi), this);
dpn.SetPackPtr(reinterpret_cast<unsigned long>(sp.get()) + tag_one);
}
......@@ -976,8 +980,68 @@ void RCAttr::UpdateData(uint64_t row, Value &v) {
dpn.synced = false;
// update global data
hdr.nn -= dpn_save.nn;
hdr.nn += dpn.nn;
hdr.numOfNulls -= dpn_save.numOfNulls;
hdr.numOfNulls += dpn.numOfNulls;
if (GetPackType() == common::PackType::INT) {
if (dpn.min_i < hdr.min) {
hdr.min = dpn.min_i;
} else {
// re-calculate the min
hdr.min = std::numeric_limits<int64_t>::max();
for (uint i = 0; i < m_idx.size(); i++) {
if (!get_dpn(i).NullOnly()) hdr.min = std::min(get_dpn(i).min_i, hdr.min);
}
}
if (dpn.max_i > hdr.max) {
hdr.max = dpn.max_i;
} else {
// re-calculate the max
hdr.max = std::numeric_limits<int64_t>::min();
for (uint i = 0; i < m_idx.size(); i++) {
if (!get_dpn(i).NullOnly()) hdr.max = std::max(get_dpn(i).max_i, hdr.max);
}
}
} else { // common::PackType::STR
}
}
bool RCAttr::IsDelete(int64_t row) {
if (row == common::NULL_VALUE_64) return true;
DEBUG_ASSERT(hdr.numOfRecords >= static_cast<uint64_t>(row));
auto pack = row2pack(row);
const auto &dpn = get_dpn(pack);
if (dpn.numOfDeleted == 0) return false;
if (dpn.numOfDeleted > 0) {
FunctionExecutor fe([this, pack]() { LockPackForUse(pack); }, [this, pack]() { UnlockPackFromUse(pack); });
return get_pack(pack)->IsDeleted(row2offset(row));
}
return false;
}
void RCAttr::DeleteData(uint64_t row) {
auto pn = row2pack(row);
FunctionExecutor fe([this, pn]() { LockPackForUse(pn); }, [this, pn]() { UnlockPackFromUse(pn); });
// primary key process
DeleteByPrimaryKey(row, ColId());
CopyPackForWrite(pn);
auto &dpn = get_dpn(pn);
auto dpn_save = dpn;
if (dpn.Trivial()) {
// need to create pack struct for previous trivial pack
ha_rcengine_->cache.GetOrFetchObject<Pack>(get_pc(pn), this);
}
get_pack(pn)->DeleteByRow(row2offset(row));
// update global data
hdr.numOfNulls -= dpn_save.numOfNulls;
hdr.numOfNulls += dpn.numOfNulls;
hdr.numOfDeleted++;
if (GetPackType() == common::PackType::INT) {
if (dpn.min_i < hdr.min) {
......@@ -1081,7 +1145,7 @@ types::BString RCAttr::MinS(Filter *f) {
(GetPackOntologicalStatus(b) == PackOntologicalStatus::UNIFORM_AND_NULLS && f->IsFull(b)))) {
CompareAndSetCurrentMin(DecodeValue_S(dpn.min_i), min, set);
it.NextPack();
} else if (!(dpn.NullOnly() || dpn.nr == 0)) {
} else if (!(dpn.NullOnly() || dpn.numOfRecords == 0)) {
while (it.IsValid() && b == (unsigned int)it.GetCurrPack()) {
int n = it.GetCurrInPack();
if (GetPackType() == common::PackType::STR && p->IsNull(n) == 0) {
......@@ -1114,7 +1178,7 @@ types::BString RCAttr::MaxS(Filter *f) {
(GetPackOntologicalStatus(b) == PackOntologicalStatus::UNIFORM ||
(GetPackOntologicalStatus(b) == PackOntologicalStatus::UNIFORM_AND_NULLS && f->IsFull(b)))) {
CompareAndSetCurrentMax(DecodeValue_S(dpn.min_i), max);
} else if (!(dpn.NullOnly() || dpn.nr == 0)) {
} else if (!(dpn.NullOnly() || dpn.numOfRecords == 0)) {
while (it.IsValid() && b == it.GetCurrPack()) {
int n = it.GetCurrInPack();
if (GetPackType() == common::PackType::STR && p->IsNull(n) == 0) {
......@@ -1234,8 +1298,8 @@ void RCAttr::UpdateIfIndex(uint64_t row, uint64_t col, const Value &v) {
auto vold = GetValueString(row);
std::string_view nkey(vnew.data(), vnew.length());
std::string_view okey(vold.val, vold.size());
common::ErrorCode rc = tab->UpdateIndex(current_txn_, nkey, okey, row);
if (rc == common::ErrorCode::DUPP_KEY || rc == common::ErrorCode::FAILED) {
common::ErrorCode returnCode = tab->UpdateIndex(current_txn_, nkey, okey, row);
if (returnCode == common::ErrorCode::DUPP_KEY || returnCode == common::ErrorCode::FAILED) {
TIANMU_LOG(LogCtl_Level::DEBUG, "Duplicate entry: %s for primary key", vnew.data());
throw common::DupKeyException("Duplicate entry: " + vnew + " for primary key");
}
......@@ -1244,12 +1308,40 @@ void RCAttr::UpdateIfIndex(uint64_t row, uint64_t col, const Value &v) {
int64_t vold = GetValueInt64(row);
std::string_view nkey(reinterpret_cast<const char *>(&vnew), sizeof(int64_t));
std::string_view okey(reinterpret_cast<const char *>(&vold), sizeof(int64_t));
common::ErrorCode rc = tab->UpdateIndex(current_txn_, nkey, okey, row);
if (rc == common::ErrorCode::DUPP_KEY || rc == common::ErrorCode::FAILED) {
common::ErrorCode returnCode = tab->UpdateIndex(current_txn_, nkey, okey, row);
if (returnCode == common::ErrorCode::DUPP_KEY || returnCode == common::ErrorCode::FAILED) {
TIANMU_LOG(LogCtl_Level::DEBUG, "Duplicate entry :%" PRId64 " for primary key", vnew);
throw common::DupKeyException("Duplicate entry: " + std::to_string(vnew) + " for primary key");
}
}
}
void RCAttr::DeleteByPrimaryKey(uint64_t row, uint64_t col) {
auto path = m_share->owner->Path();
std::shared_ptr<index::RCTableIndex> tab = ha_rcengine_->GetTableIndex(path);
// col is not primary key
if (!tab) return;
std::vector<uint> keycols = tab->KeyCols();
if (std::find(keycols.begin(), keycols.end(), col) == keycols.end()) return;
if (GetPackType() == common::PackType::STR) {
auto currentValue = GetValueString(row);
std::string_view currentRowKey(currentValue.val, currentValue.size());
common::ErrorCode returnCode = tab->DeleteIndex(current_txn_, currentRowKey, row);
if (returnCode == common::ErrorCode::FAILED) {
TIANMU_LOG(LogCtl_Level::DEBUG, "Delete: %s for primary key", currentValue.GetDataBytesPointer());
throw common::Exception("Delete: " + currentValue.ToString() + " for primary key");
}
} else { // common::PackType::INT
auto currentValue = GetValueInt64(row);
std::string_view currentRowKey(reinterpret_cast<const char *>(&currentValue), sizeof(int64_t));
common::ErrorCode returnCode = tab->DeleteIndex(current_txn_, currentRowKey, row);
if (returnCode == common::ErrorCode::FAILED) {
TIANMU_LOG(LogCtl_Level::DEBUG, "Delete: %" PRId64 " for primary key", currentValue);
throw common::Exception("Delete: " + std::to_string(currentValue) + " for primary key");
}
}
}
} // namespace core
} // namespace Tianmu
......@@ -92,6 +92,9 @@ class RCAttr final : public mm::TraceableObject, public PhysicalColumn, public P
void UpdateData(uint64_t row, Value &v);
void UpdateIfIndex(uint64_t row, uint64_t col, const Value &v);
void Truncate();
void DeleteData(uint64_t row);
void DeleteByPrimaryKey(uint64_t row, uint64_t col);
bool IsDelete(int64_t row);
const types::RCDataType &ValuePrototype(bool lookup_to_num) const {
if ((Type().IsLookup() && lookup_to_num) || ATI::IsNumericType(TypeName())) return types::RCNum::NullValue();
......@@ -102,7 +105,7 @@ class RCAttr final : public mm::TraceableObject, public PhysicalColumn, public P
int64_t GetValueInt64(int64_t obj) const override {
if (obj == common::NULL_VALUE_64) return common::NULL_VALUE_64;
DEBUG_ASSERT(hdr.nr >= static_cast<uint64_t>(obj));
DEBUG_ASSERT(hdr.numOfRecords >= static_cast<uint64_t>(obj));
auto pack = row2pack(obj);
const auto &dpn = get_dpn(pack);
auto p = get_packN(pack);
......@@ -142,11 +145,11 @@ class RCAttr final : public mm::TraceableObject, public PhysicalColumn, public P
bool IsNull(int64_t obj) const override {
if (obj == common::NULL_VALUE_64) return true;
DEBUG_ASSERT(hdr.nr >= static_cast<uint64_t>(obj));
DEBUG_ASSERT(hdr.numOfRecords >= static_cast<uint64_t>(obj));
auto pack = row2pack(obj);
const auto &dpn = get_dpn(pack);
if (Type().NotNull() || dpn.nn == 0) return false;
if (Type().NotNull() || dpn.numOfNulls == 0) return false;
if (!dpn.Trivial()) {
DEBUG_ASSERT(get_pack(pack)->IsLocked()); // assuming the pack is already loaded and locked
......@@ -200,8 +203,9 @@ class RCAttr final : public mm::TraceableObject, public PhysicalColumn, public P
// ratio); may be slightly approximated
int64_t CompressedSize() const { return hdr.compressed_size; };
uint32_t ValueOfPackPower() const { return pss; }
uint64_t NumOfObj() const { return hdr.nr; }
uint64_t NumOfNulls() const { return hdr.nn; }
uint64_t NumOfObj() const { return hdr.numOfRecords; }
uint64_t NumOfDeleted() const { return hdr.numOfDeleted; }
uint64_t NumOfNulls() const { return hdr.numOfNulls; }
uint SizeOfPack() const { return m_idx.size(); }
int64_t GetMinInt64() const { return hdr.min; }
void SetMinInt64(int64_t a_imin) { hdr.min = a_imin; }
......@@ -224,8 +228,8 @@ class RCAttr final : public mm::TraceableObject, public PhysicalColumn, public P
types::RCDataType &GetValueData(size_t obj, types::RCDataType &value, bool lookup_to_num = false);
int64_t GetNumOfNulls(int pack) override;
bool IsRoughNullsOnly() const override { return hdr.nr == hdr.nn; }
size_t GetNumOfValues(int pack) const { return get_dpn(pack).nr; }
bool IsRoughNullsOnly() const override { return hdr.numOfRecords == hdr.numOfNulls; }
size_t GetNumOfValues(int pack) const { return get_dpn(pack).numOfRecords; }
int64_t GetSum(int pack, bool &nonnegative) override;
size_t GetActualSize(int pack);
int64_t GetMinInt64(int pack) override;
......@@ -341,6 +345,8 @@ class RCAttr final : public mm::TraceableObject, public PhysicalColumn, public P
PackStr *get_packS(size_t i) const { return reinterpret_cast<PackStr *>(get_pack(i)); }
DPN &get_last_dpn() { return *m_share->get_dpn_ptr(m_idx.back()); }
const DPN &get_last_dpn() const { return *m_share->get_dpn_ptr(m_idx.back()); }
void EvaluatePack_IsNoDelete(MIUpdatingIterator &mit, int dim);
void EvaluatePack_IsNull(MIUpdatingIterator &mit, int dim);
void EvaluatePack_NotNull(MIUpdatingIterator &mit, int dim);
void EvaluatePack_Like(MIUpdatingIterator &mit, int dim, Descriptor &d);
......
......@@ -504,6 +504,8 @@ RCTable::Iterator RCTable::Iterator::CreateEnd() { return RCTable::Iterator(); }
int64_t RCTable::NumOfObj() { return m_attrs[0]->NumOfObj(); }
int64_t RCTable::NumOfDeleted() { return m_attrs[0]->NumOfDeleted(); }
void RCTable::GetTable_S(types::BString &s, int64_t obj, int attr) {
DEBUG_ASSERT(static_cast<size_t>(attr) <= m_attrs.size());
DEBUG_ASSERT(static_cast<uint64_t>(obj) <= m_attrs[attr]->NumOfObj());
......@@ -722,6 +724,8 @@ int RCTable::Insert(TABLE *table) {
void RCTable::UpdateItem(uint64_t row, uint64_t col, Value &v) { m_attrs[col]->UpdateData(row, v); }
void RCTable::DeleteItem(uint64_t row, uint64_t col) { m_attrs[col]->DeleteData(row); }
uint64_t RCTable::ProceedNormal(system::IOParameters &iop) {
std::unique_ptr<system::Stream> fs;
if (iop.LocalLoad()) {
......
......@@ -74,6 +74,7 @@ class RCTable final : public JustATable {
size_t no_objs);
void Truncate();
void UpdateItem(uint64_t row, uint64_t col, Value &v);
void DeleteItem(uint64_t row, uint64_t col);
void LockPackInfoForUse(); // lock attribute data against memory manager
void UnlockPackInfoFromUse(); // return attribute data to memory manager
......@@ -96,8 +97,9 @@ class RCTable final : public JustATable {
void Rollback(common::TX_ID xid, bool = false);
void PostCommit();
// Data access & information
// Data access & information
int64_t NumOfObj() override;
int64_t NumOfDeleted();
int64_t NumOfValues() { return NumOfObj(); }
void GetTable_S(types::BString &s, int64_t obj, int attr) override;
......
......@@ -75,7 +75,7 @@ common::RSValue RCAttr::RoughCheck(int pack, Descriptor &d, bool additional_null
return common::RSValue::RS_NONE;
}
if (d.op == common::Operator::O_IS_NULL || d.op == common::Operator::O_NOT_NULL) {
if (dpn.nn == 0 && !additional_nulls_possible) {
if (dpn.numOfNulls == 0 && !additional_nulls_possible) {
if (d.op == common::Operator::O_IS_NULL)
return common::RSValue::RS_NONE;
else
......@@ -160,7 +160,7 @@ common::RSValue RCAttr::RoughCheck(int pack, Descriptor &d, bool additional_null
else if (res == common::RSValue::RS_NONE)
res = common::RSValue::RS_ALL;
}
if ((dpn.nn != 0 || additional_nulls_possible) && res == common::RSValue::RS_ALL) res = common::RSValue::RS_SOME;
if ((dpn.numOfNulls != 0 || additional_nulls_possible) && res == common::RSValue::RS_ALL) res = common::RSValue::RS_SOME;
return res;
} else if ((d.op == common::Operator::O_IN || d.op == common::Operator::O_NOT_IN) &&
GetPackType() == common::PackType::STR) {
......@@ -208,7 +208,7 @@ common::RSValue RCAttr::RoughCheck(int pack, Descriptor &d, bool additional_null
else if (res == common::RSValue::RS_NONE)
res = common::RSValue::RS_ALL;
}
if (res == common::RSValue::RS_ALL && (dpn.nn > 0 || additional_nulls_possible)) res = common::RSValue::RS_SOME;
if (res == common::RSValue::RS_ALL && (dpn.numOfNulls > 0 || additional_nulls_possible)) res = common::RSValue::RS_SOME;
return res;
} else if ((d.op == common::Operator::O_IN || d.op == common::Operator::O_NOT_IN) &&
(GetPackType() == common::PackType::INT)) {
......@@ -273,7 +273,7 @@ common::RSValue RCAttr::RoughCheck(int pack, Descriptor &d, bool additional_null
else if (res == common::RSValue::RS_NONE)
res = common::RSValue::RS_ALL;
}
if (res == common::RSValue::RS_ALL && (dpn.nn > 0 || additional_nulls_possible)) res = common::RSValue::RS_SOME;
if (res == common::RSValue::RS_ALL && (dpn.numOfNulls > 0 || additional_nulls_possible)) res = common::RSValue::RS_SOME;
return res;
}
} else if (GetPackType() == common::PackType::STR) { // Note: text operations as
......@@ -339,7 +339,7 @@ common::RSValue RCAttr::RoughCheck(int pack, Descriptor &d, bool additional_null
else if (res == common::RSValue::RS_NONE)
res = common::RSValue::RS_ALL;
}
if ((dpn.nn != 0 || additional_nulls_possible) && res == common::RSValue::RS_ALL) {
if ((dpn.numOfNulls != 0 || additional_nulls_possible) && res == common::RSValue::RS_ALL) {
res = common::RSValue::RS_SOME;
}
return res;
......@@ -367,7 +367,7 @@ common::RSValue RCAttr::RoughCheck(int pack, Descriptor &d, bool additional_null
res = common::RSValue::RS_ALL;
}
}
if (res == common::RSValue::RS_ALL && (dpn.nn != 0 || additional_nulls_possible)) {
if (res == common::RSValue::RS_ALL && (dpn.numOfNulls != 0 || additional_nulls_possible)) {
res = common::RSValue::RS_SOME;
}
return res;
......@@ -489,7 +489,7 @@ common::RSValue RCAttr::RoughCheck(int pack, Descriptor &d, bool additional_null
}
}
// take nulls into account
if ((dpn.nn != 0 || secDpn.nn != 0 || additional_nulls_possible) && res == common::RSValue::RS_ALL)
if ((dpn.numOfNulls != 0 || secDpn.numOfNulls != 0 || additional_nulls_possible) && res == common::RSValue::RS_ALL)
res = common::RSValue::RS_SOME;
return res;
}
......@@ -554,7 +554,7 @@ common::RSValue RCAttr::RoughCheckBetween(int pack, int64_t v1, int64_t v2) {
}
}
}
if (dpn.nn != 0 && res == common::RSValue::RS_ALL) {
if (dpn.numOfNulls != 0 && res == common::RSValue::RS_ALL) {
res = common::RSValue::RS_SOME;
}
return res;
......@@ -813,7 +813,7 @@ double RCAttr::RoughSelectivity() {
double width_sum = 0;
for (uint p = 0; p < SizeOfPack(); p++) { // minimum of nonempty packs
auto const &dpn(get_dpn(p));
if (dpn.nn == uint(dpn.nr) + 1) continue;
if (dpn.numOfNulls == uint(dpn.numOfRecords) + 1) continue;
if (dpn.min_i < global_min) global_min = dpn.min_i;
if (dpn.max_i > global_max) global_max = dpn.max_i;
width_sum += double(dpn.max_i) - double(dpn.min_i) + 1;
......@@ -868,7 +868,7 @@ void RCAttr::RoughStats(double &hist_density, int &trivial_packs, double &span)
LoadPackInfo();
for (uint pack = 0; pack < npack; pack++) {
auto const &dpn(get_dpn(pack));
if (dpn.NullOnly() || (GetPackType() == common::PackType::INT && dpn.nn == 0 && dpn.min_i == dpn.max_i))
if (dpn.NullOnly() || (GetPackType() == common::PackType::INT && dpn.numOfNulls == 0 && dpn.min_i == dpn.max_i))
trivial_packs++;
else {
if (GetPackType() == common::PackType::INT && !ATI::IsRealType(TypeName())) {
......@@ -893,7 +893,7 @@ void RCAttr::RoughStats(double &hist_density, int &trivial_packs, double &span)
int ones_found = 0, ones_needed = 0;
for (uint pack = 0; pack < npack; pack++) {
auto const &dpn(get_dpn(pack));
if (uint(dpn.nr + 1) != dpn.nn && dpn.min_i + 1 < dpn.max_i) {
if (uint(dpn.numOfRecords + 1) != dpn.numOfNulls && dpn.min_i + 1 < dpn.max_i) {
int loc_no_ones;
if (dpn.max_i - dpn.min_i > 1024)
loc_no_ones = 1024;
......@@ -930,11 +930,11 @@ void RCAttr::DisplayAttrStats(Filter *f) // filter is for # of objects
if (f)
cur_obj = f->NumOfOnes(pack);
else if (pack == npack - 1)
cur_obj = dpn.nr + 1;
cur_obj = dpn.numOfRecords + 1;
std::sprintf(line_buf, "%-7d %5ld %5d", pack, cur_obj, dpn.nn);
std::sprintf(line_buf, "%-7d %5ld %5d", pack, cur_obj, dpn.numOfNulls);
if (uint(dpn.nr + 1) != dpn.nn) {
if (uint(dpn.numOfRecords + 1) != dpn.numOfNulls) {
if (GetPackType() == common::PackType::INT && !ATI::IsRealType(TypeName())) {
int rsi_span = -1;
int rsi_ones = 0;
......
......@@ -82,6 +82,8 @@ void RCAttr::EvaluatePack(MIUpdatingIterator &mit, int dim, Descriptor &d) {
} else if (GetPackType() == common::PackType::INT &&
(d.op == common::Operator::O_IN || d.op == common::Operator::O_NOT_IN))
EvaluatePack_InNum(mit, dim, d);
else if (d.op == common::Operator::O_EQ_ALL)
EvaluatePack_IsNoDelete(mit, dim);
else
DEBUG_ASSERT(0); // case not implemented!
}
......@@ -145,7 +147,7 @@ common::ErrorCode RCAttr::EvaluateOnIndex_BetweenInt(MIUpdatingIterator &mit, in
++iter;
} else {
TIANMU_LOG(LogCtl_Level::ERROR, "GetCurKV valid! col:[%u]=%I64d, Path:%s", ColId(), pv1,
m_share->owner->Path().data());
m_share->owner->Path().data());
break;
}
}
......@@ -207,7 +209,7 @@ common::ErrorCode RCAttr::EvaluateOnIndex_BetweenString(MIUpdatingIterator &mit,
++iter;
} else {
TIANMU_LOG(LogCtl_Level::ERROR, "GetCurKV valid! col:[%u]=%s, Path:%s", ColId(), pv1.ToString().data(),
m_share->owner->Path().data());
m_share->owner->Path().data());
break;
}
}
......@@ -271,7 +273,7 @@ common::ErrorCode RCAttr::EvaluateOnIndex_BetweenString_UTF(MIUpdatingIterator &
++iter;
} else {
TIANMU_LOG(LogCtl_Level::ERROR, "GetCurKV valid! col:[%u]=%s, Path:%s", ColId(), pv1.ToString().data(),
m_share->owner->Path().data());
m_share->owner->Path().data());
break;
}
}
......@@ -287,24 +289,49 @@ common::ErrorCode RCAttr::EvaluateOnIndex_BetweenString_UTF(MIUpdatingIterator &
return rv;
}
void RCAttr::EvaluatePack_IsNoDelete(MIUpdatingIterator &mit, int dim) {
MEASURE_FET("RCAttr::EvaluatePack_IsNoDelete(...)");
auto pack = row2pack(mit[dim]);
auto const &dpn(get_dpn(pack));
if (dpn.numOfDeleted > 0) {
if (dpn.numOfDeleted == dpn.numOfRecords) {
mit.ResetCurrentPack();
mit.NextPackrow();
return;
}
FunctionExecutor fe([this, pack]() { LockPackForUse(pack); }, [this, pack]() { UnlockPackFromUse(pack); });
do {
if (mit[dim] == common::NULL_VALUE_64 || get_pack(pack)->IsDeleted(mit.GetCurInpack(dim))) {
mit.ResetCurrent();
}
++mit;
} while (mit.IsValid() && !mit.PackrowStarted());
} else {
mit.NextPackrow();
}
}
void RCAttr::EvaluatePack_IsNull(MIUpdatingIterator &mit, int dim) {
MEASURE_FET("RCAttr::EvaluatePack_IsNull(...)");
int pack = mit.GetCurPackrow(dim);
if (pack == -1) {
mit.NextPackrow();
EvaluatePack_IsNoDelete(mit, dim);
return;
}
auto const &dpn(get_dpn(pack));
if (!dpn.Trivial() && dpn.nn != 0) { // nontrivial pack exists
if (!dpn.Trivial() && dpn.numOfNulls != 0) { // nontrivial pack exists
do {
if (mit[dim] != common::NULL_VALUE_64 && !get_pack(pack)->IsNull(mit.GetCurInpack(dim))) mit.ResetCurrent();
if ((mit[dim] != common::NULL_VALUE_64 && !get_pack(pack)->IsNull(mit.GetCurInpack(dim))) ||
get_pack(pack)->IsDeleted(mit.GetCurInpack(dim))) {
mit.ResetCurrent();
}
++mit;
} while (mit.IsValid() && !mit.PackrowStarted());
} else { // pack is trivial - uniform or null only
if (GetPackOntologicalStatus(pack) != PackOntologicalStatus::NULLS_ONLY) {
if (mit.NullsPossibleInPack(dim)) {
do {
if (mit[dim] != common::NULL_VALUE_64) mit.ResetCurrent();
if (mit[dim] != common::NULL_VALUE_64 || get_pack(pack)->IsDeleted(mit.GetCurInpack(dim))) mit.ResetCurrent();
++mit;
} while (mit.IsValid() && !mit.PackrowStarted());
} else
......@@ -323,7 +350,7 @@ void RCAttr::EvaluatePack_NotNull(MIUpdatingIterator &mit, int dim) {
return;
}
auto const &dpn(get_dpn(pack));
if (!dpn.Trivial() && dpn.nn != 0) {
if (!dpn.Trivial() && dpn.numOfNulls != 0) {
do {
if (mit[dim] == common::NULL_VALUE_64 || get_pack(pack)->IsNull(mit.GetCurInpack(dim))) mit.ResetCurrent();
++mit;
......@@ -613,7 +640,7 @@ void RCAttr::EvaluatePack_InNum(MIUpdatingIterator &mit, int dim, Descriptor &d)
res = multival_column->Contains(mit, GetValueData(mit[dim], *value, lookup_to_num));
if (not_in) res = !res;
if (res == true) {
if (dpn.nn != 0)
if (dpn.numOfNulls != 0)
EvaluatePack_NotNull(mit, dim);
else
mit.NextPackrow();
......@@ -801,15 +828,15 @@ void RCAttr::EvaluatePack_BetweenInt(MIUpdatingIterator &mit, int dim, Descripto
// Loop without it when packs are nearly full
if (tianmu_sysvar_filterevaluation_speedup && filter &&
filter->NumOfOnes(pack) > static_cast<uint>(1 << (mit.GetPower() - 1))) {
if (d.op == common::Operator::O_BETWEEN && !mit.NullsPossibleInPack(dim) && dpn.nn == 0) {
if (d.op == common::Operator::O_BETWEEN && !mit.NullsPossibleInPack(dim) && dpn.numOfNulls == 0) {
// easy and fast case - no "if"s
for (uint32_t n = 0; n < dpn.nr; n++) {
for (uint32_t n = 0; n < dpn.numOfRecords; n++) {
auto v = p->GetValInt(n);
if (pv1 > v || v > pv2) filter->Reset(pack, n);
}
} else {
// more general case
for (uint32_t n = 0; n < dpn.nr; n++) {
for (uint32_t n = 0; n < dpn.numOfRecords; n++) {
if (p->IsNull(n))
filter->Reset(pack, n);
else {
......@@ -822,7 +849,7 @@ void RCAttr::EvaluatePack_BetweenInt(MIUpdatingIterator &mit, int dim, Descripto
}
mit.NextPackrow();
} else {
if (d.op == common::Operator::O_BETWEEN && !mit.NullsPossibleInPack(dim) && dpn.nn == 0) {
if (d.op == common::Operator::O_BETWEEN && !mit.NullsPossibleInPack(dim) && dpn.numOfNulls == 0) {
// easy and fast case - no "if"s
do {
auto v = p->GetValInt(mit.GetCurInpack(dim));
......@@ -883,7 +910,7 @@ void RCAttr::EvaluatePack_BetweenReal(MIUpdatingIterator &mit, int dim, Descript
// Loop without it when packs are nearly full
if (tianmu_sysvar_filterevaluation_speedup && filter &&
filter->NumOfOnes(pack) > static_cast<uint>(1 << (mit.GetPower() - 1))) {
for (uint32_t n = 0; n < dpn.nr; n++) {
for (uint32_t n = 0; n < dpn.numOfRecords; n++) {
if (p->IsNull(n))
filter->Reset(pack, n);
else {
......@@ -928,7 +955,8 @@ void RCAttr::EvaluatePack_AttrAttr(MIUpdatingIterator &mit, int dim, Descriptor
return;
}
RCAttr *a2 = (RCAttr *)(((vcolumn::SingleColumn *)d.val1.vc)->GetPhysical());
if (get_dpn(pack).nn == get_dpn(pack).nr || a2->get_dpn(pack).nn == a2->get_dpn(pack).nr) {
if (get_dpn(pack).numOfNulls == get_dpn(pack).numOfRecords ||
a2->get_dpn(pack).numOfNulls == a2->get_dpn(pack).numOfRecords) {
mit.ResetCurrentPack(); // nulls only
mit.NextPackrow();
return;
......@@ -989,7 +1017,8 @@ void RCAttr::EvaluatePack_AttrAttrReal(MIUpdatingIterator &mit, int dim, Descrip
return;
}
RCAttr *a2 = (RCAttr *)(((vcolumn::SingleColumn *)d.val1.vc)->GetPhysical());
if (get_dpn(pack).nn == get_dpn(pack).nr || a2->get_dpn(pack).nn == a2->get_dpn(pack).nr) {
if (get_dpn(pack).numOfNulls == get_dpn(pack).numOfRecords ||
a2->get_dpn(pack).numOfNulls == a2->get_dpn(pack).numOfRecords) {
mit.ResetCurrentPack(); // nulls only
mit.NextPackrow();
return;
......@@ -1049,7 +1078,7 @@ bool RCAttr::IsDistinct(Filter *f) {
if (f == NULL) return (NumOfNulls() == 0); // no nulls at all, and is_unique => distinct
LoadPackInfo();
for (uint b = 0; b < SizeOfPack(); b++)
if (!f->IsEmpty(b) && get_dpn(b).nn > 0) // any null in nonempty pack?
if (!f->IsEmpty(b) && get_dpn(b).numOfNulls > 0) // any null in nonempty pack?
return false;
return true;
}
......@@ -1086,7 +1115,7 @@ uint64_t RCAttr::ApproxAnswerSize(Descriptor &d) {
return int64_t(res);
}
for (uint b = 0; b < SizeOfPack(); b++) {
if (get_dpn(b).min_i > val2 || get_dpn(b).max_i < val1 || get_dpn(b).nn == get_dpn(b).nr)
if (get_dpn(b).min_i > val2 || get_dpn(b).max_i < val1 || get_dpn(b).numOfNulls == get_dpn(b).numOfRecords)
continue; // pack irrelevant
span1 = get_dpn(b).max_i - get_dpn(b).min_i + 1;
if (span1 <= 0) // out of int64_t range
......@@ -1100,7 +1129,7 @@ uint64_t RCAttr::ApproxAnswerSize(Descriptor &d) {
else
span2 -= get_dpn(b).min_i;
span2 += 1;
res += (get_dpn(b).nr - get_dpn(b).nn) * double(span2) / span1; // supposing uniform distribution of values
res += (get_dpn(b).numOfRecords - get_dpn(b).numOfNulls) * double(span2) / span1; // supposing uniform distribution of values
}
} else { // double
double span1,
......@@ -1110,15 +1139,15 @@ uint64_t RCAttr::ApproxAnswerSize(Descriptor &d) {
for (uint b = 0; b < SizeOfPack(); b++) {
double d_min = get_dpn(b).min_d;
double d_max = get_dpn(b).max_d;
if (d_min > v_max || d_max < v_min || get_dpn(b).nn == get_dpn(b).nr) continue; // pack irrelevant
if (d_min > v_max || d_max < v_min || get_dpn(b).numOfNulls == get_dpn(b).numOfRecords) continue; // pack irrelevant
span1 = d_max - d_min;
span2 = std::min(v_max, d_max) - std::max(v_min, d_min);
if (span1 == 0)
res += get_dpn(b).nr - get_dpn(b).nn;
res += get_dpn(b).numOfRecords - get_dpn(b).numOfNulls;
else if (span2 == 0) // just one value
res += 1;
else
res += (get_dpn(b).nr - get_dpn(b).nn) * (span2 / span1); // supposing uniform distribution of values
res += (get_dpn(b).numOfRecords - get_dpn(b).numOfNulls) * (span2 / span1); // supposing uniform distribution of values
}
}
return int64_t(res);
......
......@@ -97,7 +97,7 @@ void RSIndex_Bloom::Update(common::PACK_INDEX pi, DPN &dpn, const PackStr *pack)
bloom_builder->StartBlock(0);
for (size_t i = 0; i < dpn.nr; i++)
for (size_t i = 0; i < dpn.numOfRecords; i++)
if (pack->NotNull(i)) bloom_builder->AddKey(Slice(pack->GetValueBinary(i).ToString()));
Slice block = bloom_builder->Finish();
......
......@@ -157,7 +157,7 @@ void RSIndex_CMap::Update(common::PACK_INDEX pi, DPN &dpn, const PackStr *pack)
// system::unlock;
}
ClearPack(pi);
for (size_t i = 0; i < dpn.nr; i++)
for (size_t i = 0; i < dpn.numOfRecords; i++)
if (pack->NotNull(i)) {
if (dpn.Trivial() || pack->IsNull(i)) continue;
types::BString str(pack->GetValueBinary(i));
......
......@@ -199,7 +199,7 @@ void RSIndex_Hist::Update(common::PACK_INDEX pi, DPN &dpn, const PackInt *pack)
interval_len = (dpn.max_d - dpn.min_d) / double(RSI_HIST_BITS);
}
for (size_t i = 0; i < dpn.nr; i++) {
for (size_t i = 0; i < dpn.numOfRecords; i++) {
if (pack->IsNull(i)) continue;
auto v = pack->GetValInt(i);
......
......@@ -44,7 +44,7 @@ namespace {
bool AtLeastOneTIANMUTableInvolved(LEX *lex) {
for (TABLE_LIST *table_list = lex->query_tables; table_list; table_list = table_list->next_global) {
TABLE *table = table_list->table;
if (core::Engine::IsTIANMUTable(table)) return TRUE;
if (core::Engine::IsTianmuTable(table)) return TRUE;
}
return FALSE;
}
......@@ -103,7 +103,7 @@ int TIANMU_LoadData(THD *thd, sql_exchange *ex, TABLE_LIST *table_list, void *ar
common::TIANMUError tianmu_error;
int ret = static_cast<int>(TIANMUEngineReturnValues::LD_Failed);
if (!core::Engine::IsTIANMUTable(table_list->table)) return static_cast<int>(TIANMUEngineReturnValues::LD_Continue);
if (!core::Engine::IsTianmuTable(table_list->table)) return static_cast<int>(TIANMUEngineReturnValues::LD_Continue);
try {
tianmu_error = ha_rcengine_->RunLoader(thd, ex, table_list, arg);
......
......@@ -534,9 +534,32 @@ int TianmuHandler::update_row(const uchar *old_data, uchar *new_data) {
*/
int TianmuHandler::delete_row([[maybe_unused]] const uchar *buf) {
DBUG_ENTER(__PRETTY_FUNCTION__);
DBUG_RETURN(HA_ERR_WRONG_COMMAND);
}
int ret = HA_ERR_INTERNAL_ERROR;
auto org_bitmap = dbug_tmp_use_all_columns(table, table->write_set);
std::shared_ptr<void> defer(nullptr,
[org_bitmap, this](...) { dbug_tmp_restore_column_map(table->write_set, org_bitmap); });
try {
auto tab = current_txn_->GetTableByPath(m_table_name);
for (uint i = 0; i < table->s->fields; i++) {
tab->DeleteItem(current_position, i);
}
DBUG_RETURN(0);
} catch (common::DatabaseException &e) {
TIANMU_LOG(LogCtl_Level::ERROR, "Delete exception: %s.", e.what());
} catch (common::FileException &e) {
TIANMU_LOG(LogCtl_Level::ERROR, "Delete exception: %s.", e.what());
} catch (common::Exception &e) {
TIANMU_LOG(LogCtl_Level::ERROR, "Delete exception: %s.", e.what());
} catch (std::exception &e) {
TIANMU_LOG(LogCtl_Level::ERROR, "Delete exception: %s.", e.what());
} catch (...) {
TIANMU_LOG(LogCtl_Level::ERROR, "An unknown system exception error caught.");
}
DBUG_RETURN(ret);
}
/*
Used to delete all rows in a table. Both for cases of truncate and
for cases where the optimizer realizes that all rows will be
......@@ -562,6 +585,7 @@ int TianmuHandler::delete_all_rows() {
DBUG_RETURN(ret);
}
int TianmuHandler::rename_table(const char *from, const char *to) {
try {
ha_rcengine_->RenameTable(current_txn_, from, to, ha_thd());
......@@ -634,7 +658,7 @@ int TianmuHandler::info(uint flag) {
tab = current_txn_->GetTableByPath(m_table_name);
} else
tab = ha_rcengine_->GetTableRD(m_table_name);
stats.records = (ha_rows)tab->NumOfValues();
stats.records = (ha_rows)(tab->NumOfValues() - tab->NumOfDeleted());
stats.data_file_length = 0;
stats.mean_rec_length = 0;
if (stats.records > 0) {
......
......@@ -206,6 +206,23 @@ common::ErrorCode RCTableIndex::UpdateIndex(core::Transaction *tx, std::string_v
return rc;
}
common::ErrorCode RCTableIndex::DeleteIndex(core::Transaction *tx, std::string_view &currentRowKey,
uint64_t row) {
StringWriter value, packkey;
std::vector<std::string_view> fields;
fields.emplace_back(currentRowKey);
rocksdb_key_->pack_key(packkey, fields, value);
const auto cf = rocksdb_key_->get_cf();
const auto rockdbStatus = tx->KVTrans().Delete(cf, {(const char *)packkey.ptr(), packkey.length()});
if (!rockdbStatus.ok()) {
TIANMU_LOG(LogCtl_Level::ERROR, "RockDb: delete key fail!");
return common::ErrorCode::FAILED;
}
return common::ErrorCode::SUCCESS;
}
common::ErrorCode RCTableIndex::GetRowByKey(core::Transaction *tx, std::vector<std::string_view> &fields,
uint64_t &row) {
std::string value;
......
......@@ -56,6 +56,7 @@ class RCTableIndex final {
common::ErrorCode RenameIndexTable(const std::string &from, const std::string &to);
common::ErrorCode InsertIndex(core::Transaction *tx, std::vector<std::string_view> &fields, uint64_t row);
common::ErrorCode UpdateIndex(core::Transaction *tx, std::string_view &nkey, std::string_view &okey, uint64_t row);
common::ErrorCode DeleteIndex(core::Transaction *tx, std::string_view &currentRowKey, uint64_t row);
common::ErrorCode GetRowByKey(core::Transaction *tx, std::vector<std::string_view> &fields, uint64_t &row);
public:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册