未验证 提交 92d0465c 编写于 作者: 静夜思朝颜's avatar 静夜思朝颜 提交者: GitHub

Simplify the Zabbix UT, reduce use time (#6362)

上级 75f03acb
......@@ -106,7 +106,7 @@ public class ZabbixProtocolDecoder extends ByteToMessageDecoder {
/**
* Close connection if protocol error
*/
private void errorProtocol(ChannelHandlerContext context, ByteBuf byteBuf, String reason, Throwable ex) throws InterruptedException {
protected void errorProtocol(ChannelHandlerContext context, ByteBuf byteBuf, String reason, Throwable ex) throws InterruptedException {
log.warn("Receive message is not Zabbix protocol, reason: {}", reason, ex);
// Skip all content
byteBuf.skipBytes(byteBuf.readableBytes());
......
......@@ -23,47 +23,43 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.SocketChannel;
import java.net.SocketTimeoutException;
import lombok.SneakyThrows;
import lombok.Getter;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.ZabbixErrorProtocolException;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.ZabbixProtocolDecoder;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.ZabbixProtocolEncoder;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.ZabbixProtocolHandler;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.ZabbixServer;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.bean.ZabbixProtocolType;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.bean.ZabbixRequest;
import org.junit.After;
import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.bean.ZabbixResponse;
import org.junit.Assert;
import org.junit.Before;
import org.mockito.stubbing.Answer;
import org.powermock.reflect.Whitebox;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import org.mockito.Spy;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
public abstract class ZabbixBaseTest {
private static final String TCP_HOST = "0.0.0.0";
private static final int TCP_PORT = 10051;
protected ZabbixServer zabbixServer;
protected SocketClient socketClient;
@Spy
private ChannelHandlerContext channelHandlerContext;
private List<Object> requests;
private List<Object> responses;
private ZabbixProtocolEncoderWrapper encoder;
private ZabbixProtocolDecoderWrapper decoder;
private ZabbixProtocolHandler handler;
protected ZabbixMetrics zabbixMetrics;
/**
......@@ -72,40 +68,31 @@ public abstract class ZabbixBaseTest {
protected abstract ZabbixMetrics buildZabbixMetrics() throws Exception;
@Before
public void setupService() throws Throwable {
// Startup server
ZabbixModuleConfig config = new ZabbixModuleConfig();
config.setPort(TCP_PORT);
config.setHost(TCP_HOST);
public void setupMetrics() throws Throwable {
zabbixMetrics = buildZabbixMetrics();
zabbixServer = new ZabbixServerWrapper(config, zabbixMetrics);
zabbixServer.start();
}
@After
public void cleanup() {
zabbixServer.stop();
requests = new ArrayList<>();
responses = new ArrayList<>();
encoder = new ZabbixProtocolEncoderWrapper();
decoder = new ZabbixProtocolDecoderWrapper();
handler = new ZabbixProtocolHandler(zabbixMetrics);
when(channelHandlerContext.writeAndFlush(any())).thenAnswer(invocationOnMock -> {
responses.add(invocationOnMock.getArgument(0));
return null;
});
ByteBufAllocator allocator = mock(ByteBufAllocator.class);
when(allocator.buffer(anyInt())).thenAnswer(invocationOnMock -> Unpooled.buffer(invocationOnMock.getArgument(0)));
when(channelHandlerContext.alloc()).thenReturn(allocator);
}
/**
* Verify request error protocol
*/
public void assertWriteErrorProtocol(byte[] data) throws Throwable {
startupSocketClient();
try {
socketClient.socket.getOutputStream().write(data);
for (int i = 0; i < 10; i++) {
// No response
if (socketClient.socket.getInputStream().available() == 0 && socketClient.socket.getInputStream().read() == -1) {
return ;
}
TimeUnit.MILLISECONDS.sleep(500);
}
throw new IllegalStateException("Could not detect protocol error");
} finally {
stopSocketClient();
ZabbixProtocolDecoderWrapper decoder = new ZabbixProtocolDecoderWrapper();
decoder.decode(null, Unpooled.wrappedBuffer(data), null);
if (!decoder.isProtocolError()) {
throw new IllegalStateException("Could not detect need more input error");
}
}
......@@ -113,33 +100,46 @@ public abstract class ZabbixBaseTest {
* Assert need more input to server
*/
public void assertNeedMoreInput(byte[] data) throws Throwable {
startupSocketClient();
try {
socketClient.socket.getOutputStream().write(data);
try {
for (int i = 0; i < 10; i++) {
// No response
if (socketClient.socket.getInputStream().available() == 0 && socketClient.socket.getInputStream().read() == -1) {
return ;
}
TimeUnit.MILLISECONDS.sleep(100);
ZabbixProtocolDecoder decoder = spy(new ZabbixProtocolDecoder());
if (decoder.decodeToPayload(null, Unpooled.wrappedBuffer(data)) != null) {
throw new IllegalStateException("Could not detect need more input error");
}
}
/**
* Verify Active checks item names
*/
public void assertZabbixActiveChecksResponse(int inx, String... itemNames) throws Exception {
ZabbixResponse response = (ZabbixResponse) responses.get(inx);
// Active Checks
Assert.assertEquals(itemNames.length, response.getActiveChecks().size());
for (String itemName : itemNames) {
boolean found = false;
for (final ZabbixResponse.ActiveChecks checks : response.getActiveChecks()) {
if (Objects.equals(checks.getKey(), itemName)) {
Assert.assertTrue(checks.getDelay() > 0);
Assert.assertTrue(checks.getLastlogsize() >= 0);
Assert.assertTrue(checks.getMtime() >= 0);
found = true;
}
} catch (SocketTimeoutException e) {
// Read timeout mean need more content
return;
}
throw new IllegalStateException("Could not detect need more input error");
} finally {
stopSocketClient();
if (!found) {
throw new AssertionError("Could not found " + itemName + " in Active Checks response");
}
}
encoder.encode(channelHandlerContext, response, null);
String respBody = decoder.decodeToPayload(channelHandlerContext, (ByteBuf) responses.get(inx + 1));
assertZabbixActiveChecksResponseWithEncoded(respBody, itemNames);
}
/**
* Verify Active checks item names
* Verify Active checks item names with encoded
*/
public void assertZabbixActiveChecksResponse(String body, String... itemNames) {
private void assertZabbixActiveChecksResponseWithEncoded(String body, String... itemNames) {
Assert.assertNotNull(body);
JsonElement bodyRoot = new Gson().fromJson(body, JsonElement.class);
JsonObject rootObject = bodyRoot.getAsJsonObject();
......@@ -173,16 +173,29 @@ public abstract class ZabbixBaseTest {
/**
* Verify Zabbix agent data response
*/
public void assertZabbixAgentDataResponse(String body) {
public void assertZabbixAgentDataResponse(int inx) throws Exception {
ZabbixResponse response = (ZabbixResponse) responses.get(inx);
// Agent data info
Assert.assertTrue(StringUtil.isNotEmpty(response.getAgentData().getInfo()));
encoder.encode(channelHandlerContext, response, null);
String respBody = decoder.decodeToPayload(channelHandlerContext, (ByteBuf) responses.get(inx + 1));
assertZabbixAgentDataResponseWithEncoded(respBody);
}
/**
* Verify Zabbix agent data response with encoded
*/
public void assertZabbixAgentDataResponseWithEncoded(String body) {
Assert.assertNotNull(body);
JsonElement bodyRoot = new Gson().fromJson(body, JsonElement.class);
JsonObject rootObject = bodyRoot.getAsJsonObject();
// Basic response status
Assert.assertEquals("success", rootObject.get("response").getAsString());
// Agent data info
// Agent data
Assert.assertNotNull(rootObject.get("info"));
Assert.assertTrue(StringUtil.isNotEmpty(rootObject.get("info").getAsString()));
}
/**
......@@ -229,159 +242,67 @@ public abstract class ZabbixBaseTest {
* Verify zabbix request basic info
*/
private ZabbixRequest assertZabbixRequestBasic(int inx, ZabbixProtocolType protocolType) {
List<ZabbixRequest> requests = socketClient.requests;
Assert.assertNotNull(requests);
Assert.assertTrue(requests.size() > inx);
ZabbixRequest request = requests.get(inx);
ZabbixRequest request = (ZabbixRequest) requests.get(inx);
Assert.assertEquals(protocolType, request.getType());
return request;
}
/**
* Startup a new socket client to server
*/
protected void startupSocketClient() throws Throwable {
socketClient = Optional.ofNullable(this.socketClient).orElseGet(SocketClient::new);
socketClient.startup();
public byte[] buildZabbixRequestData(String content) {
// Build header
byte[] payload = content.getBytes();
int payloadLength = payload.length;
byte[] header = new byte[] {
'Z', 'B', 'X', 'D', '\1',
(byte) (payloadLength & 0xFF),
(byte) (payloadLength >> 8 & 0xFF),
(byte) (payloadLength >> 16 & 0xFF),
(byte) (payloadLength >> 24 & 0xFF),
'\0', '\0', '\0', '\0'};
byte[] packet = new byte[header.length + payloadLength];
System.arraycopy(header, 0, packet, 0, header.length);
System.arraycopy(payload, 0, packet, header.length, payloadLength);
return packet;
}
/**
* Close the client
*/
protected void stopSocketClient() {
Optional.ofNullable(socketClient).ifPresent(SocketClient::stop);
socketClient = null;
}
/**
* Connect to receiver server
*/
protected static class SocketClient {
private ZabbixProtocolHandler protocolHandler;
private Throwable spyHandlerException;
private Socket socket;
private List<ZabbixRequest> requests;
private void startup() throws Throwable {
if (socket != null) {
return;
}
socket = new Socket();
socket.setSoTimeout(2000);
socket.connect(new InetSocketAddress(TCP_HOST, TCP_PORT));
public void writeZabbixMessage(String message) throws Exception {
ArrayList<Object> data = new ArrayList<>();
decoder.decode(channelHandlerContext, Unpooled.wrappedBuffer(buildZabbixRequestData(message)), data);
requests.add(data.get(0));
// Waiting for connection
while (!socket.isConnected() || (protocolHandler == null && spyHandlerException == null)) {
TimeUnit.SECONDS.sleep(1);
}
if (spyHandlerException != null) {
throw spyHandlerException;
}
// Intercept message received
requests = new ArrayList<>();
doAnswer((Answer<Object>) invocationOnMock -> {
requests.add(invocationOnMock.getArgument(1));
return invocationOnMock.callRealMethod();
}).when(protocolHandler).channelRead0(any(), any());
}
@SneakyThrows
private void stop() {
if (socket != null && socket.isConnected()) {
socket.close();
}
}
public void writeZabbixMessage(String message) throws IOException {
this.socket.getOutputStream().write(buildZabbixRequestData(message));
}
public static byte[] buildZabbixRequestData(String content) {
// Build header
byte[] payload = content.getBytes();
int payloadLength = payload.length;
byte[] header = new byte[] {
'Z', 'B', 'X', 'D', '\1',
(byte) (payloadLength & 0xFF),
(byte) (payloadLength >> 8 & 0xFF),
(byte) (payloadLength >> 16 & 0xFF),
(byte) (payloadLength >> 24 & 0xFF),
'\0', '\0', '\0', '\0'};
byte[] packet = new byte[header.length + payloadLength];
System.arraycopy(header, 0, packet, 0, header.length);
System.arraycopy(payload, 0, packet, header.length, payloadLength);
return packet;
}
handler.channelRead0(channelHandlerContext, (ZabbixRequest) data.get(0));
}
/**
* Finding and spy the Zabbix handler
*/
private void spyHandler(SocketChannel channel) {
Object tailContext = Whitebox.getInternalState(channel.pipeline(), "tail");
Object handlerContext = Whitebox.getInternalState(tailContext, "prev");
ZabbixProtocolHandler handler = spyHandler(handlerContext, ZabbixProtocolHandler.class);
if (handler == null) {
throw new IllegalStateException("Unnable to find Zabbix protocol handler");
}
protocolHandler = handler;
}
@Getter
private class ZabbixProtocolDecoderWrapper extends ZabbixProtocolDecoder {
private boolean protocolError;
private <T> T spyHandler(Object handlerContext, Class<T> handlerCls) {
if (handlerContext == null || handlerContext.getClass().getSimpleName().contains("HeadContext")) {
return null;
}
Object handler = Whitebox.getInternalState(handlerContext, "handler");
if (handler.getClass().equals(handlerCls)) {
Object realHandler = spy(handler);
Whitebox.setInternalState(handlerContext, "handler", realHandler);
return (T) realHandler;
} else {
return spyHandler(Whitebox.getInternalState(handlerContext, "prev"), handlerCls);
}
}
private byte[] readAllContent(InputStream inputStream) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(512);
byte[] buffer = new byte[512];
int len;
while ((len = inputStream.read(buffer)) > 0) {
outputStream.write(buffer, 0, len);
if (len != buffer.length) {
break;
}
}
return outputStream.toByteArray();
@Override
public void decode(final ChannelHandlerContext channelHandlerContext,
final ByteBuf byteBuf,
final List<Object> list) throws Exception {
super.decode(channelHandlerContext, byteBuf, list);
}
public String waitAndGetResponsePayload() throws InterruptedException, IOException, ZabbixErrorProtocolException {
ChannelHandlerContext channelHandlerContext = mock(ChannelHandlerContext.class);
ByteBuf byteBuf = Unpooled.copiedBuffer(readAllContent(socket.getInputStream()));
return new ZabbixProtocolDecoder().decodeToPayload(channelHandlerContext, byteBuf);
@Override
protected void errorProtocol(final ChannelHandlerContext context,
final ByteBuf byteBuf,
final String reason,
final Throwable ex) throws InterruptedException {
protocolError = true;
}
}
/**
* Zabbix binder wrapper, support spy Zabbix message received data
*/
private class ZabbixServerWrapper extends ZabbixServer {
public ZabbixServerWrapper(ZabbixModuleConfig config, ZabbixMetrics zabbixMetrics) {
super(config, zabbixMetrics);
}
@Getter
private class ZabbixProtocolEncoderWrapper extends ZabbixProtocolEncoder {
@Override
public void initChannel(SocketChannel channel) {
super.initChannel(channel);
try {
socketClient.spyHandler(channel);
} catch (Throwable e) {
socketClient.spyHandlerException = e;
}
public void encode(final ChannelHandlerContext channelHandlerContext,
final ZabbixResponse zabbixResponse,
final List<Object> list) throws Exception {
super.encode(channelHandlerContext, zabbixResponse, list);
}
}
......
......@@ -60,7 +60,7 @@ public class ZabbixMetricsTest extends ZabbixBaseTest {
private List<AcceptableValue> values = new ArrayList<>();
@Override
public void setupService() throws Throwable {
public void setupMetrics() throws Throwable {
moduleProvider = Mockito.mock(CoreModuleProvider.class);
moduleManager = Mockito.mock(ModuleManager.class);
......@@ -82,7 +82,7 @@ public class ZabbixMetricsTest extends ZabbixBaseTest {
map.put("avgHistogram", AvgHistogramFunction.class);
map.put("avgHistogramPercentile", AvgHistogramPercentileFunction.class);
Whitebox.setInternalState(meterSystem, "functionRegister", map);
super.setupService();
super.setupMetrics();
}
@Override
......@@ -100,23 +100,20 @@ public class ZabbixMetricsTest extends ZabbixBaseTest {
@Test
public void testReceiveMetrics() throws Throwable {
startupSocketClient();
// Verify Active Checks
socketClient.writeZabbixMessage("{\"request\":\"active checks\",\"host\":\"test-01\"}");
String activeChecksRespData = socketClient.waitAndGetResponsePayload();
writeZabbixMessage("{\"request\":\"active checks\",\"host\":\"test-01\"}");
assertZabbixActiveChecksRequest(0, "test-01");
assertZabbixActiveChecksResponse(activeChecksRespData, "system.cpu.load[all,avg1]", "system.cpu.load[all,avg5]", "system.cpu.load[all,avg15]", "agent.hostname");
assertZabbixActiveChecksResponse(0, "system.cpu.load[all,avg1]", "system.cpu.load[all,avg5]", "system.cpu.load[all,avg15]", "agent.hostname");
// Verify Agent data
socketClient.writeZabbixMessage("{\"request\":\"agent data\",\"session\":\"f32425dc61971760bf791f731931a92e\",\"data\":[" +
writeZabbixMessage("{\"request\":\"agent data\",\"session\":\"f32425dc61971760bf791f731931a92e\",\"data\":[" +
"{\"host\":\"test-01\",\"key\":\"system.cpu.load[all,avg1]\",\"value\":\"1.123\",\"id\":2,\"clock\":1609588563,\"ns\":87682907}," +
"{\"host\":\"test-01\",\"key\":\"system.cpu.load[all,avg5]\",\"value\":\"2.123\",\"id\":2,\"clock\":1609588563,\"ns\":87682907}," +
"{\"host\":\"test-01\",\"key\":\"system.cpu.load[all,avg15]\",\"value\":\"3.123\",\"id\":2,\"clock\":1609588563,\"ns\":87682907}," +
"{\"host\":\"test-01\",\"key\":\"agent.hostname\",\"value\":\"test-01-hostname\",\"id\":2,\"clock\":1609588563,\"ns\":87682907}" +
"],\"clock\":1609588568,\"ns\":102244476}");
String agentDataRespData = socketClient.waitAndGetResponsePayload();
assertZabbixAgentDataRequest(1, "test-01", "system.cpu.load[all,avg1]", "system.cpu.load[all,avg5]", "system.cpu.load[all,avg15]", "agent.hostname");
assertZabbixAgentDataResponse(agentDataRespData);
assertZabbixAgentDataResponse(2);
// Verify meter system received data
Assert.assertEquals(1, values.size());
......@@ -130,6 +127,5 @@ public class ZabbixMetricsTest extends ZabbixBaseTest {
Assert.assertEquals(1, avgLabeledFunction.getCount().get("avg1"), 0.0);
Assert.assertEquals(1, avgLabeledFunction.getCount().get("avg5"), 0.0);
Assert.assertEquals(1, avgLabeledFunction.getCount().get("avg15"), 0.0);
stopSocketClient();
}
}
......@@ -47,20 +47,15 @@ public class ZabbixProtocolHandlerTest extends ZabbixBaseTest {
*/
@Test
public void testReceive() throws Throwable {
startupSocketClient();
// Verify Active Checks
socketClient.writeZabbixMessage("{\"request\":\"active checks\",\"host\":\"zabbix-test-agent\"}");
String activeChecksRespData = socketClient.waitAndGetResponsePayload();
writeZabbixMessage("{\"request\":\"active checks\",\"host\":\"zabbix-test-agent\"}");
assertZabbixActiveChecksRequest(0, "zabbix-test-agent");
assertZabbixActiveChecksResponse(activeChecksRespData, "system.cpu.load[all,avg15]");
assertZabbixActiveChecksResponse(0, "system.cpu.load[all,avg15]");
// Verify Agent data
socketClient.writeZabbixMessage("{\"request\":\"agent data\",\"session\":\"f32425dc61971760bf791f731931a92e\",\"data\":[{\"host\":\"zabbix-test-agent\",\"key\":\"system.cpu.load[all,avg15]\",\"value\":\"1.123\",\"id\":2,\"clock\":1609588563,\"ns\":87682907}],\"clock\":1609588568,\"ns\":102244476}");
String agentDataRespData = socketClient.waitAndGetResponsePayload();
writeZabbixMessage("{\"request\":\"agent data\",\"session\":\"f32425dc61971760bf791f731931a92e\",\"data\":[{\"host\":\"zabbix-test-agent\",\"key\":\"system.cpu.load[all,avg15]\",\"value\":\"1.123\",\"id\":2,\"clock\":1609588563,\"ns\":87682907}],\"clock\":1609588568,\"ns\":102244476}");
assertZabbixAgentDataRequest(1, "zabbix-test-agent", "system.cpu.load[all,avg15]");
assertZabbixAgentDataResponse(agentDataRespData);
stopSocketClient();
assertZabbixAgentDataResponse(2);
}
/**
......@@ -69,7 +64,7 @@ public class ZabbixProtocolHandlerTest extends ZabbixBaseTest {
@Test
public void testErrorProtocol() throws Throwable {
// Simple header
for (int i = 1; i < 5; i++) {
for (int i = 1; i < 4; i++) {
assertNeedMoreInput(new byte[i]);
}
......@@ -80,10 +75,13 @@ public class ZabbixProtocolHandlerTest extends ZabbixBaseTest {
assertWriteErrorProtocol(new byte[] {'Z', 'B', 'X', 'D', 2, 0, 0, 0, 0});
assertWriteErrorProtocol(new byte[] {'Z', 'B', 'X', 'D', 2, 1, 0, 0, 0});
// Need more content
assertNeedMoreInput(new byte[] {'Z', 'B', 'X', 'D', 1, 5, 0, 0, 0, 1, 1, 1});
// Empty data
assertWriteErrorProtocol(SocketClient.buildZabbixRequestData(""));
assertWriteErrorProtocol(SocketClient.buildZabbixRequestData("{}"));
assertWriteErrorProtocol(SocketClient.buildZabbixRequestData("{\"test\": 1}"));
assertWriteErrorProtocol(buildZabbixRequestData(""));
assertWriteErrorProtocol(buildZabbixRequestData("{}"));
assertWriteErrorProtocol(buildZabbixRequestData("{\"test\": 1}"));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册