提交 edbb3536 编写于 作者: B Boyang Jerry Peng 提交者: xiaolong.ran

Fix instability in Pulsar Function window integration test (#5337)

* fix instability with tumbling window test

* cleaning up

* cleaning up

(cherry picked from commit 094ebf7a)
上级 a4a8eeb3
......@@ -1016,7 +1016,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
.build();
@Cleanup
Reader reader = client.newReader().startMessageId(MessageId.earliest)
Reader<byte[]> reader = client.newReader().startMessageId(MessageId.earliest)
.topic(outputTopicName)
.create();
......@@ -1031,12 +1031,18 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
}
int i = 0;
while (reader.hasMessageAvailable()) {
if (i >= expectedResults.length) {
while (true) {
if (i > expectedResults.length) {
Assertions.fail("More results than expected");
}
String result = new String(reader.readNext().getData()).split(":")[0];
log.info("i: {} result: {}", i, result);
Message<byte[]> msg = reader.readNext(30, TimeUnit.SECONDS);
if (msg == null) {
break;
}
String msgStr = new String(msg.getData());
log.info("i: {} RECV: {}", i, msgStr);
String result = msgStr.split(":")[0];
assertThat(result).contains(expectedResults[i]);
i++;
}
......@@ -1706,18 +1712,25 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
}
private static void getFunctionInfoNotFound(String functionName) throws Exception {
try {
pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"functions",
"get",
"--tenant", "public",
"--namespace", "default",
"--name", functionName);
fail("Command should have exited with non-zero");
} catch (ContainerExecException e) {
assertTrue(e.getResult().getStderr().contains("Reason: Function " + functionName + " doesn't exist"));
}
retryStrategically(aVoid -> {
try {
pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"functions",
"get",
"--tenant", "public",
"--namespace", "default",
"--name", functionName);
} catch (ContainerExecException e) {
if (e.getResult().getStderr().contains("Reason: Function " + functionName + " doesn't exist")) {
return true;
}
} catch (Exception e) {
}
return false;
}, 5, 100, true);
}
private static void checkSubscriptionsCleanup(String topic) throws Exception {
......
......@@ -44,13 +44,29 @@ public class PulsarTestSuite extends PulsarClusterTestBase implements ITest {
return "pulsar-test-suite";
}
public static void retryStrategically(Predicate<Void> predicate, int retryCount, long intSleepTimeInMillis)
public static void retryStrategically(Predicate<Void> predicate, int retryCount, long intSleepTimeInMillis) throws Exception {
retryStrategically(predicate, retryCount, intSleepTimeInMillis, false);
}
public static void retryStrategically(Predicate<Void> predicate, int retryCount, long intSleepTimeInMillis, boolean throwException)
throws Exception {
for (int i = 0; i < retryCount; i++) {
if (predicate.test(null) || i == (retryCount - 1)) {
break;
if (throwException) {
if (i == (retryCount - 1)) {
throw new RuntimeException("Action was not successful after " + retryCount + " retries");
}
if (predicate.test(null)) {
break;
}
} else {
if (predicate.test(null) || i == (retryCount - 1)) {
break;
}
}
Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i));
Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i));
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册