Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
oceanbase
oceanbase
提交
5a6c820e
O
oceanbase
项目概览
oceanbase
/
oceanbase
8 个月 前同步成功
通知
260
Star
6084
Fork
1301
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
O
oceanbase
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
5a6c820e
编写于
4月 18, 2024
作者:
S
SanmuWangZJU
提交者:
ob-robot
4月 18, 2024
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[OBCDC] Support mark column value origin, expecially for column value that is filled not from redo
上级
58d8530e
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
169 addition
and
77 deletion
+169
-77
src/logservice/libobcdc/src/ob_cdc_lob_data_merger.cpp
src/logservice/libobcdc/src/ob_cdc_lob_data_merger.cpp
+1
-1
src/logservice/libobcdc/src/ob_cdc_udt.cpp
src/logservice/libobcdc/src/ob_cdc_udt.cpp
+7
-2
src/logservice/libobcdc/src/ob_log_binlog_record.cpp
src/logservice/libobcdc/src/ob_log_binlog_record.cpp
+17
-2
src/logservice/libobcdc/src/ob_log_binlog_record.h
src/logservice/libobcdc/src/ob_log_binlog_record.h
+6
-0
src/logservice/libobcdc/src/ob_log_formatter.cpp
src/logservice/libobcdc/src/ob_log_formatter.cpp
+68
-47
src/logservice/libobcdc/src/ob_log_formatter.h
src/logservice/libobcdc/src/ob_log_formatter.h
+3
-2
src/logservice/libobcdc/src/ob_log_part_trans_task.cpp
src/logservice/libobcdc/src/ob_log_part_trans_task.cpp
+40
-19
src/logservice/libobcdc/src/ob_log_part_trans_task.h
src/logservice/libobcdc/src/ob_log_part_trans_task.h
+11
-3
src/logservice/libobcdc/tests/ob_binlog_record_printer.cpp
src/logservice/libobcdc/tests/ob_binlog_record_printer.cpp
+16
-1
未找到文件。
src/logservice/libobcdc/src/ob_cdc_lob_data_merger.cpp
浏览文件 @
5a6c820e
...
...
@@ -10,7 +10,7 @@
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX OBLOG
#define USING_LOG_PREFIX OBLOG
_FORMATTER
#include "ob_cdc_lob_data_merger.h"
#include "ob_cdc_lob_aux_meta_storager.h" // ObCDCLobAuxMetaStorager
...
...
src/logservice/libobcdc/src/ob_cdc_udt.cpp
浏览文件 @
5a6c820e
...
...
@@ -10,7 +10,7 @@
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX OBLOG
#define USING_LOG_PREFIX OBLOG
_FORMATTER
#include "ob_cdc_udt.h"
#include "ob_log_utils.h"
...
...
@@ -218,7 +218,12 @@ int ObCDCUdtValueBuilder::build(
ColValue
&
cv
)
{
int
ret
=
OB_SUCCESS
;
if
(
column_schema_info
.
is_xmltype
())
{
if
(
OB_UNLIKELY
(
cv
.
is_col_nop_
))
{
LOG_DEBUG
(
"ignore nop col"
,
"tls_id"
,
dml_stmt_task
.
get_tls_id
(),
"table_id"
,
dml_stmt_task
.
get_table_id
(),
"column_id"
,
column_schema_info
.
get_column_id
());
}
else
if
(
column_schema_info
.
is_xmltype
())
{
if
(
OB_FAIL
(
build_xmltype
(
column_schema_info
,
tz_info_wrap
,
...
...
src/logservice/libobcdc/src/ob_log_binlog_record.cpp
浏览文件 @
5a6c820e
...
...
@@ -12,7 +12,7 @@
* Binlog Record
*/
#define USING_LOG_PREFIX OBLOG
#define USING_LOG_PREFIX OBLOG
_FORMATTER
#ifdef OB_USE_DRCMSG
#include <drcmsg/MD.h> // ITableMeta
...
...
@@ -244,15 +244,30 @@ int ObLogBR::put_old(IBinlogRecord *br, const bool is_changed)
// mark value of OldCol to empty string, use global unique empty string value
// value of unchanged OldCol as NULL
const
char
*
val
=
is_changed
?
COLUMN_VALUE_IS_EMPTY
:
COLUMN_VALUE_IS_NULL
;
// mark value of unchanged column is PADDING(column value should not be used)
VALUE_ORIGIN
origin
=
is_changed
?
VALUE_ORIGIN
::
REDO
:
VALUE_ORIGIN
::
PADDING
;
int64_t
pos
=
(
NULL
==
val
?
0
:
strlen
(
val
));
(
void
)
br
->
putOld
(
val
,
static_cast
<
int
>
(
pos
));
(
void
)
br
->
putOld
(
val
,
static_cast
<
int
>
(
pos
)
,
origin
);
}
return
ret
;
}
void
ObLogBR
::
mark_value_populated_by_cdc
(
IBinlogRecord
&
br
,
const
bool
is_new_col
,
const
char
*
reason
,
const
int
col_idx
)
{
if
(
is_new_col
)
{
// mark for new_cols
(
void
)
br
.
putNew
(
COLUMN_VALUE_IS_NULL
,
0
,
VALUE_ORIGIN
::
PADDING
);
}
else
{
// mark for old_cols
(
void
)
br
.
putOld
(
COLUMN_VALUE_IS_NULL
,
0
,
VALUE_ORIGIN
::
PADDING
);
}
LOG_DEBUG
(
"mark_value_populated_by_cdc"
,
K
(
is_new_col
),
K
(
col_idx
),
KCSTRING
(
reason
));
}
int
ObLogBR
::
get_record_type
(
int
&
record_type
)
{
int
ret
=
OB_SUCCESS
;
...
...
src/logservice/libobcdc/src/ob_log_binlog_record.h
浏览文件 @
5a6c820e
...
...
@@ -20,6 +20,7 @@
#else
#include <drcmsg/BR.h> // IBinlogRecord
#include <drcmsg/DRCMessageFactory.h> // createBinlogRecord
#include <drcmsg/ValueOrigin.h> // VALUE_ORIGIN
#endif
#include "lib/queue/ob_link.h" // ObLink
...
...
@@ -46,6 +47,11 @@ public:
public:
static
int
put_old
(
IBinlogRecord
*
br
,
const
bool
is_changed
);
static
void
mark_value_populated_by_cdc
(
IBinlogRecord
&
br
,
const
bool
is_new_col
=
false
,
const
char
*
reason
=
""
,
const
int
col_idx
=
0
);
public:
void
reset
();
...
...
src/logservice/libobcdc/src/ob_log_formatter.cpp
浏览文件 @
5a6c820e
...
...
@@ -34,7 +34,7 @@
#include "ob_cdc_lob_ctx.h" // ObLobDataGetCtx
#include "ob_cdc_lob_data_merger.h" // IObCDCLobDataMerger
#include "ob_cdc_lob_aux_meta_storager.h" // ObCDCLobAuxMetaStorager
#include "ob_cdc_lob_aux_table_parse.h" // ObCDCLobAuxMetaStorager
#include "ob_cdc_lob_aux_table_parse.h"
// ObCDCLobAuxMetaStorager
#include "ob_cdc_udt.h" // ObCDCUdtValueBuilder
#include "ob_log_trace_id.h" // ObLogTraceIdGuard
#include "ob_log_timezone_info_getter.h"
...
...
@@ -62,8 +62,9 @@ void ObLogFormatter::RowValue::reset()
(
void
)
memset
(
orig_default_value_
,
0
,
sizeof
(
orig_default_value_
));
(
void
)
memset
(
is_rowkey_
,
0
,
sizeof
(
is_rowkey_
));
(
void
)
memset
(
is_changed_
,
0
,
sizeof
(
is_changed_
));
(
void
)
memset
(
is_null_lob_
old_columns_
,
0
,
sizeof
(
is_null_lob_old
_columns_
));
(
void
)
memset
(
is_null_lob_
columns_
,
0
,
sizeof
(
is_null_lob
_columns_
));
(
void
)
memset
(
is_diff_
,
0
,
sizeof
(
is_diff_
));
(
void
)
memset
(
is_old_col_nop_
,
0
,
sizeof
(
is_old_col_nop_
));
}
int
ObLogFormatter
::
RowValue
::
init
(
const
int64_t
column_num
,
const
bool
contain_old_column
)
...
...
@@ -79,8 +80,9 @@ int ObLogFormatter::RowValue::init(const int64_t column_num, const bool contain_
(
void
)
memset
(
orig_default_value_
,
0
,
column_num
*
sizeof
(
orig_default_value_
[
0
]));
(
void
)
memset
(
is_rowkey_
,
0
,
column_num
*
sizeof
(
is_rowkey_
[
0
]));
(
void
)
memset
(
is_changed_
,
0
,
column_num
*
sizeof
(
is_changed_
[
0
]));
(
void
)
memset
(
is_null_lob_
old_columns_
,
0
,
column_num
*
sizeof
(
is_null_lob_old
_columns_
[
0
]));
(
void
)
memset
(
is_null_lob_
columns_
,
0
,
column_num
*
sizeof
(
is_null_lob
_columns_
[
0
]));
(
void
)
memset
(
is_diff_
,
0
,
column_num
*
sizeof
(
is_diff_
[
0
]));
(
void
)
memset
(
is_old_col_nop_
,
0
,
column_num
*
sizeof
(
is_old_col_nop_
[
0
]));
}
return
OB_SUCCESS
;
...
...
@@ -1334,13 +1336,18 @@ int ObLogFormatter::fill_normal_cols_(
}
else
if
(
OB_ENTRY_NOT_EXIST
==
ret
)
{
ret
=
OB_SUCCESS
;
rv
->
new_columns_
[
usr_column_idx
]
=
nullptr
;
rv
->
is_null_lob_columns_
[
usr_column_idx
]
=
true
;
LOG_INFO
(
"fill_normal_cols_ nullptr"
,
K
(
is_new_value
),
KPC
(
cv
),
K
(
lob_ctx_cols
));
}
}
rv
->
is_changed_
[
usr_column_idx
]
=
true
;
rv
->
is_changed_
[
usr_column_idx
]
=
(
1
!=
cv
->
is_col_nop_
);
// column is not changed if col_value is nop(may be in minimal mode)
}
else
{
if
(
!
cv
->
is_out_row_
)
{
rv
->
old_columns_
[
usr_column_idx
]
=
&
cv
->
string_value_
;
if
(
cv
->
is_col_nop_
)
{
rv
->
is_old_col_nop_
[
usr_column_idx
]
=
true
;
}
else
{
rv
->
old_columns_
[
usr_column_idx
]
=
&
cv
->
string_value_
;
}
}
else
{
ObLobDataGetCtx
*
lob_data_get_ctx
=
nullptr
;
ObString
*
old_col_str
=
nullptr
;
...
...
@@ -1356,9 +1363,9 @@ int ObLogFormatter::fill_normal_cols_(
if
(
lob_data_get_ctx
->
is_ext_info_log
())
{
if
(
cv
->
is_json
())
{
// old data isn't passed when data is partial json
// so need set is_null_lob_
old_
columns_
// so need set is_null_lob_columns_
rv
->
old_columns_
[
usr_column_idx
]
=
nullptr
;
rv
->
is_null_lob_
old_
columns_
[
usr_column_idx
]
=
true
;
rv
->
is_null_lob_columns_
[
usr_column_idx
]
=
true
;
}
else
{
ret
=
OB_ERR_UNEXPECTED
;
LOG_ERROR
(
"not support ext info log type"
,
KR
(
ret
),
K
(
is_new_value
),
KPC
(
lob_data_get_ctx
),
KPC
(
cv
));
...
...
@@ -1383,7 +1390,7 @@ int ObLogFormatter::fill_normal_cols_(
}
else
if
(
OB_ENTRY_NOT_EXIST
==
ret
)
{
ret
=
OB_SUCCESS
;
rv
->
old_columns_
[
usr_column_idx
]
=
nullptr
;
rv
->
is_null_lob_
old_
columns_
[
usr_column_idx
]
=
true
;
rv
->
is_null_lob_columns_
[
usr_column_idx
]
=
true
;
LOG_INFO
(
"fill_normal_cols_ nullptr"
,
K
(
usr_column_idx
),
K
(
is_new_value
),
KPC
(
cv
),
K
(
lob_ctx_cols
));
}
}
...
...
@@ -1449,7 +1456,7 @@ int ObLogFormatter::fill_rowkey_cols_(
}
rv
->
is_rowkey_
[
rowkey_usr_index
]
=
true
;
rv
->
is_changed_
[
rowkey_usr_index
]
=
true
;
rv
->
is_changed_
[
rowkey_usr_index
]
=
(
1
!=
cv_node
->
is_col_nop_
)
;
if
(
rv
->
contain_old_column_
&&
NULL
==
rv
->
old_columns_
[
rowkey_usr_index
])
{
rv
->
old_columns_
[
rowkey_usr_index
]
=
&
(
cv_node
->
string_value_
);
...
...
@@ -1756,28 +1763,42 @@ int ObLogFormatter::format_dml_delete_(IBinlogRecord *br_data, const RowValue *r
}
// Handling non-primary key values
else
{
ObString
*
str
=
nullptr
;
bool
need_populate_old_value_to_null_or_empty
=
false
;
if
(
row_value
->
contain_old_column_
)
{
// When full column logging, the non-rowkey column of oldCold is set to the corresponding value
// If the column value is not provided, then it is a new column and the corresponding original default value is set
ObString
*
str
=
row_value
->
old_columns_
[
i
];
if
(
NULL
==
str
)
{
str
=
row_value
->
orig_default_value_
[
i
];
}
str
=
row_value
->
old_columns_
[
i
];
if
(
OB_ISNULL
(
str
))
{
ret
=
OB_ERR_UNEXPECTED
;
LOG_ERROR
(
"old column value and original default value are all invalid"
,
KR
(
ret
),
K
(
i
),
"column_num"
,
row_value
->
column_num_
);
if
(
row_value
->
is_old_col_nop_
[
i
])
{
need_populate_old_value_to_null_or_empty
=
true
;
ObLogBR
::
mark_value_populated_by_cdc
(
*
br_data
,
false
/*is_new_col*/
,
"delete_op with nop_old_col_value"
,
i
);
}
else
if
(
row_value
->
is_null_lob_columns_
[
i
])
{
// check if is outrow lob old col
need_populate_old_value_to_null_or_empty
=
true
;
ObLogBR
::
mark_value_populated_by_cdc
(
*
br_data
,
false
/*is_new_col*/
,
"delete_op with null_lob_col_value"
,
i
);
}
else
{
if
(
OB_ISNULL
(
str
=
row_value
->
orig_default_value_
[
i
]))
{
ret
=
OB_ERR_UNEXPECTED
;
LOG_ERROR
(
"old column value and original default value are all invalid"
,
KR
(
ret
),
K
(
i
),
"column_num"
,
row_value
->
column_num_
);
}
else
{
br_data
->
putOld
(
str
->
ptr
(),
str
->
length
());
}
}
}
else
{
br_data
->
putOld
(
str
->
ptr
(),
str
->
length
());
}
}
else
{
// Non-rowkey columns of oldCold are set to no-change status for non-full column logging
bool
is_changed
=
false
;
ObLogBR
::
mark_value_populated_by_cdc
(
*
br_data
,
false
/*is_new_col*/
,
"delete_op not contains old_column"
,
i
);
}
if
(
OB_FAIL
(
ObLogBR
::
put_old
(
br_data
,
is_changed
)))
{
LOG_ERROR
(
"put_old fail"
,
KR
(
ret
),
K
(
br_data
),
K
(
is_changed
));
}
if
(
OB_SUCC
(
ret
))
{
LOG_DEBUG
(
"put_old_column_value for delete operation"
,
K
(
i
),
K
(
need_populate_old_value_to_null_or_empty
),
"value"
,
str
==
nullptr
?
"NULL"
:
to_cstring
(
*
str
),
"default_val"
,
row_value
->
orig_default_value_
[
i
]);
}
}
}
...
...
@@ -1796,6 +1817,7 @@ int ObLogFormatter::format_dml_insert_(IBinlogRecord *br_data, const RowValue *r
}
else
{
for
(
int64_t
i
=
0
;
OB_SUCCESS
==
ret
&&
i
<
row_value
->
column_num_
;
i
++
)
{
if
(
!
row_value
->
is_changed_
[
i
])
{
// use defualt value in case of format old data with new schema(e.g. add column)
ObString
*
str_val
=
row_value
->
orig_default_value_
[
i
];
if
(
OB_ISNULL
(
str_val
))
{
...
...
@@ -1833,27 +1855,33 @@ int ObLogFormatter::format_dml_update_(IBinlogRecord *br_data, const RowValue *r
LOG_ERROR
(
"invalid argument"
,
KR
(
ret
),
K
(
br_data
),
K
(
row_value
));
}
else
{
for
(
int
i
=
0
;
OB_SUCCESS
==
ret
&&
i
<
row_value
->
column_num_
;
i
++
)
{
// fill column value after update
if
(
!
row_value
->
is_changed_
[
i
])
{
if
(
row_value
->
contain_old_column_
)
{
// In the case of a full column log, for update, if a column is not updated, the new value is filled with the value in old_column
// If there is no corresponding value in the old column either, the original default value is filled
ObString
*
str_val
=
row_value
->
old_columns_
[
i
];
if
(
NULL
==
str_val
)
{
str_val
=
row_value
->
orig_default_value_
[
i
];
}
if
(
OB_ISNULL
(
str_val
))
{
ret
=
OB_ERR_UNEXPECTED
;
LOG_ERROR
(
"new column value, old column value and original default value "
"are all invalid"
,
KR
(
ret
),
K
(
i
),
"column_num"
,
row_value
->
column_num_
);
if
(
row_value
->
is_old_col_nop_
[
i
])
{
ObLogBR
::
mark_value_populated_by_cdc
(
*
br_data
,
true
/*is_new_col*/
,
"update_op unchanged_col with nop_old_col"
,
i
);
}
else
if
(
row_value
->
is_null_lob_columns_
[
i
])
{
ObLogBR
::
mark_value_populated_by_cdc
(
*
br_data
,
true
/*is_new_col*/
,
"update_op unchanged_col with null_lob"
,
i
);
}
else
if
(
OB_ISNULL
(
str_val
=
row_value
->
orig_default_value_
[
i
]))
{
ret
=
OB_ERR_UNEXPECTED
;
LOG_ERROR
(
"new column value, old column value and original default value "
"are all invalid"
,
KR
(
ret
),
K
(
i
),
"column_num"
,
row_value
->
column_num_
);
}
else
{
br_data
->
putNew
(
str_val
->
ptr
(),
str_val
->
length
());
}
}
else
{
br_data
->
putNew
(
str_val
->
ptr
(),
str_val
->
length
());
}
}
else
{
// Mark as unmodified when not a full column log
br_data
->
putNew
(
NULL
,
0
);
// e.g. updated columns not include outrow lob column
ObLogBR
::
mark_value_populated_by_cdc
(
*
br_data
,
true
/*is_new_col*/
,
"update_op unchanged_col without old_col_val"
,
i
);
}
}
else
{
ObString
*
str_val
=
row_value
->
new_columns_
[
i
];
...
...
@@ -1870,40 +1898,33 @@ int ObLogFormatter::format_dml_update_(IBinlogRecord *br_data, const RowValue *r
}
}
if
(
OB_SUCCESS
==
ret
)
{
if
(
OB_SUCC
(
ret
))
{
// fill column value before update
bool
is_changed
=
row_value
->
is_changed_
[
i
];
if
(
row_value
->
contain_old_column_
)
{
// For full column logging, the old value is always filled with the value in old_column for updates
// If there is no valid value in the old column, the original default value is filled
ObString
*
str_val
=
row_value
->
old_columns_
[
i
];
if
(
NULL
==
str_val
)
{
str_val
=
row_value
->
orig_default_value_
[
i
];
}
if
(
OB_ISNULL
(
str_val
))
{
if
(
row_value
->
is_
null_lob_old_columns
_
[
i
])
{
br_data
->
putOld
(
NULL
,
0
);
// NOTICE: LOB column doesn't have default value.
LOG_DEBUG
(
"old_column is invalid, may outrow lob updated to inrow"
,
K
(
i
),
K
(
row_value
)
);
}
else
{
if
(
row_value
->
is_
old_col_nop
_
[
i
])
{
ObLogBR
::
mark_value_populated_by_cdc
(
*
br_data
,
false
/*is_new_col*/
,
"update_op with nop_old_col"
,
i
);
}
else
if
(
row_value
->
is_null_lob_columns_
[
i
])
{
ObLogBR
::
mark_value_populated_by_cdc
(
*
br_data
,
false
/*is_new_col*/
,
"update_op with null_lob_col"
,
i
);
}
else
if
(
OB_ISNULL
(
str_val
=
row_value
->
orig_default_value_
[
i
]))
{
ret
=
OB_ERR_UNEXPECTED
;
LOG_ERROR
(
"old column value and original default value are all invalid"
,
KR
(
ret
),
K
(
i
),
"column_num"
,
row_value
->
column_num_
,
"is_changed"
,
row_value
->
is_changed_
[
i
]);
}
else
{
br_data
->
putOld
(
str_val
->
ptr
(),
str_val
->
length
());
}
}
else
{
br_data
->
putOld
(
str_val
->
ptr
(),
str_val
->
length
());
}
}
else
{
// When not full column logging, for update, the old value is filled with whether the corresponding column has been modified
if
(
row_value
->
is_rowkey_
[
i
])
{
is_changed
=
true
;
}
if
(
OB_FAIL
(
ObLogBR
::
put_old
(
br_data
,
is_changed
)))
{
LOG_ERROR
(
"put_old fail"
,
KR
(
ret
),
K
(
br_data
),
K
(
is_changed
));
}
ObLogBR
::
mark_value_populated_by_cdc
(
*
br_data
,
false
/*is_new_col*/
,
"update_op with unchanged old_col"
,
i
);
}
}
}
// end of for
...
...
src/logservice/libobcdc/src/ob_log_formatter.h
浏览文件 @
5a6c820e
...
...
@@ -137,8 +137,9 @@ private:
bool
is_rowkey_
[
common
::
OB_MAX_COLUMN_NUMBER
];
bool
is_changed_
[
common
::
OB_MAX_COLUMN_NUMBER
];
bool
is_null_lob_old_columns_
[
common
::
OB_MAX_COLUMN_NUMBER
];
bool
is_diff_
[
common
::
OB_MAX_COLUMN_NUMBER
];
bool
is_null_lob_columns_
[
common
::
OB_MAX_COLUMN_NUMBER
];
// lob column value not recorded in log
bool
is_diff_
[
common
::
OB_MAX_COLUMN_NUMBER
];
// is lob_diff column
bool
is_old_col_nop_
[
common
::
OB_MAX_COLUMN_NUMBER
];
// old column that marked nop in log(most likely happened in minimal mode)
// invoke before handle format stmt task
// incase of usage of column_num but row doesn't contain valid column and column_num is not set
...
...
src/logservice/libobcdc/src/ob_log_part_trans_task.cpp
浏览文件 @
5a6c820e
...
...
@@ -360,7 +360,7 @@ int MutatorRow::parse_columns_(
}
else
if
(
OB_FAIL
(
row_reader
.
read_row
(
col_data
,
col_data_size
,
nullptr
,
datum_row
)))
{
LOG_WARN
(
"Failed to read datum row"
,
KR
(
ret
),
K
(
tenant_id
),
K
(
table_id
),
K
(
is_parse_new_col
));
}
else
{
LOG_DEBUG
(
"prepare to handle datum_row"
,
K
(
datum_row
));
OBLOG_FORMATTER_LOG
(
DEBUG
,
"prepare to handle datum_row"
,
K
(
is_parse_new_col
)
,
K
(
datum_row
));
// Iterate through all Cells using Cell Reader
for
(
int64_t
column_stored_idx
=
0
;
OB_SUCC
(
ret
)
&&
column_stored_idx
<
datum_row
.
get_column_count
();
column_stored_idx
++
)
{
const
ObObj
*
value
=
NULL
;
...
...
@@ -371,8 +371,6 @@ int MutatorRow::parse_columns_(
if
(
OB_FAIL
(
deep_copy_encoded_column_value_
(
datum
)))
{
LOG_ERROR
(
"deep_copy_encoded_column_value_ failed"
,
KR
(
ret
),
K
(
tenant_id
),
K
(
table_id
),
K
(
column_stored_idx
),
K
(
datum
),
K
(
is_parse_new_col
));
}
else
if
(
datum
.
is_nop
())
{
LOG_DEBUG
(
"ignore nop datum"
,
K
(
column_stored_idx
),
K
(
datum
));
}
else
if
(
OB_FAIL
(
get_column_info_
(
tb_schema_info
,
all_ddl_operation_table_schema_info
,
...
...
@@ -381,6 +379,29 @@ int MutatorRow::parse_columns_(
column_id
,
column_schema_info
)))
{
LOG_ERROR
(
"get_column_info"
,
KR
(
ret
),
K_
(
table_id
),
K
(
column_stored_idx
));
}
else
if
(
datum
.
is_nop
())
{
OBLOG_FORMATTER_LOG
(
DEBUG
,
"handle nop datum"
,
K
(
column_stored_idx
),
K
(
is_parse_new_col
),
K
(
datum
));
if
(
OB_NOT_NULL
(
column_schema_info
)
&&
column_schema_info
->
is_usr_column
())
{
ColValue
*
cv_node
=
nullptr
;
if
(
OB_ISNULL
(
cv_node
=
static_cast
<
ColValue
*>
(
allocator_
.
alloc
(
sizeof
(
ColValue
)))))
{
ret
=
OB_ALLOCATE_MEMORY_FAILED
;
LOG_ERROR
(
"allcate memory for ColValue failed"
,
KR
(
ret
),
"size"
,
sizeof
(
ColValue
));
}
else
if
(
OB_ISNULL
(
column_schema_info
))
{
ret
=
OB_ERR_UNEXPECTED
;
LOG_ERROR
(
"column_schema_info should be not null"
,
KR
(
ret
),
K
(
tenant_id
),
K
(
table_id
),
K
(
column_stored_idx
));
}
else
{
cv_node
->
reset
();
cv_node
->
column_id_
=
column_schema_info
->
get_column_id
();
cv_node
->
is_col_nop_
=
1
;
if
(
OB_FAIL
(
cols
.
add
(
cv_node
)))
{
LOG_ERROR
(
"add column_node into ColValueList failed"
,
KR
(
ret
),
KPC
(
cv_node
),
K
(
tenant_id
),
K
(
table_id
));
}
}
if
(
OB_FAIL
(
ret
)
&&
OB_NOT_NULL
(
cv_node
))
{
allocator_
.
free
(
cv_node
);
cv_node
=
nullptr
;
}
}
}
else
{
bool
ignore_column
=
false
;
if
(
OB_NOT_NULL
(
tb_schema_info
))
{
...
...
@@ -390,8 +411,8 @@ int MutatorRow::parse_columns_(
// if is hidden column of udt, is_usr_column is false, is_udt_column is true.
if
(
!
(
column_schema_info
->
is_usr_column
()
||
column_schema_info
->
is_udt_column
()))
{
// ignore non user columns
LOG_DEBUG
(
"ignore non user-required column"
,
K
(
tenant_id
),
K
(
table_id
),
K
(
column_stored_idx
),
K
(
column_schema_info
));
OBLOG_FORMATTER_LOG
(
DEBUG
,
"ignore non user-required column"
,
K
(
tenant_id
),
K
(
table_id
),
K
(
column_stored_idx
),
K
(
is_parse_new_col
),
K
(
column_schema_info
));
ignore_column
=
true
;
}
else
{
...
...
@@ -424,10 +445,10 @@ int MutatorRow::parse_columns_(
}
else
if
(
is_lob_storage
)
{
const
ObLobCommon
&
lob_common
=
datum
.
get_lob_data
();
is_out_row
=
!
lob_common
.
in_row_
;
LOG_DEBUG
(
"handle_lob_v2_data"
,
K
(
column_stored_idx
),
K
(
lob_common
),
K
(
obj
));
OBLOG_FORMATTER_LOG
(
DEBUG
,
"handle_lob_v2_data"
,
K
(
is_parse_new_col
)
,
K
(
column_stored_idx
),
K
(
lob_common
),
K
(
obj
));
if
(
!
is_out_row
)
{
LOG_DEBUG
(
"is_lob_storage in row"
,
K
(
column_id
),
K
(
is_lob_storage
),
K
(
is_parse_new_col
),
K
(
lob_common
),
K
(
obj
));
OBLOG_FORMATTER_LOG
(
DEBUG
,
"is_lob_storage in row"
,
K
(
column_id
),
K
(
is_lob_storage
),
K
(
is_parse_new_col
),
K
(
lob_common
),
K
(
obj
));
obj
.
set_string
(
obj
.
get_type
(),
lob_common
.
get_inrow_data_ptr
(),
lob_common
.
get_byte_size
(
datum
.
len_
));
}
else
{
const
ObLobData
&
lob_data
=
*
(
reinterpret_cast
<
const
ObLobData
*>
(
lob_common
.
buffer_
));
...
...
@@ -435,7 +456,7 @@ int MutatorRow::parse_columns_(
const
ObLobDataOutRowCtx
*
lob_data_out_row_ctx
=
reinterpret_cast
<
const
ObLobDataOutRowCtx
*>
(
lob_data
.
buffer_
);
LOG_DEBUG
(
"is_lob_storage out row"
,
K
(
column_id
),
K
(
is_lob_storage
),
K
(
is_parse_new_col
),
K
(
lob_common
),
OBLOG_FORMATTER_LOG
(
DEBUG
,
"is_lob_storage out row"
,
K
(
column_id
),
K
(
is_lob_storage
),
K
(
is_parse_new_col
),
K
(
lob_common
),
K
(
lob_data
),
K
(
obj
),
KPC
(
lob_data_out_row_ctx
));
if
(
is_parse_new_col
)
{
...
...
@@ -575,7 +596,7 @@ int MutatorRow::add_column_(
collation_type
=
column_schema_info
->
get_collation_type
();
}
LOG_DEBUG
(
"column_cast: "
,
OBLOG_FORMATTER_LOG
(
DEBUG
,
"column_cast: "
,
K
(
tenant_id
),
K
(
table_id
),
K
(
column_id
),
...
...
@@ -584,22 +605,22 @@ int MutatorRow::add_column_(
// If the LOB is larger than 2M, do not print the contents, but the address and length, in case of taking too long to print the log
if
(
value
->
is_lob
()
&&
value
->
get_string_len
()
>
2
*
_M_
)
{
LOG_DEBUG
(
"column_cast: "
,
"old_obj_ptr"
,
(
void
*
)
value
->
get_string_ptr
(),
OBLOG_FORMATTER_LOG
(
DEBUG
,
"column_cast: "
,
"old_obj_ptr"
,
(
void
*
)
value
->
get_string_ptr
(),
"old_obj_len"
,
value
->
get_string_len
(),
"new_obj_ptr"
,
(
void
*
)
cv_node
->
value_
.
get_string_ptr
(),
"new_obj_len"
,
cv_node
->
value_
.
get_string_len
());
}
else
if
(
value
->
is_json
()
&&
value
->
get_string_len
()
>
2
*
_M_
)
{
// Json may exceed 2M
LOG_DEBUG
(
"column_cast: "
,
"old_obj_ptr"
,
(
void
*
)
value
->
get_string_ptr
(),
OBLOG_FORMATTER_LOG
(
DEBUG
,
"column_cast: "
,
"old_obj_ptr"
,
(
void
*
)
value
->
get_string_ptr
(),
"old_obj_len"
,
value
->
get_string_len
(),
"new_obj_ptr"
,
(
void
*
)
cv_node
->
value_
.
get_string_ptr
(),
"new_obj_len"
,
cv_node
->
value_
.
get_string_len
());
}
else
if
(
value
->
is_geometry
()
&&
value
->
get_string_len
()
>
2
*
_M_
)
{
// geometry may exceed 2M
LOG_DEBUG
(
"column_cast: "
,
"old_obj_ptr"
,
(
void
*
)
value
->
get_string_ptr
(),
OBLOG_FORMATTER_LOG
(
DEBUG
,
"column_cast: "
,
"old_obj_ptr"
,
(
void
*
)
value
->
get_string_ptr
(),
"old_obj_len"
,
value
->
get_string_len
(),
"new_obj_ptr"
,
(
void
*
)
cv_node
->
value_
.
get_string_ptr
(),
"new_obj_len"
,
cv_node
->
value_
.
get_string_len
());
}
else
{
LOG_DEBUG
(
"column_cast: "
,
"old_obj"
,
*
value
,
"new_obj"
,
OBLOG_FORMATTER_LOG
(
DEBUG
,
"column_cast: "
,
"old_obj"
,
*
value
,
"new_obj"
,
cv_node
->
value_
);
}
...
...
@@ -684,8 +705,8 @@ int MutatorRow::parse_rowkey_(
const
ObObj
*
rowkey_objs
=
rowkey
.
get_obj_ptr
();
if
(
OB_UNLIKELY
(
rowkey_count
<=
0
)
||
OB_ISNULL
(
rowkey_objs
))
{
LOG_ERROR
(
"rowkey is invalid"
,
K
(
rowkey_count
),
K
(
rowkey_objs
),
K
(
rowkey
));
ret
=
OB_INVALID_ARGUMENT
;
LOG_ERROR
(
"rowkey is invalid"
,
KR
(
ret
),
K
(
rowkey_count
),
K
(
rowkey_objs
),
K
(
rowkey
));
}
else
{
for
(
int64_t
index
=
0
;
OB_SUCC
(
ret
)
&&
index
<
rowkey_count
;
index
++
)
{
// Column ID is invalid when Table Schema is not provided
...
...
@@ -712,7 +733,7 @@ int MutatorRow::parse_rowkey_(
K
(
tenant_id
),
K
(
table_id
),
K
(
rowkey_count
),
K
(
rowkey_info
),
KPC
(
tb_schema_info
));
}
else
if
(
!
column_schema_info
->
is_usr_column
())
{
// ignore hidden rowkey column
LOG_DEBUG
(
"ignore non user-required rowkey column"
,
KPC
(
column_schema_info
),
OBLOG_FORMATTER_LOG
(
DEBUG
,
"ignore non user-required rowkey column"
,
KPC
(
column_schema_info
),
K
(
tenant_id
),
K
(
table_id
),
K
(
column_id
));
ignore_column
=
true
;
...
...
@@ -769,7 +790,7 @@ int MutatorRow::parse_columns_(
}
else
if
(
OB_FAIL
(
row_reader
.
read_row
(
col_data
,
col_data_size
,
nullptr
,
datum_row
)))
{
LOG_ERROR
(
"Failed to read datum row"
,
KR
(
ret
));
}
else
{
LOG_DEBUG
(
"parse_columns_"
,
K
(
is_parse_new_col
),
K
(
datum_row
));
OBLOG_FORMATTER_LOG
(
DEBUG
,
"parse_columns_"
,
K
(
is_parse_new_col
),
K
(
datum_row
));
// Iterate through all Cells using Cell Reader
for
(
int64_t
i
=
0
;
OB_SUCC
(
ret
)
&&
i
<
datum_row
.
get_column_count
();
i
++
)
{
...
...
@@ -781,11 +802,11 @@ int MutatorRow::parse_columns_(
if
(
OB_FAIL
(
deep_copy_encoded_column_value_
(
datum
)))
{
LOG_ERROR
(
"deep_copy_encoded_column_value_ failed"
,
KR
(
ret
),
"column_stored_idx"
,
i
,
K
(
datum
),
K
(
is_parse_new_col
));
}
else
if
(
datum
.
is_nop
())
{
LOG_DEBUG
(
"ignore nop datum"
,
"column_stored_idx"
,
i
,
K
(
datum
));
OBLOG_FORMATTER_LOG
(
DEBUG
,
"ignore nop datum"
,
"column_stored_idx"
,
i
,
K
(
datum
));
}
else
if
(
OB_INVALID_ID
==
column_id
)
{
// Note: the column_id obtained here may be invalid
// For example a delete statement with only one cell and an invalid column_id in the cell
LOG_DEBUG
(
"cell column_id is invalid"
,
K
(
i
),
K
(
datum_row
),
K_
(
table_id
),
K_
(
rowkey
));
OBLOG_FORMATTER_LOG
(
DEBUG
,
"cell column_id is invalid"
,
K
(
i
),
K
(
datum_row
),
K_
(
table_id
),
K_
(
rowkey
));
}
else
{
if
(
OB_SUCC
(
ret
))
{
ObObjMeta
obj_meta
;
...
...
@@ -869,8 +890,8 @@ int MutatorRow::add_column_(
// NOTE: Allow obj2str_helper and column_schema to be empty
if
(
OB_ISNULL
(
cv_node
))
{
LOG_ERROR
(
"allocate memory for ColValue fail"
,
"size"
,
sizeof
(
ColValue
));
ret
=
OB_ALLOCATE_MEMORY_FAILED
;
LOG_ERROR
(
"allocate memory for ColValue fail"
,
KR
(
ret
),
"size"
,
sizeof
(
ColValue
));
}
else
{
cv_node
->
reset
();
cv_node
->
value_
=
*
value
;
...
...
src/logservice/libobcdc/src/ob_log_part_trans_task.h
浏览文件 @
5a6c820e
...
...
@@ -159,7 +159,14 @@ struct ColValue
uint64_t
column_id_
;
ObString
string_value_
;
// The value after converting Obj to a string
ColValue
*
next_
;
uint8_t
is_out_row_
:
1
;
// Column data is stored out row
union
{
uint8_t
column_flags_
;
struct
{
uint8_t
is_out_row_
:
1
;
// Column data is stored out row
uint8_t
is_col_nop_
:
1
;
// Column data is nop
uint8_t
reserve_fields_
:
6
;
// reserve fileds
};
};
// if this ColValue is group value
// then children_ store group hidden ColValue
...
...
@@ -171,7 +178,7 @@ struct ColValue
column_id_
=
common
::
OB_INVALID_ID
;
string_value_
.
reset
();
next_
=
NULL
;
is_out_row
_
=
0
;
column_flags
_
=
0
;
children_
.
reset
();
}
...
...
@@ -194,7 +201,8 @@ struct ColValue
K_
(
value
),
K_
(
column_id
),
K_
(
string_value
),
K_
(
is_out_row
));
K_
(
is_out_row
),
K_
(
is_col_nop
));
};
///////////////////////////////////////////////////////////////////////////////////
...
...
src/logservice/libobcdc/tests/ob_binlog_record_printer.cpp
浏览文件 @
5a6c820e
...
...
@@ -628,11 +628,19 @@ int ObBinlogRecordPrinter::output_data_file_column_data(IBinlogRecord *br,
// FIXME: does not check the value of the field until the length of the default value can be obtained
// ROW_PRINTF(ptr, size, pos, ri, "[C%ld] column_default_value:%s", column_index, default_val);
LOG_DEBUG
(
"output column"
,
K
(
index
),
K
(
new_cols_count
),
K
(
old_cols_count
));
if
(
OB_SUCC
(
ret
))
{
if
(
index
<
new_cols_count
)
{
VALUE_ORIGIN
newValueOrigin
=
new_cols
[
index
].
m_origin
;
const
bool
is_new_value_origin_redo
=
(
VALUE_ORIGIN
::
REDO
==
newValueOrigin
);
const
char
*
new_col_value
=
new_cols
[
index
].
buf
;
size_t
new_col_value_len
=
new_cols
[
index
].
buf_used_size
;
if
(
!
is_new_value_origin_redo
)
{
ROW_PRINTF
(
ptr
,
size
,
pos
,
ri
,
"C[%ld] column_value_new_origin: %s"
,
column_index
,
newValueOrigin
==
2
?
"PADDING"
:
"BACK_QUERY"
);
}
if
(
is_type_for_md5_printing
&&
enable_print_lob_md5
)
{
ROW_PRINTF
(
ptr
,
size
,
pos
,
ri
,
"[C%ld] column_value_new_md5:[%s](%ld)"
,
column_index
,
calc_md5_cstr
(
new_col_value
,
new_col_value_len
),
new_col_value_len
);
...
...
@@ -654,9 +662,16 @@ int ObBinlogRecordPrinter::output_data_file_column_data(IBinlogRecord *br,
}
if
(
OB_SUCCESS
==
ret
&&
index
<
old_cols_count
)
{
VALUE_ORIGIN
oldValueOrigin
=
old_cols
[
index
].
m_origin
;
const
bool
is_old_value_origin_redo
=
(
VALUE_ORIGIN
::
REDO
==
oldValueOrigin
);
const
char
*
old_col_value
=
old_cols
[
index
].
buf
;
size_t
old_col_value_len
=
old_cols
[
index
].
buf_used_size
;
if
(
!
is_old_value_origin_redo
)
{
ROW_PRINTF
(
ptr
,
size
,
pos
,
ri
,
"[C%ld] column_value_old_origin: %s"
,
column_index
,
oldValueOrigin
==
2
?
"PADDING"
:
"BACK_QUERY"
);
}
if
(
EMySQLFieldType
::
MYSQL_TYPE_BIT
==
ctype
)
{
ROW_PRINTF
(
ptr
,
size
,
pos
,
ri
,
"[C%ld] column_value_old_hex:"
,
column_index
);
pos
--
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录