StreamCollectorManager.java 3.4 KB
Newer Older
G
ghermann 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/***********************************************************************************************************************
 *
 * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 **********************************************************************************************************************/

16
package eu.stratosphere.streaming.api;
G
ghermann 已提交
17 18 19 20 21

import java.util.ArrayList;
import java.util.List;

import eu.stratosphere.api.java.tuple.Tuple;
22
import eu.stratosphere.nephele.io.RecordWriter;
G
ghermann 已提交
23
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
24
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
G
ghermann 已提交
25 26
import eu.stratosphere.util.Collector;

M
Márton Balassi 已提交
27
public class StreamCollectorManager<T extends Tuple> implements Collector<T> {
G
ghermann 已提交
28 29 30

	ArrayList<StreamCollector<Tuple>> notPartitionedCollectors;
	ArrayList<StreamCollector<Tuple>[]> partitionedCollectors;
31 32
	List<RecordWriter<StreamRecord>> partitionedOutputs;
	List<RecordWriter<StreamRecord>> notPartitionedOutputs;
G
ghermann 已提交
33
	int keyPostition;
34

G
ghermann 已提交
35
	// TODO consider channelID
M
Márton Balassi 已提交
36 37 38
	public StreamCollectorManager(List<Integer> batchSizesOfNotPartitioned,
			List<Integer> batchSizesOfPartitioned, List<Integer> parallelismOfOutput,
			int keyPosition, long batchTimeout, int channelID,
G
ghermann 已提交
39
			SerializationDelegate<Tuple> serializationDelegate,
40 41
			List<RecordWriter<StreamRecord>> partitionedOutputs,
			List<RecordWriter<StreamRecord>> notPartitionedOutputs) {
G
ghermann 已提交
42 43

		notPartitionedCollectors = new ArrayList<StreamCollector<Tuple>>(
44
				batchSizesOfNotPartitioned.size());
G
ghermann 已提交
45
		partitionedCollectors = new ArrayList<StreamCollector<Tuple>[]>(
46
				batchSizesOfPartitioned.size());
G
ghermann 已提交
47 48

		this.keyPostition = keyPosition;
49 50

		for (int i = 0; i < batchSizesOfNotPartitioned.size(); i++) {
M
Márton Balassi 已提交
51 52 53
			notPartitionedCollectors.add(new StreamCollector<Tuple>(batchSizesOfNotPartitioned
					.get(i), batchTimeout, channelID, serializationDelegate, notPartitionedOutputs
					.get(i)));
G
ghermann 已提交
54 55
		}

56 57
		for (int i = 0; i < batchSizesOfPartitioned.size(); i++) {
			StreamCollector<Tuple>[] collectors = new StreamCollector[parallelismOfOutput.get(i)];
G
ghermann 已提交
58
			for (int j = 0; j < collectors.length; j++) {
59
				collectors[j] = new StreamCollector<Tuple>(batchSizesOfPartitioned.get(i),
M
Márton Balassi 已提交
60
						batchTimeout, channelID, serializationDelegate, partitionedOutputs.get(i));
G
ghermann 已提交
61 62 63 64
			}
			partitionedCollectors.add(collectors);
		}
	}
65

G
ghermann 已提交
66 67
	// TODO copy here instead of copying inside every StreamCollector
	@Override
M
Márton Balassi 已提交
68 69
	public void collect(T tuple) {
		T copiedTuple = StreamRecord.copyTuple(tuple);
M
Márton Balassi 已提交
70

G
ghermann 已提交
71
		for (StreamCollector<Tuple> collector : notPartitionedCollectors) {
M
Márton Balassi 已提交
72
			collector.collect(copiedTuple);
G
ghermann 已提交
73
		}
74

M
Márton Balassi 已提交
75
		int partitionHash = Math.abs(copiedTuple.getField(keyPostition).hashCode());
76

G
ghermann 已提交
77
		for (StreamCollector<Tuple>[] collectors : partitionedCollectors) {
M
Márton Balassi 已提交
78
			collectors[partitionHash % collectors.length].collect(copiedTuple);
G
ghermann 已提交
79 80 81 82 83 84 85 86
		}
	}

	@Override
	public void close() {

	}
}