NettyClientServerCommunicationSystemClientSide.java 17.2 KB
Newer Older
P
pjsousa@gmail.com 已提交
1 2
/**
 * Copyright (c) 2007-2009 Alysson Bessani, Eduardo Alchieri, Paulo Sousa, and the authors indicated in the @author tags
3
 *
P
pjsousa@gmail.com 已提交
4
 * This file is part of SMaRt.
5
 *
P
pjsousa@gmail.com 已提交
6 7 8 9
 * SMaRt is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
10
 *
P
pjsousa@gmail.com 已提交
11 12
 * SMaRt is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
P
pjsousa@gmail.com 已提交
14
 * GNU General Public License for more details.
15
 *
P
pjsousa@gmail.com 已提交
16 17 18 19 20 21 22 23 24 25 26 27
 * You should have received a copy of the GNU General Public License along with SMaRt.  If not, see <http://www.gnu.org/licenses/>.
 */
package navigators.smart.communication.client.netty;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.security.PrivateKey;
import java.security.Signature;
import java.security.spec.InvalidKeySpecException;
28
import java.util.ArrayList;
P
pjsousa@gmail.com 已提交
29
import java.util.Enumeration;
30 31
import java.util.HashMap;
import java.util.Iterator;
B
bessani@gmail.com 已提交
32 33
import java.util.LinkedList;
import java.util.ListIterator;
P
pjsousa@gmail.com 已提交
34 35 36 37 38 39 40 41 42 43 44
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import javax.crypto.Mac;
import javax.crypto.SecretKey;
import javax.crypto.SecretKeyFactory;
import javax.crypto.spec.PBEKeySpec;

import navigators.smart.communication.client.CommunicationSystemClientSide;
import navigators.smart.communication.client.ReplyReceiver;
45
import navigators.smart.reconfiguration.ViewManager;
P
pjsousa@gmail.com 已提交
46
import navigators.smart.tom.core.messages.TOMMessage;
47
import navigators.smart.tom.util.Logger;
P
pjsousa@gmail.com 已提交
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
import navigators.smart.tom.util.TOMUtil;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;

/**
 *
 * @author Paulo
 */
@ChannelPipelineCoverage("all")
public class NettyClientServerCommunicationSystemClientSide extends SimpleChannelUpstreamHandler implements CommunicationSystemClientSide {

68 69
    //private static final int MAGIC = 59;
    //private static final int CONNECT_TIMEOUT = 3000;
P
pjsousa@gmail.com 已提交
70 71 72
    private static final String PASSWORD = "newcs";
    //private static final int BENCHMARK_PERIOD = 10000;
    protected ReplyReceiver trr;
73 74 75
    //******* EDUARDO BEGIN **************//
    private ViewManager manager;
    //******* EDUARDO END **************//
76
    private HashMap sessionTable = new HashMap();
P
pjsousa@gmail.com 已提交
77 78 79 80 81
    private ReentrantReadWriteLock rl;
    private SecretKey authKey;
    //the signature engine used in the system
    private Signature signatureEngine;
    //private Storage st;
82 83 84
    //private int count = 0;
    private int signatureLength;
    private boolean closed = false;
P
pjsousa@gmail.com 已提交
85

86
    public NettyClientServerCommunicationSystemClientSide(ViewManager manager) {
87
        super();
P
pjsousa@gmail.com 已提交
88 89 90 91 92
        try {
            SecretKeyFactory fac = SecretKeyFactory.getInstance("PBEWithMD5AndDES");
            PBEKeySpec spec = new PBEKeySpec(PASSWORD.toCharArray());
            authKey = fac.generateSecret(spec);

93
            this.manager = manager;
P
pjsousa@gmail.com 已提交
94 95
            //this.st = new Storage(BENCHMARK_PERIOD);
            this.rl = new ReentrantReadWriteLock();
96 97 98 99 100 101
            Mac macDummy = Mac.getInstance(manager.getStaticConf().getHmacAlgorithm());
            signatureLength = TOMUtil.getSignatureSize(manager);


            int[] currV = manager.getCurrentViewProcesses();
            for (int i = 0; i < currV.length; i++) {
P
pjsousa@gmail.com 已提交
102 103 104 105 106 107 108 109 110
                try {
                    // Configure the client.
                    ClientBootstrap bootstrap = new ClientBootstrap(
                            new NioClientSocketChannelFactory(
                            Executors.newCachedThreadPool(),
                            Executors.newCachedThreadPool()));

                    bootstrap.setOption("tcpNoDelay", true);
                    bootstrap.setOption("keepAlive", true);
111
                    bootstrap.setOption("connectTimeoutMillis", 10000);
P
pjsousa@gmail.com 已提交
112 113

                    // Set up the default event pipeline.
114
                    bootstrap.setPipelineFactory(new NettyClientPipelineFactory(this, true, sessionTable,
115
                            authKey, macDummy.getMacLength(), manager, rl, signatureLength, new ReentrantLock()));
116 117

                    //******* EDUARDO BEGIN **************//
P
pjsousa@gmail.com 已提交
118 119

                    // Start the connection attempt.
120
                    ChannelFuture future = bootstrap.connect(manager.getRemoteAddress(currV[i]));
P
pjsousa@gmail.com 已提交
121 122

                    //creates MAC stuff
123
                    Mac macSend = Mac.getInstance(manager.getStaticConf().getHmacAlgorithm());
P
pjsousa@gmail.com 已提交
124
                    macSend.init(authKey);
125
                    Mac macReceive = Mac.getInstance(manager.getStaticConf().getHmacAlgorithm());
P
pjsousa@gmail.com 已提交
126
                    macReceive.init(authKey);
127
                    NettyClientServerSession cs = new NettyClientServerSession(future.getChannel(), macSend,
128
                            macReceive, currV[i], manager.getStaticConf().getRSAPublicKey(currV[i]), new ReentrantLock());
129 130
                    sessionTable.put(currV[i], cs);

131
                    System.out.println("Connecting to replica " + currV[i] + " at " + manager.getRemoteAddress(currV[i]));
132 133
                    //******* EDUARDO END **************//

P
pjsousa@gmail.com 已提交
134 135
                    future.awaitUninterruptibly();

136 137 138
                    if (!future.isSuccess()) {
                        System.err.println("Impossible to connect to " + currV[i]);
                    }
139

140 141 142 143
                } catch (java.lang.NullPointerException ex) {
                    //What the fuck is this??? This is not possible!!!
                    System.err.println("Deve resolver o problema, e acho que não trás outras implicações :-), "
                            + "mas temos que fazer os servidores armazenarem as view em um lugar default.");
144 145 146 147 148 149 150

                } catch (InvalidKeyException ex) {
                }
            }
        } catch (InvalidKeySpecException ex) {
        } catch (NoSuchAlgorithmException ex) {
        }
151
    }
152

B
bessani@gmail.com 已提交
153
    @Override
154 155 156 157 158
    public void updateConnections() {
        try {
            Mac macDummy = Mac.getInstance(manager.getStaticConf().getHmacAlgorithm());
            int[] currV = manager.getCurrentViewProcesses();

B
bessani@gmail.com 已提交
159
            //open connections with new servers
160
            for (int i = 0; i < currV.length; i++) {
161
                rl.readLock().lock();
162
                if (sessionTable.get(currV[i]) == null) {
163 164 165

                    rl.readLock().unlock();
                    rl.writeLock().lock();
166 167 168 169 170 171 172 173 174
                    try {
                        // Configure the client.
                        ClientBootstrap bootstrap = new ClientBootstrap(
                                new NioClientSocketChannelFactory(
                                Executors.newCachedThreadPool(),
                                Executors.newCachedThreadPool()));

                        bootstrap.setOption("tcpNoDelay", true);
                        bootstrap.setOption("keepAlive", true);
175
                        bootstrap.setOption("connectTimeoutMillis", 10000);
176 177

                        // Set up the default event pipeline.
B
bessani@gmail.com 已提交
178 179 180 181
                        bootstrap.setPipelineFactory(
                                new NettyClientPipelineFactory(this, true, sessionTable,
                                authKey, macDummy.getMacLength(), manager, rl, signatureLength,
                                new ReentrantLock()));
182 183 184 185


                        //******* EDUARDO BEGIN **************//
                        // Start the connection attempt.
186
                        ChannelFuture future = bootstrap.connect(manager.getRemoteAddress(currV[i]));
187 188 189 190 191 192 193 194 195

                        //creates MAC stuff
                        Mac macSend = Mac.getInstance(manager.getStaticConf().getHmacAlgorithm());
                        macSend.init(authKey);
                        Mac macReceive = Mac.getInstance(manager.getStaticConf().getHmacAlgorithm());
                        macReceive.init(authKey);
                        NettyClientServerSession cs = new NettyClientServerSession(future.getChannel(), macSend, macReceive, currV[i], manager.getStaticConf().getRSAPublicKey(currV[i]), new ReentrantLock());
                        sessionTable.put(currV[i], cs);

196
                        System.out.println("Connecting to replica " + currV[i] + " at " + manager.getRemoteAddress(currV[i]));
197 198 199 200 201 202
                        //******* EDUARDO END **************//

                        future.awaitUninterruptibly();

                    } catch (InvalidKeyException ex) {
                    }
203 204
                    rl.writeLock().unlock();
                } else rl.readLock().unlock();
205
            }
B
bessani@gmail.com 已提交
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
            
            //close connections with removed servers
            //ANB: This code need to be tested!!!
            ListIterator ids = new LinkedList(sessionTable.keySet()).listIterator();
            while (ids.hasNext()) {
                int id = (Integer) ids.next();

                boolean found = false;
                for (int v : currV) {
                    if (v == id) {
                        found = true;
                        break;
                    }
                }

                if (!found) {
                    NettyClientServerSession cs =
                            (NettyClientServerSession) sessionTable.remove(id);
                    cs.getChannel().close();
                }
            }

228 229 230 231
        } catch (NoSuchAlgorithmException ex) {
        }
    }

P
pjsousa@gmail.com 已提交
232
    @Override
233 234
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
        //if (!(e.getCause() instanceof ClosedChannelException) && !(e.getCause() instanceof ConnectException)) {
235
        System.out.println("Excepção no  CS (client side)!");
236
        e.getCause().printStackTrace();
237
        //}
P
pjsousa@gmail.com 已提交
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257
    }

    @Override
    public void messageReceived(
            ChannelHandlerContext ctx, MessageEvent e) {
        TOMMessage sm = (TOMMessage) e.getMessage();

        //delivers message to replyReceived callback
        trr.replyReceived(sm);
    }

    @Override
    public void channelConnected(
            ChannelHandlerContext ctx, ChannelStateEvent e) {
        System.out.println("Channel connected");
    }

    @Override
    public void channelClosed(
            ChannelHandlerContext ctx, ChannelStateEvent e) {
258 259 260
        if (this.closed) {
            return;
        }
261

P
pjsousa@gmail.com 已提交
262 263 264 265 266
        try {
            //sleeps 10 seconds before trying to reconnect
            Thread.sleep(10000);
        } catch (InterruptedException ex) {
        }
267

P
pjsousa@gmail.com 已提交
268
        rl.writeLock().lock();
269 270 271 272 273
        //Iterator sessions = sessionTable.values().iterator();

        ArrayList<NettyClientServerSession> sessions = new ArrayList<NettyClientServerSession>(sessionTable.values());
        for (NettyClientServerSession ncss : sessions) {

P
pjsousa@gmail.com 已提交
274 275
            if (ncss.getChannel() == ctx.getChannel()) {
                try {
276 277 278 279

                    //******* EDUARDO BEGIN **************//
                    Mac macDummy = Mac.getInstance(manager.getStaticConf().getHmacAlgorithm());
                    // Configure the client.
P
pjsousa@gmail.com 已提交
280 281
                    ClientBootstrap bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
                    // Set up the default event pipeline.
282
                    bootstrap.setPipelineFactory(new NettyClientPipelineFactory(this, true, sessionTable, authKey, macDummy.getMacLength(), manager, rl, TOMUtil.getSignatureSize(manager), new ReentrantLock()));
P
pjsousa@gmail.com 已提交
283
                    // Start the connection attempt.
284
                    if (manager.getRemoteAddress(ncss.getReplicaId()) != null) {
285

286 287
                        ChannelFuture future = bootstrap.connect(manager.getRemoteAddress(ncss.getReplicaId()));
                        //******* EDUARDO END **************//
288

289 290 291 292 293 294 295 296 297 298 299 300 301

                        //creates MAC stuff
                        Mac macSend = ncss.getMacSend();
                        Mac macReceive = ncss.getMacReceive();
                        NettyClientServerSession cs = new NettyClientServerSession(future.getChannel(), macSend, macReceive, ncss.getReplicaId(), manager.getStaticConf().getRSAPublicKey(ncss.getReplicaId()), new ReentrantLock());
                        sessionTable.remove(ncss.getReplicaId());
                        sessionTable.put(ncss.getReplicaId(), cs);
                        //System.out.println("RE-Connecting to replica "+ncss.getReplicaId()+" at " + conf.getRemoteAddress(ncss.getReplicaId()));

                    } else {
                        // This cleans an olde server from the session table
                        sessionTable.remove(ncss.getReplicaId());
                    }
P
pjsousa@gmail.com 已提交
302 303 304 305 306 307 308 309 310 311 312 313 314 315
                } catch (NoSuchAlgorithmException ex) {
                }
            }

        }

        //closes all other channels to avoid messages being sent to only a subset of the replicas
        /*Enumeration sessionElements = sessionTable.elements();
        while (sessionElements.hasMoreElements()){
        ((NettyClientServerSession) sessionElements.nextElement()).getChannel().close();
        }*/
        rl.writeLock().unlock();
    }

316
    @Override
P
pjsousa@gmail.com 已提交
317 318 319 320
    public void setReplyReceiver(ReplyReceiver trr) {
        this.trr = trr;
    }

321
    @Override
B
bessani@gmail.com 已提交
322
    public void send(boolean sign, int[] targets, TOMMessage sm) {
P
pjsousa@gmail.com 已提交
323
        if (sm.serializedMessage == null) {
B
bessani@gmail.com 已提交
324

P
pjsousa@gmail.com 已提交
325 326 327 328
            //serialize message
            DataOutputStream dos = null;
            try {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
B
bessani@gmail.com 已提交
329 330 331 332
                dos = new DataOutputStream(baos);
                sm.wExternal(dos);
                dos.flush();
                sm.serializedMessage = baos.toByteArray();
P
pjsousa@gmail.com 已提交
333
            } catch (IOException ex) {
334
                Logger.println("Impossible to serialize message: " + sm);
P
pjsousa@gmail.com 已提交
335 336
            } finally {
                try {
B
bessani@gmail.com 已提交
337 338
                    dos.close();
                } catch (IOException ex) { }
P
pjsousa@gmail.com 已提交
339 340 341
            }
        }

B
bessani@gmail.com 已提交
342 343
        //Logger.println("Sending message with "+sm.serializedMessage.length+" bytes of content.");

P
pjsousa@gmail.com 已提交
344
        //produce signature
B
bessani@gmail.com 已提交
345 346 347
        if (sign && sm.serializedMessageSignature == null) {
            sm.serializedMessageSignature = signMessage(
                    manager.getStaticConf().getRSAPrivateKey(), sm.serializedMessage);
P
pjsousa@gmail.com 已提交
348 349
        }

350
        int sent = 0;
P
pjsousa@gmail.com 已提交
351 352
        for (int i = targets.length - 1; i >= 0; i--) {
            sm.destination = targets[i];
353

P
pjsousa@gmail.com 已提交
354
            rl.readLock().lock();
B
bessani@gmail.com 已提交
355
            Channel channel = ((NettyClientServerSession) sessionTable.get(targets[i])).getChannel();
P
pjsousa@gmail.com 已提交
356 357 358 359
            rl.readLock().unlock();
            if (channel.isConnected()) {
                sm.signed = sign;
                channel.write(sm);
360
                sent++;
P
pjsousa@gmail.com 已提交
361
            } else {
362
                Logger.println("Channel to " + targets[i] + " is not connected");
P
pjsousa@gmail.com 已提交
363
            }
364 365

        }
B
bessani@gmail.com 已提交
366

367
        if (sent < manager.getCurrentViewF() + 1) {
B
bessani@gmail.com 已提交
368
            //if less than f+1 servers are connected send an exception to the client
369
            throw new RuntimeException("Impossible to connect to servers!");
P
pjsousa@gmail.com 已提交
370 371 372 373 374 375 376 377 378 379
        }
    }

    public void sign(TOMMessage sm) {
        //serialize message
        DataOutputStream dos = null;
        byte[] data = null;
        try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            dos = new DataOutputStream(baos);
380
            sm.wExternal(dos);
P
pjsousa@gmail.com 已提交
381 382 383 384 385 386 387
            dos.flush();
            data = baos.toByteArray();
            sm.serializedMessage = data;
        } catch (IOException ex) {
        } finally {
            try {
                dos.close();
B
bessani@gmail.com 已提交
388
            } catch (IOException ex) { }
P
pjsousa@gmail.com 已提交
389 390
        }

391 392 393 394 395
        //******* EDUARDO BEGIN **************//
        //produce signature
        byte[] data2 = signMessage(manager.getStaticConf().getRSAPrivateKey(), data);
        //******* EDUARDO END **************//

P
pjsousa@gmail.com 已提交
396 397 398 399
        sm.serializedMessageSignature = data2;
    }

    public byte[] signMessage(PrivateKey key, byte[] message) {
400
        //long startTime = System.nanoTime();
P
pjsousa@gmail.com 已提交
401 402 403 404 405
        try {
            if (signatureEngine == null) {
                signatureEngine = Signature.getInstance("SHA1withRSA");
            }
            byte[] result = null;
406

P
pjsousa@gmail.com 已提交
407 408 409
            signatureEngine.initSign(key);
            signatureEngine.update(message);
            result = signatureEngine.sign();
410

P
pjsousa@gmail.com 已提交
411 412 413 414 415 416 417
            //st.store(System.nanoTime() - startTime);
            return result;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
418

419
    @Override
420 421
    public void close() {
        this.closed = true;
422 423 424 425 426 427
        //Iterator sessions = sessionTable.values().iterator();
        rl.readLock().lock();
        ArrayList<NettyClientServerSession> sessions = new ArrayList<NettyClientServerSession>(sessionTable.values());
        rl.readLock().unlock();
        for (NettyClientServerSession ncss : sessions) {

428 429 430
            ncss.getChannel().close();
        }
    }
P
pjsousa@gmail.com 已提交
431
}