提交 d345ac88 编写于 作者: S Soby Chacko

Enable custom binder health check impelementation

Currently, KafkaBinderHealthIndicator is not customizable and included by default
when Spring Boot actuator is on the classpath. Fix this by allowing the application
to provide a custom implementation. A new marker interface called KafkaBinderHealth
can be used by the applicaiton to provide a custom HealthIndicator implementation, in
which case, the binder's default implementation will be excluded.

Tests and docs changes.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1180
上级 31b91f47
......@@ -969,3 +969,28 @@ public AdminClientConfigCustomizer adminClientConfigCustomizer() {
};
}
```
[[custom-kafka-binder-health-indicator]]
=== Custom Kafka Binder Health Indicator
Kafka binder activates a default health indicator when Spring Boot actuator is on the classpath.
This health indicator checks the health of the binder and any communication issues with the Kafka broker.
If an application wants to disable this default health check implementation and include a custom implementation, then it can provide an implementation for `KafkaBinderHealth` interface.
`KafkaBinderHealth` is a marker interface that extends from `HealthIndicator`.
In the custom implementation, it must provide an implementation for the `health()` method.
The custom implementation must be present in the application configuration as a bean.
When the binder discovers the custom implementation, it will use that instead of the default implementation.
Here is an example of such a custom implementation bean in the application.
```
@Bean
public KafkaBinderHealth kafkaBinderHealthIndicator() {
return new KafkaBinderHealth() {
@Override
public Health health() {
// custom implementation details.
}
};
}
```
/*
* Copyright 2022-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import org.springframework.boot.actuate.health.HealthIndicator;
/**
* Marker interface used for custom KafkaBinderHealth indicator implementations.
*
* @author Soby Chacko
* @since 3.2.2
*/
public interface KafkaBinderHealth extends HealthIndicator {
}
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -35,7 +35,6 @@ import org.apache.kafka.common.PartitionInfo;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.actuate.health.Status;
import org.springframework.boot.actuate.health.StatusAggregator;
import org.springframework.kafka.core.ConsumerFactory;
......@@ -55,7 +54,7 @@ import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
* @author Chukwubuikem Ume-Ugwa
* @author Taras Danylchuk
*/
public class KafkaBinderHealthIndicator implements HealthIndicator, DisposableBean {
public class KafkaBinderHealthIndicator implements KafkaBinderHealth, DisposableBean {
private static final int DEFAULT_TIMEOUT = 60;
......@@ -73,7 +72,7 @@ public class KafkaBinderHealthIndicator implements HealthIndicator, DisposableBe
private boolean considerDownWhenAnyPartitionHasNoLeader;
public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder,
ConsumerFactory<?, ?> consumerFactory) {
ConsumerFactory<?, ?> consumerFactory) {
this.binder = binder;
this.consumerFactory = consumerFactory;
}
......@@ -219,7 +218,7 @@ public class KafkaBinderHealthIndicator implements HealthIndicator, DisposableBe
}
@Override
public void destroy() throws Exception {
public void destroy() {
executor.shutdown();
}
......
/*
* Copyright 2018-2019 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -24,6 +24,8 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealth;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
......@@ -38,11 +40,13 @@ import org.springframework.util.ObjectUtils;
*
* @author Oleg Zhurakousky
* @author Chukwubuikem Ume-Ugwa
* @author Soby Chacko
*/
@Configuration
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator")
@ConditionalOnEnabledHealthIndicator("binders")
@ConditionalOnMissingBean(KafkaBinderHealth.class)
public class KafkaBinderHealthIndicatorConfiguration {
@Bean
......
/*
* Copyright 2022-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.integration2;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealth;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
/**
* @author Soby Chacko
*/
public class KafkaBinderCustomHealthCheckTests {
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, 10);
@Test
public void testCustomHealthIndicatorIsActivated() {
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(
CustomHealthCheckApplication.class).web(WebApplicationType.NONE).run(
"--spring.cloud.stream.kafka.binder.brokers="
+ embeddedKafka.getEmbeddedKafka().getBrokersAsString());
final KafkaBinderHealth kafkaBinderHealth = applicationContext.getBean(KafkaBinderHealth.class);
assertThat(kafkaBinderHealth).isInstanceOf(CustomHealthIndicator.class);
assertThatThrownBy(() -> applicationContext.getBean(KafkaBinderHealthIndicator.class)).isInstanceOf(NoSuchBeanDefinitionException.class);
applicationContext.close();
}
@SpringBootApplication
static class CustomHealthCheckApplication {
@Bean
public CustomHealthIndicator kafkaBinderHealthIndicator() {
return new CustomHealthIndicator();
}
}
static class CustomHealthIndicator implements KafkaBinderHealth {
@Override
public Health health() {
return null;
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册