提交 64e6c03e 编写于 作者: V Vlad Ilyushchenko

NET: Fixed issue with HTTP context. It wasn't reading socket fully, which...

NET: Fixed issue with HTTP context. It wasn't reading socket fully, which caused some peer disconnected to go unnoticed. Rewritten rolling file writer test, will migrate other tests to similar, deterministic test approach.
上级 34f5fea3
......@@ -23,6 +23,8 @@
package com.questdb.cutlass.http;
import com.questdb.log.Log;
import com.questdb.log.LogFactory;
import com.questdb.network.IOContext;
import com.questdb.network.IODispatcher;
import com.questdb.network.IOOperation;
......@@ -33,6 +35,8 @@ import com.questdb.std.Unsafe;
import com.questdb.std.str.DirectByteCharSequence;
public class HttpConnectionContext implements IOContext {
private static final Log LOG = LogFactory.getLog(HttpConnectionContext.class);
private final HttpHeaderParser headerParser;
private final long recvBuffer;
private final int recvBufferSize;
......@@ -74,6 +78,13 @@ public class HttpConnectionContext implements IOContext {
return headerParser;
}
public void clear() {
this.headerParser.clear();
this.multipartContentParser.clear();
this.multipartContentParser.clear();
this.csPool.clear();
}
public void handleClientOperation(int operation, NetworkFacade nf, IODispatcher<HttpConnectionContext> dispatcher, HttpRequestProcessorSelector selector) {
switch (operation) {
case IOOperation.READ:
......@@ -85,6 +96,16 @@ public class HttpConnectionContext implements IOContext {
}
}
private void checkRemainingInputAndCompleteRequest(NetworkFacade nf, IODispatcher<HttpConnectionContext> dispatcher, long fd, HttpRequestProcessor processor) {
int read;// consume and throw away the remainder of TCP input
read = nf.recv(fd, recvBuffer, 1);
if (read != 0) {
dispatcher.registerChannel(this, IOOperation.DISCONNECT);
} else {
processor.onRequestComplete(this, dispatcher);
}
}
private void handleClientRecv(
NetworkFacade nf,
IODispatcher<HttpConnectionContext> dispatcher,
......@@ -99,7 +120,9 @@ public class HttpConnectionContext implements IOContext {
while (headerParser.isIncomplete()) {
// read headers
read = nf.recv(fd, recvBuffer, recvBufferSize);
LOG.debug().$("recv [count=").$(read).$(']').$();
if (read < 0) {
LOG.debug().$("done").$();
// peer disconnect
dispatcher.registerChannel(this, IOOperation.CLEANUP);
return;
......@@ -127,7 +150,8 @@ public class HttpConnectionContext implements IOContext {
dispatcher.registerChannel(this, IOOperation.READ);
} else if (multipartProcessor) {
processor.onHeadersReady(this, dispatcher);
processor.onHeadersReady(this);
HttpMultipartContentListener multipartListener = (HttpMultipartContentListener) processor;
long bufferEnd = recvBuffer + read;
......@@ -148,10 +172,10 @@ public class HttpConnectionContext implements IOContext {
}
} while (!multipartContentParser.parse(recvBuffer, recvBuffer + read, multipartListener));
}
checkRemainingInputAndCompleteRequest(nf, dispatcher, fd, processor);
} else {
processor.onHeadersReady(this, dispatcher);
// todo: processor will decide what to do next
headerParser.clear();
processor.onHeadersReady(this);
checkRemainingInputAndCompleteRequest(nf, dispatcher, fd, processor);
}
} catch (HttpException e) {
e.printStackTrace();
......
......@@ -25,7 +25,8 @@ package com.questdb.cutlass.http;
import com.questdb.network.IODispatcher;
@FunctionalInterface
public interface HttpRequestProcessor {
void onHeadersReady(HttpConnectionContext connectionContext, IODispatcher<HttpConnectionContext> dispatcher);
void onHeadersReady(HttpConnectionContext connectionContext);
void onRequestComplete(HttpConnectionContext connectionContext, IODispatcher<HttpConnectionContext> dispatcher);
}
......@@ -227,10 +227,8 @@ public class IODispatcherLinux<C extends IOContext> extends SynchronizedJob impl
private boolean processRegistrations(long timestamp) {
long cursor;
boolean useful = false;
int offset = 0;
while ((cursor = interestSubSeq.next()) > -1) {
useful = true;
IOEvent<C> evt = interestQueue.get(cursor);
C context = evt.context;
int operation = evt.operation;
......@@ -275,7 +273,10 @@ public class IODispatcherLinux<C extends IOContext> extends SynchronizedJob impl
pending.set(r, context);
}
return useful;
if (offset > 0) {
LOG.debug().$("reg").$();
}
return offset > 0;
}
private void publishOperation(int operation, C context) {
......@@ -295,6 +296,7 @@ public class IODispatcherLinux<C extends IOContext> extends SynchronizedJob impl
int offset = 0;
if (n > 0) {
// check all activated FDs
LOG.debug().$("epoll [n=").$(n).$(']').$();
for (int i = 0; i < n; i++) {
epoll.setOffset(offset);
offset += EpollAccessor.SIZEOF_EVENT;
......
......@@ -35,7 +35,6 @@ import com.questdb.std.Unsafe;
import com.questdb.std.str.StringSink;
import com.questdb.test.tools.TestUtils;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
......@@ -156,7 +155,14 @@ public class IODispatcherTest {
}
}
)) {
HttpRequestProcessorSelector selector = url -> (connectionContext, dispatcher1) -> {
HttpRequestProcessorSelector selector = url -> new HttpRequestProcessor() {
@Override
public void onHeadersReady(HttpConnectionContext connectionContext) {
}
@Override
public void onRequestComplete(HttpConnectionContext connectionContext, IODispatcher<HttpConnectionContext> dispatcher1) {
}
};
AtomicBoolean serverRunning = new AtomicBoolean(true);
......@@ -241,7 +247,14 @@ public class IODispatcherTest {
}
}
)) {
HttpRequestProcessorSelector selector = url -> (connectionContext, dispatcher1) -> {
HttpRequestProcessorSelector selector = url -> new HttpRequestProcessor() {
@Override
public void onHeadersReady(HttpConnectionContext connectionContext) {
}
@Override
public void onRequestComplete(HttpConnectionContext connectionContext, IODispatcher<HttpConnectionContext> dispatcher) {
}
};
AtomicBoolean serverRunning = new AtomicBoolean(true);
......@@ -339,20 +352,25 @@ public class IODispatcherTest {
)) {
StringSink sink = new StringSink();
HttpRequestProcessorSelector selector = url -> (context, dispatcher1) -> {
HttpHeaders headers = context.getHeaders();
sink.put(headers.getMethodLine());
sink.put("\r\n");
ObjList<CharSequence> headerNames = headers.getHeaderNames();
for (int i = 0, n = headerNames.size(); i < n; i++) {
sink.put(headerNames.getQuick(i)).put(':');
sink.put(headers.getHeader(headerNames.getQuick(i)));
final HttpRequestProcessorSelector selector = url -> new HttpRequestProcessor() {
@Override
public void onHeadersReady(HttpConnectionContext context) {
HttpHeaders headers = context.getHeaders();
sink.put(headers.getMethodLine());
sink.put("\r\n");
ObjList<CharSequence> headerNames = headers.getHeaderNames();
for (int i = 0, n = headerNames.size(); i < n; i++) {
sink.put(headerNames.getQuick(i)).put(':');
sink.put(headers.getHeader(headerNames.getQuick(i)));
sink.put("\r\n");
}
sink.put("\r\n");
}
sink.put("\r\n");
dispatcher1.registerChannel(context, IOOperation.READ);
@Override
public void onRequestComplete(HttpConnectionContext context, IODispatcher<HttpConnectionContext> dispatcher1) {
dispatcher1.registerChannel(context, IOOperation.READ);
}
};
AtomicBoolean serverRunning = new AtomicBoolean(true);
......@@ -462,20 +480,25 @@ public class IODispatcherTest {
)) {
StringSink sink = new StringSink();
HttpRequestProcessorSelector selector = url -> (context, dispatcher1) -> {
HttpHeaders headers = context.getHeaders();
sink.put(headers.getMethodLine());
sink.put("\r\n");
ObjList<CharSequence> headerNames = headers.getHeaderNames();
for (int i = 0, n = headerNames.size(); i < n; i++) {
sink.put(headerNames.getQuick(i)).put(':');
sink.put(headers.getHeader(headerNames.getQuick(i)));
HttpRequestProcessorSelector selector = url -> new HttpRequestProcessor() {
@Override
public void onHeadersReady(HttpConnectionContext connectionContext) {
HttpHeaders headers = connectionContext.getHeaders();
sink.put(headers.getMethodLine());
sink.put("\r\n");
ObjList<CharSequence> headerNames = headers.getHeaderNames();
for (int i = 0, n = headerNames.size(); i < n; i++) {
sink.put(headerNames.getQuick(i)).put(':');
sink.put(headers.getHeader(headerNames.getQuick(i)));
sink.put("\r\n");
}
sink.put("\r\n");
}
sink.put("\r\n");
dispatcher1.registerChannel(context, IOOperation.READ);
@Override
public void onRequestComplete(HttpConnectionContext connectionContext, IODispatcher<HttpConnectionContext> dispatcher1) {
dispatcher1.registerChannel(connectionContext, IOOperation.READ);
}
};
AtomicBoolean serverRunning = new AtomicBoolean(true);
......@@ -540,7 +563,6 @@ public class IODispatcherTest {
}
@Test
@Ignore
// this test is ignore for the time being because it is unstable on OSX and I
// have not figured out the reason yet. I would like to see if this test
// runs any different on Linux, just to narrow the problem down to either
......@@ -572,7 +594,10 @@ public class IODispatcherTest {
"Cookie:textwrapon=false; textautoformat=false; wysiwyg=textarea\r\n" +
"\r\n";
int N = 1000;
final int N = 1000;
final int serverThreadCount = 2;
final int senderCount = 2;
TestUtils.assertMemoryLeak(() -> {
HttpServerConfiguration httpServerConfiguration = new DefaultHttpServerConfiguration();
......@@ -592,7 +617,7 @@ public class IODispatcherTest {
pubSeq.then(subSeq).then(pubSeq);
AtomicBoolean serverRunning = new AtomicBoolean(true);
int serverThreadCount = 1;
CountDownLatch serverHaltLatch = new CountDownLatch(serverThreadCount);
for (int j = 0; j < serverThreadCount; j++) {
new Thread(() -> {
......@@ -600,44 +625,56 @@ public class IODispatcherTest {
final long responseBuf = Unsafe.malloc(32);
Unsafe.getUnsafe().putByte(responseBuf, (byte) 'A');
HttpRequestProcessorSelector selector = url -> (context, dispatcher1) -> {
HttpHeaders headers = context.getHeaders();
sink.clear();
sink.put(headers.getMethodLine());
sink.put("\r\n");
ObjList<CharSequence> headerNames = headers.getHeaderNames();
for (int i = 0, n = headerNames.size(); i < n; i++) {
sink.put(headerNames.getQuick(i)).put(':');
sink.put(headers.getHeader(headerNames.getQuick(i)));
final HttpRequestProcessor processor = new HttpRequestProcessor() {
@Override
public void onHeadersReady(HttpConnectionContext context) {
HttpHeaders headers = context.getHeaders();
sink.clear();
sink.put(headers.getMethodLine());
sink.put("\r\n");
ObjList<CharSequence> headerNames = headers.getHeaderNames();
for (int i = 0, n = headerNames.size(); i < n; i++) {
sink.put(headerNames.getQuick(i)).put(':');
sink.put(headers.getHeader(headerNames.getQuick(i)));
sink.put("\r\n");
}
sink.put("\r\n");
}
sink.put("\r\n");
boolean result;
try {
TestUtils.assertEquals(expected, sink);
result = true;
} catch (Exception e) {
result = false;
}
boolean result;
try {
TestUtils.assertEquals(expected, sink);
result = true;
} catch (Exception e) {
result = false;
}
while (true) {
long cursor = pubSeq.next();
if (cursor < 0) {
continue;
while (true) {
long cursor = pubSeq.next();
if (cursor < 0) {
continue;
}
queue.get(cursor).valid = result;
pubSeq.done(cursor);
break;
}
queue.get(cursor).valid = result;
pubSeq.done(cursor);
break;
}
requestsReceived.incrementAndGet();
requestsReceived.incrementAndGet();
nf.send(context.getFd(), responseBuf, 1);
nf.send(context.getFd(), responseBuf, 1);
}
dispatcher1.registerChannel(context, IOOperation.READ);
@Override
public void onRequestComplete(HttpConnectionContext connectionContext, IODispatcher<HttpConnectionContext> dispatcher) {
connectionContext.clear();
// there is interesting situation here, its possible that header is fully
// read and there are either more bytes or disconnect lingering
dispatcher.registerChannel(connectionContext, IOOperation.READ);
}
};
HttpRequestProcessorSelector selector = url -> processor;
while (serverRunning.get()) {
dispatcher.run();
dispatcher.processIOQueue(
......@@ -650,36 +687,38 @@ public class IODispatcherTest {
}).start();
}
new Thread(() -> {
for (int i = 0; i < N; i++) {
long fd = Net.socketTcp(true);
for (int j = 0; j < senderCount; j++) {
new Thread(() -> {
long sockAddr = Net.sockaddr("127.0.0.1", 9001);
try {
long sockAddr = Net.sockaddr("127.0.0.1", 9001);
try {
Assert.assertTrue(fd > -1);
Assert.assertEquals(0, Net.connect(fd, sockAddr));
int len = request.length();
long buffer = TestUtils.toMemory(request);
for (int i = 0; i < N; i++) {
LOG.info().$("i=").$(i).$();
long fd = Net.socketTcp(true);
try {
Assert.assertEquals(len, Net.send(fd, buffer, len));
Assert.assertEquals("fd=" + fd + ", i=" + i, 1, Net.recv(fd, buffer, 1));
Assert.assertEquals('A', Unsafe.getUnsafe().getByte(buffer));
Assert.assertTrue(fd > -1);
Assert.assertEquals(0, Net.connect(fd, sockAddr));
int len = request.length();
long buffer = TestUtils.toMemory(request);
try {
Assert.assertEquals(len, Net.send(fd, buffer, len));
Assert.assertEquals("fd=" + fd + ", i=" + i, 1, Net.recv(fd, buffer, 1));
Assert.assertEquals('A', Unsafe.getUnsafe().getByte(buffer));
} finally {
Unsafe.free(buffer, len);
}
} finally {
Unsafe.free(buffer, len);
Net.close(fd);
}
} finally {
Net.freeSockAddr(sockAddr);
}
} finally {
Net.close(fd);
Net.freeSockAddr(sockAddr);
}
}
}).start();
}).start();
}
int receiveCount = 0;
while (receiveCount < N) {
while (receiveCount < N * senderCount) {
long cursor = subSeq.next();
if (cursor < 0) {
continue;
......@@ -693,7 +732,7 @@ public class IODispatcherTest {
serverRunning.set(false);
serverHaltLatch.await();
}
Assert.assertEquals(N, requestsReceived.get());
Assert.assertEquals(N * senderCount, requestsReceived.get());
});
}
......
......@@ -23,6 +23,10 @@
package com.questdb.log;
import com.questdb.mp.RingQueue;
import com.questdb.mp.SCSequence;
import com.questdb.mp.SOCountDownLatch;
import com.questdb.mp.SPSequence;
import com.questdb.std.*;
import com.questdb.std.microtime.DateFormatUtils;
import com.questdb.std.microtime.MicrosecondClock;
......@@ -36,6 +40,8 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
public class LogFactoryTest {
......@@ -258,40 +264,73 @@ public class LogFactoryTest {
String logFile = base + "mylog-${date:yyyy-MM-dd}.log";
String expectedLogFile = base + "mylog-2015-05-03.log";
try (LogFactory factory = new LogFactory()) {
final MicrosecondClock clock = new TestMicrosecondClock(DateFormatUtils.parseDateTime("2015-05-03T10:35:00.000Z"), 1);
final MicrosecondClock clock = new TestMicrosecondClock(DateFormatUtils.parseDateTime("2015-05-03T10:35:00.000Z"), 1);
try (Path path = new Path()) {
// create rogue file that would be in a way of logger rolling existing files
path.of(base);
Assert.assertTrue(com.questdb.std.Files.touch(path.concat("mylog-2015-05-03.log.2").$()));
}
try (Path path = new Path()) {
// create rogue file that would be in a way of logger rolling existing files
path.of(base);
Assert.assertTrue(com.questdb.std.Files.touch(path.concat("mylog-2015-05-03.log.2").$()));
}
factory.add(new LogWriterConfig(LogLevel.LOG_LEVEL_INFO, (ring, seq, level) -> {
LogRollingFileWriter w = new LogRollingFileWriter(FilesFacadeImpl.INSTANCE, clock, ring, seq, level);
w.setLocation(logFile);
// 1Mb log file limit, we will create 4 of them
w.setRollSize("1m");
w.setBufferSize("1k");
w.setRollEvery("day");
return w;
}));
RingQueue<LogRecordSink> queue = new RingQueue<>(() -> new LogRecordSink(1024), 1024);
factory.bind();
factory.startThread();
SPSequence pubSeq = new SPSequence(queue.getCapacity());
SCSequence subSeq = new SCSequence();
pubSeq.then(subSeq).then(pubSeq);
try {
Log logger = factory.create("x");
for (int i = 0; i < 1000000; i++) {
logger.xinfo().$("test ").$(' ').$(i).$();
try (final LogRollingFileWriter writer = new LogRollingFileWriter(
FilesFacadeImpl.INSTANCE,
clock,
queue,
subSeq,
LogLevel.LOG_LEVEL_INFO
)) {
writer.setLocation(logFile);
writer.setRollSize("1m");
writer.setBufferSize("64k");
writer.bindProperties();
AtomicBoolean running = new AtomicBoolean(true);
SOCountDownLatch halted = new SOCountDownLatch();
halted.setCount(1);
new Thread(() -> {
while (running.get()) {
writer.runSerially();
}
// logger is async, we need to let it finish writing
Thread.sleep(4000);
while (writer.runSerially()) ;
} finally {
factory.haltThread();
halted.countDown();
}).start();
// now publish
int published = 0;
int toPublish = 1_000_000;
while (published < toPublish) {
long cursor = pubSeq.next();
if (cursor < 0) {
LockSupport.parkNanos(1);
continue;
}
final long available = pubSeq.available();
while (cursor < available && published < toPublish) {
LogRecordSink sink = queue.get(cursor++);
sink.setLevel(LogLevel.LOG_LEVEL_INFO);
sink.put("test");
published++;
}
pubSeq.done(cursor - 1);
}
running.set(false);
halted.await();
}
assertFileLength(expectedLogFile);
assertFileLength(expectedLogFile + ".1");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册