BestEffortsDeliveryListener.java 6.0 KB
Newer Older
H
fix #58  
haocao 已提交
1
/*
2 3 4 5 6
 * Copyright 1999-2015 dangdang.com.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
H
fix #58  
haocao 已提交
7
 *
T
terrymanu 已提交
8
 *     http://www.apache.org/licenses/LICENSE-2.0
H
fix #58  
haocao 已提交
9
 *
10 11 12 13 14 15 16 17
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * </p>
 */

18
package io.shardingjdbc.transaction.bed.sync;
19

20 21
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
22 23 24 25
import io.shardingjdbc.core.executor.event.DMLExecutionEvent;
import io.shardingjdbc.transaction.api.SoftTransactionManager;
import io.shardingjdbc.transaction.api.config.SoftTransactionConfiguration;
import io.shardingjdbc.transaction.bed.BEDSoftTransaction;
26
import io.shardingjdbc.transaction.constants.SoftTransactionType;
27 28 29
import io.shardingjdbc.transaction.storage.TransactionLog;
import io.shardingjdbc.transaction.storage.TransactionLogStorage;
import io.shardingjdbc.transaction.storage.TransactionLogStorageFactory;
30 31
import lombok.extern.slf4j.Slf4j;

H
fix #58  
haocao 已提交
32 33 34 35 36
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

37
/**
38
 * Best efforts delivery B.A.S.E transaction listener.
39 40
 * 
 * @author zhangliang
41
 * @author maxiaoguang
42 43
 */
@Slf4j
T
terrymanu 已提交
44
public final class BestEffortsDeliveryListener {
45
    
46 47 48 49 50
    /**
     * Listen event.
     * 
     * @param event dml execution event
     */
51 52
    @Subscribe
    @AllowConcurrentEvents
T
terrymanu 已提交
53
    public void listen(final DMLExecutionEvent event) {
54 55 56
        if (!isProcessContinuously()) {
            return;
        }
H
fix #58  
haocao 已提交
57
        SoftTransactionConfiguration transactionConfig = SoftTransactionManager.getCurrentTransactionConfiguration().get();
H
fix #69  
haocao 已提交
58
        TransactionLogStorage transactionLogStorage = TransactionLogStorageFactory.createTransactionLogStorage(transactionConfig.buildTransactionLogDataSource());
H
fix #58  
haocao 已提交
59
        BEDSoftTransaction bedSoftTransaction = (BEDSoftTransaction) SoftTransactionManager.getCurrentTransaction().get();
60
        switch (event.getEventExecutionType()) {
H
fix #58  
haocao 已提交
61
            case BEFORE_EXECUTE:
62
                //TODO for batch SQL need split to 2-level records
T
terrymanu 已提交
63
                transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(), 
64
                        event.getDataSource(), event.getSqlUnit().getSql(), event.getParameters(), System.currentTimeMillis(), 0));
65 66
                return;
            case EXECUTE_SUCCESS: 
T
terrymanu 已提交
67
                transactionLogStorage.remove(event.getId());
68 69 70 71 72 73 74 75 76
                return;
            case EXECUTE_FAILURE: 
                boolean deliverySuccess = false;
                for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) {
                    if (deliverySuccess) {
                        return;
                    }
                    boolean isNewConnection = false;
                    Connection conn = null;
T
terrymanu 已提交
77
                    PreparedStatement preparedStatement = null;
78
                    try {
79
                        conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource());
80
                        if (!isValidConnection(conn)) {
T
terrymanu 已提交
81
                            bedSoftTransaction.getConnection().release(conn);
82
                            conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource());
83 84
                            isNewConnection = true;
                        }
85
                        preparedStatement = conn.prepareStatement(event.getSqlUnit().getSql());
86
                        //TODO for batch event need split to 2-level records
87
                        for (int parameterIndex = 0; parameterIndex < event.getParameters().size(); parameterIndex++) {
T
terrymanu 已提交
88
                            preparedStatement.setObject(parameterIndex + 1, event.getParameters().get(parameterIndex));
89
                        }
T
terrymanu 已提交
90
                        preparedStatement.executeUpdate();
91
                        deliverySuccess = true;
T
terrymanu 已提交
92
                        transactionLogStorage.remove(event.getId());
93 94 95
                    } catch (final SQLException ex) {
                        log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex);
                    } finally {
T
terrymanu 已提交
96
                        close(isNewConnection, conn, preparedStatement);
97 98 99 100 101 102 103 104 105
                    }
                }
                return;
            default: 
                throw new UnsupportedOperationException(event.getEventExecutionType().toString());
        }
    }
    
    private boolean isProcessContinuously() {
H
fix #58  
haocao 已提交
106
        return SoftTransactionManager.getCurrentTransaction().isPresent()
107
                && SoftTransactionType.BestEffortsDelivery == SoftTransactionManager.getCurrentTransaction().get().getTransactionType();
108 109 110
    }
    
    private boolean isValidConnection(final Connection conn) {
T
terrymanu 已提交
111 112
        try (PreparedStatement preparedStatement = conn.prepareStatement("SELECT 1")) {
            try (ResultSet rs = preparedStatement.executeQuery()) {
113 114 115 116 117 118 119
                return rs.next() && 1 == rs.getInt("1");
            }
        } catch (final SQLException ex) {
            return false;
        }
    }
    
T
terrymanu 已提交
120 121
    private void close(final boolean isNewConnection, final Connection conn, final PreparedStatement preparedStatement) {
        if (null != preparedStatement) {
122
            try {
T
terrymanu 已提交
123
                preparedStatement.close();
124
            } catch (final SQLException ex) {
T
terrymanu 已提交
125
                log.error("PreparedStatement closed error:", ex);
126 127 128 129 130 131
            }
        }
        if (isNewConnection && null != conn) {
            try {
                conn.close();
            } catch (final SQLException ex) {
T
terrymanu 已提交
132
                log.error("Connection closed error:", ex);
133 134 135 136
            }
        }
    }
}