提交 84364ddc 编写于 作者: V vzhikserg 提交者: Matteo Merli

Convert anonymous classes to lambda (#4703)

* Convert anonymous functions to lambda

* Replacing lambda with anonymous implementation, because lambda cannot be mocked
上级 8a3b3af6
......@@ -1294,31 +1294,25 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
final Position position = ledger.addEntry("entry-0".getBytes());
Executor executor = Executors.newCachedThreadPool();
final CountDownLatch counter = new CountDownLatch(2);
executor.execute(new Runnable() {
@Override
public void run() {
try {
for (int i = 0; i < N; i++) {
c1.markDelete(position);
}
counter.countDown();
} catch (Exception e) {
e.printStackTrace();
executor.execute(() -> {
try {
for (int i = 0; i < N; i++) {
c1.markDelete(position);
}
counter.countDown();
} catch (Exception e) {
e.printStackTrace();
}
});
executor.execute(new Runnable() {
@Override
public void run() {
try {
for (int i = 0; i < N; i++) {
ledger.openCursor("cursor-" + i);
}
counter.countDown();
} catch (Exception e) {
e.printStackTrace();
executor.execute(() -> {
try {
for (int i = 0; i < N; i++) {
ledger.openCursor("cursor-" + i);
}
counter.countDown();
} catch (Exception e) {
e.printStackTrace();
}
});
......@@ -1832,12 +1826,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
ManagedLedgerImpl newVersionLedger = (ManagedLedgerImpl) factory.open("backward_test_ledger", conf);
List<LedgerInfo> mlInfo = newVersionLedger.getLedgersInfoAsList();
assertTrue(mlInfo.stream().allMatch(new Predicate<LedgerInfo>() {
@Override
public boolean test(LedgerInfo ledgerInfo) {
return ledgerInfo.hasTimestamp();
}
}));
assertTrue(mlInfo.stream().allMatch(ledgerInfo -> ledgerInfo.hasTimestamp()));
}
@Test
......@@ -2288,15 +2277,8 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
responseException1.set(exception);
}
}, ctxStr);
ledger.asyncCreateLedger(bk, config, null, new CreateCallback() {
@Override
public void createComplete(int rc, LedgerHandle lh, Object ctx) {
}
}, Collections.emptyMap());
retryStrategically((test) -> {
return responseException1.get() != null;
}, 5, 1000);
ledger.asyncCreateLedger(bk, config, null, (rc, lh, ctx) -> {}, Collections.emptyMap());
retryStrategically((test) -> responseException1.get() != null, 5, 1000);
assertNotNull(responseException1.get());
assertEquals(responseException1.get().getMessage(), BKException.getMessage(BKException.Code.TimeoutException));
......
......@@ -35,15 +35,12 @@ public class CallbackMutexTest {
salary.add(1000);
// No thread competition here
// We will test thread competition in unlock()
new Thread(new Runnable() {
@Override
public void run() {
cbm.lock();
if (salary.value() == 1000)
salary.add(2000);
cbm.unlock();
Assert.assertEquals(salary.value(), 3000);
}
new Thread(() -> {
cbm.lock();
if (salary.value() == 1000)
salary.add(2000);
cbm.unlock();
Assert.assertEquals(salary.value(), 3000);
}).start();
}
......
......@@ -106,11 +106,7 @@ public class MockZooKeeper extends ZooKeeper {
private MockZooKeeper(String quorum) throws Exception {
// This constructor is never called
super(quorum, 1, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
super(quorum, 1, event -> {});
assert false;
}
......
......@@ -99,14 +99,11 @@ public class LeaderElectionService {
log.warn("Election node {} is deleted, attempting re-election...", event.getPath());
if (event.getPath().equals(ELECTION_ROOT)) {
log.info("This should call elect again...");
executor.execute(new Runnable() {
@Override
public void run() {
// If the node is deleted, attempt the re-election
log.info("Broker [{}] is calling re-election from the thread",
pulsar.getSafeWebServiceAddress());
elect();
}
executor.execute(() -> {
// If the node is deleted, attempt the re-election
log.info("Broker [{}] is calling re-election from the thread",
pulsar.getSafeWebServiceAddress());
elect();
});
}
break;
......@@ -148,12 +145,7 @@ public class LeaderElectionService {
log.warn(
"Got exception [{}] while creating election node because it already exists. Attempting re-election...",
nee.getMessage());
executor.execute(new Runnable() {
@Override
public void run() {
elect();
}
});
executor.execute(this::elect);
} catch (Exception e) {
// Kill the broker because this broker's session with zookeeper might be stale. Killing the broker will
// make sure that we get the fresh zookeeper session.
......
......@@ -275,12 +275,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
// register listener to capture zk-latency
zkStatsListener = new EventListner() {
@Override
public void recordLatency(EventType eventType, long latencyMs) {
pulsarStats.recordZkLatencyTimeValue(eventType, latencyMs);
}
};
zkStatsListener = (eventType, latencyMs) -> pulsarStats.recordZkLatencyTimeValue(eventType, latencyMs);
this.delayedDeliveryTrackerFactory = DelayedDeliveryTrackerLoader
.loadDelayedDeliveryTrackerFactory(pulsar.getConfiguration());
......
......@@ -671,13 +671,10 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
final CyclicBarrier barrier = new CyclicBarrier(numConsumersThreads + 1);
for (int i = 0; i < numConsumersThreads; i++) {
executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
barrier.await();
consumer.receive();
return null;
}
executor.submit((Callable<Void>) () -> {
barrier.await();
consumer.receive();
return null;
});
}
......@@ -712,13 +709,10 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
barrier.reset();
for (int i = 0; i < numConsumersThreads; i++) {
executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
barrier.await();
consumer.receive();
return null;
}
executor.submit((Callable<Void>) () -> {
barrier.await();
consumer.receive();
return null;
});
}
barrier.await();
......@@ -742,13 +736,10 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
barrier.reset();
for (int i = 0; i < numConsumersThreads; i++) {
executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
barrier.await();
consumer.receive();
return null;
}
executor.submit((Callable<Void>) () -> {
barrier.await();
consumer.receive();
return null;
});
}
barrier.await();
......
......@@ -136,11 +136,7 @@ public class PulsarFunctionLocalRunTest {
// delete all function temp files
File dir = new File(System.getProperty("java.io.tmpdir"));
File[] foundFiles = dir.listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
return name.startsWith("function");
}
});
File[] foundFiles = dir.listFiles((ignoredDir, name) -> name.startsWith("function"));
for (File file : foundFiles) {
file.delete();
......
......@@ -123,11 +123,7 @@ public class PulsarFunctionPublishTest {
// delete all function temp files
File dir = new File(System.getProperty("java.io.tmpdir"));
File[] foundFiles = dir.listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
return name.startsWith("function");
}
});
File[] foundFiles = dir.listFiles((ignoredDir, name) -> name.startsWith("function"));
for (File file : foundFiles) {
file.delete();
......@@ -376,11 +372,7 @@ public class PulsarFunctionPublishTest {
// make sure all temp files are deleted
File dir = new File(System.getProperty("java.io.tmpdir"));
File[] foundFiles = dir.listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
return name.startsWith("function");
}
});
File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
}
......
......@@ -146,11 +146,7 @@ public class PulsarFunctionStateTest {
// delete all function temp files
File dir = new File(System.getProperty("java.io.tmpdir"));
File[] foundFiles = dir.listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
return name.startsWith("function");
}
});
File[] foundFiles = dir.listFiles((ignoredDir, name) -> name.startsWith("function"));
for (File file : foundFiles) {
file.delete();
......@@ -411,11 +407,7 @@ public class PulsarFunctionStateTest {
// make sure all temp files are deleted
File dir = new File(System.getProperty("java.io.tmpdir"));
File[] foundFiles = dir.listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
return name.startsWith("function");
}
});
File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
}
......
......@@ -149,11 +149,7 @@ public class PulsarFunctionE2ETest {
// delete all function temp files
File dir = new File(System.getProperty("java.io.tmpdir"));
File[] foundFiles = dir.listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
return name.startsWith("function");
}
});
File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
for (File file : foundFiles) {
file.delete();
......@@ -162,7 +158,7 @@ public class PulsarFunctionE2ETest {
log.info("--- Setting up method {} ---", method.getName());
// Start local bookkeeper ensemble
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager::nextFreePort);
bkEnsemble.start();
String brokerServiceUrl = "https://127.0.0.1:" + brokerWebServiceTlsPort;
......@@ -492,11 +488,7 @@ public class PulsarFunctionE2ETest {
// make sure all temp files are deleted
File dir = new File(System.getProperty("java.io.tmpdir"));
File[] foundFiles = dir.listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
return name.startsWith("function");
}
});
File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
}
......@@ -726,11 +718,7 @@ public class PulsarFunctionE2ETest {
// make sure all temp files are deleted
File dir = new File(System.getProperty("java.io.tmpdir"));
File[] foundFiles = dir.listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
return name.startsWith("function");
}
});
File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
}
......@@ -869,11 +857,7 @@ public class PulsarFunctionE2ETest {
// make sure all temp files are deleted
File dir = new File(System.getProperty("java.io.tmpdir"));
File[] foundFiles = dir.listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
return name.startsWith("function");
}
});
File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
}
......@@ -1221,11 +1205,7 @@ public class PulsarFunctionE2ETest {
// make sure all temp files are deleted
File dir = new File(System.getProperty("java.io.tmpdir"));
File[] foundFiles = dir.listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
return name.startsWith("function");
}
});
File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
}
......
......@@ -124,12 +124,9 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NarClassLoader extends URLClassLoader {
private static final FileFilter JAR_FILTER = new FileFilter() {
@Override
public boolean accept(File pathname) {
final String nameToTest = pathname.getName().toLowerCase();
return nameToTest.endsWith(".jar") && pathname.isFile();
}
private static final FileFilter JAR_FILTER = pathname -> {
final String nameToTest = pathname.getName().toLowerCase();
return nameToTest.endsWith(".jar") && pathname.isFile();
};
/**
......
......@@ -239,12 +239,7 @@ public class RateLimiter implements AutoCloseable{
}
protected ScheduledFuture<?> createTask() {
return executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
renew();
}
}, this.rateTime, this.rateTime, this.timeUnit);
return executorService.scheduleAtFixedRate(this::renew, this.rateTime, this.rateTime, this.timeUnit);
}
synchronized void renew() {
......
......@@ -376,11 +376,7 @@ public class ConcurrentLongHashMapTest {
public void testComputeIfAbsent() {
ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(16, 1);
AtomicInteger counter = new AtomicInteger();
LongFunction<Integer> provider = new LongFunction<Integer>() {
public Integer apply(long key) {
return counter.getAndIncrement();
}
};
LongFunction<Integer> provider = key -> counter.getAndIncrement();
assertEquals(map.computeIfAbsent(0, provider).intValue(), 0);
assertEquals(map.get(0).intValue(), 0);
......
......@@ -77,14 +77,11 @@ public abstract class ComponentStatsManager implements AutoCloseable {
this.collectorRegistry = collectorRegistry;
this.metricsLabels = metricsLabels;
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
reset();
} catch (Exception e) {
log.error("Failed to reset metrics for 1min window", e);
}
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
reset();
} catch (Exception e) {
log.error("Failed to reset metrics for 1min window", e);
}
}, 1, 1, TimeUnit.MINUTES);
}
......
......@@ -88,12 +88,7 @@ public class ContextImplTest {
client,
new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null);
context.setCurrentMessageContext(new Record<String>() {
@Override
public String getValue() {
return null;
}
});
context.setCurrentMessageContext((Record<String>) () -> null);
}
@Test(expectedExceptions = IllegalStateException.class)
......
......@@ -53,13 +53,7 @@ public abstract class CanalAbstractSource<V> extends PushSource<V> {
private static final String DESTINATION = "destination";
protected final Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
log.error("[{}] parse events has an error", t.getName(), e);
}
};
protected final Thread.UncaughtExceptionHandler handler = (t, e) -> log.error("[{}] parse events has an error", t.getName(), e);
@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
......@@ -82,14 +76,7 @@ public abstract class CanalAbstractSource<V> extends PushSource<V> {
protected void start() {
Objects.requireNonNull(connector, "connector is null");
thread = new Thread(new Runnable() {
@Override
public void run() {
process();
}
});
thread = new Thread(this::process);
thread.setName("canal source thread");
thread.setUncaughtExceptionHandler(handler);
running = true;
......
......@@ -72,12 +72,7 @@ public class PollingZooKeeperConfigurationProvider extends
try {
agentNodeCache = new NodeCache(client, basePath + "/" + getAgentName());
agentNodeCache.start();
agentNodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
refreshConfiguration();
}
});
agentNodeCache.getListenable().addListener(() -> refreshConfiguration());
} catch (Exception e) {
client.close();
throw e;
......
......@@ -46,13 +46,7 @@ public abstract class AbstractSource<V> extends PushSource<V> {
protected volatile boolean running = false;
protected final Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
log.error("[{}] parse events has an error", t.getName(), e);
}
};
protected final Thread.UncaughtExceptionHandler handler = (t, e) -> log.error("[{}] parse events has an error", t.getName(), e);
@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
......@@ -69,14 +63,7 @@ public abstract class AbstractSource<V> extends PushSource<V> {
public abstract V extractValue(String message);
protected void start() {
thread = new Thread(new Runnable() {
@Override
public void run() {
process();
}
});
thread = new Thread(this::process);
thread.setName("flume source thread");
thread.setUncaughtExceptionHandler(handler);
running = true;
......
......@@ -163,12 +163,7 @@ public abstract class AbstractHdfsConnector {
protected FileSystem getFileSystemAsUser(final Configuration config, UserGroupInformation ugi) throws IOException {
try {
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws Exception {
return FileSystem.get(config);
}
});
return ugi.doAs((PrivilegedExceptionAction<FileSystem>) () -> FileSystem.get(config));
} catch (InterruptedException e) {
throw new IOException("Unable to create file system: " + e.getMessage());
}
......
......@@ -163,12 +163,7 @@ public abstract class AbstractHdfsConnector {
protected FileSystem getFileSystemAsUser(final Configuration config, UserGroupInformation ugi) throws IOException {
try {
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws Exception {
return FileSystem.get(config);
}
});
return ugi.doAs((PrivilegedExceptionAction<FileSystem>) () -> FileSystem.get(config));
} catch (InterruptedException e) {
throw new IOException("Unable to create file system: " + e.getMessage());
}
......
......@@ -124,10 +124,8 @@ public class JdbcSinkTest {
Record<GenericRecord> insertRecord = PulsarRecord.<GenericRecord>builder()
.message(insertMessage)
.topicName("fake_topic_name").ackFunction(new Runnable(){
public void run(){
}
})
.topicName("fake_topic_name")
.ackFunction(() -> {})
.build();
genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
......@@ -173,10 +171,8 @@ public class JdbcSinkTest {
Message<GenericRecord> updateMessage = mock(MessageImpl.class);
Record<GenericRecord> updateRecord = PulsarRecord.<GenericRecord>builder()
.message(updateMessage)
.topicName("fake_topic_name").ackFunction(new Runnable(){
public void run(){
}
})
.topicName("fake_topic_name")
.ackFunction(() -> {})
.build();
GenericSchema<GenericRecord> updateGenericAvroSchema;
......@@ -216,10 +212,8 @@ public class JdbcSinkTest {
Message<GenericRecord> deleteMessage = mock(MessageImpl.class);
Record<GenericRecord> deleteRecord = PulsarRecord.<GenericRecord>builder()
.message(deleteMessage)
.topicName("fake_topic_name").ackFunction(new Runnable(){
public void run(){
}
})
.topicName("fake_topic_name")
.ackFunction(() -> {})
.build();
GenericSchema<GenericRecord> deleteGenericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
......
......@@ -186,33 +186,27 @@ public class PulsarMetadata implements ConnectorMetadata {
ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
tableMetaData.getColumns().forEach(new Consumer<ColumnMetadata>() {
@Override
public void accept(ColumnMetadata columnMetadata) {
PulsarColumnMetadata pulsarColumnMetadata = (PulsarColumnMetadata) columnMetadata;
PulsarColumnHandle pulsarColumnHandle = new PulsarColumnHandle(
connectorId,
pulsarColumnMetadata.getNameWithCase(),
pulsarColumnMetadata.getType(),
pulsarColumnMetadata.isHidden(),
pulsarColumnMetadata.isInternal(),
pulsarColumnMetadata.getFieldNames(),
pulsarColumnMetadata.getPositionIndices());
columnHandles.put(
columnMetadata.getName(),
pulsarColumnHandle);
}
tableMetaData.getColumns().forEach(columnMetadata -> {
PulsarColumnMetadata pulsarColumnMetadata = (PulsarColumnMetadata) columnMetadata;
PulsarColumnHandle pulsarColumnHandle = new PulsarColumnHandle(
connectorId,
pulsarColumnMetadata.getNameWithCase(),
pulsarColumnMetadata.getType(),
pulsarColumnMetadata.isHidden(),
pulsarColumnMetadata.isInternal(),
pulsarColumnMetadata.getFieldNames(),
pulsarColumnMetadata.getPositionIndices());
columnHandles.put(
columnMetadata.getName(),
pulsarColumnHandle);
});
PulsarInternalColumn.getInternalFields().stream().forEach(new Consumer<PulsarInternalColumn>() {
@Override
public void accept(PulsarInternalColumn pulsarInternalColumn) {
PulsarColumnHandle pulsarColumnHandle = pulsarInternalColumn.getColumnHandle(connectorId, false);
columnHandles.put(pulsarColumnHandle.getName(), pulsarColumnHandle);
}
PulsarInternalColumn.getInternalFields().forEach(pulsarInternalColumn -> {
PulsarColumnHandle pulsarColumnHandle = pulsarInternalColumn.getColumnHandle(connectorId, false);
columnHandles.put(pulsarColumnHandle.getName(), pulsarColumnHandle);
});
return columnHandles.build();
......@@ -314,12 +308,8 @@ public class PulsarMetadata implements ConnectorMetadata {
builder.addAll(getColumns(null, schema, new HashSet<>(), new Stack<>(), new Stack<>()));
if (withInternalColumns) {
PulsarInternalColumn.getInternalFields().stream().forEach(new Consumer<PulsarInternalColumn>() {
@Override
public void accept(PulsarInternalColumn pulsarInternalColumn) {
builder.add(pulsarInternalColumn.getColumnMetadata(false));
}
});
PulsarInternalColumn.getInternalFields().forEach(
pulsarInternalColumn -> builder.add(pulsarInternalColumn.getColumnMetadata(false)));
}
return new ConnectorTableMetadata(schemaTableName, builder.build());
......
......@@ -292,12 +292,10 @@ public class PerformanceConsumer {
long start = System.nanoTime();
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
printAggregatedThroughput(start);
printAggregatedStats();
}
});
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
printAggregatedThroughput(start);
printAggregatedStats();
}));
long oldTime = System.nanoTime();
......
......@@ -336,12 +336,10 @@ public class PerformanceProducer {
long start = System.nanoTime();
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
printAggregatedThroughput(start);
printAggregatedStats();
}
});
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
printAggregatedThroughput(start);
printAggregatedStats();
}));
Collections.shuffle(producers);
AtomicBoolean isDone = new AtomicBoolean();
......
......@@ -74,12 +74,7 @@ public class ClientCnxnAspect {
// zkResponse event shouldn't be blocked and it should be processed
// async
if (eventProcessExecutor != null && !eventProcessExecutor.isShutdown()) {
eventProcessExecutor.submit(new Runnable() {
@Override
public void run() {
processEvent(joinPoint);
}
});
eventProcessExecutor.submit(() -> processEvent(joinPoint));
}
}
......
......@@ -204,12 +204,7 @@ public abstract class ZooKeeperCache implements Watcher {
private boolean exists(final String path, Watcher watcher) throws KeeperException, InterruptedException {
try {
return existsCache.get(path, new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return zkSession.get().exists(path, watcher) != null;
}
});
return existsCache.get(path, () -> zkSession.get().exists(path, watcher) != null);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof KeeperException) {
......@@ -386,12 +381,9 @@ public abstract class ZooKeeperCache implements Watcher {
public Set<String> getChildren(final String path, final Watcher watcher)
throws KeeperException, InterruptedException {
try {
return childrenCache.get(path, new Callable<Set<String>>() {
@Override
public Set<String> call() throws Exception {
LOG.debug("Fetching children at {}", path);
return Sets.newTreeSet(checkNotNull(zkSession.get()).getChildren(path, watcher));
}
return childrenCache.get(path, () -> {
LOG.debug("Fetching children at {}", path);
return Sets.newTreeSet(checkNotNull(zkSession.get()).getChildren(path, watcher));
});
} catch (ExecutionException e) {
Throwable cause = e.getCause();
......
......@@ -80,15 +80,12 @@ public class AdminMultiHostTest {
// Because zookeeper session timeout is 30ms and ticktime is 2ms, so we need wait more than 32ms
private void waitBrokerDown(PulsarAdmin admin, int expectBrokers, int timeout)
throws InterruptedException, ExecutionException, TimeoutException {
FutureTask<Boolean> futureTask = new FutureTask<>(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
while (admin.brokers().getActiveBrokers(clusterName).size() != expectBrokers) {
admin.brokers().healthcheck();
TimeUnit.MILLISECONDS.sleep(1000);
}
return true;
FutureTask<Boolean> futureTask = new FutureTask<>(() -> {
while (admin.brokers().getActiveBrokers(clusterName).size() != expectBrokers) {
admin.brokers().healthcheck();
TimeUnit.MILLISECONDS.sleep(1000);
}
return true;
});
new Thread(futureTask).start();
futureTask.get(timeout, TimeUnit.SECONDS);
......
......@@ -44,7 +44,7 @@ public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite {
@Test(dataProvider = "ServiceUrls")
public void testReceivedMessage(String serviceUrl) throws Exception {
ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData();
ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData<>();
Set<String> set = new HashSet<>();
set.add(TOPIC);
......@@ -80,8 +80,8 @@ public class SparkStreamingPulsarReceiverTest extends PulsarTestSuite {
}
@Test(dataProvider = "ServiceUrls")
public void testDefaultSettingsOfReceiver(String serviceUrl) throws Exception {
ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData();
public void testDefaultSettingsOfReceiver(String serviceUrl) {
ConsumerConfigurationData<byte[]> consConf = new ConsumerConfigurationData<>();
Set<String> set = new HashSet<>();
set.add(TOPIC);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册