From d8356d8c898e42b75e6ba90f4ab3773c6d2386e1 Mon Sep 17 00:00:00 2001 From: Yi Tang Date: Fri, 2 Aug 2019 12:10:05 +0800 Subject: [PATCH] [pulsar-io-jdbc] not set action as insert (#4862) ### Motivation jdbc sink treat all record as INSERT before #4358 , now it requires an indispensable action property which seems to be a break change, and we can deal records without any action property as INSERT. ### Modifications treat action not set as INSERT action like before. --- .../pulsar/io/jdbc/JdbcAbstractSink.java | 35 ++++++++----- .../apache/pulsar/io/jdbc/JdbcSinkTest.java | 51 +++++++++++++------ 2 files changed, 58 insertions(+), 28 deletions(-) diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java index f0ccec3c886..fe22a30d36d 100644 --- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java +++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java @@ -182,18 +182,29 @@ public abstract class JdbcAbstractSink implements Sink { // bind each record value for (Record record : swapList) { String action = record.getProperties().get(ACTION); - if (action != null && action.equals(DELETE)) { - bindValue(deleteStatment, record, action); - count += 1; - deleteStatment.execute(); - } else if (action != null && action.equals(UPDATE)) { - bindValue(updateStatment, record, action); - count += 1; - updateStatment.execute(); - } else if (action != null && action.equals(INSERT)){ - bindValue(insertStatement, record, action); - count += 1; - insertStatement.execute(); + if (action == null) { + action = INSERT; + } + switch (action) { + case DELETE: + bindValue(deleteStatment, record, action); + count += 1; + deleteStatment.execute(); + break; + case UPDATE: + bindValue(updateStatment, record, action); + count += 1; + updateStatment.execute(); + break; + case INSERT: + bindValue(insertStatement, record, action); + count += 1; + insertStatement.execute(); + break; + default: + String msg = String.format("Unsupported action %s, can be one of %s, or not set which indicate %s", + action, Arrays.asList(INSERT, UPDATE, DELETE), INSERT); + throw new IllegalArgumentException(msg); } } connection.commit(); diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java index ce4aae43a6c..84c4406e683 100644 --- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java +++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java @@ -19,9 +19,13 @@ package org.apache.pulsar.io.jdbc; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; @@ -40,6 +44,7 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -95,7 +100,6 @@ public class JdbcSinkTest { // change batchSize to 1, to flush on each write. conf.put("batchSize", 1); - jdbcSink = new JdbcAutoSchemaSink(); jdbcSink = new JdbcAutoSchemaSink(); // open should success @@ -109,8 +113,7 @@ public class JdbcSinkTest { jdbcSink.close(); } - @Test - public void TestOpenAndWriteSink() throws Exception { + private void testOpenAndWriteSink(Map actionProperties) throws Exception { Message insertMessage = mock(MessageImpl.class); GenericSchema genericAvroSchema; // prepare a foo Record @@ -121,19 +124,16 @@ public class JdbcSinkTest { AvroSchema schema = AvroSchema.of(SchemaDefinition.builder().withPojo(Foo.class).build()); byte[] insertBytes = schema.encode(insertObj); - + CompletableFuture future = new CompletableFuture<>(); Record insertRecord = PulsarRecord.builder() .message(insertMessage) .topicName("fake_topic_name") - .ackFunction(() -> {}) + .ackFunction(() -> future.complete(null)) .build(); genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo()); - - Map insertProperties = Maps.newHashMap(); - insertProperties.put("ACTION", "INSERT"); when(insertMessage.getValue()).thenReturn(genericAvroSchema.decode(insertBytes)); - when(insertMessage.getProperties()).thenReturn(insertProperties); + when(insertMessage.getProperties()).thenReturn(actionProperties); log.info("foo:{}, Message.getValue: {}, record.getValue: {}", insertObj.toString(), insertMessage.getValue().toString(), @@ -143,7 +143,7 @@ public class JdbcSinkTest { jdbcSink.write(insertRecord); log.info("executed write"); // sleep to wait backend flush complete - Thread.sleep(1000); + future.get(1, TimeUnit.SECONDS); // value has been written to db, read it out and verify. String querySql = "SELECT * FROM " + tableName + " WHERE field3=3"; @@ -156,6 +156,25 @@ public class JdbcSinkTest { } + @Test + public void TestInsertAction() throws Exception { + testOpenAndWriteSink(ImmutableMap.of("ACTION", "INSERT")); + } + + @Test + public void TestNoAction() throws Exception { + testOpenAndWriteSink(ImmutableMap.of()); + } + + @Test + public void TestUnknownAction() throws Exception { + Record recordRecord = mock(Record.class); + when(recordRecord.getProperties()).thenReturn(ImmutableMap.of("ACTION", "UNKNOWN")); + CompletableFuture future = new CompletableFuture<>(); + doAnswer(a -> future.complete(null)).when(recordRecord).fail(); + jdbcSink.write(recordRecord); + future.get(1, TimeUnit.SECONDS); + } @Test public void TestUpdateAction() throws Exception { @@ -169,10 +188,11 @@ public class JdbcSinkTest { byte[] updateBytes = schema.encode(updateObj); Message updateMessage = mock(MessageImpl.class); + CompletableFuture future = new CompletableFuture<>(); Record updateRecord = PulsarRecord.builder() .message(updateMessage) .topicName("fake_topic_name") - .ackFunction(() -> {}) + .ackFunction(() -> future.complete(null)) .build(); GenericSchema updateGenericAvroSchema; @@ -188,8 +208,7 @@ public class JdbcSinkTest { updateRecord.getValue().toString()); jdbcSink.write(updateRecord); - - Thread.sleep(1000); + future.get(1, TimeUnit.SECONDS); // value has been written to db, read it out and verify. String updateQuerySql = "SELECT * FROM " + tableName + " WHERE field3=4"; @@ -210,10 +229,11 @@ public class JdbcSinkTest { byte[] deleteBytes = schema.encode(deleteObj); Message deleteMessage = mock(MessageImpl.class); + CompletableFuture future = new CompletableFuture<>(); Record deleteRecord = PulsarRecord.builder() .message(deleteMessage) .topicName("fake_topic_name") - .ackFunction(() -> {}) + .ackFunction(() -> future.complete(null)) .build(); GenericSchema deleteGenericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo()); @@ -228,8 +248,7 @@ public class JdbcSinkTest { deleteRecord.getValue().toString()); jdbcSink.write(deleteRecord); - - Thread.sleep(1000); + future.get(1, TimeUnit.SECONDS); // value has been written to db, read it out and verify. String deleteQuerySql = "SELECT * FROM " + tableName + " WHERE field3=5"; -- GitLab