/** * Copyright 1999-2015 dangdang.com. *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *

*/ package com.dangdang.ddframe.rdb.transaction.soft.bed.sync; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import com.dangdang.ddframe.rdb.sharding.executor.event.DMLExecutionEvent; import com.dangdang.ddframe.rdb.sharding.executor.event.DMLExecutionEventListener; import com.dangdang.ddframe.rdb.transaction.soft.bed.BEDSoftTransactionManager; import com.dangdang.ddframe.rdb.transaction.soft.api.SoftTransactionManagerFactory; import com.dangdang.ddframe.rdb.transaction.soft.api.SoftTransactionType; import com.dangdang.ddframe.rdb.transaction.soft.api.config.SoftTransactionConfiguration; import com.dangdang.ddframe.rdb.transaction.soft.storage.TransactionLogStorage; import com.dangdang.ddframe.rdb.transaction.soft.storage.TransactionLogStorageFactory; import com.dangdang.ddframe.rdb.transaction.soft.storage.TransactionLog; import com.google.common.eventbus.AllowConcurrentEvents; import com.google.common.eventbus.Subscribe; import lombok.extern.slf4j.Slf4j; /** * 最大努力送达型事务监听器. * * @author zhangliang */ @Slf4j public final class BestEffortsDeliveryListener implements DMLExecutionEventListener { @Subscribe @AllowConcurrentEvents public void listen(final DMLExecutionEvent event) { if (!isProcessContinuously()) { return; } SoftTransactionConfiguration transactionConfig = SoftTransactionManagerFactory.getCurrentTransactionConfiguration().get(); TransactionLogStorage transactionLogStorage = TransactionLogStorageFactory.createTransactionLogStorageFactory(transactionConfig); BEDSoftTransactionManager transactionManager = (BEDSoftTransactionManager) SoftTransactionManagerFactory.getCurrentTransactionManager().get(); switch (event.getEventExecutionType()) { case BEFORE_EXECUTE: transactionLogStorage.add(new TransactionLog( event.getId(), transactionManager.getTransactionId(), transactionManager.getTransactionType(), event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0)); return; case EXECUTE_SUCCESS: transactionLogStorage.remove(event.getId()); return; case EXECUTE_FAILURE: boolean deliverySuccess = false; for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) { if (deliverySuccess) { return; } boolean isNewConnection = false; Connection conn = null; PreparedStatement preparedStatement = null; try { conn = transactionManager.getConnection().getConnection(event.getDataSource()); if (!isValidConnection(conn)) { conn = transactionManager.getConnection(); isNewConnection = true; } preparedStatement = conn.prepareStatement(event.getSql()); for (int parameterIndex = 0; parameterIndex < event.getParameters().size(); parameterIndex++) { preparedStatement.setObject(parameterIndex + 1, event.getParameters().get(parameterIndex)); } preparedStatement.executeUpdate(); deliverySuccess = true; transactionLogStorage.remove(event.getId()); } catch (final SQLException ex) { log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex); } finally { close(isNewConnection, conn, preparedStatement); } } return; default: throw new UnsupportedOperationException(event.getEventExecutionType().toString()); } } private boolean isProcessContinuously() { return SoftTransactionManagerFactory.getCurrentTransactionManager().isPresent() && SoftTransactionType.BestEffortsDelivery == SoftTransactionManagerFactory.getCurrentTransactionManager().get().getTransactionType(); } private boolean isValidConnection(final Connection conn) { try (PreparedStatement preparedStatement = conn.prepareStatement("SELECT 1")) { try (ResultSet rs = preparedStatement.executeQuery()) { return rs.next() && 1 == rs.getInt("1"); } } catch (final SQLException ex) { return false; } } private void close(final boolean isNewConnection, final Connection conn, final PreparedStatement preparedStatement) { if (null != preparedStatement) { try { preparedStatement.close(); } catch (final SQLException ex) { log.error("PreparedStatement closed error:", ex); } } if (isNewConnection && null != conn) { try { conn.close(); } catch (final SQLException ex) { log.error("Connection closed error:", ex); } } } @Override public String getName() { return getClass().getName(); } }