未验证 提交 ac10b006 编写于 作者: B Boyang Jerry Peng 提交者: GitHub

Fix: predicate pushdown for Pulsar SQL NPE (#4744)

* Fix: predicate pushdown for Pulsar SQL NPE

* fix unit test
上级 23dd38f3
......@@ -64,7 +64,7 @@ class OpFindNewest implements ReadEntryCallback {
switch (state) {
case checkFirst:
if (!condition.apply(entry)) {
callback.findEntryComplete(null, OpFindNewest.this.ctx);
callback.findEntryComplete(startPosition, OpFindNewest.this.ctx);
return;
} else {
lastMatchedPosition = position;
......
......@@ -1645,7 +1645,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
ledger.addEntry("not-expired".getBytes(Encoding));
ledger.addEntry("not-expired".getBytes(Encoding));
assertNull(
assertEquals(c1.readPosition,
c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))));
}
......@@ -2108,7 +2108,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);
ledger.addEntry(getEntryPublishTime("retained1"));
Position firstPosition = ledger.addEntry(getEntryPublishTime("retained1"));
// space apart message publish times
Thread.sleep(100);
ledger.addEntry(getEntryPublishTime("retained2"));
......@@ -2135,6 +2135,9 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
PositionImpl found = (PositionImpl) findPositionFromAllEntries(c1, timestamp);
assertEquals(found.getLedgerId(), ledgerId);
assertEquals(found.getEntryId(), expectedEntryId);
found = (PositionImpl) findPositionFromAllEntries(c1, 0);
assertEquals(found, firstPosition);
}
@Test(timeOut = 20000)
......
......@@ -154,7 +154,7 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
future = findMessage(result, c1, beginTimestamp);
future.get();
assertEquals(result.exception, null);
assertEquals(result.position, null);
assertEquals(result.position, c1.getFirstPosition());
result.reset();
future = findMessage(result, c1, endTimestamp);
......
......@@ -137,6 +137,13 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-jdbc</artifactId>
<version>${presto.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
......
......@@ -37,6 +37,7 @@ public class PrestoWorkerContainer extends PulsarContainer<PrestoWorkerContainer
-1,
PRESTO_HTTP_PORT,
"/v1/node");
}
@Override
......@@ -50,4 +51,8 @@ public class PrestoWorkerContainer extends PulsarContainer<PrestoWorkerContainer
);
}
}
public String getUrl() {
return String.format("%s:%s", getContainerIpAddress(), getMappedPort(PrestoWorkerContainer.PRESTO_HTTP_PORT));
}
}
......@@ -32,6 +32,15 @@ import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.LinkedList;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
@Slf4j
......@@ -84,7 +93,12 @@ public class TestBasicPresto extends PulsarTestSuite {
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
final String stocksTopic = "stocks";
String stocksTopic;
if (isBatched) {
stocksTopic = "stocks_batched";
} else {
stocksTopic = "stocks_nonbatched";
}
@Cleanup
Producer<Stock> producer = pulsarClient.newProducer(JSONSchema.of(Stock.class))
......@@ -96,6 +110,7 @@ public class TestBasicPresto extends PulsarTestSuite {
final Stock stock = new Stock(i,"STOCK_" + i , 100.0 + i * 10);
producer.send(stock);
}
producer.flush();
result = execQuery("show schemas in pulsar;");
assertThat(result.getExitCode()).isEqualTo(0);
......@@ -105,7 +120,7 @@ public class TestBasicPresto extends PulsarTestSuite {
assertThat(result.getExitCode()).isEqualTo(0);
assertThat(result.getStdout()).contains("stocks");
ContainerExecResult containerExecResult = execQuery("select * from pulsar.\"public/default\".stocks order by entryid;");
ContainerExecResult containerExecResult = execQuery(String.format("select * from pulsar.\"public/default\".%s order by entryid;", stocksTopic));
assertThat(containerExecResult.getExitCode()).isEqualTo(0);
log.info("select sql query output \n{}", containerExecResult.getStdout());
String[] split = containerExecResult.getStdout().split("\n");
......@@ -119,6 +134,67 @@ public class TestBasicPresto extends PulsarTestSuite {
assertThat(split2).contains("\"" + (100.0 + i * 10) + "\"");
}
// test predicate pushdown
String url = String.format("jdbc:presto://%s", pulsarCluster.getPrestoWorkerContainer().getUrl());
Connection connection = DriverManager.getConnection(url, "test", null);
String query = String.format("select * from pulsar" +
".\"public/default\".%s order by __publish_time__", stocksTopic);
log.info("Executing query: {}", query);
ResultSet res = connection.createStatement().executeQuery(query);
List<Timestamp> timestamps = new LinkedList<>();
while (res.next()) {
printCurrent(res);
timestamps.add(res.getTimestamp("__publish_time__"));
}
assertThat(timestamps.size()).isGreaterThan(NUM_OF_STOCKS - 2);
query = String.format("select * from pulsar" +
".\"public/default\".%s where __publish_time__ > timestamp '%s' order by __publish_time__", stocksTopic, timestamps.get(timestamps.size() / 2));
log.info("Executing query: {}", query);
res = connection.createStatement().executeQuery(query);
List<Timestamp> returnedTimestamps = new LinkedList<>();
while (res.next()) {
printCurrent(res);
returnedTimestamps.add(res.getTimestamp("__publish_time__"));
}
assertThat(returnedTimestamps.size()).isEqualTo(timestamps.size() / 2);
// Try with a predicate that has a earlier time than any entry
// Should return all rows
query = String.format("select * from pulsar" +
".\"public/default\".%s where __publish_time__ > from_unixtime(%s) order by __publish_time__", stocksTopic, 0);
log.info("Executing query: {}", query);
res = connection.createStatement().executeQuery(query);
returnedTimestamps = new LinkedList<>();
while (res.next()) {
printCurrent(res);
returnedTimestamps.add(res.getTimestamp("__publish_time__"));
}
assertThat(returnedTimestamps.size()).isEqualTo(timestamps.size());
// Try with a predicate that has a latter time than any entry
// Should return no rows
query = String.format("select * from pulsar" +
".\"public/default\".%s where __publish_time__ > from_unixtime(%s) order by __publish_time__", stocksTopic, 99999999999L);
log.info("Executing query: {}", query);
res = connection.createStatement().executeQuery(query);
returnedTimestamps = new LinkedList<>();
while (res.next()) {
printCurrent(res);
returnedTimestamps.add(res.getTimestamp("__publish_time__"));
}
assertThat(returnedTimestamps.size()).isEqualTo(0);
}
@AfterSuite
......@@ -137,4 +213,16 @@ public class TestBasicPresto extends PulsarTestSuite {
}
private static void printCurrent(ResultSet rs) throws SQLException {
ResultSetMetaData rsmd = rs.getMetaData();
int columnsNumber = rsmd.getColumnCount();
for (int i = 1; i <= columnsNumber; i++) {
if (i > 1) System.out.print(", ");
String columnValue = rs.getString(i);
System.out.print(columnValue + " " + rsmd.getColumnName(i));
}
System.out.println("");
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册