RMQSink.java 3.4 KB
Newer Older
J
jfeher 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/***********************************************************************************************************************
 *
 * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 **********************************************************************************************************************/

16
package eu.stratosphere.streaming.rabbitmq;
J
jfeher 已提交
17 18 19

import java.io.IOException;

20 21 22
import org.apache.commons.lang3.SerializationUtils;

import eu.stratosphere.api.java.tuple.Tuple;
J
jfeher 已提交
23
import eu.stratosphere.api.java.tuple.Tuple1;
24
import eu.stratosphere.streaming.api.SinkFunction;
J
jfeher 已提交
25 26 27 28 29 30 31 32 33 34

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

/**
 * Source for sending messages to a RabbitMQ queue. The source currently only
 * support string messages. Other types will be added soon.
 * 
 */
35
public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN>{
J
jfeher 已提交
36 37 38 39 40 41 42 43
	private static final long serialVersionUID = 1L;
	
	private String QUEUE_NAME;
	private String HOST_NAME;
	private transient ConnectionFactory factory;
	private transient Connection connection;
	private transient Channel channel;
	
44
	//TODO Should the user implement the constructor?
J
jfeher 已提交
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
	public RMQSink(String HOST_NAME, String QUEUE_NAME) {
		this.HOST_NAME = HOST_NAME;
		this.QUEUE_NAME = QUEUE_NAME;
	}
	
	public void initializeConnection(){
		factory = new ConnectionFactory();
		factory.setHost(HOST_NAME);
		try {
			connection = factory.newConnection();
			channel = connection.createChannel();
			
			
		    
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
		
	}
66
	/*@Override
J
jfeher 已提交
67 68 69 70 71 72 73
	public void invoke(Tuple1<String> tuple) {
		
		initializeConnection();
		
		try {
			channel.queueDeclare(QUEUE_NAME, false, false, false, null);
			String message = tuple.f0;
74 75 76
			byte[] msg = SerializationUtils.serialize(tuple.f0);
		    //channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
			channel.basicPublish("", QUEUE_NAME, null, msg);
J
jfeher 已提交
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
		} catch (IOException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}
	    
		
		try {
			channel.close();
			connection.close();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	    
		
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
	}*/

	@Override
	public void invoke(IN tuple) {
		// TODO Auto-generated method stub
		initializeConnection();
		
		try {
			channel.queueDeclare(QUEUE_NAME, false, false, false, null);
			//String message = tuple.f0;
			byte[] msg = serialize(tuple);
		    //channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
			channel.basicPublish("", QUEUE_NAME, null, msg);
		} catch (IOException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}
	    
		
		try {
			channel.close();
			connection.close();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
J
jfeher 已提交
118
	}
119 120
	
	public abstract byte[] serialize(Tuple t);
J
jfeher 已提交
121 122

}