提交 4815a0f9 编写于 作者: V Vlad Ilyushchenko

Misc: fixed similar network errors in Windows. Removed duplicate NetFacade....

Misc: fixed similar network errors in Windows. Removed duplicate NetFacade. Replaced usages of Os.errno() with interface.
上级 2e079f90
......@@ -182,27 +182,22 @@ JNIEXPORT jint JNICALL Java_com_questdb_network_Net_configureNonBlocking
return ioctlsocket((SOCKET) fd, FIONBIO, &mode);
}
jint convert_error(int n) {
SaveLastError();
JNIEXPORT jint JNICALL Java_com_questdb_network_Net_recv
(JNIEnv *e, jclass cl, jlong fd, jlong addr, jint len) {
const int n = recv((SOCKET) fd, (char *) addr, len, 0);
if (n > 0) {
return (jint) n;
return n;
}
switch (n) {
case 0:
return com_questdb_network_Net_EPEERDISCONNECT;
default:
if (WSAGetLastError() == WSAEWOULDBLOCK) {
return com_questdb_network_Net_ERETRY;
} else {
return com_questdb_network_Net_EOTHERDISCONNECT;
}
if (n == 0) {
return com_questdb_network_Net_EOTHERDISCONNECT;
}
}
JNIEXPORT jint JNICALL Java_com_questdb_network_Net_recv
(JNIEnv *e, jclass cl, jlong fd, jlong addr, jint len) {
return convert_error(recv((SOCKET) fd, (char *) addr, len, 0));
if (WSAGetLastError() == WSAEWOULDBLOCK) {
return com_questdb_network_Net_ERETRY;
}
return com_questdb_network_Net_EOTHERDISCONNECT;
}
JNIEXPORT jboolean JNICALL Java_com_questdb_network_Net_isDead
......@@ -213,7 +208,16 @@ JNIEXPORT jboolean JNICALL Java_com_questdb_network_Net_isDead
JNIEXPORT jint JNICALL Java_com_questdb_network_Net_send
(JNIEnv *e, jclass cl, jlong fd, jlong addr, jint len) {
return convert_error(send((SOCKET) fd, (const char *) addr, len, 0));
const int n = send((SOCKET) fd, (const char *) addr, len, 0);
if (n > -1) {
return n;
}
if (WSAGetLastError() == WSAEWOULDBLOCK) {
return com_questdb_network_Net_ERETRY;
}
return com_questdb_network_Net_EOTHERDISCONNECT;
}
JNIEXPORT jint JNICALL Java_com_questdb_network_Net_sendTo
......
......@@ -311,7 +311,7 @@ public class HttpResponseSink implements Closeable, Mutable {
int n = nf.send(fd, flushBuf + sent, flushBufSize - sent);
if (n < 0) {
// disconnected
LOG.info().$("disconnected [errno=").$(Os.errno()).$(']').$();
LOG.info().$("disconnected [errno=").$(nf.errno()).$(']').$();
throw PeerDisconnectedException.INSTANCE;
}
if (n == 0) {
......
......@@ -32,9 +32,8 @@ import com.questdb.cutlass.line.LineProtoLexer;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.mp.Job;
import com.questdb.network.NetworkFacade;
import com.questdb.std.Misc;
import com.questdb.std.NetFacade;
import com.questdb.std.Os;
import com.questdb.std.Unsafe;
import com.questdb.std.str.DirectByteCharSequence;
......@@ -46,7 +45,7 @@ public class GenericLineProtoReceiver implements Closeable, Job {
private final DirectByteCharSequence byteSequence = new DirectByteCharSequence();
private final LineProtoLexer lexer;
private final CairoLineProtoParser parser;
private final NetFacade nf;
private final NetworkFacade nf;
private final int bufLen;
private long fd;
private int commitRate;
......@@ -55,26 +54,26 @@ public class GenericLineProtoReceiver implements Closeable, Job {
public GenericLineProtoReceiver(ReceiverConfiguration receiverCfg, CairoConfiguration cairoCfg, ResourcePool<TableWriter> writerPool) {
nf = receiverCfg.getNetFacade();
nf = receiverCfg.getNetworkFacade();
fd = nf.socketUdp();
if (fd < 0) {
int errno = Os.errno();
int errno = nf.errno();
LOG.error().$("cannot open UDP socket [errno=").$(errno).$(']').$();
throw CairoException.instance(errno).put("Cannot open UDP socket");
}
try {
if (!nf.bindUdp(fd, receiverCfg.getPort())) {
int errno = Os.errno();
int errno = nf.errno();
LOG.error().$("cannot bind socket [errno=").$(errno).$(", fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", port=").$(receiverCfg.getPort()).$(']').$();
throw CairoException.instance(Os.errno()).put("Cannot bind to ").put(receiverCfg.getBindIPv4Address()).put(':').put(receiverCfg.getPort());
throw CairoException.instance(nf.errno()).put("Cannot bind to ").put(receiverCfg.getBindIPv4Address()).put(':').put(receiverCfg.getPort());
}
if (!nf.join(fd, receiverCfg.getBindIPv4Address(), receiverCfg.getGroupIPv4Address())) {
int errno = Os.errno();
int errno = nf.errno();
LOG.error().$("cannot join group [errno=").$(errno).$(", fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", group=").$(receiverCfg.getGroupIPv4Address()).$(']').$();
throw CairoException.instance(Os.errno()).put("Cannot join group ").put(receiverCfg.getGroupIPv4Address()).put(" [bindTo=").put(receiverCfg.getBindIPv4Address()).put(']');
throw CairoException.instance(nf.errno()).put("Cannot join group ").put(receiverCfg.getGroupIPv4Address()).put(" [bindTo=").put(receiverCfg.getBindIPv4Address()).put(']');
}
} catch (CairoException e) {
close();
......@@ -100,7 +99,7 @@ public class GenericLineProtoReceiver implements Closeable, Job {
public void close() {
if (fd > -1) {
if (nf.close(fd) != 0) {
LOG.error().$("failed to close [fd=").$(fd).$(", errno=").$(Os.errno()).$(']').$();
LOG.error().$("failed to close [fd=").$(fd).$(", errno=").$(nf.errno()).$(']').$();
} else {
LOG.info().$("closed [fd=").$(fd).$(']').$();
}
......
......@@ -33,9 +33,8 @@ import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.mp.Job;
import com.questdb.network.Net;
import com.questdb.network.NetworkFacade;
import com.questdb.std.Misc;
import com.questdb.std.NetFacade;
import com.questdb.std.Os;
import java.io.Closeable;
......@@ -45,7 +44,7 @@ public class LinuxLineProtoReceiver implements Closeable, Job {
private final int msgCount;
private final LineProtoLexer lexer;
private final CairoLineProtoParser parser;
private final NetFacade nf;
private final NetworkFacade nf;
private long fd;
private long msgVec;
private int commitRate;
......@@ -53,26 +52,26 @@ public class LinuxLineProtoReceiver implements Closeable, Job {
public LinuxLineProtoReceiver(ReceiverConfiguration receiverCfg, CairoConfiguration cairoCfg, ResourcePool<TableWriter> writerPool) {
nf = receiverCfg.getNetFacade();
nf = receiverCfg.getNetworkFacade();
fd = nf.socketUdp();
if (fd < 0) {
int errno = Os.errno();
int errno = nf.errno();
LOG.error().$("cannot open UDP socket [errno=").$(errno).$(']').$();
throw CairoException.instance(errno).put("Cannot open UDP socket");
}
try {
if (!nf.bindUdp(fd, receiverCfg.getPort())) {
int errno = Os.errno();
int errno = nf.errno();
LOG.error().$("cannot bind socket [errno=").$(errno).$(", fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", port=").$(receiverCfg.getPort()).$(']').$();
throw CairoException.instance(Os.errno()).put("Cannot bind to ").put(receiverCfg.getBindIPv4Address()).put(':').put(receiverCfg.getPort());
throw CairoException.instance(nf.errno()).put("Cannot bind to ").put(receiverCfg.getBindIPv4Address()).put(':').put(receiverCfg.getPort());
}
if (!nf.join(fd, receiverCfg.getBindIPv4Address(), receiverCfg.getGroupIPv4Address())) {
int errno = Os.errno();
int errno = nf.errno();
LOG.error().$("cannot join group [errno=").$(errno).$(", fd=").$(fd).$(", bind=").$(receiverCfg.getBindIPv4Address()).$(", group=").$(receiverCfg.getGroupIPv4Address()).$(']').$();
throw CairoException.instance(Os.errno()).put("Cannot join group ").put(receiverCfg.getGroupIPv4Address()).put(" [bindTo=").put(receiverCfg.getBindIPv4Address()).put(']');
throw CairoException.instance(nf.errno()).put("Cannot join group ").put(receiverCfg.getGroupIPv4Address()).put(" [bindTo=").put(receiverCfg.getBindIPv4Address()).put(']');
}
} catch (CairoException e) {
close();
......@@ -98,7 +97,7 @@ public class LinuxLineProtoReceiver implements Closeable, Job {
public void close() {
if (fd > -1) {
if (nf.close(fd) != 0) {
LOG.error().$("failed to close [fd=").$(fd).$(", errno=").$(Os.errno()).$(']').$();
LOG.error().$("failed to close [fd=").$(fd).$(", errno=").$(nf.errno()).$(']').$();
} else {
LOG.info().$("closed [fd=").$(fd).$(']').$();
}
......
......@@ -23,7 +23,7 @@
package com.questdb.cutlass.line.udp;
import com.questdb.std.NetFacade;
import com.questdb.network.NetworkFacade;
public interface ReceiverConfiguration {
......@@ -37,7 +37,7 @@ public interface ReceiverConfiguration {
int getMsgCount();
NetFacade getNetFacade();
NetworkFacade getNetworkFacade();
int getPort();
......
......@@ -274,7 +274,7 @@ public class LogRollingFileWriter extends SynchronizedJob implements Closeable,
buildUniquePath();
this.fd = ff.openAppend(path.$());
if (this.fd == -1) {
throw new LogError("[" + Os.errno() + "] Cannot open file for append: " + path);
throw new LogError("[" + ff.errno() + "] Cannot open file for append: " + path);
}
this.currentSize = ff.length(fd);
}
......
......@@ -26,7 +26,6 @@ package com.questdb.network;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.std.Files;
import com.questdb.std.Os;
import com.questdb.std.Unsafe;
import java.io.Closeable;
......@@ -84,7 +83,7 @@ public final class Epoll implements Closeable {
Unsafe.getUnsafe().putLong(events + EpollAccessor.DATA_OFFSET, 0);
if (epf.epollCtl(epollFd, EpollAccessor.EPOLL_CTL_ADD, sfd, events) != 0) {
throw NetworkError.instance(Os.errno(), "epoll_ctl");
throw NetworkError.instance(epf.errno(), "epoll_ctl");
}
}
......
......@@ -31,4 +31,6 @@ public interface EpollFacade {
int epollWait(long epfd, long eventPtr, int eventCount, int timeout);
NetworkFacade getNetworkFacade();
int errno();
}
......@@ -23,6 +23,8 @@
package com.questdb.network;
import com.questdb.std.Os;
public class EpollFacadeImpl implements EpollFacade {
public static final EpollFacadeImpl INSTANCE = new EpollFacadeImpl();
......@@ -45,4 +47,9 @@ public class EpollFacadeImpl implements EpollFacade {
public NetworkFacade getNetworkFacade() {
return NetworkFacadeImpl.INSTANCE;
}
@Override
public int errno() {
return Os.errno();
}
}
......@@ -84,4 +84,17 @@ public interface NetworkFacade {
int setReusePort(long fd);
int setTcpNoDelay(long fd, boolean noDelay);
int setRcvBuf(long fd, int size);
void freeMsgHeaders(long msgVec);
long getMMsgBuf(long msg);
long getMMsgBufLen(long msg);
long msgHeaders(int msgBufferSize, int msgCount);
@SuppressWarnings("SpellCheckingInspection")
int recvmmsg(long fd, long msgVec, int msgCount);
}
......@@ -138,7 +138,7 @@ public class NetworkFacadeImpl implements NetworkFacade {
@Override
public boolean join(long fd, int bindIPv4, int groupIPv4) {
return Net.join(fd, bindIPv4, bindIPv4);
return Net.join(fd, bindIPv4, groupIPv4);
}
@Override
......@@ -175,4 +175,34 @@ public class NetworkFacadeImpl implements NetworkFacade {
public int setTcpNoDelay(long fd, boolean noDelay) {
return Net.setTcpNoDelay(fd, noDelay);
}
@Override
public int setRcvBuf(long fd, int size) {
return Net.setRcvBuf(fd, size);
}
@Override
public void freeMsgHeaders(long msgVec) {
Net.freeMsgHeaders(msgVec);
}
@Override
public long getMMsgBuf(long msg) {
return Net.getMMsgBuf(msg);
}
@Override
public long getMMsgBufLen(long msg) {
return Net.getMMsgBufLen(msg);
}
@Override
public long msgHeaders(int msgBufferSize, int msgCount) {
return Net.msgHeaders(msgBufferSize, msgCount);
}
@Override
public int recvmmsg(long fd, long msgVec, int msgCount) {
return Net.recvmmsg(fd, msgVec, msgCount);
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.std;
public interface NetFacade {
boolean bindTcp(long fd, CharSequence IPv4Address, int port);
boolean bindUdp(long fd, int port);
int close(long fd);
void freeMsgHeaders(long msgVec);
long getMMsgBuf(long msg);
long getMMsgBufLen(long msg);
boolean join(long fd, CharSequence bindIPv4Address, CharSequence groupIPv4Address);
long msgHeaders(int msgBufferSize, int msgCount);
int recv(long fd, long buf, int bufLen);
@SuppressWarnings("SpellCheckingInspection")
int recvmmsg(long fd, long msgVec, int msgCount);
int setRcvBuf(long fd, int size);
long socketUdp();
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.std;
import com.questdb.network.Net;
public class NetFacadeImpl implements NetFacade {
public static final NetFacadeImpl INSTANCE = new NetFacadeImpl();
@Override
public boolean bindTcp(long fd, CharSequence IPv4Address, int port) {
return Net.bindTcp(fd, IPv4Address, port);
}
@Override
public int close(long fd) {
return Net.close(fd);
}
@Override
public void freeMsgHeaders(long msgVec) {
Net.freeMsgHeaders(msgVec);
}
@Override
public long getMMsgBuf(long msg) {
return Net.getMMsgBuf(msg);
}
@Override
public long getMMsgBufLen(long msg) {
return Net.getMMsgBufLen(msg);
}
@Override
public boolean join(long fd, CharSequence bindIPv4Address, CharSequence groupIPv4Address) {
return Net.join(fd, bindIPv4Address, groupIPv4Address);
}
@Override
public long msgHeaders(int msgBufferSize, int msgCount) {
return Net.msgHeaders(msgBufferSize, msgCount);
}
@Override
public int recv(long fd, long buf, int bufLen) {
return Net.recv(fd, buf, bufLen);
}
@Override
public int recvmmsg(long fd, long msgVec, int msgCount) {
return Net.recvmmsg(fd, msgVec, msgCount);
}
@Override
public int setRcvBuf(long fd, int size) {
return Net.setRcvBuf(fd, size);
}
@Override
public long socketUdp() {
return Net.socketUdp();
}
@Override
public boolean bindUdp(long fd, int port) {
return Net.bindUdp(fd, port);
}
}
......@@ -29,8 +29,11 @@ import com.questdb.mp.Job;
import com.questdb.mp.SOCountDownLatch;
import com.questdb.mp.Worker;
import com.questdb.network.Net;
import com.questdb.network.NetworkFacade;
import com.questdb.network.NetworkFacadeImpl;
import com.questdb.std.*;
import com.questdb.std.Misc;
import com.questdb.std.ObjHashSet;
import com.questdb.std.Os;
import com.questdb.std.str.StringSink;
import com.questdb.test.tools.TestUtils;
import org.junit.Assert;
......@@ -123,7 +126,7 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
private void assertCannotBindSocket(ReceiverFactory factory) throws Exception {
TestUtils.assertMemoryLeak(() -> {
NetFacade nf = new NetFacadeImpl() {
NetworkFacade nf = new NetworkFacadeImpl() {
@Override
public boolean bindUdp(long fd, int port) {
return false;
......@@ -131,7 +134,7 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
};
ReceiverConfiguration receiverCfg = new TestReceiverConfiguration() {
@Override
public NetFacade getNetFacade() {
public NetworkFacade getNetworkFacade() {
return nf;
}
};
......@@ -141,7 +144,7 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
private void assertCannotJoin(ReceiverFactory factory) throws Exception {
TestUtils.assertMemoryLeak(() -> {
NetFacade nf = new NetFacadeImpl() {
NetworkFacade nf = new NetworkFacadeImpl() {
@Override
public boolean join(long fd, CharSequence bindIPv4Address, CharSequence groupIPv4Address) {
return false;
......@@ -150,7 +153,7 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
};
ReceiverConfiguration receiverCfg = new TestReceiverConfiguration() {
@Override
public NetFacade getNetFacade() {
public NetworkFacade getNetworkFacade() {
return nf;
}
};
......@@ -161,7 +164,7 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
private void assertCannotOpenSocket(ReceiverFactory factory) throws Exception {
TestUtils.assertMemoryLeak(() -> {
NetFacade nf = new NetFacadeImpl() {
NetworkFacade nf = new NetworkFacadeImpl() {
@Override
public long socketUdp() {
return -1;
......@@ -169,7 +172,7 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
};
ReceiverConfiguration receiverCfg = new TestReceiverConfiguration() {
@Override
public NetFacade getNetFacade() {
public NetworkFacade getNetworkFacade() {
return nf;
}
};
......@@ -178,7 +181,7 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
}
private void assertCannotSetReceiveBuffer(ReceiverFactory factory) throws Exception {
NetFacade nf = new NetFacadeImpl() {
NetworkFacade nf = new NetworkFacadeImpl() {
@Override
public int setRcvBuf(long fd, int size) {
return -1;
......@@ -192,7 +195,7 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
}
@Override
public NetFacade getNetFacade() {
public NetworkFacade getNetworkFacade() {
return nf;
}
};
......@@ -325,8 +328,8 @@ public class LinuxLineProtoReceiverTest extends AbstractCairoTest {
}
@Override
public NetFacade getNetFacade() {
return NetFacadeImpl.INSTANCE;
public NetworkFacade getNetworkFacade() {
return NetworkFacadeImpl.INSTANCE;
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册