ServerCommunicationSystem.java 5.4 KB
Newer Older
B
bessani@gmail.com 已提交
1
/**
P
pjsousa@gmail.com 已提交
2
 * Copyright (c) 2007-2009 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags
3
 *
P
pjsousa@gmail.com 已提交
4
 * This file is part of SMaRt.
5
 *
P
pjsousa@gmail.com 已提交
6 7 8 9
 * SMaRt is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
10
 *
P
pjsousa@gmail.com 已提交
11 12
 * SMaRt is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
P
pjsousa@gmail.com 已提交
14
 * GNU General Public License for more details.
15
 *
P
pjsousa@gmail.com 已提交
16 17 18 19 20 21
 * You should have received a copy of the GNU General Public License along with SMaRt.  If not, see <http://www.gnu.org/licenses/>.
 */
package navigators.smart.communication;

import java.util.concurrent.LinkedBlockingQueue;

B
bessani@gmail.com 已提交
22
import java.util.concurrent.TimeUnit;
P
pjsousa@gmail.com 已提交
23 24 25 26 27
import navigators.smart.communication.client.CommunicationSystemServerSide;
import navigators.smart.communication.client.CommunicationSystemServerSideFactory;
import navigators.smart.communication.client.RequestReceiver;
import navigators.smart.communication.server.ServersCommunicationLayer;
import navigators.smart.paxosatwar.roles.Acceptor;
28 29
import navigators.smart.reconfiguration.ReconfigurationManager;
import navigators.smart.tom.ServiceReplica;
P
pjsousa@gmail.com 已提交
30 31 32 33 34 35 36 37 38 39
import navigators.smart.tom.core.TOMLayer;
import navigators.smart.tom.core.messages.TOMMessage;
import navigators.smart.tom.util.Logger;

/**
 *
 * @author alysson
 */
public class ServerCommunicationSystem extends Thread {

B
bessani@gmail.com 已提交
40
    public final long MESSAGE_WAIT_TIME = 100;
41
    private LinkedBlockingQueue<SystemMessage> inQueue = null;//new LinkedBlockingQueue<SystemMessage>(IN_QUEUE_SIZE);
P
pjsousa@gmail.com 已提交
42 43 44
    protected MessageHandler messageHandler = new MessageHandler();
    private ServersCommunicationLayer serversConn;
    private CommunicationSystemServerSide clientsConn;
45 46
    private ReconfigurationManager manager;

P
pjsousa@gmail.com 已提交
47 48 49
    /**
     * Creates a new instance of ServerCommunicationSystem
     */
50
    public ServerCommunicationSystem(ReconfigurationManager manager, ServiceReplica replica) throws Exception {
P
pjsousa@gmail.com 已提交
51 52
        super("Server CS");

53 54 55
        this.manager = manager;

        inQueue = new LinkedBlockingQueue<SystemMessage>(manager.getStaticConf().getInQueueSize());
P
pjsousa@gmail.com 已提交
56

57 58
        //create a new conf, with updated port number for servers
        //TOMConfiguration serversConf = new TOMConfiguration(conf.getProcessId(),
B
bessani@gmail.com 已提交
59
        //      Configuration.getHomeDir(), "hosts.config");
P
pjsousa@gmail.com 已提交
60

61
        //serversConf.increasePortNumber();
P
pjsousa@gmail.com 已提交
62

B
bessani@gmail.com 已提交
63
        serversConn = new ServersCommunicationLayer(manager, inQueue, replica);
P
pjsousa@gmail.com 已提交
64

65
        //******* EDUARDO BEGIN **************//
B
bessani@gmail.com 已提交
66
        if (manager.isInCurrentView() || manager.isInInitView()) {
67 68 69
            clientsConn = CommunicationSystemServerSideFactory.getCommunicationSystemServerSide(manager);
        }
        //******* EDUARDO END **************//
70
        //start();
P
pjsousa@gmail.com 已提交
71 72
    }

73 74 75 76 77
    //******* EDUARDO BEGIN **************//
    public void joinViewReceived() {
        serversConn.joinViewReceived();
    }

B
bessani@gmail.com 已提交
78
    public void updateServersConnections() {
79
        this.serversConn.updateConnections();
B
bessani@gmail.com 已提交
80
        if (clientsConn == null) {
81 82 83 84 85 86
            clientsConn = CommunicationSystemServerSideFactory.getCommunicationSystemServerSide(manager);
        }

    }

    //******* EDUARDO END **************//
P
pjsousa@gmail.com 已提交
87 88 89 90 91 92 93 94 95
    public void setAcceptor(Acceptor acceptor) {
        messageHandler.setAcceptor(acceptor);
    }

    public void setTOMLayer(TOMLayer tomLayer) {
        messageHandler.setTOMLayer(tomLayer);
    }

    public void setRequestReceiver(RequestReceiver requestReceiver) {
B
bessani@gmail.com 已提交
96
        if (clientsConn == null) {
97 98
            clientsConn = CommunicationSystemServerSideFactory.getCommunicationSystemServerSide(manager);
        }
P
pjsousa@gmail.com 已提交
99 100 101 102
        clientsConn.setRequestReceiver(requestReceiver);
    }

    /**
B
bessani@gmail.com 已提交
103
     * Thread method responsible for receiving messages sent by other servers.
P
pjsousa@gmail.com 已提交
104 105 106
     */
    @Override
    public void run() {
B
bessani@gmail.com 已提交
107
        long count = 0;
P
pjsousa@gmail.com 已提交
108 109
        while (true) {
            try {
110
                if (count % 1000 == 0 && count > 0) {
B
bessani@gmail.com 已提交
111 112 113 114 115 116 117 118 119 120 121 122 123 124
                    Logger.println("(ServerCommunicationSystem.run) After " + count + " messages, inQueue size=" + inQueue.size());
                }

                SystemMessage sm = inQueue.poll(MESSAGE_WAIT_TIME, TimeUnit.MILLISECONDS);

                if (sm != null) {
                    Logger.println("<-------receiving---------- " + sm);
                    messageHandler.processData(sm);
                    count++;
                } else {                
                    messageHandler.verifyPending();               
                }
            } catch (InterruptedException e) {
                e.printStackTrace(System.err);
P
pjsousa@gmail.com 已提交
125 126 127 128 129
            }
        }
    }

    /**
B
bessani@gmail.com 已提交
130 131 132
     * Send a message to target processes. If the message is an instance of 
     * TOMMessage, it is sent to the clients, otherwise it is set to the
     * servers.
P
pjsousa@gmail.com 已提交
133 134 135 136 137
     *
     * @param targets the target receivers of the message
     * @param sm the message to be sent
     */
    public void send(int[] targets, SystemMessage sm) {
B
bessani@gmail.com 已提交
138 139
        if (sm instanceof TOMMessage) {
            clientsConn.send(targets, (TOMMessage) sm, false);
P
pjsousa@gmail.com 已提交
140
        } else {
B
bessani@gmail.com 已提交
141
            Logger.println("--------sending----------> " + sm);
P
pjsousa@gmail.com 已提交
142 143 144 145 146 147 148 149 150
            serversConn.send(targets, sm);
        }
    }

    @Override
    public String toString() {
        return serversConn.toString();
    }
}