提交 87d699d7 编写于 作者: G Gyula Fora

[streaming] Time trigger preNotify fix

上级 561eaf04
......@@ -107,9 +107,9 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
LinkedList<Object> fakeElements = new LinkedList<Object>();
// check if there is more then one window border missed
// use > here. In case >= would fit, the regular call will do the job.
while (timestamp.getTimestamp(datapoint) > startTime + granularity) {
while (timestamp.getTimestamp(datapoint) >= startTime + granularity) {
startTime += granularity;
fakeElements.add(startTime-1);
fakeElements.add(startTime - 1);
}
return (Object[]) fakeElements.toArray();
}
......@@ -146,7 +146,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
// start time is excluded, but end time is included: >=
if (System.currentTimeMillis() >= startTime + granularity) {
startTime += granularity;
callback.sendFakeElement(startTime-1);
callback.sendFakeElement(startTime - 1);
}
}
......
......@@ -50,8 +50,7 @@ public class TimeTriggerPolicyTest {
// test different granularity
for (long granularity = 0; granularity < 31; granularity++) {
// create policy
TriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(granularity,
timeStamp);
TriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(granularity, timeStamp);
// remember window border
// Remark: This might NOT work in case the timeStamp uses
......@@ -101,11 +100,10 @@ public class TimeTriggerPolicyTest {
};
// create policy
TimeTriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(5,
timeStamp);
TimeTriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(5, timeStamp);
// expected result
Long[][] result = { {}, {}, { 4L, 9L, 14L }, { 24L } };
Long[][] result = { {}, {}, { 4L, 9L, 14L, 19L }, { 24L } };
// call policy
for (int i = 0; i < times.length; i++) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册