提交 2e2ac8cc 编写于 作者: V Vlad Ilyushchenko

feat: on Linux platform server will use recvmmsg() for influx protocol

上级 1cf87c7a
......@@ -27,6 +27,7 @@ package io.questdb;
import io.questdb.cairo.CairoEngine;
import io.questdb.cutlass.http.HttpServer;
import io.questdb.cutlass.line.udp.GenericLineProtoReceiver;
import io.questdb.cutlass.line.udp.LinuxLineProtoReceiver;
import io.questdb.cutlass.pgwire.PGWireServer;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
......@@ -136,12 +137,23 @@ public class ServerMain {
cairoEngine
);
final Closeable lineProtocolReceiver = GenericLineProtoReceiver.create(
configuration.getLineUdpReceiverConfiguration(),
workerPool,
log,
cairoEngine
);
final Closeable lineProtocolReceiver;
if (Os.type == Os.LINUX_AMD64 || Os.type == Os.LINUX_ARM64) {
lineProtocolReceiver = LinuxLineProtoReceiver.create(
configuration.getLineUdpReceiverConfiguration(),
workerPool,
log,
cairoEngine
);
} else {
lineProtocolReceiver = GenericLineProtoReceiver.create(
configuration.getLineUdpReceiverConfiguration(),
workerPool,
log,
cairoEngine
);
}
workerPool.start(log);
......
......@@ -24,6 +24,7 @@
package io.questdb.cutlass.line.udp;
import io.questdb.WorkerPoolAwareConfiguration;
import io.questdb.cairo.CairoEngine;
import io.questdb.cairo.CairoException;
import io.questdb.cutlass.line.CairoLineProtoParser;
......@@ -35,12 +36,13 @@ import io.questdb.mp.WorkerPool;
import io.questdb.network.Net;
import io.questdb.network.NetworkFacade;
import io.questdb.std.Misc;
import org.jetbrains.annotations.Nullable;
import java.io.Closeable;
public class LinuxLineProtoReceiver implements Closeable, Job {
private static final Log LOG = LogFactory.getLog(LinuxLineProtoReceiver.class);
private static final WorkerPoolAwareConfiguration.ServerFactory<LinuxLineProtoReceiver, LineUdpReceiverConfiguration> CREATE0 = LinuxLineProtoReceiver::create0;
private final int msgCount;
private final LineProtoLexer lexer;
private final CairoLineProtoParser parser;
......@@ -54,7 +56,6 @@ public class LinuxLineProtoReceiver implements Closeable, Job {
LineUdpReceiverConfiguration configuration,
CairoEngine engine,
WorkerPool workerPool
) {
nf = configuration.getNetworkFacade();
......@@ -82,14 +83,16 @@ public class LinuxLineProtoReceiver implements Closeable, Job {
parser = new CairoLineProtoParser(engine, configuration.getCairoSecurityContext());
lexer.withParser(parser);
LOG.info().
$("started [fd=").$(fd).
$(", bind=").$(configuration.getBindIPv4Address()).
$(", group=").$(configuration.getGroupIPv4Address()).
$(", port=").$(configuration.getPort()).
$(", batch=").$(msgCount).
$(", commitRate=").$(commitRate).
$(']').$();
LOG.info()
.$("receiving multicast from ")
.$ip(configuration.getGroupIPv4Address())
.$(':')
.$(configuration.getPort())
.$(" via ")
.$ip(configuration.getBindIPv4Address())
.$(" [fd=").$(fd)
.$(", commitRate=").$(commitRate)
.$(']').$();
workerPool.assign(this);
......@@ -109,6 +112,22 @@ public class LinuxLineProtoReceiver implements Closeable, Job {
}
}
@Nullable
public static LinuxLineProtoReceiver create(
LineUdpReceiverConfiguration configuration,
WorkerPool sharedWorkerPool,
Log log,
CairoEngine cairoEngine
) {
return WorkerPoolAwareConfiguration.create(
configuration,
sharedWorkerPool,
log,
cairoEngine,
CREATE0
);
}
@Override
public void close() {
if (fd > -1) {
......@@ -159,4 +178,8 @@ public class LinuxLineProtoReceiver implements Closeable, Job {
parser.commitAll();
return ran;
}
private static LinuxLineProtoReceiver create0(LineUdpReceiverConfiguration configuration, CairoEngine cairoEngine, WorkerPool workerPool, boolean local) {
return new LinuxLineProtoReceiver(configuration, cairoEngine, workerPool);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册