diff --git a/mysql-test/suite/tianmu/r/issue1876.result b/mysql-test/suite/tianmu/r/issue1876.result new file mode 100644 index 0000000000000000000000000000000000000000..2ca5ecba695ae76fb0ffdbce6954abc87a7db897 --- /dev/null +++ b/mysql-test/suite/tianmu/r/issue1876.result @@ -0,0 +1,57 @@ +include/master-slave.inc +Warnings: +Note #### Sending passwords in plain text without SSL/TLS is extremely insecure. +Note #### Storing MySQL user name or password information in the master info repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START SLAVE; see the 'START SLAVE Syntax' in the MySQL Manual for more information. +[connection master] +create table t1 (b int not null default 1, c varchar(60) default '\\')engine=tianmu; +insert into t1 values(1, 'AAAAAAAA'); +insert into t1 values(2, 'BBBBBBBB'); +SELECT * from t1 INTO OUTFILE '1876_tmp_dat'; +create table t2 like t1; +load data infile '1876_tmp_dat' into table t2; +CREATE TABLE `column_type_test1` ( +`c_tinyint` tinyint(4) DEFAULT NULL COMMENT 'tinyint', +`c_smallint` smallint(6) DEFAULT NULL COMMENT 'smallint', +`c_mediumint` mediumint(9) DEFAULT NULL COMMENT 'mediumint', +`c_int` int(11) DEFAULT NULL COMMENT 'int', +`c_bigint` bigint(20) DEFAULT NULL COMMENT 'bigint', +`c_float` float DEFAULT NULL COMMENT 'float', +`c_double` double DEFAULT NULL COMMENT 'double', +`c_decimal` decimal(10,5) DEFAULT NULL COMMENT 'decimal', +`c_date` date DEFAULT NULL COMMENT 'date', +`c_datetime` datetime DEFAULT NULL COMMENT 'datetime', +`c_timestamp` timestamp NULL DEFAULT NULL COMMENT 'timestamp', +`c_time` time DEFAULT NULL COMMENT 'time', +`c_char` char(10) DEFAULT NULL COMMENT 'char', +`c_varchar` varchar(10) DEFAULT NULL COMMENT 'varchar', +`c_blob` blob COMMENT 'blob', +`c_text` text COMMENT 'text', +`c_longblob` longblob COMMENT 'longblob' +) engine=tianmu; +insert into column_type_test1 values(100, 100, 100, 100, 100, 5.2, 10.88, 100.08300, '2016-02-25', '2016-02-25 10:20:01', '2007-04-23 08:12:49', '10:20:01', 'stonedb', 'hello', null, 'bcdefghijklmn', null); +insert into column_type_test1 values(101, 101, 101, 101, 101, 5.2, 10.88, 101.08300, '2016-02-25', '2016-02-25 10:20:01', '1985-08-11 09:10:25', '10:20:01', 'stoneatom', 'hello', null, 'bcdefghijklmn', null); +SELECT * from column_type_test1 INTO OUTFILE '1876_tmp1_dat'; +create table column_type_test2 like column_type_test1; +load data infile '1876_tmp1_dat' into table column_type_test2; +create table user_t1(id int, department varchar(10)) engine=tianmu; +SELECT * from user_t1 INTO OUTFILE '1876_tmp2_dat'; +create table user_t2 like user_t1; +load data infile '1876_tmp2_dat' into table user_t2; +SHOW STATUS LIKE 'Slave_running'; +Variable_name Value +Slave_running ON +select * from t2; +b c +1 AAAAAAAA +2 BBBBBBBB +select * from column_type_test2; +c_tinyint c_smallint c_mediumint c_int c_bigint c_float c_double c_decimal c_date c_datetime c_timestamp c_time c_char c_varchar c_blob c_text c_longblob +100 100 100 100 100 5.2 10.88 100.08300 2016-02-25 2016-02-25 10:20:01 2007-04-23 08:12:49 10:20:01 stonedb hello NULL bcdefghijklmn NULL +101 101 101 101 101 5.2 10.88 101.08300 2016-02-25 2016-02-25 10:20:01 1985-08-11 09:10:25 10:20:01 stoneatom hello NULL bcdefghijklmn NULL +checksum table user_t2; +Table Checksum +test.user_t2 536836232 +drop table t1, t2; +drop table column_type_test1, column_type_test2; +drop table user_t1, user_t2; +include/rpl_end.inc diff --git a/mysql-test/suite/tianmu/t/issue1876-master.opt b/mysql-test/suite/tianmu/t/issue1876-master.opt new file mode 100644 index 0000000000000000000000000000000000000000..55fa0a48deb968a43f749c600ff6ccfe0838f892 --- /dev/null +++ b/mysql-test/suite/tianmu/t/issue1876-master.opt @@ -0,0 +1,5 @@ +--testcase-timeout=40 +--secure-file-priv="" +--tianmu_insert_delayed=off +--log-bin=bin +--binlog_format=row diff --git a/mysql-test/suite/tianmu/t/issue1876.test b/mysql-test/suite/tianmu/t/issue1876.test new file mode 100644 index 0000000000000000000000000000000000000000..0e756c3c6aedbd65a26d65a793aed94b9a833d36 --- /dev/null +++ b/mysql-test/suite/tianmu/t/issue1876.test @@ -0,0 +1,66 @@ +--source include/have_tianmu.inc +--source include/master-slave.inc + +connection master; +create table t1 (b int not null default 1, c varchar(60) default '\\')engine=tianmu; +insert into t1 values(1, 'AAAAAAAA'); +insert into t1 values(2, 'BBBBBBBB'); +SELECT * from t1 INTO OUTFILE '1876_tmp_dat'; +create table t2 like t1; +load data infile '1876_tmp_dat' into table t2; + +CREATE TABLE `column_type_test1` ( + `c_tinyint` tinyint(4) DEFAULT NULL COMMENT 'tinyint', + `c_smallint` smallint(6) DEFAULT NULL COMMENT 'smallint', + `c_mediumint` mediumint(9) DEFAULT NULL COMMENT 'mediumint', + `c_int` int(11) DEFAULT NULL COMMENT 'int', + `c_bigint` bigint(20) DEFAULT NULL COMMENT 'bigint', + `c_float` float DEFAULT NULL COMMENT 'float', + `c_double` double DEFAULT NULL COMMENT 'double', + `c_decimal` decimal(10,5) DEFAULT NULL COMMENT 'decimal', + `c_date` date DEFAULT NULL COMMENT 'date', + `c_datetime` datetime DEFAULT NULL COMMENT 'datetime', + `c_timestamp` timestamp NULL DEFAULT NULL COMMENT 'timestamp', + `c_time` time DEFAULT NULL COMMENT 'time', + `c_char` char(10) DEFAULT NULL COMMENT 'char', + `c_varchar` varchar(10) DEFAULT NULL COMMENT 'varchar', + `c_blob` blob COMMENT 'blob', + `c_text` text COMMENT 'text', + `c_longblob` longblob COMMENT 'longblob' +) engine=tianmu; +insert into column_type_test1 values(100, 100, 100, 100, 100, 5.2, 10.88, 100.08300, '2016-02-25', '2016-02-25 10:20:01', '2007-04-23 08:12:49', '10:20:01', 'stonedb', 'hello', null, 'bcdefghijklmn', null); +insert into column_type_test1 values(101, 101, 101, 101, 101, 5.2, 10.88, 101.08300, '2016-02-25', '2016-02-25 10:20:01', '1985-08-11 09:10:25', '10:20:01', 'stoneatom', 'hello', null, 'bcdefghijklmn', null); +SELECT * from column_type_test1 INTO OUTFILE '1876_tmp1_dat'; +create table column_type_test2 like column_type_test1; +load data infile '1876_tmp1_dat' into table column_type_test2; + +create table user_t1(id int, department varchar(10)) engine=tianmu; +--disable_query_log +let $i = 0; +while($i < 70000) +{ + eval insert into user_t1 values($i, 'stonedb'); + inc $i; +} +--enable_query_log +SELECT * from user_t1 INTO OUTFILE '1876_tmp2_dat'; +create table user_t2 like user_t1; +load data infile '1876_tmp2_dat' into table user_t2; + +--sync_slave_with_master + +connection slave; +# check the rpl is running normally +SHOW STATUS LIKE 'Slave_running'; + +# the data in table t2 in slave is the same to that's in master, means the binlog is written correctly +select * from t2; +select * from column_type_test2; +checksum table user_t2; + +connection master; +drop table t1, t2; +drop table column_type_test1, column_type_test2; +drop table user_t1, user_t2; +--sync_slave_with_master +--source include/rpl_end.inc diff --git a/sql/binlog.cc b/sql/binlog.cc index 5017b573d6ca569923d21ac2bc47514c0d8aa455..6d8a4550e4d50441f02faecaad9685f1e2b5ef6d 100644 --- a/sql/binlog.cc +++ b/sql/binlog.cc @@ -8802,12 +8802,14 @@ TC_LOG::enum_result MYSQL_BIN_LOG::commit(THD *thd, bool all) DBUG_RETURN(RESULT_ABORTED); } } - else if (real_trans && xid && trn_ctx->rw_ha_count(trx_scope) > 1 && - !trn_ctx->no_2pc(trx_scope)) + else if (thd->tianmu_need_xid || (real_trans && xid && trn_ctx->rw_ha_count(trx_scope) > 1 && + !trn_ctx->no_2pc(trx_scope))) { Xid_log_event end_evt(thd, xid); if (cache_mngr->trx_cache.finalize(thd, &end_evt)) DBUG_RETURN(RESULT_ABORTED); + // used for tianmu only + thd->tianmu_need_xid= false; } else { diff --git a/sql/sql_class.h b/sql/sql_class.h index bb9204de0fc3e37fe305ebc3e7012f331c6b7714..8eb6a1a48dd39910ccba738c5c36cd4a184809af 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -1492,6 +1492,8 @@ private: { assert(0); return Query_arena::is_conventional(); } public: + /* Used only for tianmu to wirte xid log event */ + bool tianmu_need_xid; MDL_context mdl_context; /* diff --git a/storage/tianmu/core/tianmu_table.cpp b/storage/tianmu/core/tianmu_table.cpp index 386f0c536765db7b6e839d82b5a87e99e925752c..8e95efa29563160c9070ed0ae179a7caf59a6dc8 100644 --- a/storage/tianmu/core/tianmu_table.cpp +++ b/storage/tianmu/core/tianmu_table.cpp @@ -1118,11 +1118,20 @@ uint64_t TianmuTable::ProceedNormal(system::IOParameters &iop) { auto no_loaded_rows = parser.GetNoRow(); - if (no_loaded_rows > 0 && mysql_bin_log.is_open()) - if (binlog_load_query_log_event(iop) != 0) { - TIANMU_LOG(LogCtl_Level::ERROR, "Write load binlog fail!"); - throw common::FormatException("Write load binlog fail!"); + if (no_loaded_rows > 0 && mysql_bin_log.is_open()) { + LOAD_FILE_INFO *lf_info = (LOAD_FILE_INFO *)iop.GetLogInfo(); + THD *thd = lf_info->thd; + if (thd->is_current_stmt_binlog_format_row()) { // if binlog format is row + if (binlog_flush_pending_rows_event(iop, true, iop.GetTable()->file->has_transactions()) != 0) { + TIANMU_LOG(LogCtl_Level::ERROR, "Write row binlog fail!"); + throw common::FormatException("Write row binlog fail!"); + } + } else if (binlog_load_query_log_event(iop) != 0) { + TIANMU_LOG(LogCtl_Level::ERROR, "Write statement binlog fail!"); + throw common::FormatException("Write statement binlog fail!"); } + } + timer.Print(__PRETTY_FUNCTION__); no_rejected_rows = parser.GetNumOfRejectedRows(); @@ -1137,6 +1146,31 @@ uint64_t TianmuTable::ProceedNormal(system::IOParameters &iop) { return no_loaded_rows; } +int TianmuTable::binlog_flush_pending_rows_event(system::IOParameters &iop, bool stmt_end, bool is_transactional) { + DBUG_ENTER(__PRETTY_FUNCTION__); + /* + We shall flush the pending event even if we are not in row-based + mode: it might be the case that we left row-based mode before + flushing anything (e.g., if we have explicitly locked tables). + */ + if (!mysql_bin_log.is_open()) + DBUG_RETURN(0); + + LOAD_FILE_INFO *lf_info = (LOAD_FILE_INFO *)iop.GetLogInfo(); + THD *thd = lf_info->thd; + thd->tianmu_need_xid = true; + int error = 0; + + if (Rows_log_event *pending = thd->binlog_get_pending_rows_event(is_transactional)) { + if (stmt_end) { + pending->set_flags(Rows_log_event::STMT_END_F); + thd->clear_binlog_table_maps(); + } + error = mysql_bin_log.flush_and_set_pending_rows_event(thd, 0, is_transactional); + } + DBUG_RETURN(error); +} + int TianmuTable::binlog_load_query_log_event(system::IOParameters &iop) { char *load_data_query, *end, *fname_start, *fname_end, *p = nullptr; size_t pl = 0; diff --git a/storage/tianmu/core/tianmu_table.h b/storage/tianmu/core/tianmu_table.h index 368532dc968f5607461d4ac1df8cecb2d4fe2bcc..34b37f472a5b41c038c88c55b5362bf193c3d1f0 100644 --- a/storage/tianmu/core/tianmu_table.h +++ b/storage/tianmu/core/tianmu_table.h @@ -159,6 +159,7 @@ class TianmuTable final : public JustATable { uint64_t ProceedNormal(system::IOParameters &iop); uint64_t ProcessDelayed(system::IOParameters &iop); void Field2VC(Field *f, loader::ValueCache &vc, size_t col); + int binlog_flush_pending_rows_event(system::IOParameters &iop, bool stmt_end, bool is_transactional); int binlog_load_query_log_event(system::IOParameters &iop); int binlog_insert2load_log_event(system::IOParameters &iop); int binlog_insert2load_block(std::vector &vcs, uint load_obj, system::IOParameters &iop); diff --git a/storage/tianmu/loader/load_parser.cpp b/storage/tianmu/loader/load_parser.cpp index e83c118892d24be57c20073e4f069775a59be1da..7222e703c567cd6c72e008f55418644887b28269 100644 --- a/storage/tianmu/loader/load_parser.cpp +++ b/storage/tianmu/loader/load_parser.cpp @@ -73,10 +73,25 @@ uint LoadParser::GetPackrow(uint no_of_rows, std::vector &value_buff value_buffers.emplace_back(pack_size_, init_capacity); } + THD *thd = io_param_.GetTHD(); + TABLE *table = io_param_.GetTable(); + bool is_transactional = table->file->has_transactions(); + bool need_rows_binlog = false; + if (mysql_bin_log.is_open() && thd->is_current_stmt_binlog_format_row()) { + need_rows_binlog = true; + const bool has_trans = thd->lex->sql_command == SQLCOM_CREATE_TABLE || is_transactional; + bool need_binlog_rows_query = thd->variables.binlog_rows_query_log_events; + /* write table map event */ + [[maybe_unused]] int err = thd->binlog_write_table_map(table, has_trans, need_binlog_rows_query); + } + uint no_of_rows_returned; for (no_of_rows_returned = 0; no_of_rows_returned < no_of_rows; no_of_rows_returned++) { if (!MakeRow(value_buffers)) break; + /* write row after one row is ready */ + if (need_rows_binlog) + [[maybe_unused]] int err = thd->binlog_write_row(table, is_transactional, table->record[0], NULL); } last_pack_size_.clear();