diff --git a/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/protocol/ZabbixProtocolDecoder.java b/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/protocol/ZabbixProtocolDecoder.java index e2d41d50803511530613d589e1d4e82c6e8fe68c..6fd100e998f88841a1c2e567ceed561beed6b524 100644 --- a/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/protocol/ZabbixProtocolDecoder.java +++ b/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/protocol/ZabbixProtocolDecoder.java @@ -106,7 +106,7 @@ public class ZabbixProtocolDecoder extends ByteToMessageDecoder { /** * Close connection if protocol error */ - private void errorProtocol(ChannelHandlerContext context, ByteBuf byteBuf, String reason, Throwable ex) throws InterruptedException { + protected void errorProtocol(ChannelHandlerContext context, ByteBuf byteBuf, String reason, Throwable ex) throws InterruptedException { log.warn("Receive message is not Zabbix protocol, reason: {}", reason, ex); // Skip all content byteBuf.skipBytes(byteBuf.readableBytes()); diff --git a/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/ZabbixBaseTest.java b/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/ZabbixBaseTest.java index f73314a21ae78cbf9808739117be179a03ed3167..be5e073cb093aa7c92f5c255d0c03a3df68d8d59 100644 --- a/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/ZabbixBaseTest.java +++ b/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/ZabbixBaseTest.java @@ -23,47 +23,43 @@ import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.socket.SocketChannel; -import java.net.SocketTimeoutException; -import lombok.SneakyThrows; +import lombok.Getter; import org.apache.commons.lang3.math.NumberUtils; import org.apache.skywalking.apm.util.StringUtil; -import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.ZabbixErrorProtocolException; import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.ZabbixProtocolDecoder; +import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.ZabbixProtocolEncoder; import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.ZabbixProtocolHandler; -import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.ZabbixServer; import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.bean.ZabbixProtocolType; import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.bean.ZabbixRequest; -import org.junit.After; +import org.apache.skywalking.oap.server.receiver.zabbix.provider.protocol.bean.ZabbixResponse; import org.junit.Assert; import org.junit.Before; -import org.mockito.stubbing.Answer; -import org.powermock.reflect.Whitebox; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.InetSocketAddress; -import java.net.Socket; +import org.mockito.Spy; + import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.TimeUnit; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; public abstract class ZabbixBaseTest { - private static final String TCP_HOST = "0.0.0.0"; - private static final int TCP_PORT = 10051; - protected ZabbixServer zabbixServer; - protected SocketClient socketClient; + @Spy + private ChannelHandlerContext channelHandlerContext; + + private List requests; + private List responses; + + private ZabbixProtocolEncoderWrapper encoder; + private ZabbixProtocolDecoderWrapper decoder; + private ZabbixProtocolHandler handler; protected ZabbixMetrics zabbixMetrics; /** @@ -72,40 +68,31 @@ public abstract class ZabbixBaseTest { protected abstract ZabbixMetrics buildZabbixMetrics() throws Exception; @Before - public void setupService() throws Throwable { - // Startup server - ZabbixModuleConfig config = new ZabbixModuleConfig(); - config.setPort(TCP_PORT); - config.setHost(TCP_HOST); + public void setupMetrics() throws Throwable { zabbixMetrics = buildZabbixMetrics(); - zabbixServer = new ZabbixServerWrapper(config, zabbixMetrics); - zabbixServer.start(); - } - - @After - public void cleanup() { - zabbixServer.stop(); + requests = new ArrayList<>(); + responses = new ArrayList<>(); + + encoder = new ZabbixProtocolEncoderWrapper(); + decoder = new ZabbixProtocolDecoderWrapper(); + handler = new ZabbixProtocolHandler(zabbixMetrics); + when(channelHandlerContext.writeAndFlush(any())).thenAnswer(invocationOnMock -> { + responses.add(invocationOnMock.getArgument(0)); + return null; + }); + ByteBufAllocator allocator = mock(ByteBufAllocator.class); + when(allocator.buffer(anyInt())).thenAnswer(invocationOnMock -> Unpooled.buffer(invocationOnMock.getArgument(0))); + when(channelHandlerContext.alloc()).thenReturn(allocator); } /** * Verify request error protocol */ public void assertWriteErrorProtocol(byte[] data) throws Throwable { - startupSocketClient(); - try { - socketClient.socket.getOutputStream().write(data); - - for (int i = 0; i < 10; i++) { - // No response - if (socketClient.socket.getInputStream().available() == 0 && socketClient.socket.getInputStream().read() == -1) { - return ; - } - TimeUnit.MILLISECONDS.sleep(500); - } - - throw new IllegalStateException("Could not detect protocol error"); - } finally { - stopSocketClient(); + ZabbixProtocolDecoderWrapper decoder = new ZabbixProtocolDecoderWrapper(); + decoder.decode(null, Unpooled.wrappedBuffer(data), null); + if (!decoder.isProtocolError()) { + throw new IllegalStateException("Could not detect need more input error"); } } @@ -113,33 +100,46 @@ public abstract class ZabbixBaseTest { * Assert need more input to server */ public void assertNeedMoreInput(byte[] data) throws Throwable { - startupSocketClient(); - try { - socketClient.socket.getOutputStream().write(data); - - try { - for (int i = 0; i < 10; i++) { - // No response - if (socketClient.socket.getInputStream().available() == 0 && socketClient.socket.getInputStream().read() == -1) { - return ; - } - TimeUnit.MILLISECONDS.sleep(100); + ZabbixProtocolDecoder decoder = spy(new ZabbixProtocolDecoder()); + if (decoder.decodeToPayload(null, Unpooled.wrappedBuffer(data)) != null) { + throw new IllegalStateException("Could not detect need more input error"); + } + } + + /** + * Verify Active checks item names + */ + public void assertZabbixActiveChecksResponse(int inx, String... itemNames) throws Exception { + ZabbixResponse response = (ZabbixResponse) responses.get(inx); + + // Active Checks + Assert.assertEquals(itemNames.length, response.getActiveChecks().size()); + for (String itemName : itemNames) { + boolean found = false; + + for (final ZabbixResponse.ActiveChecks checks : response.getActiveChecks()) { + if (Objects.equals(checks.getKey(), itemName)) { + Assert.assertTrue(checks.getDelay() > 0); + Assert.assertTrue(checks.getLastlogsize() >= 0); + Assert.assertTrue(checks.getMtime() >= 0); + found = true; } - } catch (SocketTimeoutException e) { - // Read timeout mean need more content - return; } - throw new IllegalStateException("Could not detect need more input error"); - } finally { - stopSocketClient(); + if (!found) { + throw new AssertionError("Could not found " + itemName + " in Active Checks response"); + } } + + encoder.encode(channelHandlerContext, response, null); + String respBody = decoder.decodeToPayload(channelHandlerContext, (ByteBuf) responses.get(inx + 1)); + assertZabbixActiveChecksResponseWithEncoded(respBody, itemNames); } /** - * Verify Active checks item names + * Verify Active checks item names with encoded */ - public void assertZabbixActiveChecksResponse(String body, String... itemNames) { + private void assertZabbixActiveChecksResponseWithEncoded(String body, String... itemNames) { Assert.assertNotNull(body); JsonElement bodyRoot = new Gson().fromJson(body, JsonElement.class); JsonObject rootObject = bodyRoot.getAsJsonObject(); @@ -173,16 +173,29 @@ public abstract class ZabbixBaseTest { /** * Verify Zabbix agent data response */ - public void assertZabbixAgentDataResponse(String body) { + public void assertZabbixAgentDataResponse(int inx) throws Exception { + ZabbixResponse response = (ZabbixResponse) responses.get(inx); + + // Agent data info + Assert.assertTrue(StringUtil.isNotEmpty(response.getAgentData().getInfo())); + + encoder.encode(channelHandlerContext, response, null); + String respBody = decoder.decodeToPayload(channelHandlerContext, (ByteBuf) responses.get(inx + 1)); + assertZabbixAgentDataResponseWithEncoded(respBody); + } + + /** + * Verify Zabbix agent data response with encoded + */ + public void assertZabbixAgentDataResponseWithEncoded(String body) { Assert.assertNotNull(body); JsonElement bodyRoot = new Gson().fromJson(body, JsonElement.class); JsonObject rootObject = bodyRoot.getAsJsonObject(); // Basic response status Assert.assertEquals("success", rootObject.get("response").getAsString()); - // Agent data info + // Agent data Assert.assertNotNull(rootObject.get("info")); - Assert.assertTrue(StringUtil.isNotEmpty(rootObject.get("info").getAsString())); } /** @@ -229,159 +242,67 @@ public abstract class ZabbixBaseTest { * Verify zabbix request basic info */ private ZabbixRequest assertZabbixRequestBasic(int inx, ZabbixProtocolType protocolType) { - List requests = socketClient.requests; Assert.assertNotNull(requests); Assert.assertTrue(requests.size() > inx); - ZabbixRequest request = requests.get(inx); + ZabbixRequest request = (ZabbixRequest) requests.get(inx); Assert.assertEquals(protocolType, request.getType()); return request; } - /** - * Startup a new socket client to server - */ - protected void startupSocketClient() throws Throwable { - socketClient = Optional.ofNullable(this.socketClient).orElseGet(SocketClient::new); - socketClient.startup(); + public byte[] buildZabbixRequestData(String content) { + // Build header + byte[] payload = content.getBytes(); + int payloadLength = payload.length; + byte[] header = new byte[] { + 'Z', 'B', 'X', 'D', '\1', + (byte) (payloadLength & 0xFF), + (byte) (payloadLength >> 8 & 0xFF), + (byte) (payloadLength >> 16 & 0xFF), + (byte) (payloadLength >> 24 & 0xFF), + '\0', '\0', '\0', '\0'}; + + byte[] packet = new byte[header.length + payloadLength]; + System.arraycopy(header, 0, packet, 0, header.length); + System.arraycopy(payload, 0, packet, header.length, payloadLength); + + return packet; } - /** - * Close the client - */ - protected void stopSocketClient() { - Optional.ofNullable(socketClient).ifPresent(SocketClient::stop); - socketClient = null; - } - - /** - * Connect to receiver server - */ - protected static class SocketClient { - private ZabbixProtocolHandler protocolHandler; - private Throwable spyHandlerException; - private Socket socket; - private List requests; - - private void startup() throws Throwable { - if (socket != null) { - return; - } - socket = new Socket(); - socket.setSoTimeout(2000); - socket.connect(new InetSocketAddress(TCP_HOST, TCP_PORT)); + public void writeZabbixMessage(String message) throws Exception { + ArrayList data = new ArrayList<>(); + decoder.decode(channelHandlerContext, Unpooled.wrappedBuffer(buildZabbixRequestData(message)), data); + requests.add(data.get(0)); - // Waiting for connection - while (!socket.isConnected() || (protocolHandler == null && spyHandlerException == null)) { - TimeUnit.SECONDS.sleep(1); - } - - if (spyHandlerException != null) { - throw spyHandlerException; - } - - // Intercept message received - requests = new ArrayList<>(); - doAnswer((Answer) invocationOnMock -> { - requests.add(invocationOnMock.getArgument(1)); - return invocationOnMock.callRealMethod(); - }).when(protocolHandler).channelRead0(any(), any()); - } - - @SneakyThrows - private void stop() { - if (socket != null && socket.isConnected()) { - socket.close(); - } - } - - public void writeZabbixMessage(String message) throws IOException { - this.socket.getOutputStream().write(buildZabbixRequestData(message)); - } - - public static byte[] buildZabbixRequestData(String content) { - // Build header - byte[] payload = content.getBytes(); - int payloadLength = payload.length; - byte[] header = new byte[] { - 'Z', 'B', 'X', 'D', '\1', - (byte) (payloadLength & 0xFF), - (byte) (payloadLength >> 8 & 0xFF), - (byte) (payloadLength >> 16 & 0xFF), - (byte) (payloadLength >> 24 & 0xFF), - '\0', '\0', '\0', '\0'}; - - byte[] packet = new byte[header.length + payloadLength]; - System.arraycopy(header, 0, packet, 0, header.length); - System.arraycopy(payload, 0, packet, header.length, payloadLength); - - return packet; - } + handler.channelRead0(channelHandlerContext, (ZabbixRequest) data.get(0)); + } - /** - * Finding and spy the Zabbix handler - */ - private void spyHandler(SocketChannel channel) { - Object tailContext = Whitebox.getInternalState(channel.pipeline(), "tail"); - Object handlerContext = Whitebox.getInternalState(tailContext, "prev"); - ZabbixProtocolHandler handler = spyHandler(handlerContext, ZabbixProtocolHandler.class); - if (handler == null) { - throw new IllegalStateException("Unnable to find Zabbix protocol handler"); - } - protocolHandler = handler; - } + @Getter + private class ZabbixProtocolDecoderWrapper extends ZabbixProtocolDecoder { + private boolean protocolError; - private T spyHandler(Object handlerContext, Class handlerCls) { - if (handlerContext == null || handlerContext.getClass().getSimpleName().contains("HeadContext")) { - return null; - } - Object handler = Whitebox.getInternalState(handlerContext, "handler"); - if (handler.getClass().equals(handlerCls)) { - Object realHandler = spy(handler); - Whitebox.setInternalState(handlerContext, "handler", realHandler); - return (T) realHandler; - } else { - return spyHandler(Whitebox.getInternalState(handlerContext, "prev"), handlerCls); - } - } - - private byte[] readAllContent(InputStream inputStream) throws IOException { - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(512); - byte[] buffer = new byte[512]; - int len; - while ((len = inputStream.read(buffer)) > 0) { - outputStream.write(buffer, 0, len); - if (len != buffer.length) { - break; - } - } - return outputStream.toByteArray(); + @Override + public void decode(final ChannelHandlerContext channelHandlerContext, + final ByteBuf byteBuf, + final List list) throws Exception { + super.decode(channelHandlerContext, byteBuf, list); } - public String waitAndGetResponsePayload() throws InterruptedException, IOException, ZabbixErrorProtocolException { - ChannelHandlerContext channelHandlerContext = mock(ChannelHandlerContext.class); - ByteBuf byteBuf = Unpooled.copiedBuffer(readAllContent(socket.getInputStream())); - return new ZabbixProtocolDecoder().decodeToPayload(channelHandlerContext, byteBuf); + @Override + protected void errorProtocol(final ChannelHandlerContext context, + final ByteBuf byteBuf, + final String reason, + final Throwable ex) throws InterruptedException { + protocolError = true; } } - /** - * Zabbix binder wrapper, support spy Zabbix message received data - */ - private class ZabbixServerWrapper extends ZabbixServer { - - public ZabbixServerWrapper(ZabbixModuleConfig config, ZabbixMetrics zabbixMetrics) { - super(config, zabbixMetrics); - } - + @Getter + private class ZabbixProtocolEncoderWrapper extends ZabbixProtocolEncoder { @Override - public void initChannel(SocketChannel channel) { - super.initChannel(channel); - - try { - socketClient.spyHandler(channel); - } catch (Throwable e) { - socketClient.spyHandlerException = e; - } + public void encode(final ChannelHandlerContext channelHandlerContext, + final ZabbixResponse zabbixResponse, + final List list) throws Exception { + super.encode(channelHandlerContext, zabbixResponse, list); } } diff --git a/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/ZabbixMetricsTest.java b/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/ZabbixMetricsTest.java index 6bd9354a6d34d5805e4dbf99f0b72186cdba3352..6766e718f0a03afeea508fd0db8ef1f203ff9acd 100644 --- a/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/ZabbixMetricsTest.java +++ b/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/ZabbixMetricsTest.java @@ -60,7 +60,7 @@ public class ZabbixMetricsTest extends ZabbixBaseTest { private List values = new ArrayList<>(); @Override - public void setupService() throws Throwable { + public void setupMetrics() throws Throwable { moduleProvider = Mockito.mock(CoreModuleProvider.class); moduleManager = Mockito.mock(ModuleManager.class); @@ -82,7 +82,7 @@ public class ZabbixMetricsTest extends ZabbixBaseTest { map.put("avgHistogram", AvgHistogramFunction.class); map.put("avgHistogramPercentile", AvgHistogramPercentileFunction.class); Whitebox.setInternalState(meterSystem, "functionRegister", map); - super.setupService(); + super.setupMetrics(); } @Override @@ -100,23 +100,20 @@ public class ZabbixMetricsTest extends ZabbixBaseTest { @Test public void testReceiveMetrics() throws Throwable { - startupSocketClient(); // Verify Active Checks - socketClient.writeZabbixMessage("{\"request\":\"active checks\",\"host\":\"test-01\"}"); - String activeChecksRespData = socketClient.waitAndGetResponsePayload(); + writeZabbixMessage("{\"request\":\"active checks\",\"host\":\"test-01\"}"); assertZabbixActiveChecksRequest(0, "test-01"); - assertZabbixActiveChecksResponse(activeChecksRespData, "system.cpu.load[all,avg1]", "system.cpu.load[all,avg5]", "system.cpu.load[all,avg15]", "agent.hostname"); + assertZabbixActiveChecksResponse(0, "system.cpu.load[all,avg1]", "system.cpu.load[all,avg5]", "system.cpu.load[all,avg15]", "agent.hostname"); // Verify Agent data - socketClient.writeZabbixMessage("{\"request\":\"agent data\",\"session\":\"f32425dc61971760bf791f731931a92e\",\"data\":[" + + writeZabbixMessage("{\"request\":\"agent data\",\"session\":\"f32425dc61971760bf791f731931a92e\",\"data\":[" + "{\"host\":\"test-01\",\"key\":\"system.cpu.load[all,avg1]\",\"value\":\"1.123\",\"id\":2,\"clock\":1609588563,\"ns\":87682907}," + "{\"host\":\"test-01\",\"key\":\"system.cpu.load[all,avg5]\",\"value\":\"2.123\",\"id\":2,\"clock\":1609588563,\"ns\":87682907}," + "{\"host\":\"test-01\",\"key\":\"system.cpu.load[all,avg15]\",\"value\":\"3.123\",\"id\":2,\"clock\":1609588563,\"ns\":87682907}," + "{\"host\":\"test-01\",\"key\":\"agent.hostname\",\"value\":\"test-01-hostname\",\"id\":2,\"clock\":1609588563,\"ns\":87682907}" + "],\"clock\":1609588568,\"ns\":102244476}"); - String agentDataRespData = socketClient.waitAndGetResponsePayload(); assertZabbixAgentDataRequest(1, "test-01", "system.cpu.load[all,avg1]", "system.cpu.load[all,avg5]", "system.cpu.load[all,avg15]", "agent.hostname"); - assertZabbixAgentDataResponse(agentDataRespData); + assertZabbixAgentDataResponse(2); // Verify meter system received data Assert.assertEquals(1, values.size()); @@ -130,6 +127,5 @@ public class ZabbixMetricsTest extends ZabbixBaseTest { Assert.assertEquals(1, avgLabeledFunction.getCount().get("avg1"), 0.0); Assert.assertEquals(1, avgLabeledFunction.getCount().get("avg5"), 0.0); Assert.assertEquals(1, avgLabeledFunction.getCount().get("avg15"), 0.0); - stopSocketClient(); } } diff --git a/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/protocol/ZabbixProtocolHandlerTest.java b/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/protocol/ZabbixProtocolHandlerTest.java index 749699f8c9d4ce582488469965115f70fb4e5bc1..a92c142fcbd44d60e39e49f87c3d119da9e4125d 100644 --- a/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/protocol/ZabbixProtocolHandlerTest.java +++ b/oap-server/server-receiver-plugin/skywalking-zabbix-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zabbix/provider/protocol/ZabbixProtocolHandlerTest.java @@ -47,20 +47,15 @@ public class ZabbixProtocolHandlerTest extends ZabbixBaseTest { */ @Test public void testReceive() throws Throwable { - startupSocketClient(); // Verify Active Checks - socketClient.writeZabbixMessage("{\"request\":\"active checks\",\"host\":\"zabbix-test-agent\"}"); - String activeChecksRespData = socketClient.waitAndGetResponsePayload(); + writeZabbixMessage("{\"request\":\"active checks\",\"host\":\"zabbix-test-agent\"}"); assertZabbixActiveChecksRequest(0, "zabbix-test-agent"); - assertZabbixActiveChecksResponse(activeChecksRespData, "system.cpu.load[all,avg15]"); + assertZabbixActiveChecksResponse(0, "system.cpu.load[all,avg15]"); // Verify Agent data - socketClient.writeZabbixMessage("{\"request\":\"agent data\",\"session\":\"f32425dc61971760bf791f731931a92e\",\"data\":[{\"host\":\"zabbix-test-agent\",\"key\":\"system.cpu.load[all,avg15]\",\"value\":\"1.123\",\"id\":2,\"clock\":1609588563,\"ns\":87682907}],\"clock\":1609588568,\"ns\":102244476}"); - String agentDataRespData = socketClient.waitAndGetResponsePayload(); + writeZabbixMessage("{\"request\":\"agent data\",\"session\":\"f32425dc61971760bf791f731931a92e\",\"data\":[{\"host\":\"zabbix-test-agent\",\"key\":\"system.cpu.load[all,avg15]\",\"value\":\"1.123\",\"id\":2,\"clock\":1609588563,\"ns\":87682907}],\"clock\":1609588568,\"ns\":102244476}"); assertZabbixAgentDataRequest(1, "zabbix-test-agent", "system.cpu.load[all,avg15]"); - assertZabbixAgentDataResponse(agentDataRespData); - - stopSocketClient(); + assertZabbixAgentDataResponse(2); } /** @@ -69,7 +64,7 @@ public class ZabbixProtocolHandlerTest extends ZabbixBaseTest { @Test public void testErrorProtocol() throws Throwable { // Simple header - for (int i = 1; i < 5; i++) { + for (int i = 1; i < 4; i++) { assertNeedMoreInput(new byte[i]); } @@ -80,10 +75,13 @@ public class ZabbixProtocolHandlerTest extends ZabbixBaseTest { assertWriteErrorProtocol(new byte[] {'Z', 'B', 'X', 'D', 2, 0, 0, 0, 0}); assertWriteErrorProtocol(new byte[] {'Z', 'B', 'X', 'D', 2, 1, 0, 0, 0}); + // Need more content + assertNeedMoreInput(new byte[] {'Z', 'B', 'X', 'D', 1, 5, 0, 0, 0, 1, 1, 1}); + // Empty data - assertWriteErrorProtocol(SocketClient.buildZabbixRequestData("")); - assertWriteErrorProtocol(SocketClient.buildZabbixRequestData("{}")); - assertWriteErrorProtocol(SocketClient.buildZabbixRequestData("{\"test\": 1}")); + assertWriteErrorProtocol(buildZabbixRequestData("")); + assertWriteErrorProtocol(buildZabbixRequestData("{}")); + assertWriteErrorProtocol(buildZabbixRequestData("{\"test\": 1}")); } }