# LogMessage
[LogMessage.java](../../oblogclient-common/src/main/java/com/oceanbase/oms/logmessage/LogMessage.java) 将日志数据的结构定义为 LogMessage。在程序运行过程中,客户端会将接收到的日志数据转换成 LogMessage 对象,用户可以使用它们来定制自己的处理逻辑。
## 结构
在获取增量日志的链路中,数据先使用 [oblogmsg](https://github.com/oceanbase/oblogmsg) 进行序列化处理,之后经过传输组件最终到达客户端,再在客户端中进行反序列化,转为 LogMessage 结构。具体的字段信息可以参考 oblogmsg。
以下是 LogMessage 中常用的一些字段:
参数 |
获取方法 |
返回类型 |
参数说明 |
byteBuf |
getRawData |
byte[] |
客户端接收到的日志数据原始值。 |
srcType |
getDbType |
DbTypeEnum |
数据源类型,OceanBase 1.0 以前版本对应值 OB_05 ,1.0 及之后的版本对应 OB_MYSQL 和 OB_ORACLE 。 |
op |
getOpt |
DataMessage.Record.Type |
日志数据的类型,OceanBase 中主要涉及 BEGIN , COMMIT , INSERT , UPDATE , DELETE , DDL , HEARTBEAT 。 |
timestamp |
getTimestamp |
String |
日志数据对应的变动执行时间的时间戳。 |
dbName |
getDbName |
String |
日志数据对应的库名。需要注意得是,该值包含租户名,格式为 租户名.库名 。 |
tableName |
getTableName |
String |
日志数据对应的表名。 |
除此之外,可以通过 `getFieldList` 方法获取到 DML 和 DDL 的具体变动信息。以下是 Field 格式常用的字段:
参数 |
获取方法 |
返回类型 |
参数说明 |
primaryKey |
isPrimary |
boolean |
是否是主键或非空唯一键。 |
name |
getFieldname |
String |
字段名称。 |
type |
getType |
DataMessage.Record.Field.Type |
字段类型。 |
encoding |
getEncoding |
String |
字段编码。 |
value |
getValue |
ByteString |
字段值,ByteString 类型。 |
prev |
isPrev |
boolean |
新旧值标识,为 true 时表示该值为变更前的值,false 则为变更后的值。 |
## 使用
使用示例可以参考 [LogProxyClientTest.java](../../oblogclient-logproxy/src/test/java/com/oceanbase/clogproxy/client/LogProxyClientTest.java)。
### 安全位点
LogMessage 提供了 `safeTimestamp` 来表示数据接收的安全位点,也就是说早于该秒级时间戳提交的 LogMessage 均已被客户端接收。
业务应用在进行数据消费时,一般还要维护一个数据处理的安全位点。在 LogMessage 中,该安全位点需要借助心跳的 `timestamp` 来实现。 LogMessage 在时间存储上有两套逻辑:
- 心跳类型:`timestamp` 字段值为安全位点对应的秒级时间戳。
- 其他类型:`timestamp` 字段值为数据变动的提交时间,而 `fileNameOffset` 字段对应最近一次心跳信息的 `timestamp`。由于 libobcdc 并不保证拉取到的数据变动是严格按照时间顺序的,因此对于 DDL、DML 类型的 LogMessage,应当使用 `fileNameOffset` 而非 `timestamp` 作为安全位点。
获取当前数据对应安全位点可以使用如下代码:
```java
long checkpoint;
if (DataMessage.Record.Type.HEARTBEAT.equals(message.getOpt())) {
checkpoint = Long.parseLong(message.getTimestamp());
} else {
checkpoint = message.getFileNameOffset();
}
```