提交 cccb0988 编写于 作者: V Vlad Ilyushchenko

FIX: bugfix Number.appendHex, this was producing single digits for small...

FIX: bugfix Number.appendHex, this was producing single digits for small numbers. Test for straw man WireParser that would allow to stress test network behaviour
上级 daf96492
......@@ -28,13 +28,8 @@ import com.questdb.log.LogFactory;
import com.questdb.network.*;
import com.questdb.std.*;
import com.questdb.std.str.DirectByteCharSequence;
import com.questdb.std.str.StdoutSink;
import java.io.IOException;
public class HttpConnectionContext implements IOContext, Locality, Mutable {
// todo: remove or comment out eventually
public static final StdoutSink stdOutSink = new StdoutSink();
private static final Log LOG = LogFactory.getLog(HttpConnectionContext.class);
private final HttpHeaderParser headerParser;
private final long recvBuffer;
......@@ -51,6 +46,20 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
private long fd;
private HttpRequestProcessor resumeProcessor = null;
@Override
public void close() {
this.fd = -1;
csPool.clear();
multipartContentParser.close();
multipartContentHeaderParser.close();
responseSink.close();
headerParser.close();
localValueMap.close();
Unsafe.free(recvBuffer, recvBufferSize);
Unsafe.free(sendBuffer, configuration.getSendBufferSize());
LOG.debug().$("closed").$();
}
public HttpConnectionContext(HttpServerConfiguration configuration) {
this.configuration = configuration;
this.nf = configuration.getDispatcherConfiguration().getNetworkFacade();
......@@ -66,18 +75,6 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
LOG.debug().$("new").$();
}
public static void dump(long recvBuffer, int read) {
for (int i = 0; i < read; i++) {
Numbers.appendHex(stdOutSink, Unsafe.getUnsafe().getByte(recvBuffer + i) & 0xff);
}
stdOutSink.put('\n');
try {
stdOutSink.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void clear() {
LOG.debug().$("clear").$();
......@@ -90,16 +87,8 @@ public class HttpConnectionContext implements IOContext, Locality, Mutable {
}
@Override
public void close() {
csPool.clear();
multipartContentParser.close();
multipartContentHeaderParser.close();
responseSink.close();
headerParser.close();
localValueMap.close();
Unsafe.free(recvBuffer, recvBufferSize);
Unsafe.free(sendBuffer, configuration.getSendBufferSize());
LOG.debug().$("closed").$();
public boolean invalid() {
return this.fd == -1;
}
@Override
......
......@@ -221,6 +221,7 @@ public class HttpServer implements Closeable {
// it is possible that context is in transit (on a queue somewhere)
// and server shutdown is performed by a non-worker thread
// in this case we just close context
context.of(-1);
if (thread instanceof HttpServerWorker) {
((HttpServerWorker) thread).contextPool.push(context);
} else {
......
......@@ -26,6 +26,10 @@ package com.questdb.cutlass.pgwire;
import com.questdb.network.NetworkFacade;
public interface WireParserConfiguration {
default boolean getDumpNetworkTraffic() {
return false;
}
NetworkFacade getNetworkFacade();
int getRecvBufferSize();
......
......@@ -142,6 +142,14 @@ public abstract class AbstractIODispatcher<C extends IOContext> extends Synchron
return false;
}
@Override
public void disconnect(C context) {
final long cursor = disconnectPubSeq.nextBully();
assert cursor > -1;
disconnectQueue.get(cursor).context = context;
disconnectPubSeq.done(cursor);
}
protected void accept(long timestamp) {
while (true) {
// this accept is greedy, rather than to rely on epoll(or similar) to
......@@ -199,29 +207,12 @@ public abstract class AbstractIODispatcher<C extends IOContext> extends Synchron
}
protected void logSuccess(IODispatcherConfiguration configuration) {
LOG.info()
.$("listening on ")
.$(configuration.getBindIPv4Address()).$(':').$(configuration.getBindPort())
.$(" [fd=").$(serverFd).$(']').$();
}
protected abstract void pendingAdded(int index);
@Override
public void disconnect(C context) {
final long cursor = disconnectPubSeq.nextBully();
assert cursor > -1;
disconnectQueue.get(cursor).context = context;
disconnectPubSeq.done(cursor);
}
private void disconnectContext(IOEvent<C> event) {
doDisconnect(event.context);
}
protected void doDisconnect(C context) {
if (context == null) {
if (context == null || context.invalid()) {
return;
}
final long fd = context.getFd();
......@@ -234,6 +225,15 @@ public abstract class AbstractIODispatcher<C extends IOContext> extends Synchron
connectionCount.decrementAndGet();
}
protected void logSuccess(IODispatcherConfiguration configuration) {
LOG.info()
.$("listening on ")
.$(configuration.getBindIPv4Address()).$(':').$(configuration.getBindPort())
.$(" [fd=").$(serverFd).$(']').$();
}
protected abstract void pendingAdded(int index);
protected void processDisconnects() {
disconnectSubSeq.consumeAll(disconnectQueue, this.disconnectContextRef);
}
......
......@@ -30,4 +30,6 @@ public interface IOContext extends Closeable {
void close();
long getFd();
boolean invalid();
}
......@@ -25,6 +25,9 @@ package com.questdb.network;
import com.questdb.std.*;
import com.questdb.std.str.CharSink;
import com.questdb.std.str.StdoutSink;
import java.io.IOException;
public final class Net {
......@@ -88,6 +91,20 @@ public final class Net {
public native static long connect(long fd, long sockaddr);
public static void dump(long buffer, int len) {
if (len > 0) {
for (int i = 0; i < len; i++) {
Numbers.appendHex(StdoutSink.INSTANCE, Unsafe.getUnsafe().getByte(buffer + i) & 0xff);
}
StdoutSink.INSTANCE.put('\n');
try {
StdoutSink.INSTANCE.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static native void freeMsgHeaders(long msgHeaders);
public native static void freeSockAddr(long sockaddr);
......
......@@ -231,11 +231,13 @@ public final class Numbers {
}
int c;
if (i < 0x10) {
sink.put('0');
sink.put(hexDigits[i]);
} else if (i < 0x100) { // two
sink.put(hexDigits[i / 0x10]);
sink.put(hexDigits[i % 0x10]);
} else if (i < 0x1000) { // three
sink.put('0');
sink.put(hexDigits[i / 0x100]);
sink.put(hexDigits[(c = i % 0x100) / 0x10]);
sink.put(hexDigits[c % 0x10]);
......@@ -245,6 +247,7 @@ public final class Numbers {
sink.put(hexDigits[(c = c % 0x100) / 0x10]);
sink.put(hexDigits[c % 0x10]);
} else if (i < 0x100000) { // five
sink.put('0');
sink.put(hexDigits[i / 0x10000]);
sink.put(hexDigits[(c = i % 0x10000) / 0x1000]);
sink.put(hexDigits[(c = c % 0x1000) / 0x100]);
......@@ -258,6 +261,7 @@ public final class Numbers {
sink.put(hexDigits[(c = c % 0x100) / 0x10]);
sink.put(hexDigits[c % 0x10]);
} else if (i < 0x10000000) { // seven
sink.put('0');
sink.put(hexDigits[i / 0x1000000]);
sink.put(hexDigits[(c = i % 0x1000000) / 0x100000]);
sink.put(hexDigits[(c = c % 0x100000) / 0x10000]);
......
......@@ -27,7 +27,10 @@ import java.io.FileDescriptor;
import java.io.FileOutputStream;
import java.io.IOException;
public class StdoutSink extends FlexBufferSink {
public final class StdoutSink extends FlexBufferSink {
public static final StdoutSink INSTANCE = new StdoutSink();
private final FileOutputStream out;
public StdoutSink() {
......
......@@ -337,7 +337,7 @@ public class IODispatcherTest {
"Transfer-Encoding: chunked\r\n" +
"Content-Type: text/plain; charset=utf-8\r\n" +
"\r\n" +
"442\r\n" +
"0442\r\n" +
"+---------------------------------------------------------------------------------------------------------------+\r\n" +
"| Location: | fhv_tripdata_2017-02.csv | Pattern | Locale | Errors |\r\n" +
"| Partition by | NONE | | | |\r\n" +
......@@ -352,7 +352,7 @@ public class IODispatcherTest {
"| 4 | 0 |\r\n" +
"+---------------------------------------------------------------------------------------------------------------+\r\n" +
"\r\n" +
"0\r\n" +
"00\r\n" +
"\r\n").getBytes();
......@@ -501,7 +501,7 @@ public class IODispatcherTest {
"Transfer-Encoding: chunked\r\n" +
"Content-Type: text/plain; charset=utf-8\r\n" +
"\r\n" +
"442\r\n" +
"0442\r\n" +
"+---------------------------------------------------------------------------------------------------------------+\r\n" +
"| Location: | fhv_tripdata_2017-02.csv | Pattern | Locale | Errors |\r\n" +
"| Partition by | NONE | | | |\r\n" +
......@@ -516,7 +516,7 @@ public class IODispatcherTest {
"| 4 | 0 |\r\n" +
"+---------------------------------------------------------------------------------------------------------------+\r\n" +
"\r\n" +
"0\r\n" +
"00\r\n" +
"\r\n").getBytes();
......@@ -640,7 +640,7 @@ public class IODispatcherTest {
"Transfer-Encoding: chunked\r\n" +
"Content-Type: text/plain; charset=utf-8\r\n" +
"\r\n" +
"442\r\n" +
"0442\r\n" +
"+---------------------------------------------------------------------------------------------------------------+\r\n" +
"| Location: | fhv_tripdata_2017-02.csv | Pattern | Locale | Errors |\r\n" +
"| Partition by | NONE | | | |\r\n" +
......@@ -655,7 +655,7 @@ public class IODispatcherTest {
"| 4 | 0 |\r\n" +
"+---------------------------------------------------------------------------------------------------------------+\r\n" +
"\r\n" +
"0\r\n" +
"00\r\n" +
"\r\n").getBytes();
......@@ -753,9 +753,9 @@ public class IODispatcherTest {
"Content-Type: application/json; charset=utf-8\r\n" +
"Keep-Alive: timeout=5, max=10000\r\n" +
"\r\n" +
"224\r\n" +
"0224\r\n" +
"{\"query\":\"x where i = ('EHNRX')\",\"columns\":[{\"name\":\"a\",\"type\":\"BYTE\"},{\"name\":\"b\",\"type\":\"SHORT\"},{\"name\":\"c\",\"type\":\"INT\"},{\"name\":\"d\",\"type\":\"LONG\"},{\"name\":\"e\",\"type\":\"DATE\"},{\"name\":\"f\",\"type\":\"TIMESTAMP\"},{\"name\":\"g\",\"type\":\"FLOAT\"},{\"name\":\"h\",\"type\":\"DOUBLE\"},{\"name\":\"i\",\"type\":\"STRING\"},{\"name\":\"j\",\"type\":\"SYMBOL\"},{\"name\":\"k\",\"type\":\"BOOLEAN\"},{\"name\":\"l\",\"type\":\"BINARY\"}],\"dataset\":[[80,24814,-727724771,8920866532787660373,\"-169665660-01-09T01:58:28.119Z\",\"-51129-02-11T06:38:29.397464Z\",null,null,\"EHNRX\",\"ZSX\",false,[]]],\"count\":1}\r\n" +
"0\r\n" +
"00\r\n" +
"\r\n").getBytes();
sendAndReceive(
......@@ -907,7 +907,7 @@ public class IODispatcherTest {
",24045,-2102123220,-7175695171900374773,\"-242871073-08-17T14:45:16.399Z\",\"125517-01-13T08:03:16.581566Z\",0.20179749,0.4293443705\r\n" +
"25\r\n" +
",\"USIMY\",\"XUU\",false,[]]],\"count\":30}\r\n" +
"0\r\n" +
"00\r\n" +
"\r\n").getBytes();
sendAndReceive(nf, request, expectedResponse, 10, 100L);
......@@ -980,7 +980,7 @@ public class IODispatcherTest {
"\r\n" +
"4d\r\n" +
"{\"query\":\"x where2 i = ('EHNRX')\",\"error\":\"unexpected token: i\",\"position\":9}\r\n" +
"0\r\n" +
"00\r\n" +
"\r\n").getBytes();
sendAndReceive(
......@@ -1222,10 +1222,10 @@ public class IODispatcherTest {
"Transfer-Encoding: chunked\r\n" +
"Content-Type: text/html; charset=utf-8\r\n" +
"\r\n" +
"b\r\n" +
"0b\r\n" +
"Not Found\r\n" +
"\r\n" +
"0\r\n" +
"00\r\n" +
"\r\n";
......@@ -1385,10 +1385,10 @@ public class IODispatcherTest {
"Transfer-Encoding: chunked\r\n" +
"Content-Type: text/html; charset=utf-8\r\n" +
"\r\n" +
"b\r\n" +
"0b\r\n" +
"Not Found\r\n" +
"\r\n" +
"0\r\n" +
"00\r\n" +
"\r\n";
......@@ -1611,10 +1611,10 @@ public class IODispatcherTest {
"Transfer-Encoding: chunked\r\n" +
"Content-Type: text/html; charset=utf-8\r\n" +
"\r\n" +
"4\r\n" +
"04\r\n" +
"OK\r\n" +
"\r\n" +
"0\r\n" +
"00\r\n" +
"\r\n";
TestUtils.assertMemoryLeak(() -> {
......@@ -1720,7 +1720,7 @@ public class IODispatcherTest {
Assert.assertEquals(len, Net.send(fd, buffer, len));
// read response we expect
StringSink sink2 = new StringSink();
final int expectedLen = 158;
final int expectedLen = expectedResponse.length();
int read = 0;
while (read < expectedLen) {
int n = Net.recv(fd, buffer, len);
......@@ -2396,6 +2396,11 @@ public class IODispatcherTest {
public long getFd() {
return fd;
}
@Override
public boolean invalid() {
return false;
}
}
class Status {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册