提交 67555655 编写于 作者: C cherrylzhao

Add Transactional recovery function for atomikos XA.

上级 1c322580
......@@ -55,4 +55,18 @@ public interface XATransactionManager extends ShardingTransactionManager {
* @return transaction manager
*/
TransactionManager getUnderlyingTransactionManager();
/**
* Register recovery resource.
* @param dataSourceName data source name
* @param xaDataSource XA data source
*/
void registerRecoveryResource(String dataSourceName, XADataSource xaDataSource);
/**
* Remove recovery resource.
* @param dataSourceName data source name
* @param xaDataSource XA data source
*/
void removeRecoveryResource(String dataSourceName, XADataSource xaDataSource);
}
......@@ -19,6 +19,7 @@ package io.shardingsphere.transaction.xa.convert.datasource;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.util.ReflectiveUtil;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import javax.sql.XAConnection;
......@@ -36,6 +37,7 @@ import java.util.logging.Logger;
* @author zhaojun
*/
@RequiredArgsConstructor
@Getter
public final class ShardingXADataSource implements XADataSource {
private final XADataSource xaDataSource;
......
......@@ -50,9 +50,11 @@ public final class XAShardingTransactionHandler extends ShardingTransactionHandl
private DatabaseType databaseType;
private final XATransactionManager xaTransactionManager = XATransactionManagerSPILoader.getInstance().getTransactionManager();
@Override
public ShardingTransactionManager getShardingTransactionManager() {
return XATransactionManagerSPILoader.getInstance().getTransactionManager();
return xaTransactionManager;
}
@Override
......@@ -62,21 +64,31 @@ public final class XAShardingTransactionHandler extends ShardingTransactionHandl
@Override
public void registerTransactionDataSource(final DatabaseType databaseType, final Map<String, DataSource> dataSourceMap) {
SHARDING_XA_DATA_SOURCE_MAP.clear();
removeTransactionDataSource();
this.databaseType = databaseType;
try {
for (Map.Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
DataSourceParameter parameter = DataSourceSwapperRegistry.getSwapper(entry.getValue().getClass()).swap(entry.getValue());
ShardingXADataSource shardingXADataSource = ShardingXADataSourceUtil.createShardingXADataSource(databaseType, entry.getKey(), parameter);
SHARDING_XA_DATA_SOURCE_MAP.put(entry.getKey(), shardingXADataSource);
xaTransactionManager.registerRecoveryResource(entry.getKey(), shardingXADataSource.getXaDataSource());
}
} catch (final Exception ex) {
log.error("Failed to register transaction datasource of XAShardingTransactionHandler");
}
}
private void removeTransactionDataSource() {
if (!SHARDING_XA_DATA_SOURCE_MAP.isEmpty()) {
for (ShardingXADataSource each : SHARDING_XA_DATA_SOURCE_MAP.values()) {
xaTransactionManager.removeRecoveryResource(each.getDatasourceName(), each.getXaDataSource());
}
}
SHARDING_XA_DATA_SOURCE_MAP.clear();
}
@Override
public void synchronizeTransactionResource(final String datasourceName, final Connection connection, final Object... properties) throws SQLException {
public synchronized void synchronizeTransactionResource(final String datasourceName, final Connection connection, final Object... properties) throws SQLException {
try {
ShardingXADataSource shardingXADataSource = SHARDING_XA_DATA_SOURCE_MAP.get(datasourceName);
Preconditions.checkNotNull(shardingXADataSource, "Could not find ShardingXADataSource of `%s`", datasourceName);
......
......@@ -17,6 +17,9 @@
package io.shardingsphere.transaction.xa.manager;
import com.atomikos.datasource.xa.jdbc.JdbcTransactionalResource;
import com.atomikos.icatch.config.UserTransactionService;
import com.atomikos.icatch.config.UserTransactionServiceImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import io.shardingsphere.core.constant.DatabaseType;
import io.shardingsphere.core.exception.ShardingException;
......@@ -40,25 +43,17 @@ import javax.transaction.TransactionManager;
*/
public final class AtomikosTransactionManager implements XATransactionManager {
private final UserTransactionManager underlyingTransactionManager;
private final UserTransactionManager underlyingTransactionManager = new UserTransactionManager();
public AtomikosTransactionManager() {
underlyingTransactionManager = new UserTransactionManager();
init();
}
private final UserTransactionService userTransactionService = new UserTransactionServiceImp();
private void init() {
try {
underlyingTransactionManager.init();
} catch (SystemException ex) {
throw new ShardingException(ex);
}
public AtomikosTransactionManager() {
userTransactionService.init();
}
@Override
public void destroy() {
underlyingTransactionManager.setForceShutdown(true);
underlyingTransactionManager.close();
userTransactionService.shutdown(true);
}
@Override
......@@ -112,4 +107,15 @@ public final class AtomikosTransactionManager implements XATransactionManager {
public TransactionManager getUnderlyingTransactionManager() {
return underlyingTransactionManager;
}
@Override
public void registerRecoveryResource(final String dataSourceName, final XADataSource xaDataSource) {
userTransactionService.registerResource(new JdbcTransactionalResource(dataSourceName, xaDataSource));
}
@Override
public void removeRecoveryResource(final String dataSourceName, final XADataSource xaDataSource) {
userTransactionService.removeResource(new JdbcTransactionalResource(dataSourceName, xaDataSource));
}
}
com.atomikos.icatch.serial_jta_transactions = false
com.atomikos.icatch.automatic_resource_registration = false
com.atomikos.icatch.default_jta_timeout = 300000
com.atomikos.icatch.max_actives = 10000
com.atomikos.icatch.checkpoint_interval = 50000
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册