提交 c6289968 编写于 作者: 武汉红喜's avatar 武汉红喜

whatsmars-mq-otter

上级 54b7ae18
......@@ -22,6 +22,8 @@
<module>whatsmars-mq-rocketmq-broker</module>
<module>whatsmars-mq-rocketmq-tools</module>
<module>whatsmars-mq-rocketmq-spring</module>
<module>whatsmars-mq-otter</module>
<module>whatsmars-mq-sdk</module>
</modules>
<properties>
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>whatsmars-mq</artifactId>
<groupId>org.hongxi</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>whatsmars-mq-otter</artifactId>
<properties>
<otter.version>4.2.16</otter.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>shared.etl</artifactId>
<version>${otter.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>node.extend</artifactId>
<version>${otter.version}</version>
</dependency>
<dependency>
<groupId>org.hongxi</groupId>
<artifactId>whatsmars-mq-sdk</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.hongxi.whatsmars.otter.extend;
import com.alibaba.fastjson.JSON;
import com.alibaba.otter.node.extend.processor.AbstractEventProcessor;
import com.alibaba.otter.shared.etl.model.EventColumn;
import com.alibaba.otter.shared.etl.model.EventData;
import com.alibaba.otter.shared.etl.model.EventType;
import org.hongxi.whatsmars.common.rocketmq.RocketMQTemplate;
import org.hongxi.whatsmars.otter.extend.support.Column;
import org.hongxi.whatsmars.otter.extend.support.OtterData;
import org.springframework.util.StringUtils;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Created by shenhongxi on 2018/12/11.
*/
public class MQEventProcessor extends AbstractEventProcessor {
private static final String COLUMN_ORDER_ID = "order_id";
@Override
public boolean process(EventData eventData) {
EventType eventType = eventData.getEventType();
if (eventType != EventType.INSERT
&& eventType != EventType.DELETE
&& eventType != EventType.UPDATE
) return false;
OtterData otterData = new OtterData();
String scheme = eventData.getSchemaName();
String tableName = eventData.getTableName();
long tableId = eventData.getTableId();
otterData.setScheme(scheme);
otterData.setTableName(tableName);
otterData.setEventType(eventType);
// 主键
String id = "";
for(EventColumn eventColumn : eventData.getKeys()){
if ("id".equals(eventColumn.getColumnName())) {
id = eventColumn.getColumnValue();
}
}
otterData.setId(id);
String msgKey = id;
if(StringUtils.isEmpty(msgKey)){
msgKey = tableId + "";
}
// 列
List<Column> columns = new ArrayList<>();
for(EventColumn eventColumn : eventData.getColumns()){
Column column = doColumn(eventColumn);
if (column != null) {
columns.add(column);
if ("order_id".equals(column.getName())) {
msgKey = column.getValue();
}
}
}
otterData.setColumns(columns);
// topic 分库分表时不用每个库一个topic,请手动指定这里的topic
String topic = "otter-" + scheme;
String tags = tableName;
Pattern pattern = Pattern.compile("\\d+$");
Matcher matcher = pattern.matcher(tags);
if (matcher.find()) {
String suffix = matcher.group();
tags = tags.substring(0, tags.length() - suffix.length());
}
RocketMQTemplate.sendOrderly("otter-producer", topic, tags, msgKey, JSON.toJSONString(otterData));
return false;
}
private Column doColumn(EventColumn column) {
if (column.getColumnType() != Types.BLOB && column.getColumnType() != Types.CLOB) {
Column col = new Column();
col.setName(column.getColumnName());
col.setValue(column.getColumnValue());
col.setUpdated(column.isUpdate());
return col;
}
return null;
}
}
package org.hongxi.whatsmars.otter.extend.support;
import lombok.Data;
/**
* Created by shenhongxi on 2018/11/23.
*/
@Data
public class Column {
private String name;
private String value;
private boolean updated;
}
package org.hongxi.whatsmars.otter.extend.support;
import com.alibaba.otter.shared.etl.model.EventType;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
/**
* Created by shenhongxi on 2018/11/23.
*/
@Data
public class OtterData implements Serializable {
private String scheme;
private EventType eventType;
private String tableName;
private String id;
private List<Column> columns;
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>whatsmars-mq</artifactId>
<groupId>org.hongxi</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>whatsmars-mq-sdk</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.hongxi.whatsmars.common.rocketmq;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Created by shenhongxi on 2018/12/11.
*/
public class RocketMQTemplate {
private static final InternalLogger log = ClientLogger.getLog();
private static final String DEFAULT_PRODUCER_GROUP= "common-producer";
private static Map<String, DefaultMQProducer> producerMap = new HashMap<>();
public static DefaultMQProducer getProducer() throws MQClientException {
return getProducer(4, DEFAULT_PRODUCER_GROUP);
}
public static DefaultMQProducer getProducer(int queueNum) throws MQClientException {
return getProducer(queueNum, DEFAULT_PRODUCER_GROUP);
}
public static DefaultMQProducer getProducer(String producerGroup) throws MQClientException {
return getProducer(4, producerGroup);
}
public static DefaultMQProducer getProducer(int queueNum, String producerGroup) throws MQClientException {
if (queueNum < 1) throw new IllegalArgumentException("queueNum must >= 1");
if (StringUtils.isBlank(producerGroup)) throw new IllegalArgumentException("producerGroup cannot be null");
String producerKey = producerGroup + queueNum;
if (producerMap.get(producerKey) == null) {
synchronized (producerMap) {
if (producerMap.get(producerKey) == null) {
DefaultMQProducer producer = new DefaultMQProducer(DEFAULT_PRODUCER_GROUP);
producer.setDefaultTopicQueueNums(queueNum);
producer.start();
producerMap.put(producerKey, producer);
}
}
}
return producerMap.get(producerKey);
}
public static void send(String producerGroup, String topic, String tags, String keys, String body) {
try {
getProducer(producerGroup).send(new Message(topic, tags, keys, body.getBytes(RemotingHelper.DEFAULT_CHARSET)));
} catch (Exception e) {
log.error("send error, producerGroup:{}, topic:{}, tags:{}, keys:{}, body:{}",
producerGroup, topic, tags, keys, body);
}
}
public static void sendOrderly(String producerGroup, String topic, String tags, String keys, String body) {
try {
getProducer(producerGroup).send(new Message(topic, tags, keys, body.getBytes(RemotingHelper.DEFAULT_CHARSET)),
new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
long id = NumberUtils.toLong(String.valueOf(arg));
int index = (int) (id % mqs.size());
return mqs.get(index);
}
}, keys);
} catch (Exception e) {
log.error("send error, producerGroup:{}, topic:{}, tags:{}, keys:{}, body:{}",
producerGroup, topic, tags, keys, body, e);
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册