diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java index f0ccec3c886686d858ad03ce61826accacda614a..fe22a30d36d31ddc8fa1f5dedef76068f975cf1e 100644 --- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java +++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java @@ -182,18 +182,29 @@ public abstract class JdbcAbstractSink implements Sink { // bind each record value for (Record record : swapList) { String action = record.getProperties().get(ACTION); - if (action != null && action.equals(DELETE)) { - bindValue(deleteStatment, record, action); - count += 1; - deleteStatment.execute(); - } else if (action != null && action.equals(UPDATE)) { - bindValue(updateStatment, record, action); - count += 1; - updateStatment.execute(); - } else if (action != null && action.equals(INSERT)){ - bindValue(insertStatement, record, action); - count += 1; - insertStatement.execute(); + if (action == null) { + action = INSERT; + } + switch (action) { + case DELETE: + bindValue(deleteStatment, record, action); + count += 1; + deleteStatment.execute(); + break; + case UPDATE: + bindValue(updateStatment, record, action); + count += 1; + updateStatment.execute(); + break; + case INSERT: + bindValue(insertStatement, record, action); + count += 1; + insertStatement.execute(); + break; + default: + String msg = String.format("Unsupported action %s, can be one of %s, or not set which indicate %s", + action, Arrays.asList(INSERT, UPDATE, DELETE), INSERT); + throw new IllegalArgumentException(msg); } } connection.commit(); diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java index ce4aae43a6cb1723c0e62c4eb1caad6dfe91d446..84c4406e6831227308c8f9b4a5092fe5e566cd8c 100644 --- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java +++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java @@ -19,9 +19,13 @@ package org.apache.pulsar.io.jdbc; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; @@ -40,6 +44,7 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -95,7 +100,6 @@ public class JdbcSinkTest { // change batchSize to 1, to flush on each write. conf.put("batchSize", 1); - jdbcSink = new JdbcAutoSchemaSink(); jdbcSink = new JdbcAutoSchemaSink(); // open should success @@ -109,8 +113,7 @@ public class JdbcSinkTest { jdbcSink.close(); } - @Test - public void TestOpenAndWriteSink() throws Exception { + private void testOpenAndWriteSink(Map actionProperties) throws Exception { Message insertMessage = mock(MessageImpl.class); GenericSchema genericAvroSchema; // prepare a foo Record @@ -121,19 +124,16 @@ public class JdbcSinkTest { AvroSchema schema = AvroSchema.of(SchemaDefinition.builder().withPojo(Foo.class).build()); byte[] insertBytes = schema.encode(insertObj); - + CompletableFuture future = new CompletableFuture<>(); Record insertRecord = PulsarRecord.builder() .message(insertMessage) .topicName("fake_topic_name") - .ackFunction(() -> {}) + .ackFunction(() -> future.complete(null)) .build(); genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo()); - - Map insertProperties = Maps.newHashMap(); - insertProperties.put("ACTION", "INSERT"); when(insertMessage.getValue()).thenReturn(genericAvroSchema.decode(insertBytes)); - when(insertMessage.getProperties()).thenReturn(insertProperties); + when(insertMessage.getProperties()).thenReturn(actionProperties); log.info("foo:{}, Message.getValue: {}, record.getValue: {}", insertObj.toString(), insertMessage.getValue().toString(), @@ -143,7 +143,7 @@ public class JdbcSinkTest { jdbcSink.write(insertRecord); log.info("executed write"); // sleep to wait backend flush complete - Thread.sleep(1000); + future.get(1, TimeUnit.SECONDS); // value has been written to db, read it out and verify. String querySql = "SELECT * FROM " + tableName + " WHERE field3=3"; @@ -156,6 +156,25 @@ public class JdbcSinkTest { } + @Test + public void TestInsertAction() throws Exception { + testOpenAndWriteSink(ImmutableMap.of("ACTION", "INSERT")); + } + + @Test + public void TestNoAction() throws Exception { + testOpenAndWriteSink(ImmutableMap.of()); + } + + @Test + public void TestUnknownAction() throws Exception { + Record recordRecord = mock(Record.class); + when(recordRecord.getProperties()).thenReturn(ImmutableMap.of("ACTION", "UNKNOWN")); + CompletableFuture future = new CompletableFuture<>(); + doAnswer(a -> future.complete(null)).when(recordRecord).fail(); + jdbcSink.write(recordRecord); + future.get(1, TimeUnit.SECONDS); + } @Test public void TestUpdateAction() throws Exception { @@ -169,10 +188,11 @@ public class JdbcSinkTest { byte[] updateBytes = schema.encode(updateObj); Message updateMessage = mock(MessageImpl.class); + CompletableFuture future = new CompletableFuture<>(); Record updateRecord = PulsarRecord.builder() .message(updateMessage) .topicName("fake_topic_name") - .ackFunction(() -> {}) + .ackFunction(() -> future.complete(null)) .build(); GenericSchema updateGenericAvroSchema; @@ -188,8 +208,7 @@ public class JdbcSinkTest { updateRecord.getValue().toString()); jdbcSink.write(updateRecord); - - Thread.sleep(1000); + future.get(1, TimeUnit.SECONDS); // value has been written to db, read it out and verify. String updateQuerySql = "SELECT * FROM " + tableName + " WHERE field3=4"; @@ -210,10 +229,11 @@ public class JdbcSinkTest { byte[] deleteBytes = schema.encode(deleteObj); Message deleteMessage = mock(MessageImpl.class); + CompletableFuture future = new CompletableFuture<>(); Record deleteRecord = PulsarRecord.builder() .message(deleteMessage) .topicName("fake_topic_name") - .ackFunction(() -> {}) + .ackFunction(() -> future.complete(null)) .build(); GenericSchema deleteGenericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo()); @@ -228,8 +248,7 @@ public class JdbcSinkTest { deleteRecord.getValue().toString()); jdbcSink.write(deleteRecord); - - Thread.sleep(1000); + future.get(1, TimeUnit.SECONDS); // value has been written to db, read it out and verify. String deleteQuerySql = "SELECT * FROM " + tableName + " WHERE field3=5";