提交 ac58ccbc 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] replaced synchronized methods with concurrent collections

上级 a10202fb
......@@ -15,8 +15,8 @@
package eu.stratosphere.streaming.faulttolerance;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.api.streamrecord.UID;
......@@ -27,7 +27,7 @@ public class AtLeastOnceFaultToleranceBuffer extends FaultToleranceBuffer {
public AtLeastOnceFaultToleranceBuffer(int[] numberOfChannels, int componentInstanceID) {
super(numberOfChannels, componentInstanceID);
this.ackCounter = new HashMap<UID, Integer>();
this.ackCounter = new ConcurrentHashMap<UID, Integer>();
}
......
......@@ -16,8 +16,8 @@
package eu.stratosphere.streaming.faulttolerance;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.api.streamrecord.UID;
......@@ -29,7 +29,7 @@ public class ExactlyOnceFaultToleranceBuffer extends FaultToleranceBuffer {
public ExactlyOnceFaultToleranceBuffer(int[] numberOfChannels, int sourceInstanceID) {
super(numberOfChannels, sourceInstanceID);
this.ackCounter = new HashMap<UID, int[]>();
this.ackCounter = new ConcurrentHashMap<UID, int[]>();
this.initialAckCounts = new int[numberOfEffectiveChannels.length + 1];
for (int i = 0; i < numberOfEffectiveChannels.length; i++) {
this.initialAckCounts[i + 1] = numberOfEffectiveChannels[i];
......@@ -85,7 +85,7 @@ public class ExactlyOnceFaultToleranceBuffer extends FaultToleranceBuffer {
acks[0]++;
StreamRecord newRecord = addToChannel(id, channel);
if (acks[0] == numberOfEffectiveChannels.length) {
remove(id);
}
......
......@@ -15,14 +15,14 @@
package eu.stratosphere.streaming.faulttolerance;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
......@@ -55,12 +55,12 @@ public abstract class FaultToleranceBuffer {
this.componentInstanceID = componentInstanceID;
this.timeOfLastUpdate = System.currentTimeMillis();
this.recordBuffer = new HashMap<UID, StreamRecord>();
this.recordsByTime = new TreeMap<Long, Set<UID>>();
this.recordTimestamps = new HashMap<UID, Long>();
this.recordBuffer = new ConcurrentHashMap<UID, StreamRecord>();
this.recordsByTime = new ConcurrentSkipListMap<Long, Set<UID>>();
this.recordTimestamps = new ConcurrentHashMap<UID, Long>();
}
public synchronized void add(StreamRecord streamRecord) {
public void add(StreamRecord streamRecord) {
StreamRecord record = streamRecord.copy();
UID id = record.getId();
......@@ -73,7 +73,7 @@ public abstract class FaultToleranceBuffer {
log.trace("Record added to buffer: " + id);
}
public synchronized void add(StreamRecord streamRecord, int channel) {
public void add(StreamRecord streamRecord, int channel) {
StreamRecord record = streamRecord.copy();
......@@ -125,7 +125,7 @@ public abstract class FaultToleranceBuffer {
}
public synchronized StreamRecord remove(UID uid) {
public StreamRecord remove(UID uid) {
if (removeFromAckCounter(uid)) {
......
......@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.api.streamrecord.UID;
import eu.stratosphere.streaming.util.PerformanceTracker;
/**
* An object to provide fault tolerance for Stratosphere stream processing. It
......@@ -41,6 +42,7 @@ public class FaultToleranceUtil {
boolean exactlyOnce;
private FaultToleranceBuffer buffer;
public PerformanceTracker counter;
/**
* Creates fault tolerance buffer object for the given output channels and
......@@ -69,6 +71,9 @@ public class FaultToleranceUtil {
this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
}
counter = new PerformanceTracker("pc", 1000, 1000, 3000, "C:/temp/strato/buffer/Buffer"
+ sourceInstanceID + "-1.csv");
}
/**
......@@ -81,7 +86,7 @@ public class FaultToleranceUtil {
public void addRecord(StreamRecord streamRecord) {
buffer.add(streamRecord);
counter.track(this.buffer.recordBuffer.size());
}
public void addRecord(StreamRecord streamRecord, int output) {
......@@ -103,6 +108,7 @@ public class FaultToleranceUtil {
// TODO: find a place to call timeoutRecords
public void ackRecord(UID recordID, int channel) {
buffer.ack(recordID, channel);
}
/**
......
#!/bin/bash
toDir=$1
echo COPYING:
if [ -d "${toDir}" ] ; then
ssh strato@dell150.ilab.sztaki.hu '
for j in {101..142} 144 145;
do
echo -n $j,
for i in $(ssh dell$j "ls stratosphere-distrib/log/counter/");
do scp strato@dell$j:stratosphere-distrib/log/counter/$i stratosphere-distrib/log/all_tests/counter/$i-$j.csv;
done
......@@ -20,7 +19,6 @@ if [ -d "${toDir}" ] ; then
done
done
'
echo 150
scp strato@dell150.ilab.sztaki.hu:stratosphere-distrib/log/all_tests/counter/* $toDir/counter/
scp strato@dell150.ilab.sztaki.hu:stratosphere-distrib/log/all_tests/timer/* $toDir/timer/
else
......
#!/bin/bash
echo REMOVING:
ssh strato@dell150.ilab.sztaki.hu '
for j in {101..142} 144 145;
do
echo -n $j,
$(ssh dell$j "rm stratosphere-distrib/log/counter/*");
$(ssh dell$j "rm stratosphere-distrib/log/timer/*");
$(ssh dell$j 'rm stratosphere-distrib/log/counter/*');
$(ssh dell$j 'rm stratosphere-distrib/log/timer/*');
done
echo 150
rm stratosphere-distrib/log/counter/*
rm stratosphere-distrib/log/timer/*
rm stratosphere-distrib/log/all_tests/counter/*
rm stratosphere-distrib/log/all_tests/timer/*
'
\ No newline at end of file
'
#!/bin/bash
toDir=$1
testParams=$2
if [ -d "${toDir}" ] ; then
echo "removing files"
./remove-files.sh
paramsWithSpace="${testParams//_/ }"
rm -r $toDir/$testParams/*;
mkdir $toDir/$testParams;
mkdir $toDir/$testParams/counter;
mkdir $toDir/$testParams/timer;
ssh -n strato@dell150.ilab.sztaki.hu "./stratosphere-distrib/bin/stratosphere run -j ./stratosphere-distrib/lib/stratosphere-streaming-0.5-SNAPSHOT.jar -c eu.stratosphere.streaming.examples.wordcount.WordCountLocal -a cluster ${paramsWithSpace}"
echo "job finished"
echo "copying"
./copy-files.sh $toDir/$testParams
else
echo "USAGE:"
echo "run <directory> <test params separated by _>"
fi
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册