提交 a2a8685b 编写于 作者: W whomhim

feat( #506 ): TM 的一些相关信息保存在 redis 上

上级 cc696fa6
......@@ -5,7 +5,7 @@ import com.codingapi.txlcn.protocol.message.Connection;
import com.codingapi.txlcn.protocol.message.separate.TmNodeMessage;
import com.codingapi.txlcn.tm.config.TmConfig;
import com.codingapi.txlcn.tm.node.TmNode;
import com.codingapi.txlcn.tm.repository.redis.RedisTmNodeRepository;
import com.codingapi.txlcn.tm.repository.TmNodeRepository;
import com.codingapi.txlcn.tm.util.NetUtil;
import lombok.Data;
import lombok.EqualsAndHashCode;
......@@ -35,11 +35,11 @@ public class OtherTmNodeEvent extends TmNodeMessage {
public void handle(ApplicationContext springContext, Protocoler protocoler, Connection connection) throws Exception {
log.info("OtherTmNodeEvent request msg =>{}", messageId);
super.handle(springContext, protocoler, connection);
RedisTmNodeRepository redisTmNodeRepository = springContext.getBean(RedisTmNodeRepository.class);
TmNodeRepository tmNodeRepository = springContext.getBean(TmNodeRepository.class);
TmConfig tmConfig = springContext.getBean(TmConfig.class);
String hostAddress = Objects.requireNonNull(NetUtil.getLocalhost()).getHostAddress();
String tmId = String.format("%s:%s", hostAddress, tmConfig.getPort());
TmNode tmNode = new TmNode(tmId, hostAddress, tmConfig.getPort(), redisTmNodeRepository);
TmNode tmNode = new TmNode(tmId, hostAddress, tmConfig.getPort(), tmNodeRepository);
this.otherNodeList = tmNode.getBesidesNodeList(otherNodeList);
protocoler.sendMsg(connection.getUniqueKey(), this);
log.info("OtherTmNodeEvent.send =>[tmId:{}]", tmId);
......
......@@ -2,6 +2,7 @@ package com.codingapi.txlcn.tm.id;
import com.alibaba.fastjson.JSON;
import com.codingapi.txlcn.tm.repository.TmNodeInfo;
import com.codingapi.txlcn.tm.util.NetUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
......@@ -10,7 +11,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import javax.annotation.PreDestroy;
import java.util.concurrent.TimeUnit;
......@@ -19,7 +20,7 @@ import static com.codingapi.txlcn.tm.constant.CommonConstant.TX_MANAGER;
/**
* 雪花算法初始器
* 初始化snowflake的 dataCenterId 和 workerId
* 初始化 snowflake 的 dataCenterId 和 workerId
* <p>
* 1.系统启动时生成默认 dataCenterId 和 workerId,并尝试作为 key 存储到 redis
* 2.如果存储成功,设置 redis 过期时间为24h,把当前 dataCenterId 和 workerId 传入 snowflake
......@@ -47,9 +48,9 @@ public class SnowflakeInitiator {
*/
public static SnowflakeVo snowflakeVo;
private final StringRedisTemplate redisTemplate;
private final RedisTemplate<String, Object> redisTemplate;
public SnowflakeInitiator(StringRedisTemplate redisTemplate) {
public SnowflakeInitiator(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
......@@ -78,7 +79,8 @@ public class SnowflakeInitiator {
String hostAddress = NetUtil.getLocalhost().getHostAddress();
String hostAndPort = String.format("%s:%s", hostAddress, port);
Boolean isNotHasKey = !redisTemplate.hasKey(snowflakeRedisKey);
Boolean isSetKey = redisTemplate.opsForValue().setIfAbsent(snowflakeRedisKey, hostAndPort,
TmNodeInfo tmNodeInfo = new TmNodeInfo(snowflakeRedisKey, hostAndPort, 0);
Boolean isSetKey = redisTemplate.opsForValue().setIfAbsent(snowflakeRedisKey, tmNodeInfo,
LockExpire + randomDigits(), TimeUnit.SECONDS);
if (isNotHasKey && isSetKey) {
log.info("snowflake setIfAbsent key:{}", JSON.toJSONString(snowflakeVo));
......
......@@ -6,7 +6,7 @@ import com.codingapi.txlcn.protocol.message.separate.AbsMessage;
import com.codingapi.txlcn.tm.config.TmConfig;
import com.codingapi.txlcn.tm.node.TmNode;
import com.codingapi.txlcn.tm.reporter.TmManagerReporter;
import com.codingapi.txlcn.tm.repository.redis.RedisTmNodeRepository;
import com.codingapi.txlcn.tm.repository.TmNodeRepository;
import com.codingapi.txlcn.tm.util.NetUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -30,7 +30,7 @@ public class LoadBalancerInterceptor implements EventInterceptor {
private TmManagerReporter tmManagerReporter;
@Autowired
private RedisTmNodeRepository redisTmNodeRepository;
private TmNodeRepository tmNodeRepository;
@Autowired
private TmConfig tmConfig;
......@@ -64,7 +64,7 @@ public class LoadBalancerInterceptor implements EventInterceptor {
log.debug("=> LoadBalancerInterceptor requestMsgToOtherTm");
String hostAddress = Objects.requireNonNull(NetUtil.getLocalhost()).getHostAddress();
String tmId = String.format("%s:%s", hostAddress, tmConfig.getPort());
TmNode tmNode = new TmNode(tmId, hostAddress, tmConfig.getPort(), redisTmNodeRepository);
TmNode tmNode = new TmNode(tmId, hostAddress, tmConfig.getPort(), tmNodeRepository);
// 其他 node 节点
List<InetSocketAddress> otherNodeList = tmNode.getOtherNodeList();
......
package com.codingapi.txlcn.tm.node;
import com.codingapi.txlcn.protocol.ProtocolServer;
import com.codingapi.txlcn.tm.repository.redis.RedisTmNodeRepository;
import com.codingapi.txlcn.tm.repository.TmNodeRepository;
import com.codingapi.txlcn.tm.util.NetUtil;
import lombok.Data;
import lombok.experimental.Accessors;
......@@ -37,22 +37,22 @@ public class TmNode {
*/
private int port;
private RedisTmNodeRepository redisTmNodeRepository;
private TmNodeRepository tmNodeRepository;
public TmNode(String hostAndPort, String nodeIp, int port, RedisTmNodeRepository redisTmNodeRepository) {
public TmNode(String hostAndPort, String nodeIp, int port, TmNodeRepository tmNodeRepository) {
this.id = hostAndPort;
this.nodeIp = nodeIp;
this.port = port;
this.redisTmNodeRepository = redisTmNodeRepository;
this.tmNodeRepository = tmNodeRepository;
}
/**
* @return 获得除此 TM 节点以外 TM 节点的 IP 及端口
*/
public List<InetSocketAddress> getOtherNodeList() {
return redisTmNodeRepository.keys(TX_MANAGE_KEY).stream()
return tmNodeRepository.keys(TX_MANAGE_KEY).stream()
.filter(Objects::nonNull)
.map(tmKey -> redisTmNodeRepository.getTmNodeAddress(tmKey))
.map(tmKey -> tmNodeRepository.getTmNodeInfo(tmKey).getHostAndPort())
.filter(s -> !s.equals(id))
.map(NetUtil::addressFormat)
.filter(Objects::nonNull)
......@@ -65,9 +65,9 @@ public class TmNode {
* @return 获得传入集合以外的 TM 节点的 IP 及端口
*/
public List<InetSocketAddress> getBesidesNodeList(List<InetSocketAddress> iNetSocketAddressList) {
return redisTmNodeRepository.keys(TX_MANAGE_KEY).stream()
return tmNodeRepository.keys(TX_MANAGE_KEY).stream()
.filter(Objects::nonNull)
.map(tmKey -> redisTmNodeRepository.getTmNodeAddress(tmKey))
.map(tmKey -> tmNodeRepository.getTmNodeInfo(tmKey).getHostAndPort())
.filter(s -> !s.equals(id))
.map(NetUtil::addressFormat)
.filter(Objects::nonNull)
......
package com.codingapi.txlcn.tm.id;
package com.codingapi.txlcn.tm.node;
import com.codingapi.txlcn.protocol.message.Connection;
import com.codingapi.txlcn.tm.id.SnowflakeInitiator;
import com.codingapi.txlcn.tm.reporter.TmManagerReporter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Collection;
/**
* @author WhomHim
* @description 30秒重新设置过期时间一次,相当于心跳一样告诉其他 TM “我还存活”
* @date Create in 2020-8-17 22:34:28
*/
@Component
public class SnowflakeSchedule {
public class TmNodeSchedule {
@Autowired
private SnowflakeInitiator snowflakeInitiator;
@Autowired
private TmManagerReporter tmManagerReporter;
@Scheduled(cron = "0/30 * * * * ?")
private void snowflakeInitiatorResetExpire() {
Collection<Connection> connections = tmManagerReporter.getConnections();
snowflakeInitiator.resetExpire();
}
......
......@@ -7,6 +7,7 @@ import com.codingapi.txlcn.protocol.message.Message;
import com.codingapi.txlcn.protocol.message.separate.AbsMessage;
import com.codingapi.txlcn.tm.config.TmConfig;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.springframework.util.Assert;
import java.net.InetSocketAddress;
......@@ -19,6 +20,7 @@ import java.util.stream.Collectors;
* @author whohim
*/
@AllArgsConstructor
@Data
public class TmManagerReporter {
private Protocoler protocoler;
......@@ -101,5 +103,4 @@ public class TmManagerReporter {
}
}
package com.codingapi.txlcn.tm.repository;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @author WhomHim
* @description
* @date Create in 2020-9-13 23:07:11
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class TmNodeInfo implements Serializable {
private String tmId;
/**
* TM 节点的 ip 地址及端口
*/
private String hostAndPort;
/**
* TM 的连接数
*/
private int connection;
}
package com.codingapi.txlcn.tm.repository;
import java.util.List;
/**
* @author WhomHim
* @description
* @date Create in 2020-9-13 23:12:57
*/
public interface TmNodeRepository {
List<String> keys(String pattern);
TmNodeInfo getTmNodeInfo(String key);
void create(String tmId, String hostAndPort, int connection);
}
package com.codingapi.txlcn.tm.repository.redis;
import com.codingapi.txlcn.tm.repository.TransactionGroup;
import com.codingapi.txlcn.tm.repository.TmNodeRepository;
import com.codingapi.txlcn.tm.repository.TransactionGroupRepository;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
......@@ -10,7 +10,6 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
......@@ -24,21 +23,27 @@ public class RedisRepositoryConfiguration {
@Bean
@ConditionalOnMissingBean
public TransactionGroupRepository transactionGroupRepository(RedisTemplate<String, TransactionGroup> redisTemplate){
public TransactionGroupRepository transactionGroupRepository(RedisTemplate<String, Object> redisTemplate) {
return new RedisTransactionGroupRepository(redisTemplate);
}
@Bean
@ConditionalOnMissingBean
public TmNodeRepository tmNodeRepository(RedisTemplate<String, Object> tmRedisTemplate) {
return new RedisTmNodeRepository(tmRedisTemplate);
}
@Bean
@ConditionalOnMissingBean
public RedisTemplate<String, TransactionGroup> redisTemplate(RedisConnectionFactory factory){
RedisTemplate<String, TransactionGroup> redisTemplate = new RedisTemplate<>();
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(factory);
Jackson2JsonRedisSerializer<TransactionGroup> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(TransactionGroup.class);
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.activateDefaultTyping(objectMapper.getPolymorphicTypeValidator(),ObjectMapper.DefaultTyping.NON_FINAL);
objectMapper.activateDefaultTyping(objectMapper.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
......@@ -51,19 +56,4 @@ public class RedisRepositoryConfiguration {
return redisTemplate;
}
@Bean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) {
StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();
stringRedisTemplate.setConnectionFactory(factory);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
stringRedisTemplate.setKeySerializer(stringRedisSerializer);
stringRedisTemplate.setHashKeySerializer(stringRedisSerializer);
stringRedisTemplate.setValueSerializer(stringRedisSerializer);
stringRedisTemplate.setHashValueSerializer(stringRedisSerializer);
stringRedisTemplate.afterPropertiesSet();
return stringRedisTemplate;
}
}
package com.codingapi.txlcn.tm.repository.redis;
import com.codingapi.txlcn.tm.repository.TmNodeInfo;
import com.codingapi.txlcn.tm.repository.TmNodeRepository;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.data.redis.core.ValueOperations;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
......@@ -20,17 +22,17 @@ import java.util.function.Consumer;
* @date Create in 2020/9/3 17:49
*/
@Slf4j
@Component
public class RedisTmNodeRepository {
@AllArgsConstructor
public class RedisTmNodeRepository implements TmNodeRepository {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private RedisTemplate<String, Object> redisTemplate;
/**
* 获取符合条件的key
*
* @param pattern 表达式
*/
@Override
public List<String> keys(String pattern) {
List<String> keys = new ArrayList<>();
this.scan(pattern, item -> {
......@@ -48,7 +50,7 @@ public class RedisTmNodeRepository {
* @param consumer 对迭代到的key进行操作
*/
private void scan(String pattern, Consumer<byte[]> consumer) {
this.stringRedisTemplate.execute((RedisConnection connection) -> {
this.redisTemplate.execute((RedisConnection connection) -> {
try (Cursor<byte[]> cursor = connection
.scan(ScanOptions.scanOptions()
.count(Long.MAX_VALUE)
......@@ -65,9 +67,17 @@ public class RedisTmNodeRepository {
/**
* 获取 Tm 节点的ip地址
*
* @param key Tm 全局唯一 ID
*/
public String getTmNodeAddress(String key) {
return stringRedisTemplate.opsForValue().get(key);
public TmNodeInfo getTmNodeInfo(String key) {
return (TmNodeInfo) redisTemplate.opsForValue().get(key);
}
@Override
public void create(String tmId, String hostAndPort, int connection) {
ValueOperations<String, Object> operations = redisTemplate.opsForValue();
TmNodeInfo tmNodeInfo = new TmNodeInfo(tmId, hostAndPort, connection);
operations.set(tmId, tmNodeInfo);
}
}
......@@ -16,11 +16,11 @@ import org.springframework.data.redis.core.ValueOperations;
@AllArgsConstructor
public class RedisTransactionGroupRepository implements TransactionGroupRepository {
private RedisTemplate<String, TransactionGroup> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Override
public void create(String groupId, String uniqueKey,String moduleName) throws Exception {
ValueOperations<String,TransactionGroup> operations = redisTemplate.opsForValue();
ValueOperations<String,Object> operations = redisTemplate.opsForValue();
TransactionGroup transactionGroup = new TransactionGroup(groupId,uniqueKey,moduleName, TransactionInfo.TransactionType.REQUEST);
operations.set(groupId,transactionGroup);
......@@ -29,8 +29,8 @@ public class RedisTransactionGroupRepository implements TransactionGroupReposito
@Override
public void join(String groupId, String uniqueKey, String moduleName) throws Exception {
ValueOperations<String,TransactionGroup> operations = redisTemplate.opsForValue();
TransactionGroup transactionGroup = operations.get(groupId);
ValueOperations<String,Object> operations = redisTemplate.opsForValue();
TransactionGroup transactionGroup = (TransactionGroup) operations.get(groupId);
if(transactionGroup!=null){
transactionGroup.add(uniqueKey,moduleName, TransactionInfo.TransactionType.JOIN);
operations.set(groupId,transactionGroup);
......@@ -41,8 +41,8 @@ public class RedisTransactionGroupRepository implements TransactionGroupReposito
@Override
public TransactionGroup notify(String groupId, boolean success) throws Exception {
ValueOperations<String,TransactionGroup> operations = redisTemplate.opsForValue();
TransactionGroup transactionGroup = operations.get(groupId);
ValueOperations<String,Object> operations = redisTemplate.opsForValue();
TransactionGroup transactionGroup = (TransactionGroup) operations.get(groupId);
if(transactionGroup==null){
return null;
}
......
......@@ -2,7 +2,7 @@ package com.codingapi.txlcn.tm.runner;
import com.codingapi.txlcn.protocol.ProtocolServer;
import com.codingapi.txlcn.tm.node.TmNode;
import com.codingapi.txlcn.tm.repository.redis.RedisTmNodeRepository;
import com.codingapi.txlcn.tm.repository.TmNodeRepository;
import com.codingapi.txlcn.tm.util.NetUtil;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
......@@ -31,7 +31,7 @@ public class TmNodeServerRunner {
private ScheduledExecutorService scheduledExecutorService;
@Autowired
private RedisTmNodeRepository redisTmNodeRepository;
private TmNodeRepository tmNodeRepository;
public TmNodeServerRunner(ProtocolServer protocolServer) {
this.protocolServer = protocolServer;
......@@ -46,7 +46,7 @@ public class TmNodeServerRunner {
try {
InetAddress localhost = NetUtil.getLocalhost();
String hostAddress = Objects.requireNonNull(localhost).getHostAddress();
TmNode tmNode = new TmNode(String.format("%s:%s", hostAddress, port), hostAddress, port, redisTmNodeRepository);
TmNode tmNode = new TmNode(String.format("%s:%s", hostAddress, port), hostAddress, port, tmNodeRepository);
scheduledExecutorService.scheduleAtFixedRate(
() -> tmNode.connectToOtherNode(protocolServer), 0, 30, TimeUnit.SECONDS);
} catch (Exception e) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册