# LogMessage [LogMessage.java](../../oblogclient-common/src/main/java/com/oceanbase/oms/logmessage/LogMessage.java) 将日志数据的结构定义为 LogMessage。在程序运行过程中,客户端会将接收到的日志数据转换成 LogMessage 对象,用户可以使用它们来定制自己的处理逻辑。 ## 结构 在获取增量日志的链路中,数据先使用 [oblogmsg](https://github.com/oceanbase/oblogmsg) 进行序列化处理,之后经过传输组件最终到达客户端,再在客户端中进行反序列化,转为 LogMessage 结构。具体的字段信息可以参考 oblogmsg。 以下是 LogMessage 中常用的一些字段:
参数 获取方法 返回类型 参数说明
byteBuf getRawData byte[] 客户端接收到的日志数据原始值。
srcType getDbType DbTypeEnum 数据源类型,OceanBase 1.0 以前版本对应值 OB_05,1.0 及之后的版本对应 OB_MYSQLOB_ORACLE
op getOpt DataMessage.Record.Type 日志数据的类型,OceanBase 中主要涉及 BEGIN, COMMIT, INSERT, UPDATE, DELETE, DDL, HEARTBEAT
timestamp getTimestamp String 日志数据对应的变动执行时间的时间戳。
dbName getDbName String 日志数据对应的库名。需要注意得是,该值包含租户名,格式为 租户名.库名
tableName getTableName String 日志数据对应的表名。
除此之外,可以通过 `getFieldList` 方法获取到 DML 和 DDL 的具体变动信息。以下是 Field 格式常用的字段:
参数 获取方法 返回类型 参数说明
primaryKey isPrimary boolean 是否是主键或非空唯一键。
name getFieldname String 字段名称。
type getType DataMessage.Record.Field.Type 字段类型。
encoding getEncoding String 字段编码。
value getValue ByteString 字段值,ByteString 类型。
prev isPrev boolean 新旧值标识,为 true 时表示该值为变更前的值,false 则为变更后的值。
## 使用 使用示例可以参考 [LogProxyClientTest.java](../../oblogclient-logproxy/src/test/java/com/oceanbase/clogproxy/client/LogProxyClientTest.java)。 ### 安全位点 LogMessage 提供了 `safeTimestamp` 来表示数据接收的安全位点,也就是说早于该秒级时间戳提交的 LogMessage 均已被客户端接收。 业务应用在进行数据消费时,一般还要维护一个数据处理的安全位点。在 LogMessage 中,该安全位点需要借助心跳的 `timestamp` 来实现。 LogMessage 在时间存储上有两套逻辑: - 心跳类型:`timestamp` 字段值为安全位点对应的秒级时间戳。 - 其他类型:`timestamp` 字段值为数据变动的提交时间,而 `fileNameOffset` 字段对应最近一次心跳信息的 `timestamp`。由于 libobcdc 并不保证拉取到的数据变动是严格按照时间顺序的,因此对于 DDL、DML 类型的 LogMessage,应当使用 `fileNameOffset` 而非 `timestamp` 作为安全位点。 获取当前数据对应安全位点可以使用如下代码: ```java long checkpoint; if (DataMessage.Record.Type.HEARTBEAT.equals(message.getOpt())) { checkpoint = Long.parseLong(message.getTimestamp()); } else { checkpoint = message.getFileNameOffset(); } ```