未验证 提交 05a9367f 编写于 作者: A Alex Pelagenko 提交者: GitHub

fix(pgwire): fix pg wire network send error (#3681)

上级 c9c85f11
......@@ -158,6 +158,17 @@ public class NetworkSqlExecutionCircuitBreaker implements SqlExecutionCircuitBre
}
}
public void statefulThrowExceptionIfTimeout() {
// Same as statefulThrowExceptionIfTripped but does not check the connection state.
// Useful to check timeout before trying to send something on the connection.
if (testCount < throttle) {
testCount++;
} else {
testCount = 0;
testTimeout();
}
}
@Override
public void statefulThrowExceptionIfTrippedNoThrottle() {
testCount = 0;
......
......@@ -30,8 +30,8 @@ import io.questdb.cairo.*;
import io.questdb.cairo.pool.WriterSource;
import io.questdb.cairo.security.DenyAllSecurityContext;
import io.questdb.cairo.security.SecurityContextFactory;
import io.questdb.cairo.sql.*;
import io.questdb.cairo.sql.Record;
import io.questdb.cairo.sql.*;
import io.questdb.cutlass.auth.Authenticator;
import io.questdb.cutlass.auth.AuthenticatorException;
import io.questdb.cutlass.text.TextLoader;
......@@ -2453,6 +2453,21 @@ public class PGConnectionContext extends IOContext<PGConnectionContext> implemen
responseAsciiSink.reset();
}
private void sendAndResetWait() throws PeerDisconnectedException {
// This is simplified waited send for very limited use cases where introducing another state is an overkill.
// This method busy waits to send buffer.
while (true) {
try {
doSend(0, (int) (sendBufferPtr - sendBuffer));
break;
} catch (PeerIsSlowToReadException e) {
Os.sleep(1);
circuitBreaker.statefulThrowExceptionIfTimeout();
}
}
responseAsciiSink.reset();
}
// This method is currently unused. it's used for the COPY sub-protocol, which is currently not implemented.
// It's left here so when we add the sub-protocol later we won't need to reimplemented it.
// We could keep it just in git history, but chances are nobody would recall to search for it there
......@@ -2557,6 +2572,9 @@ public class PGConnectionContext extends IOContext<PGConnectionContext> implemen
}
prepareCommandComplete(true);
} else {
if (sendBufferLimit - sendBufferPtr < PROTOCOL_TAIL_COMMAND_LENGTH) {
sendAndResetWait();
}
prepareSuspended();
// Prevents re-sending current record row when buffer is sent fully.
resumeProcessor = null;
......@@ -2703,9 +2721,13 @@ public class PGConnectionContext extends IOContext<PGConnectionContext> implemen
}
}
void prepareReadyForQuery() {
void prepareReadyForQuery() throws PeerDisconnectedException {
if (sendRNQ) {
LOG.debug().$("RNQ sent").$();
if (sendBufferLimit - sendBufferPtr < PROTOCOL_TAIL_COMMAND_LENGTH) {
sendAndResetWait();
}
responseAsciiSink.put(MESSAGE_TYPE_READY_FOR_QUERY);
responseAsciiSink.putNetworkInt(Integer.BYTES + Byte.BYTES);
switch (transactionState) {
......
......@@ -77,9 +77,6 @@
</goals>
</execution>
</executions>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</plugin>
</plugins>
<pluginManagement>
......
......@@ -44,12 +44,14 @@ import io.questdb.network.DefaultIODispatcherConfiguration;
import io.questdb.network.IODispatcherConfiguration;
import io.questdb.std.Chars;
import io.questdb.std.Files;
import io.questdb.std.Os;
import io.questdb.std.str.Path;
import org.junit.*;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class Table2IlpTest {
private static final int ILP_PORT = 9909;
......@@ -64,6 +66,25 @@ public class Table2IlpTest {
private static SqlExecutionContextImpl sqlExecutionContext;
private static WorkerPool workerPool;
public static void assertEventually(Runnable assertion, int timeoutSeconds) {
long maxSleepingTimeMillis = 1000;
long nextSleepingTimeMillis = 10;
long startTime = System.nanoTime();
long deadline = startTime + TimeUnit.SECONDS.toNanos(timeoutSeconds);
for (; ; ) {
try {
assertion.run();
return;
} catch (AssertionError error) {
if (System.nanoTime() >= deadline) {
throw error;
}
}
Os.sleep(nextSleepingTimeMillis);
nextSleepingTimeMillis = Math.min(maxSleepingTimeMillis, nextSleepingTimeMillis << 1);
}
}
public static void createTestPath(CharSequence root) {
try (Path path = new Path().of(root).$()) {
if (Files.exists(path)) {
......@@ -92,7 +113,6 @@ public class Table2IlpTest {
engine = new CairoEngine(configuration);
}
@BeforeClass
public static void setUpStatic() throws SqlException {
setCairoStatic();
......@@ -106,6 +126,11 @@ public class Table2IlpTest {
null);
bindVariableService.clear();
final PGWireConfiguration conf = new DefaultPGWireConfiguration() {
@Override
public int getSendBufferSize() {
return 512;
}
@Override
public int getWorkerCount() {
return 3;
......@@ -166,7 +191,7 @@ public class Table2IlpTest {
@Test
public void copyAllColumnTypes() throws SqlException, InterruptedException {
String tableNameSrc = "src";
createTable(tableNameSrc, 20000);
createTable(tableNameSrc, 40_000);
String tableNameDst = "dst";
createTable(tableNameDst, 1);
......@@ -192,7 +217,13 @@ public class Table2IlpTest {
new Table2IlpCopier().copyTable(params);
done.await();
TestUtils.assertEquals(engine, sqlExecutionContext, tableNameSrc, tableNameDst);
assertEventually(() -> {
try {
TestUtils.assertEquals(engine, sqlExecutionContext, tableNameSrc, tableNameDst);
} catch (SqlException e) {
throw new RuntimeException(e);
}
}, 60);
}
@Test
......@@ -224,7 +255,13 @@ public class Table2IlpTest {
Assert.assertEquals(10568 - 189, rowsSent);
done.await();
TestUtils.assertEquals(engine, sqlExecutionContext, sourceQuery, tableNameDst);
assertEventually(() -> {
try {
TestUtils.assertEquals(engine, sqlExecutionContext, sourceQuery, tableNameDst);
} catch (SqlException e) {
throw new RuntimeException(e);
}
}, 60);
}
@Before
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册