diff --git a/docs/formats/logmessage-cn.md b/docs/formats/logmessage-cn.md new file mode 100644 index 0000000000000000000000000000000000000000..590f52fbfeb657da984c6616cf252edb33f26671 --- /dev/null +++ b/docs/formats/logmessage-cn.md @@ -0,0 +1,139 @@ +# LogMessage + +[LogMessage.java](../../oblogclient-common/src/main/java/com/oceanbase/oms/logmessage/LogMessage.java) 将日志数据的结构定义为 LogMessage。在程序运行过程中,客户端会将接收到的日志数据转换成 LogMessage 对象,用户可以使用它们来定制自己的处理逻辑。 + +## 结构 + +在获取增量日志的链路中,数据先使用 [oblogmsg](https://github.com/oceanbase/oblogmsg) 进行序列化处理,之后经过传输组件最终到达客户端,再在客户端中进行反序列化,转为 LogMessage 结构。具体的字段信息可以参考 oblogmsg。 + +以下是 LogMessage 中常用的一些字段: + +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
参数获取方法返回类型参数说明
byteBufgetRawDatabyte[]客户端接收到的日志数据原始值。
srcTypegetDbTypeDbTypeEnum数据源类型,OceanBase 1.0 以前版本对应值 OB_05,1.0 及之后的版本对应 OB_MYSQLOB_ORACLE
opgetOptDataMessage.Record.Type日志数据的类型,OceanBase 中主要涉及 BEGIN, COMMIT, INSERT, UPDATE, DELETE, DDL, HEARTBEAT
timestampgetTimestampString日志数据对应的变动执行时间的时间戳。
dbNamegetDbNameString日志数据对应的库名。需要注意得是,该值包含租户名,格式为 租户名.库名
tableNamegetTableNameString日志数据对应的表名。
+
+ +除此之外,可以通过 `getFieldList` 方法获取到 DML 和 DDL 的具体变动信息。以下是 Field 格式常用的字段: + +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
参数获取方法返回类型参数说明
primaryKeyisPrimaryboolean是否是主键或非空唯一键。
namegetFieldnameString字段名称。
typegetTypeDataMessage.Record.Field.Type字段类型。
encodinggetEncodingString字段编码。
valuegetValueByteString字段值,ByteString 类型。
previsPrevboolean新旧值标识,为 true 时表示该值为变更前的值,false 则为变更后的值。
+
+ +## 使用 + +使用示例可以参考 [LogProxyClientTest.java](../../oblogclient-logproxy/src/test/java/com/oceanbase/clogproxy/client/LogProxyClientTest.java)。 + +### 安全位点 + +LogMessage 提供了 `safeTimestamp` 来表示数据接收的安全位点,也就是说早于该秒级时间戳提交的 LogMessage 均已被客户端接收。 + +业务应用在进行数据消费时,一般还要维护一个数据处理的安全位点。在 LogMessage 中,该安全位点需要借助心跳的 `timestamp` 来实现。 LogMessage 在时间存储上有两套逻辑: +- 心跳类型:`timestamp` 字段值为安全位点对应的秒级时间戳。 +- 其他类型:`timestamp` 字段值为数据变动的提交时间,而 `fileNameOffset` 字段对应最近一次心跳信息的 `timestamp`。由于 libobcdc 并不保证拉取到的数据变动是严格按照时间顺序的,因此对于 DDL、DML 类型的 LogMessage,应当使用 `fileNameOffset` 而非 `timestamp` 作为安全位点。 + +获取当前数据对应安全位点可以使用如下代码: + +```java +long checkpoint; +if (DataMessage.Record.Type.HEARTBEAT.equals(message.getOpt())) { + checkpoint = Long.parseLong(message.getTimestamp()); +} else { + checkpoint = message.getFileNameOffset(); +} +``` + + + diff --git a/docs/formats/logmessage.md b/docs/formats/logmessage.md index 461f75d6e40238dabe657ea87f60a7d04cc704a5..34b6d0cc7a2cd924bfc62ee88e16295e005d86c0 100644 --- a/docs/formats/logmessage.md +++ b/docs/formats/logmessage.md @@ -1,43 +1,136 @@ # LogMessage -LogMessage is a struct to store log messages, see the [class file](../../common/src/main/java/com/oceanbase/oms/logmessage/LogMessage.java) for its definition. +[LogMessage.java](../../oblogclient-common/src/main/java/com/oceanbase/oms/logmessage/LogMessage.java) defines `LogMessage` as the structure of the log records. During the running of the program, the client will convert the received log data into LogMessage objects, and users can use them to customize their own processing logic. -## LogMessage Struct +## Struct -A LogMessage object mainly has the following fields (getter): +When we fetch log data from OceanBase, the data will firstly be serialized using [oblogmsg](https://github.com/oceanbase/oblogmsg), and eventually be converted into LogMessage struct in the client. For specific field information, please refer to oblogmsg. -- *RawData*: Byte array that contains all details of the log message. -- *DbType*: Type of source database, here we only use `OCEANBASE1`, which means OceanBase 1.0 or higher version. -- *Opt*: Operation type, here should be one of `BEGIN`, `COMMIT`, `INSERT`, `UPDATE`, `DELETE`, `DDL`, `HEARTBEAT`. -- *DbName*: Database name, here it is in format of `tenant_name.database_name`. -- *TableName*: Table name. -- *Timestamp*: Timestamp in seconds. -- *OB10UniqueId*: Transaction id (string) of log message, only recorded in `BEGIN` or DML (`INSERT`, `UPDATE`, `DELETE`). -- *FieldList*: A list of row fields. +There are the common fields of LogMessage: -## Field List in LogMessage +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
FieldGetterTypeDescription
byteBufgetRawDatabyte[]The original log data in byte array format.
srcTypegetDbTypeDbTypeEnumType of datasource, OceanBase versions before 1.0 correspond to OB_05, versions 1.0 and later correspond to OB_MYSQL and OB_ORACLE.
opgetOptDataMessage.Record.TypeThe type of log data, OceanBase mainly involves BEGIN, COMMIT, INSERT, UPDATE, DELETE, DDL, HEARTBEAT.
timestampgetTimestampStringThe timestamp of data change execution time.
dbNamegetDbNameStringDatabase name of log data. Note that this value contains the tenant name in the format of tenant_name.db_name.
tableNamegetTableNameStringTable name of log data.
+
-The item in *FieldList* of LogMessage is of type `DataMessage.Record.Field`, and one Field corresponding to a column of one row. A Field struct mainly contains fields as following: +The field list of DML and DDL can be obtained through the `getFieldList` method. The following are commonly used fields in the Field struct: -- *length*: The length of `value` field. -- *primaryKey*: Flag of whether the column is the primary key. -- *name*: Column name. -- *type*: Type of the column, raw value is the const in `LogMessageTypeCode`. -- *flag*: Flag of whether the Field is generated/parsed. -- *encoding*: Encoding of the column. -- *value*: Column value. -- *prev*: Flag of whether the Field is the old value of the column. +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
FieldGetterTypeDescription
primaryKeyisPrimarybooleanFlag of whether this field is a primary key of not null unique key.
namegetFieldnameStringField name.
typegetTypeDataMessage.Record.Field.TypeField type.
encodinggetEncodingStringField encoding.
valuegetValueByteStringField value in ByteString type.
previsPrevbooleanFlag of whether it is a old value. It is true if this field is the value before the change, and false if it is the value after the change.
+
-Note that the Field struct here contains the type information, which is different from MySQL binlog. The value of a Field is of `ByteString` type, which could be used as a byte array or a string, both of which can easily cast to other types. +## Usage -The content of Field list in the LogMessage is related to the operation type: +Please refer to [LogProxyClientTest.java](../../oblogclient-logproxy/src/test/java/com/oceanbase/clogproxy/client/LogProxyClientTest.java). - - `BEGIN`、`COMMIT`、`HEARTBEAT`:null - - `DDL`: One Field with ddl sql in value field. - - `INSERT`: The column value list of the new row. - - `UPDATE`: Both the old and new column values of the row. The list should be [field_0_old, field_0_new, field_1_old, field_1_new, ...]. - - `DELETE`: The column value list of the old row. +### Safe Checkpoint -## Usage +LogMessage provides `safeTimestamp` to indicate the safe checkpoint for data reception, that is to say, LogMessage committed earlier than this timestamp has been received by the client. + +When a application consumes data, it generally maintains a safe checkpoint for data processing. For LogMessage, we should use HEARTBEAT `timestamp` as the safe checkpoint. LogMessage contains two kinds of timestamp: +- HEARTBEAT type: the value of the `timestamp` field is the timestamp corresponding to the safe checkpoint. +- Other types: the value of the `timestamp` field is the execution time of the data change, and the `fileNameOffset` field corresponds to the latest HEARTBEAT timestamp. Since `libobcdc` does not guarantee that the fetched data changes are in timestamp order, so for DDL and DML types of LogMessage, `fileNameOffset` should be used as safe checkpoint instead of `timestamp`. + +The following code can be used to obtain the safe checkpoint corresponding to the current data: -You can see which projects use `logproxy-client` [here](https://github.com/oceanbase/oblogclient/network/dependents?package_id=UGFja2FnZS0yODMzMjE5Nzc1). +```java +long checkpoint; +if (DataMessage.Record.Type.HEARTBEAT.equals(message.getOpt())) { + checkpoint = Long.parseLong(message.getTimestamp()); +} else { + checkpoint = message.getFileNameOffset(); +} +``` diff --git a/docs/quickstart/logproxy-client-tutorial-cn.md b/docs/quickstart/logproxy-client-tutorial-cn.md index 5ab1d46ecb86d3bdb19bcec487049d61f05ce70f..23de6f70a92ada546c5ce98b857142adef83a4cb 100644 --- a/docs/quickstart/logproxy-client-tutorial-cn.md +++ b/docs/quickstart/logproxy-client-tutorial-cn.md @@ -252,6 +252,6 @@ LogProxy 使用 `ClientConf` 中的 `clientId` 来区分不同的连接,若想 ## 问题排查 -当 LogProxy 与客户端之间的连接建立成功后,LogProxy 将会开始向客户端发送日志数据,这里的日志数据主要有心跳和数据变动两类。也就是说,及时数据库在监听范围内没有变动,LogProxy 客户端也应当能收到心跳类型的数据。 +当 LogProxy 与客户端之间的连接建立成功后,LogProxy 将会开始向客户端发送日志数据,这里的日志数据主要有心跳和数据变动两类。也就是说,即使数据库在监听范围内没有变动,LogProxy 客户端也应当能收到心跳类型的数据。 如果 LogProxy 客户端启动后,没有报错信息出现,也没有收到任何数据,这时候为了确定问题出现的原因,需要查看 LogProxy 对应的 LogReader 子进程的状态,相关的信息在 LogProxy 部署目录的 `run/{clientId}/`。