提交 6f7e3112 编写于 作者: V Vlad Ilyushchenko

RELEASE 3.2.0

Removed tests for legacy code (it will be deleted soon too, after all parts have been migrated)
上级 bd533f40
......@@ -52,7 +52,7 @@
<dependency>
<groupId>org.questdb</groupId>
<artifactId>questdb-core</artifactId>
<version>3.1.4-SNAPSHOT</version>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
......
......@@ -31,7 +31,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<version>3.1.4-SNAPSHOT</version>
<version>3.2.0</version>
<groupId>org.questdb</groupId>
<artifactId>questdb-core</artifactId>
<packaging>jar</packaging>
......@@ -196,7 +196,7 @@
</execution>
</executions>
<configuration>
<executable>gpg2</executable>
<executable>gpg</executable>
</configuration>
</plugin>
<plugin>
......@@ -226,33 +226,12 @@
</dependency>
<dependency>
<groupId>com.intellij</groupId>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>12.0</version>
<version>17.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.5.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
......
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2018 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb;
import com.questdb.std.microtime.MicrosecondClockImpl;
import com.questdb.store.Journal;
import com.questdb.store.factory.FactoryEventListener;
import com.questdb.test.tools.AbstractTest;
import org.junit.Ignore;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.locks.LockSupport;
public class FactoryEventLoggerTest extends AbstractTest {
@Test
@Ignore
public void testThroughput() throws Exception {
final FactoryEventLogger logger = new FactoryEventLogger(getFactory(), 1000, 1000, MicrosecondClockImpl.INSTANCE);
final int count = 1000;
final CountDownLatch done = new CountDownLatch(1);
final CyclicBarrier barrier = new CyclicBarrier(2);
final FactoryEventListener listener = getFactory().getEventListener();
new Thread(() -> {
try (Journal r = getFactory().reader("$mon_factory")) {
barrier.await();
int i = 0;
while (i < count) {
if (logger.run()) {
i++;
} else {
r.refresh();
if (r.size() == count) {
break;
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
done.countDown();
}
}).start();
barrier.await();
int i = 0;
while (i < count) {
if (listener.onEvent((byte) 1, 1, "test", (short) 1, (short) 0, (short) 5)) {
i++;
} else {
LockSupport.parkNanos(1);
}
}
done.await();
logger.close();
try (Journal r = getFactory().reader("$mon_factory")) {
System.out.println(r.size());
}
}
}
\ No newline at end of file
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2018 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.net.ha;
import com.questdb.model.Quote;
import com.questdb.net.ha.comsumer.JournalClientStateConsumer;
import com.questdb.net.ha.comsumer.JournalDeltaConsumer;
import com.questdb.net.ha.model.IndexedJournal;
import com.questdb.net.ha.producer.JournalClientStateProducer;
import com.questdb.net.ha.producer.JournalDeltaProducer;
import com.questdb.std.ex.JournalException;
import com.questdb.std.ex.JournalNetworkException;
import com.questdb.store.Journal;
import com.questdb.store.JournalWriter;
import com.questdb.test.tools.AbstractTest;
import com.questdb.test.tools.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
public abstract class AbstractJournalTest extends AbstractTest {
JournalWriter<Quote> origin;
JournalWriter<Quote> master;
JournalWriter<Quote> slave;
private MockByteChannel channel;
private JournalDeltaProducer journalDeltaProducer;
private JournalDeltaConsumer journalDeltaConsumer;
private JournalClientStateProducer journalClientStateProducer;
private JournalClientStateConsumer journalClientStateConsumer;
private Journal<Quote> masterReader;
@Before
public void setUp() throws Exception {
origin = getFactory().writer(Quote.class, "origin");
slave = getFactory().writer(Quote.class, "slave");
master = getFactory().writer(Quote.class, "master");
journalClientStateProducer = new JournalClientStateProducer();
journalClientStateConsumer = new JournalClientStateConsumer();
this.masterReader = getFactory().reader(Quote.class, "master");
journalDeltaProducer = new JournalDeltaProducer(masterReader);
journalDeltaConsumer = new JournalDeltaConsumer(slave);
channel = new MockByteChannel();
}
@After
public void tearDown() throws Exception {
origin.close();
slave.close();
master.close();
masterReader.close();
journalDeltaProducer.free();
journalDeltaConsumer.free();
super.tearDown();
}
void executeSequence(boolean expectContent) throws JournalNetworkException, JournalException {
slave.refresh();
journalClientStateProducer.write(channel, new IndexedJournal(0, slave));
journalClientStateConsumer.read(channel);
journalDeltaProducer.configure(journalClientStateConsumer.getValue().getTxn(), journalClientStateConsumer.getValue().getTxPin());
Assert.assertEquals(expectContent, journalDeltaProducer.hasContent());
if (expectContent) {
journalDeltaProducer.write(channel);
journalDeltaConsumer.read(channel);
TestUtils.assertEquals(master, slave);
}
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2018 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.net.ha;
import com.questdb.model.Quote;
import com.questdb.net.ha.config.ClientConfig;
import com.questdb.net.ha.config.ServerConfig;
import com.questdb.net.ha.config.ServerNode;
import com.questdb.net.ha.krb.SSOCredentialProvider;
import com.questdb.std.NumericException;
import com.questdb.std.ex.FatalError;
import com.questdb.std.ex.JournalException;
import com.questdb.std.ex.JournalNetworkException;
import com.questdb.store.Journal;
import com.questdb.store.JournalListener;
import com.questdb.store.JournalWriter;
import com.questdb.test.tools.AbstractTest;
import com.questdb.test.tools.TestUtils;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class AuthorizationTest extends AbstractTest {
private final ClientConfig local = new ClientConfig("localhost") {{
addNode(new ServerNode(1, "xyz"));
addNode(new ServerNode(2, "localhost"));
}};
@Test
public void testClientAndServerSuccessfulAuth() throws Exception {
JournalServer server = new JournalServer(
new ServerConfig() {{
setHeartbeatFrequency(TimeUnit.MILLISECONDS.toMillis(100));
setEnableMultiCast(false);
}}
, getFactory()
,
(token, requestedKeys) -> "SECRET".equals(new String(token)));
JournalClient client = new JournalClient(local, getFactory(), "SECRET"::getBytes);
beginSync(server, client);
}
@Test
public void testClientWithoutAuthProvider() throws Exception {
JournalServer server = new JournalServer(
new ServerConfig() {{
setHeartbeatFrequency(TimeUnit.MILLISECONDS.toMillis(500));
setEnableMultiCast(false);
}}
, getFactory()
,
(token, requestedKeys) -> "SECRET".equals(new String(token)));
server.start();
try {
final AtomicInteger authErrors = new AtomicInteger();
final CountDownLatch error = new CountDownLatch(1);
JournalClient client = new JournalClient(local, getFactory(), null, evt -> {
switch (evt) {
case JournalClientEvents.EVT_AUTH_CONFIG_ERROR:
authErrors.incrementAndGet();
break;
case JournalClientEvents.EVT_TERMINATED:
error.countDown();
break;
default:
break;
}
});
client.start();
Assert.assertTrue(error.await(5, TimeUnit.SECONDS));
Assert.assertFalse(client.isRunning());
} finally {
server.halt();
}
}
@Test
public void testClientWrongAuth() throws Exception {
JournalServer server = new JournalServer(
new ServerConfig() {{
setHeartbeatFrequency(TimeUnit.MILLISECONDS.toMillis(500));
setEnableMultiCast(false);
}}
, getFactory()
,
(token, requestedKeys) -> "SECRET".equals(new String(token)));
final AtomicInteger authErrorCount = new AtomicInteger();
final CountDownLatch serverError = new CountDownLatch(1);
JournalClient client = new JournalClient(
local,
getFactory(),
"NON_SECRET"::getBytes,
evt -> {
switch (evt) {
case JournalClientEvents.EVT_AUTH_ERROR:
authErrorCount.incrementAndGet();
break;
case JournalClientEvents.EVT_TERMINATED:
serverError.countDown();
break;
default:
break;
}
});
server.start();
try {
client.start();
Assert.assertTrue(serverError.await(5, TimeUnit.SECONDS));
Assert.assertFalse(client.isRunning());
Assert.assertEquals(1, authErrorCount.get());
} finally {
server.halt();
}
}
@Test
public void testExceptionInCredentialProvider() throws Exception {
JournalServer server = new JournalServer(
new ServerConfig() {{
setHeartbeatFrequency(TimeUnit.MILLISECONDS.toMillis(500));
setEnableMultiCast(false);
}}
, getFactory()
,
(token, requestedKeys) -> "SECRET".equals(new String(token)));
final AtomicInteger authErrorCount = new AtomicInteger();
final CountDownLatch terminated = new CountDownLatch(1);
JournalClient client = new JournalClient(local, getFactory(), new SSOCredentialProvider("HOST/test"),
evt -> {
switch (evt) {
case JournalClientEvents.EVT_AUTH_CONFIG_ERROR:
authErrorCount.incrementAndGet();
break;
case JournalClientEvents.EVT_TERMINATED:
terminated.countDown();
break;
default:
break;
}
});
server.start();
try {
client.start();
Assert.assertTrue(terminated.await(5, TimeUnit.SECONDS));
Assert.assertEquals(1, authErrorCount.get());
Assert.assertFalse(client.isRunning());
} finally {
server.halt();
}
}
@Test
public void testServerAuthException() throws Exception {
JournalServer server = new JournalServer(
new ServerConfig() {{
setHeartbeatFrequency(TimeUnit.MILLISECONDS.toMillis(500));
setEnableMultiCast(false);
}}
, getFactory()
,
(token, requestedKeys) -> {
throw new FatalError("BANG!");
});
final AtomicInteger authErrorCount = new AtomicInteger();
final CountDownLatch serverError = new CountDownLatch(1);
JournalClient client = new JournalClient(local, getFactory(), "SECRET"::getBytes, evt -> {
switch (evt) {
case JournalClientEvents.EVT_AUTH_ERROR:
authErrorCount.incrementAndGet();
break;
case JournalClientEvents.EVT_TERMINATED:
serverError.countDown();
break;
default:
break;
}
});
server.start();
try {
client.start();
Assert.assertTrue(serverError.await(5, TimeUnit.SECONDS));
Assert.assertFalse(client.isRunning());
Assert.assertEquals(1, authErrorCount.get());
} finally {
server.halt();
}
}
private void beginSync(JournalServer server, JournalClient client) throws JournalException, JournalNetworkException, InterruptedException, NumericException {
int size = 100000;
try (JournalWriter<Quote> remote = getFactory().writer(Quote.class, "remote", 2 * size)) {
server.publish(remote);
server.start();
try {
final CountDownLatch latch = new CountDownLatch(1);
client.subscribe(Quote.class, "remote", "local", 2 * size, new JournalListener() {
@Override
public void onCommit() {
latch.countDown();
}
@Override
public void onEvent(int event) {
}
});
client.start();
try {
TestUtils.generateQuoteData(remote, size);
latch.await();
try (Journal<Quote> local = getFactory().reader(Quote.class, "local")) {
TestUtils.assertDataEquals(remote, local);
}
} finally {
client.halt();
}
} finally {
server.halt(0, TimeUnit.SECONDS);
}
}
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.net.ha;
import com.questdb.model.Quote;
import com.questdb.net.ha.config.ClientConfig;
import com.questdb.store.JournalWriter;
import com.questdb.test.tools.AbstractTest;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@Ignore
public class ClientRecoveryTest extends AbstractTest {
@Test
public void testClientWriterRelease() throws Exception {
final CountDownLatch serverError = new CountDownLatch(1);
JournalClient client = new JournalClient(new ClientConfig("localhost"), getFactory(), null, evt -> {
if (evt == JournalClientEvents.EVT_TERMINATED) {
serverError.countDown();
}
});
client.subscribe(Quote.class);
client.start();
Assert.assertTrue(serverError.await(5, TimeUnit.SECONDS));
Assert.assertFalse(client.isRunning());
// should be able to get writer after client failure.
try (JournalWriter<Quote> w = getFactory().writer(Quote.class)) {
Assert.assertNotNull(w);
}
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2018 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.net.ha;
import com.questdb.model.Quote;
import com.questdb.net.ha.config.ClientConfig;
import com.questdb.net.ha.config.ServerConfig;
import com.questdb.net.ha.config.ServerNode;
import com.questdb.store.JournalListener;
import com.questdb.store.JournalWriter;
import com.questdb.test.tools.AbstractTest;
import com.questdb.test.tools.TestUtils;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class DataLossTest extends AbstractTest {
@Test
@Ignore
public void testDiscardFile() throws Exception {
// create master journal
try (JournalWriter<Quote> master = getFactory().writer(Quote.class, "master")) {
TestUtils.generateQuoteData(master, 300, master.getMaxTimestamp());
master.commit();
// publish master out
JournalServer server = new JournalServer(
new ServerConfig() {{
addNode(new ServerNode(0, "localhost"));
setEnableMultiCast(false);
setHeartbeatFrequency(50);
}}
, getFactory());
server.publish(master);
server.start();
final AtomicInteger counter = new AtomicInteger();
final AtomicInteger doNotExpect = new AtomicInteger();
// equalize slave
JournalClient client = new JournalClient(new ClientConfig("localhost") {{
setEnableMultiCast(false);
}}, getFactory());
client.subscribe(Quote.class, "master", "slave", new JournalListener() {
@Override
public void onCommit() {
counter.incrementAndGet();
}
@Override
public void onEvent(int event) {
}
});
client.start();
TestUtils.assertCounter(counter, 1, 10, TimeUnit.SECONDS);
// stop client to be able to add to slave manually
client.halt();
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
// add more data to slave
try (JournalWriter<Quote> slave = getFactory().writer(Quote.class, "slave")) {
TestUtils.generateQuoteData(slave, 200, slave.getMaxTimestamp());
slave.commit();
}
// synchronise slave again
client = new JournalClient(new ClientConfig("localhost"), getFactory());
client.subscribe(Quote.class, "master", "slave", new JournalListener() {
@Override
public void onCommit() {
doNotExpect.incrementAndGet();
}
@Override
public void onEvent(int event) {
counter.incrementAndGet();
}
});
client.start();
TestUtils.assertCounter(counter, 2, 180, TimeUnit.SECONDS);
client.halt();
Assert.assertEquals(0, doNotExpect.get());
// assert that slave journal is closed
try (JournalWriter w = getFactory().writer(Quote.class, "slave")) {
Assert.assertNotNull(w);
}
server.halt();
}
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2018 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.net.ha;
import com.questdb.net.ha.comsumer.FixedColumnDeltaConsumer;
import com.questdb.net.ha.producer.FixedColumnDeltaProducer;
import com.questdb.std.ex.JournalException;
import com.questdb.store.FixedColumn;
import com.questdb.store.JournalMode;
import com.questdb.store.MemoryFile;
import org.junit.*;
import org.junit.rules.TemporaryFolder;
import java.io.File;
public class FixedColumnTest {
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
private MemoryFile file;
private MemoryFile file2;
private MockByteChannel channel;
@After
public void cleanup() {
file.delete();
file2.delete();
}
@Before
public void setUp() throws JournalException {
file = new MemoryFile(new File(temporaryFolder.getRoot(), "col.d"), 22, JournalMode.APPEND, false);
// it is important to keep bit hint small, so that file2 has small buffers. This would made test go via both pathways.
// large number will result in tests not covering all of execution path.
file2 = new MemoryFile(new File(temporaryFolder.getRoot(), "col2.d"), 18, JournalMode.APPEND, false);
channel = new MockByteChannel();
}
@After
public void tearDown() {
file.close();
file2.close();
}
@Test
public void testConsumerEqualToProducer() {
FixedColumn col1 = new FixedColumn(file, 4);
FixedColumn col2 = new FixedColumn(file2, 4);
FixedColumnDeltaProducer producer = new FixedColumnDeltaProducer(col1);
int max = 1500000;
for (int i = 0; i < max; i++) {
col1.putInt(max - i);
col1.commit();
}
for (int i = 0; i < max; i++) {
col2.putInt(max - i);
col2.commit();
}
producer.configure(col2.size(), col1.size());
// hasNext() can be true, because of compulsory header
// however, if column doesn't have data, hasContent() must be false.
Assert.assertFalse(producer.hasContent());
Assert.assertEquals(col1.size(), col2.size());
for (int i = 0; i < max; i++) {
Assert.assertEquals(max - i, col2.getInt(i));
}
}
@Test
public void testConsumerLargerThanProducer() {
FixedColumn col1 = new FixedColumn(file, 4);
FixedColumn col2 = new FixedColumn(file2, 4);
FixedColumnDeltaProducer producer = new FixedColumnDeltaProducer(col1);
int max = 1500000;
for (int i = 0; i < max - 500000; i++) {
col1.putInt(max - i);
col1.commit();
}
for (int i = 0; i < max; i++) {
col2.putInt(max - i);
col2.commit();
}
producer.configure(col2.size(), col1.size());
Assert.assertFalse(producer.hasContent());
}
@Test
public void testConsumerReset() throws Exception {
FixedColumn col1 = new FixedColumn(file, 4);
FixedColumn col2 = new FixedColumn(file2, 4);
FixedColumnDeltaProducer producer = new FixedColumnDeltaProducer(col1);
ChannelConsumer consumer = new FixedColumnDeltaConsumer(col2);
int max = 1500000;
for (int i = 0; i < max; i++) {
col1.putInt(max - i);
col1.commit();
}
for (int i = 0; i < max - 500000; i++) {
col2.putInt(max - i);
col2.commit();
}
producer.configure(col2.size(), col1.size());
Assert.assertTrue(producer.hasContent());
producer.write(channel);
consumer.read(channel);
col2.commit();
Assert.assertEquals(col1.size(), col2.size());
for (int i = 0; i < 10000; i++) {
col1.putInt(max + 10000 - i);
col1.commit();
}
producer.configure(col2.size(), col1.size());
Assert.assertTrue(producer.hasContent());
producer.write(channel);
consumer.read(channel);
col2.commit();
Assert.assertEquals(col1.size(), col2.size());
for (int i = 0; i < max; i++) {
Assert.assertEquals(max - i, col2.getInt(i));
}
for (int i = max; i < max + 10000; i++) {
Assert.assertEquals(max + max + 10000 - i, col2.getInt(i));
}
}
@Test
public void testConsumerSmallerThanProducer() throws Exception {
FixedColumn col1 = new FixedColumn(file, 4);
FixedColumn col2 = new FixedColumn(file2, 4);
FixedColumnDeltaProducer producer = new FixedColumnDeltaProducer(col1);
ChannelConsumer consumer = new FixedColumnDeltaConsumer(col2);
int max = 1500000;
for (int i = 0; i < max; i++) {
col1.putInt(max - i);
col1.commit();
}
for (int i = 0; i < max - 500000; i++) {
col2.putInt(max - i);
col2.commit();
}
producer.configure(col2.size(), col1.size());
Assert.assertTrue(producer.hasContent());
producer.write(channel);
consumer.read(channel);
col2.commit();
Assert.assertEquals(col1.size(), col2.size());
for (int i = 0; i < max; i++) {
Assert.assertEquals(max - i, col2.getInt(i));
}
}
@Test
public void testEmptyConsumerAndPopulatedProducer() throws Exception {
FixedColumn col1 = new FixedColumn(file, 4);
FixedColumn col2 = new FixedColumn(file2, 4);
FixedColumnDeltaProducer producer = new FixedColumnDeltaProducer(col1);
ChannelConsumer consumer = new FixedColumnDeltaConsumer(col2);
int max = 1500000;
for (int i = 0; i < max; i++) {
col1.putInt(max - i);
col1.commit();
}
producer.configure(col2.size(), col1.size());
// hasNext() can be true, because of compulsory header
// however, if column doesn't have data, hasContent() must be false.
Assert.assertTrue(producer.hasContent());
producer.write(channel);
consumer.read(channel);
col2.commit();
Assert.assertEquals(col1.size(), col2.size());
for (int i = 0; i < max; i++) {
Assert.assertEquals(max - i, col2.getInt(i));
}
}
@Test
public void testEmptyConsumerAndProducer() {
FixedColumn col1 = new FixedColumn(file, 4);
FixedColumn col2 = new FixedColumn(file2, 4);
FixedColumnDeltaProducer producer = new FixedColumnDeltaProducer(col1);
producer.configure(col2.size(), col1.size());
// hasNext() can be true, because of compulsory header
// however, if column doesn't have data, hasContent() must be false.
Assert.assertFalse(producer.hasContent());
Assert.assertEquals(col1.size(), col2.size());
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2018 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.net.ha;
import com.questdb.net.ha.protocol.commands.IntResponseConsumer;
import com.questdb.net.ha.protocol.commands.IntResponseProducer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class IntTest {
private MockByteChannel channel;
@Before
public void setUp() {
channel = new MockByteChannel();
}
@Test
public void testInt() throws Exception {
IntResponseProducer producer = new IntResponseProducer();
IntResponseConsumer consumer = new IntResponseConsumer();
producer.write(channel, 155);
Assert.assertEquals(155, consumer.getValue(channel));
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2019 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.net.ha;
import com.questdb.net.ha.bridge.JournalEventBridge;
import com.questdb.net.ha.bridge.JournalEventHandler;
import com.questdb.net.ha.bridge.JournalEventProcessor;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import java.util.concurrent.*;
@Ignore
public class JournalEventBridgeTest {
@Test
public void testStartStop() {
JournalEventBridge bridge = new JournalEventBridge(2, TimeUnit.SECONDS);
for (int i = 0; i < 10000; i++) {
bridge.publish(10, System.currentTimeMillis());
}
}
@Test
public void testTwoPublishersThreeConsumers() throws Exception {
ExecutorService service = Executors.newCachedThreadPool();
final JournalEventBridge bridge = new JournalEventBridge(50, TimeUnit.MILLISECONDS);
final Future[] publishers = new Future[2];
final Handler[] consumers = new Handler[3];
final int batchSize = 1000;
final CyclicBarrier barrier = new CyclicBarrier(publishers.length + consumers.length);
final CountDownLatch latch = new CountDownLatch(publishers.length + consumers.length);
for (int i = 0; i < publishers.length; i++) {
final int index = i;
publishers[i] = service.submit(() -> {
int count = 0;
try {
barrier.await();
for (int k = 0; k < batchSize; k++) {
long ts = System.nanoTime();
bridge.publish(index, ts);
count++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();
}
return count;
});
}
for (int i = 0; i < consumers.length; i++) {
final JournalEventProcessor processor = new JournalEventProcessor(bridge);
final Handler handler = new Handler(i);
consumers[i] = handler;
service.submit(() -> {
try {
barrier.await();
while (true) {
if (!processor.process(handler, true)) {
break;
}
}
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
});
}
// service.submit(new Runnable() {
// @Override
// public void run() {
// try {
// barrier.await();
// for (int i = 0; i < 1000; i++) {
// Sequence sequence = bridge.createAgentSequence();
// LockSupport.parkNanos(TimeUnit.MICROSECONDS.toNanos(10));
// bridge.removeAgentSequence(sequence);
// }
// } catch (InterruptedException | BrokenBarrierException e) {
// e.printStackTrace();
// } finally {
// latch.countDown();
// }
// }
// });
latch.await();
for (Future f : publishers) {
Assert.assertEquals(batchSize, f.get());
}
Assert.assertEquals(batchSize, consumers[0].getCounter());
Assert.assertEquals(batchSize, consumers[1].getCounter());
Assert.assertEquals(0, consumers[2].getCounter());
}
private class Handler implements JournalEventHandler {
private final int index;
private int counter;
private Handler(int index) {
this.index = index;
}
public int getCounter() {
return counter;
}
@Override
public void handle(int journalIndex) {
if (journalIndex == index) {
counter++;
}
}
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2018 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.net.ha;
import com.questdb.std.time.DateFormatUtils;
import com.questdb.test.tools.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class JournalLagTest extends AbstractJournalTest {
@Before
@Override
public void setUp() throws Exception {
super.setUp();
TestUtils.generateQuoteData(origin, 500, DateFormatUtils.parseDateTime("2013-02-01T00:00:00.000Z"), 100);
TestUtils.generateQuoteData(origin, 500, DateFormatUtils.parseDateTime("2013-02-01T01:00:00.000Z"), 100);
TestUtils.generateQuoteData(origin, 500, DateFormatUtils.parseDateTime("2013-02-01T13:00:00.000Z"), 100);
TestUtils.generateQuoteData(origin, 500, DateFormatUtils.parseDateTime("2013-05-01T00:00:00.000Z"), 100);
}
@Test
public void testLagDetach() throws Exception {
master.append(origin.query().all().asResultSet().subset(0, 500));
master.mergeAppend(origin.query().all().asResultSet().subset(500, 600));
master.commit();
executeSequence(true);
master.removeIrregularPartition();
master.commit();
executeSequence(true);
}
@Test
public void testLagOnlyPropagation() throws Exception {
master.append(origin.query().all().asResultSet().subset(0, 500));
master.mergeAppend(origin.query().all().asResultSet().subset(500, 600));
master.commit();
String lagName = master.getIrregularPartition().getName();
executeSequence(true);
master.mergeAppend(origin.query().all().asResultSet().subset(600, 700));
master.commit();
Assert.assertEquals(lagName, master.getIrregularPartition().getName());
executeSequence(true);
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2018 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.net.ha;
import com.questdb.model.Quote;
import com.questdb.model.Trade;
import com.questdb.net.ha.comsumer.HugeBufferConsumer;
import com.questdb.net.ha.comsumer.JournalDeltaConsumer;
import com.questdb.net.ha.config.NetworkConfig;
import com.questdb.net.ha.config.ServerConfig;
import com.questdb.net.ha.model.Command;
import com.questdb.net.ha.model.IndexedJournal;
import com.questdb.net.ha.model.IndexedJournalKey;
import com.questdb.net.ha.producer.JournalClientStateProducer;
import com.questdb.net.ha.protocol.CommandConsumer;
import com.questdb.net.ha.protocol.CommandProducer;
import com.questdb.net.ha.protocol.commands.CharSequenceResponseConsumer;
import com.questdb.net.ha.protocol.commands.IntResponseConsumer;
import com.questdb.net.ha.protocol.commands.SetKeyRequestProducer;
import com.questdb.std.Chars;
import com.questdb.store.Journal;
import com.questdb.store.JournalWriter;
import com.questdb.test.tools.AbstractTest;
import com.questdb.test.tools.TestUtils;
import org.junit.*;
import org.junit.rules.TemporaryFolder;
import java.net.InetSocketAddress;
public class JournalServerAgentTest extends AbstractTest {
@Rule
public final TemporaryFolder temp = new TemporaryFolder();
private final CommandProducer commandProducer = new CommandProducer();
private final CommandConsumer commandConsumer = new CommandConsumer();
private final SetKeyRequestProducer setKeyRequestProducer = new SetKeyRequestProducer();
private final CharSequenceResponseConsumer charSequenceResponseConsumer = new CharSequenceResponseConsumer();
private final JournalClientStateProducer journalClientStateProducer = new JournalClientStateProducer();
private final IntResponseConsumer intResponseConsumer = new IntResponseConsumer();
private MockByteChannel channel;
private JournalWriter<Quote> quoteWriter;
private JournalWriter<Trade> tradeWriter;
private JournalServer server;
private JournalServerAgent agent;
private HugeBufferConsumer hugeBufferConsumer;
@Before
public void setUp() throws Exception {
channel = new MockByteChannel();
quoteWriter = getFactory().writer(Quote.class);
tradeWriter = getFactory().writer(Trade.class);
ServerConfig config = new ServerConfig() {{
setHeartbeatFrequency(100);
setEnableMultiCast(false);
}};
server = new JournalServer(config, getFactory());
server.publish(quoteWriter);
agent = new JournalServerAgent(server, new InetSocketAddress(NetworkConfig.DEFAULT_DATA_PORT), null);
hugeBufferConsumer = new HugeBufferConsumer(temp.newFile());
}
@After
public void tearDown() {
quoteWriter.close();
tradeWriter.close();
server.halt();
agent.close();
hugeBufferConsumer.free();
}
@Test
public void testIncrementalInteraction() throws Exception {
try (JournalWriter<Quote> origin = getFactory().writer(Quote.class, "origin")) {
TestUtils.generateQuoteData(origin, 200);
server.start();
try (JournalWriter<Quote> quoteClientWriter = getFactory().writer(Quote.class, "client")) {
JournalDeltaConsumer quoteDeltaConsumer = new JournalDeltaConsumer(quoteClientWriter);
// send quote journal key
commandProducer.write(channel, Command.ADD_KEY_CMD);
setKeyRequestProducer.write(channel, new IndexedJournalKey(0, quoteWriter.getMetadata().getKey()));
agent.process(channel);
charSequenceResponseConsumer.read(channel);
TestUtils.assertEquals("OK", charSequenceResponseConsumer.getValue());
hugeBufferConsumer.read(channel);
// send quote state
commandProducer.write(channel, Command.DELTA_REQUEST_CMD);
journalClientStateProducer.write(channel, new IndexedJournal(0, quoteClientWriter));
agent.process(channel);
charSequenceResponseConsumer.read(channel);
TestUtils.assertEquals("OK", charSequenceResponseConsumer.getValue());
quoteWriter.append(origin.query().all().asResultSet().subset(0, 100));
quoteWriter.commit();
commandProducer.write(channel, Command.CLIENT_READY_CMD);
agent.process(channel);
commandConsumer.read(channel);
Assert.assertEquals(Command.JOURNAL_DELTA_CMD, commandConsumer.getCommand());
Assert.assertEquals(0, intResponseConsumer.getValue(channel));
quoteDeltaConsumer.read(channel);
Assert.assertEquals(100, quoteClientWriter.size());
commandConsumer.read(channel);
Assert.assertEquals(Command.SERVER_READY_CMD, commandConsumer.getCommand());
quoteWriter.append(origin.query().all().asResultSet().subset(100, 200));
quoteWriter.commit();
// send quote state
commandProducer.write(channel, Command.DELTA_REQUEST_CMD);
journalClientStateProducer.write(channel, new IndexedJournal(0, quoteClientWriter));
agent.process(channel);
charSequenceResponseConsumer.read(channel);
TestUtils.assertEquals("OK", charSequenceResponseConsumer.getValue());
commandProducer.write(channel, Command.CLIENT_READY_CMD);
agent.process(channel);
commandConsumer.read(channel);
Assert.assertEquals(Command.JOURNAL_DELTA_CMD, commandConsumer.getCommand());
Assert.assertEquals(0, intResponseConsumer.getValue(channel));
quoteDeltaConsumer.read(channel);
Assert.assertEquals(200, quoteClientWriter.size());
commandConsumer.read(channel);
Assert.assertEquals(Command.SERVER_READY_CMD, commandConsumer.getCommand());
}
}
}
@Test
public void testJournalIndexCorrectness() throws Exception {
server.publish(tradeWriter);
server.start();
try (Journal<Quote> quoteClientWriter = getFactory().writer(Quote.class, "client")) {
// send quote journal key
// commandProducer.write(channel, Command.ADD_KEY_CMD);
// setKeyRequestProducer.write(channel, new IndexedJournalKey(3, quoteWriter.getKey()));
// agent.process(channel);
// charSequenceResponseConsumer.reset();
// charSequenceResponseConsumer.read(channel);
// Assert.assertTrue(charSequenceResponseConsumer.isComplete());
// Assert.assertEquals("Journal index is too large. Max 1", charSequenceResponseConsumer.getValue());
commandProducer.write(channel, Command.ADD_KEY_CMD);
setKeyRequestProducer.write(channel, new IndexedJournalKey(0, quoteWriter.getMetadata().getKey()));
agent.process(channel);
charSequenceResponseConsumer.read(channel);
TestUtils.assertEquals("OK", charSequenceResponseConsumer.getValue());
hugeBufferConsumer.read(channel);
commandProducer.write(channel, Command.DELTA_REQUEST_CMD);
journalClientStateProducer.write(channel, new IndexedJournal(1, quoteClientWriter));
agent.process(channel);
charSequenceResponseConsumer.read(channel);
TestUtils.assertEquals("Journal index does not match key request", charSequenceResponseConsumer.getValue());
commandProducer.write(channel, Command.DELTA_REQUEST_CMD);
journalClientStateProducer.write(channel, new IndexedJournal(0, quoteClientWriter));
agent.process(channel);
charSequenceResponseConsumer.read(channel);
TestUtils.assertEquals("OK", charSequenceResponseConsumer.getValue());
}
}
@Test
public void testSetKeyRequestResponse() throws Exception {
commandProducer.write(channel, Command.ADD_KEY_CMD);
setKeyRequestProducer.write(channel, new IndexedJournalKey(0, quoteWriter.getMetadata().getKey()));
agent.process(channel);
charSequenceResponseConsumer.read(channel);
TestUtils.assertEquals("OK", charSequenceResponseConsumer.getValue());
hugeBufferConsumer.read(channel);
commandProducer.write(channel, Command.ADD_KEY_CMD);
setKeyRequestProducer.write(channel, new IndexedJournalKey(0, tradeWriter.getMetadata().getKey()));
agent.process(channel);
charSequenceResponseConsumer.read(channel);
Assert.assertTrue(Chars.startsWith(charSequenceResponseConsumer.getValue(), "Requested key not exported"));
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2018 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.net.ha;
import com.questdb.model.Quote;
import com.questdb.net.ha.comsumer.JournalClientStateConsumer;
import com.questdb.net.ha.comsumer.JournalSymbolTableConsumer;
import com.questdb.net.ha.model.IndexedJournal;
import com.questdb.net.ha.producer.JournalClientStateProducer;
import com.questdb.net.ha.producer.JournalSymbolTableProducer;
import com.questdb.std.ex.JournalNetworkException;
import com.questdb.store.JournalWriter;
import com.questdb.test.tools.AbstractTest;
import com.questdb.test.tools.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class JournalSymbolTableTest extends AbstractTest {
private final JournalClientStateProducer journalClientStateProducer = new JournalClientStateProducer();
private final JournalClientStateConsumer journalClientStateConsumer = new JournalClientStateConsumer();
private JournalWriter<Quote> origin;
private JournalWriter<Quote> master;
private JournalWriter<Quote> slave;
private MockByteChannel channel;
private JournalSymbolTableProducer journalSymbolTableProducer;
private JournalSymbolTableConsumer journalSymbolTableConsumer;
@Before
public void setUp() throws Exception {
origin = getFactory().writer(Quote.class, "origin");
master = getFactory().writer(Quote.class, "master");
slave = getFactory().writer(Quote.class, "slave");
channel = new MockByteChannel();
journalSymbolTableProducer = new JournalSymbolTableProducer(master);
journalSymbolTableConsumer = new JournalSymbolTableConsumer(slave);
origin.append(new Quote().setSym("AB").setEx("EX1").setMode("M1"));
origin.append(new Quote().setSym("CD").setEx("EX2").setMode("M2"));
origin.append(new Quote().setSym("EF").setEx("EX3").setMode("M2"));
origin.append(new Quote().setSym("GH").setEx("EX3").setMode("M3"));
}
@After
public void tearDown() {
origin.close();
master.close();
slave.close();
}
@Test
public void testConsumerEqualToProducer() throws Exception {
master.append(origin);
master.commit(false, 101L, 10);
slave.append(origin);
slave.commit(false, 101L, 10);
executeSequence(false);
}
@Test
public void testConsumerLargerThanProducer() throws Exception {
slave.append(origin);
slave.commit(false, 101L, 10);
master.append(origin.query().all().asResultSet().subset(0, 3));
master.commit(false, 101L, 10);
executeSequence(false);
}
@Test
public void testConsumerSmallerThanProducer() throws Exception {
master.append(origin.query().all().asResultSet().subset(0, 2));
master.commit(false, 101L, 10);
master.append(origin.query().all().asResultSet().subset(2, 4));
master.commit(false, 102L, 20);
slave.append(origin.query().all().asResultSet().subset(0, 2));
slave.commit(false, 101L, 10);
executeSequence(true);
}
@Test
public void testEmptyConsumerAndPopulatedProducer() throws Exception {
master.append(origin);
master.commit();
executeSequence(true);
}
@Test
public void testEmptyConsumerAndProducer() throws Exception {
executeSequence(false);
}
private void executeSequence(boolean expectContent) throws JournalNetworkException {
journalClientStateProducer.write(channel, new IndexedJournal(0, slave));
journalClientStateConsumer.read(channel);
journalSymbolTableProducer.configure(master.find(journalClientStateConsumer.getValue().getTxn(), journalClientStateConsumer.getValue().getTxPin()));
Assert.assertEquals(expectContent, journalSymbolTableProducer.hasContent());
if (expectContent) {
journalSymbolTableProducer.write(channel);
journalSymbolTableConsumer.read(channel);
TestUtils.compareSymbolTables(master, slave);
}
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2018 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.net.ha;
import com.questdb.model.Quote;
import com.questdb.std.time.DateFormatUtils;
import com.questdb.test.tools.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class JournalTest extends AbstractJournalTest {
@Before
@Override
public void setUp() throws Exception {
super.setUp();
TestUtils.generateQuoteData(origin, 1000);
}
@Test
public void testConsumerEqualToProducer() throws Exception {
master.append(origin);
master.commit(false, 101L, 10);
slave.append(origin);
slave.commit(false, 101L, 10);
executeSequence(false);
}
@Test
public void testConsumerLargerThanProducer() throws Exception {
master.append(origin.query().all().asResultSet().subset(0, 550));
master.commit(false, 101L, 10);
slave.append(origin);
slave.commit(false, 101L, 10);
executeSequence(false);
}
@Test
public void testConsumerPartitionEdge() throws Exception {
origin.truncate();
TestUtils.generateQuoteData(origin, 500, DateFormatUtils.parseDateTime("2013-10-01T00:00:00.000Z"));
TestUtils.generateQuoteData(origin, 500, DateFormatUtils.parseDateTime("2013-11-01T00:00:00.000Z"));
TestUtils.generateQuoteData(origin, 500, DateFormatUtils.parseDateTime("2013-12-01T00:00:00.000Z"));
master.append(origin.query().all().asResultSet().subset(0, 500));
master.commit(false, 101L, 10);
master.append(origin.query().all().asResultSet().subset(500, 1500));
master.commit(false, 102L, 20);
slave.append(origin.query().all().asResultSet().subset(0, 500));
slave.commit(false, 101L, 10);
Assert.assertEquals(1, slave.getPartitionCount());
executeSequence(true);
}
@Test
public void testConsumerReset() throws Exception {
master.append(origin.query().all().asResultSet().subset(0, 200));
master.commit(false, 101L, 10);
master.append(origin.query().all().asResultSet().subset(200, 550));
master.commit(false, 102L, 20);
slave.append(origin.query().all().asResultSet().subset(0, 200));
slave.commit(false, 101L, 10);
executeSequence(true);
master.append(origin.query().all().asResultSet().subset(550, 1000));
master.commit(false, 103L, 30);
executeSequence(true);
}
@Test
public void testConsumerSmallerThanProducer() throws Exception {
master.append(origin.query().all().asResultSet().subset(0, 655));
master.commit(false, 101L, 10);
master.append(origin.query().all().asResultSet().subset(655, (int) origin.size()));
master.commit(false, 102L, 20);
slave.append(origin.query().all().asResultSet().subset(0, 655));
slave.commit(false, 101L, 10);
Assert.assertEquals(655, slave.size());
executeSequence(true);
}
@Test
public void testEmptyConsumerAndPopulatedProducer() throws Exception {
master.append(origin);
master.commit();
executeSequence(true);
Assert.assertEquals(1000, slave.size());
}
@Test
public void testEmptyConsumerAndProducer() throws Exception {
executeSequence(false);
}
@Test
public void testEmptyPartitionAdd() throws Exception {
master.append(origin);
master.getAppendPartition(DateFormatUtils.parseDateTime("2013-12-01T00:00:00.000Z"));
master.append(new Quote().setTimestamp(DateFormatUtils.parseDateTime("2014-01-01T00:00:00.000Z")));
master.commit();
executeSequence(true);
Assert.assertEquals(master.getPartitionCount(), slave.getPartitionCount());
}
@Test
public void testLagConsumerSmallerThanProducer() throws Exception {
master.append(origin.query().all().asResultSet().subset(0, 350));
master.mergeAppend(origin.query().all().asResultSet().subset(350, 600));
master.commit();
executeSequence(true);
}
@Test
public void testLagReplace() throws Exception {
master.append(origin.query().all().asResultSet().subset(0, 350));
master.mergeAppend(origin.query().all().asResultSet().subset(350, 600));
master.commit(false, 101L, 10);
slave.append(origin.query().all().asResultSet().subset(0, 350));
slave.mergeAppend(origin.query().all().asResultSet().subset(350, 600));
slave.commit(false, 101L, 10);
master.mergeAppend(origin.query().all().asResultSet().subset(600, 1000));
master.commit(false, 102L, 20);
executeSequence(true);
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2018 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.net.ha;
import com.questdb.model.RDFNode;
import com.questdb.net.ha.comsumer.JournalClientStateConsumer;
import com.questdb.net.ha.comsumer.JournalSymbolTableConsumer;
import com.questdb.net.ha.model.IndexedJournal;
import com.questdb.net.ha.producer.JournalClientStateProducer;
import com.questdb.net.ha.producer.JournalSymbolTableProducer;
import com.questdb.std.ex.JournalNetworkException;
import com.questdb.store.JournalWriter;
import com.questdb.test.tools.AbstractTest;
import com.questdb.test.tools.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class LinkedJournalSymbolTableTest extends AbstractTest {
private final JournalClientStateProducer journalClientStateProducer = new JournalClientStateProducer();
private final JournalClientStateConsumer journalClientStateConsumer = new JournalClientStateConsumer();
private JournalWriter<RDFNode> origin;
private JournalWriter<RDFNode> master;
private JournalWriter<RDFNode> slave;
private MockByteChannel channel;
private JournalSymbolTableProducer journalSymbolTableProducer;
private JournalSymbolTableConsumer journalSymbolTableConsumer;
@Before
public void setUp() throws Exception {
origin = getFactory().writer(RDFNode.class, "origin");
master = getFactory().writer(RDFNode.class, "master");
slave = getFactory().writer(RDFNode.class, "slave");
channel = new MockByteChannel();
journalSymbolTableProducer = new JournalSymbolTableProducer(master);
journalSymbolTableConsumer = new JournalSymbolTableConsumer(slave);
origin.append(new RDFNode().setObj("O1").setSubj("S1"));
origin.append(new RDFNode().setObj("O2").setSubj("S1"));
origin.append(new RDFNode().setObj("O3").setSubj("S2"));
origin.append(new RDFNode().setObj("S2").setSubj("S1"));
}
@After
public void tearDown() {
origin.close();
master.close();
slave.close();
}
@Test
public void testSameAsSymbolTable() throws Exception {
master.append(origin.query().all().asResultSet().subset(0, 2));
master.commit(false, 101L, 10);
master.append(origin.query().all().asResultSet().subset(2, 4));
master.commit(false, 102L, 20);
slave.append(origin.query().all().asResultSet().subset(0, 2));
slave.commit(false, 101L, 10);
executeSequence(true);
}
private void executeSequence(boolean expectContent) throws JournalNetworkException {
journalClientStateProducer.write(channel, new IndexedJournal(0, slave));
journalClientStateConsumer.read(channel);
// journalSymbolTableProducer.configure(journalClientStateConsumer.getValue());
journalSymbolTableProducer.configure(master.find(journalClientStateConsumer.getValue().getTxn(), journalClientStateConsumer.getValue().getTxPin()));
Assert.assertEquals(expectContent, journalSymbolTableProducer.hasContent());
if (expectContent) {
journalSymbolTableProducer.write(channel);
journalSymbolTableConsumer.read(channel);
TestUtils.compareSymbolTables(master, slave);
}
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2018 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.net.ha;
import com.questdb.model.Quote;
import com.questdb.net.ha.comsumer.HugeBufferConsumer;
import com.questdb.net.ha.producer.HugeBufferProducer;
import com.questdb.store.JournalWriter;
import com.questdb.store.factory.configuration.JournalConfiguration;
import com.questdb.store.factory.configuration.JournalMetadata;
import com.questdb.test.tools.AbstractTest;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
public class MetadataReplicationTest extends AbstractTest {
@Test
public void testReplication() {
try (JournalWriter w = getFactory().writer(Quote.class)) {
MockByteChannel channel = new MockByteChannel();
HugeBufferProducer p = new HugeBufferProducer(new File(w.getLocation(), JournalConfiguration.FILE_NAME));
HugeBufferConsumer c = new HugeBufferConsumer(new File(w.getLocation(), "_remote"));
p.write(channel);
c.read(channel);
try (JournalWriter w2 = getFactory().writer(
new JournalMetadata<>(c.getHb(), "xyz")
)) {
Assert.assertTrue(w.getMetadata().isCompatible(w2.getMetadata(), false));
}
p.free();
c.free();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2018 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.net.ha;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
class MockByteChannel extends ByteArrayOutputStream implements ByteChannel {
private int offset = 0;
private int cutoffIndex = -1;
private boolean interrupted = false;
@Override
public boolean isOpen() {
return true;
}
@Override
public int read(ByteBuffer dst) {
if (offset == buf.length) {
return -1;
}
if (interrupted) {
interrupted = false;
cutoffIndex = -1;
return 0;
}
// calculate cutoff point on first read
// this is to simulate non-blocking socket mode, where there is
// suddenly nothing to read from socket
int oldOffset = offset;
while (dst.remaining() > 0 && offset < buf.length) {
// if we reached cutoff point - stop filling in buffer
// subsequent call to read() method would return 0
if (offset == cutoffIndex) {
interrupted = true;
break;
}
dst.put(buf[offset++]);
}
return offset - oldOffset;
}
@Override
public String toString() {
return "MockByteChannel{" +
"offset=" + offset +
'}';
}
@Override
public void close() {
}
@Override
public int write(ByteBuffer src) {
int result = src.remaining();
while (src.remaining() > 0) {
write(src.get());
}
return result;
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2018 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.net.ha;
import com.questdb.net.ha.config.ClientConfig;
import com.questdb.net.ha.config.ServerConfig;
import com.questdb.net.ha.config.ServerNode;
import com.questdb.net.ha.mcast.AbstractOnDemandSender;
import com.questdb.net.ha.mcast.OnDemandAddressPoller;
import com.questdb.net.ha.mcast.OnDemandAddressSender;
import com.questdb.std.ex.JournalNetworkException;
import com.questdb.test.tools.AbstractTest;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import java.net.Inet6Address;
import java.net.InterfaceAddress;
import java.net.SocketException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@Ignore
public class MulticastTest extends AbstractTest {
private boolean multicastDisabled;
public MulticastTest() throws JournalNetworkException, SocketException {
multicastDisabled = isMulticastDisabled();
}
@Test
public void testAllNics() throws Exception {
if (multicastDisabled) {
return;
}
assertMulticast();
}
@Test
public void testDefaultNICBehaviour() throws Exception {
if (multicastDisabled) {
return;
}
assertMulticast();
}
@Test
public void testIPV4Forced() throws Exception {
if (multicastDisabled) {
return;
}
System.setProperty("java.net.preferIPv4Stack", "true");
assertMulticast();
}
@Test
public void testIPv6() throws Exception {
if (multicastDisabled || !hasIPv6()) {
return;
}
JournalServer server = new JournalServer(new ServerConfig() {{
addNode(new ServerNode(0, "[0:0:0:0:0:0:0:0]"));
setHeartbeatFrequency(100);
}}, getFactory(), null, 0);
final CountDownLatch connected = new CountDownLatch(1);
JournalClient client = new JournalClient(new ClientConfig(), getFactory(), null, evt -> {
if (evt == JournalClientEvents.EVT_CONNECTED) {
connected.countDown();
}
});
server.start();
client.start();
connected.await(3, TimeUnit.SECONDS);
client.halt();
server.halt();
}
@Test
public void testLocalhostBehaviour() throws Exception {
if (multicastDisabled) {
return;
}
assertMulticast();
}
private static boolean isMulticastDisabled() throws JournalNetworkException, SocketException {
return !new ServerConfig().getMultiCastInterface(0).supportsMulticast();
}
private static boolean hasIPv6() throws JournalNetworkException {
List<InterfaceAddress> ifs = new ServerConfig().getMultiCastInterface(0).getInterfaceAddresses();
for (int i = 0; i < ifs.size(); i++) {
if (ifs.get(i).getAddress() instanceof Inet6Address) {
return true;
}
}
return false;
}
private void assertMulticast() throws JournalNetworkException, InterruptedException {
AbstractOnDemandSender sender = new OnDemandAddressSender(new ServerConfig(), 120, 150, 0);
sender.start();
Thread.sleep(1000L);
OnDemandAddressPoller poller = new OnDemandAddressPoller(new ClientConfig(), 150, 120);
ServerNode address = poller.poll(2, 500, TimeUnit.MILLISECONDS);
Assert.assertNotNull(address);
sender.halt();
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2018 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.net.ha;
import com.questdb.model.Quote;
import com.questdb.net.ha.comsumer.JournalClientStateConsumer;
import com.questdb.net.ha.comsumer.JournalSymbolTableConsumer;
import com.questdb.net.ha.comsumer.PartitionDeltaConsumer;
import com.questdb.net.ha.model.IndexedJournal;
import com.questdb.net.ha.producer.JournalClientStateProducer;
import com.questdb.net.ha.producer.JournalSymbolTableProducer;
import com.questdb.net.ha.producer.PartitionDeltaProducer;
import com.questdb.std.ex.JournalNetworkException;
import com.questdb.std.time.DateFormatUtils;
import com.questdb.store.JournalWriter;
import com.questdb.store.Partition;
import com.questdb.test.tools.AbstractTest;
import com.questdb.test.tools.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class PartitionTest extends AbstractTest {
private static final long timestamp = DateFormatUtils.parseDateTimeQuiet("2013-12-12T00:00:00.000Z");
private JournalWriter<Quote> origin;
private JournalWriter<Quote> master;
private JournalWriter<Quote> slave;
private PartitionDeltaProducer producer;
private PartitionDeltaConsumer consumer;
private MockByteChannel channel;
private Partition<Quote> masterPartition;
private Partition<Quote> slavePartition;
@Before
public void setUp() throws Exception {
origin = getFactory().writer(Quote.class, "origin");
master = getFactory().writer(Quote.class, "master");
slave = getFactory().writer(Quote.class, "slave");
masterPartition = master.getAppendPartition(timestamp);
slavePartition = slave.getAppendPartition(timestamp);
producer = new PartitionDeltaProducer(masterPartition);
consumer = new PartitionDeltaConsumer(slavePartition);
channel = new MockByteChannel();
TestUtils.generateQuoteData(origin, 1000, timestamp);
}
@After
public void tearDown() {
origin.close();
master.close();
slave.close();
}
@Test
public void testConsumerEqualToProducer() throws Exception {
master.append(origin);
slave.append(origin);
Assert.assertEquals(1000, masterPartition.size());
Assert.assertEquals(1000, slavePartition.size());
producer.configure(slave.size());
Assert.assertFalse(producer.hasContent());
}
@Test
public void testConsumerLargerThanProducer() throws Exception {
master.append(origin.query().all().asResultSet().subset(0, 700));
slave.append(origin);
producer.configure(slave.size());
Assert.assertFalse(producer.hasContent());
}
@Test
public void testConsumerReset() throws Exception {
master.append(origin);
slave.append(origin.query().all().asResultSet().subset(0, 600));
producer.configure(slave.size());
Assert.assertTrue(producer.hasContent());
syncSymbolTables();
producer.write(channel);
consumer.read(channel);
comparePartitions();
TestUtils.generateQuoteData(master, 200, DateFormatUtils.parseDateTime("2014-01-01T00:00:00.000Z"));
producer.configure(slave.size());
producer.write(channel);
consumer.read(channel);
comparePartitions();
}
@Test
public void testConsumerSmallerThanProducer() throws Exception {
master.append(origin);
slave.append(origin.query().all().asResultSet().subset(0, 700));
Assert.assertEquals(1000, masterPartition.size());
Assert.assertEquals(700, slavePartition.size());
producer.configure(slave.size());
Assert.assertTrue(producer.hasContent());
producer.write(channel);
consumer.read(channel);
comparePartitions();
}
@Test
public void testEmptyConsumerAndPopulatedProducer() throws Exception {
master.append(origin);
producer.configure(slave.size());
Assert.assertTrue(producer.hasContent());
syncSymbolTables();
producer.write(channel);
consumer.read(channel);
comparePartitions();
}
@Test
public void testEmptyConsumerAndProducer() throws Exception {
producer.configure(slave.size());
Assert.assertFalse(producer.hasContent());
}
private void comparePartitions() {
Assert.assertEquals(masterPartition.size(), slavePartition.size());
for (int i = 0; i < slavePartition.size(); i++) {
Assert.assertEquals(masterPartition.read(i), slavePartition.read(i));
}
}
private void syncSymbolTables() throws JournalNetworkException {
JournalClientStateProducer sp = new JournalClientStateProducer();
JournalClientStateConsumer sc = new JournalClientStateConsumer();
sp.write(channel, new IndexedJournal(0, slave));
sc.read(channel);
JournalSymbolTableProducer p = new JournalSymbolTableProducer(master);
JournalSymbolTableConsumer c = new JournalSymbolTableConsumer(slave);
p.configure(master.find(sc.getValue().getTxn(), sc.getValue().getTxPin()));
p.write(channel);
c.read(channel);
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2018 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.net.ha;
import com.questdb.model.Quote;
import com.questdb.net.ha.config.ClientConfig;
import com.questdb.net.ha.config.ServerConfig;
import com.questdb.std.ex.JournalException;
import com.questdb.store.Journal;
import com.questdb.store.JournalListener;
import com.questdb.store.JournalRuntimeException;
import com.questdb.store.JournalWriter;
import com.questdb.test.tools.AbstractTest;
import com.questdb.test.tools.TestUtils;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class ReconnectTest extends AbstractTest {
@Test
@Ignore
public void testServerRestart() throws Exception {
final int size = 10000;
try (JournalWriter<Quote> remote = getFactory().writer(Quote.class, "remote", 2 * size)) {
// start server #1
JournalServer server = newServer();
server.publish(remote);
server.start();
final CountDownLatch connectedLatch = new CountDownLatch(1);
JournalClient client = new JournalClient(
new ClientConfig("localhost") {{
getReconnectPolicy().setLoginRetryCount(3);
getReconnectPolicy().setRetryCount(5);
getReconnectPolicy().setSleepBetweenRetriesMillis(TimeUnit.SECONDS.toMillis(1));
}}, getFactory(), null,
evt -> {
if (evt == JournalClientEvents.EVT_CONNECTED) {
connectedLatch.countDown();
}
}
);
// subscribe client, waiting for complete set of data
// when data arrives client triggers latch
final CountDownLatch latch = new CountDownLatch(1);
// create empty "local"
getFactory().writer(Quote.class, "local").close();
try (final Journal<Quote> local = getFactory().reader(Quote.class, "local")) {
client.subscribe(Quote.class, "remote", "local", 2 * size, new JournalListener() {
@Override
public void onCommit() {
try {
if (local.refresh() && local.size() == 2 * size) {
latch.countDown();
}
} catch (JournalException e) {
throw new JournalRuntimeException(e);
}
}
@Override
public void onEvent(int event) {
}
});
client.start();
Assert.assertTrue(connectedLatch.await(5, TimeUnit.SECONDS));
// generate first batch
TestUtils.generateQuoteData(remote, size, System.currentTimeMillis(), 1);
remote.commit();
// stop server
server.halt();
// start server #2
server = newServer();
server.publish(remote);
server.start();
// generate second batch
TestUtils.generateQuoteData(remote, size, System.currentTimeMillis() + 2 * size, 1);
remote.commit();
// wait for client to get full set
latch.await();
// stop client and server
client.halt();
server.halt();
// assert client state
TestUtils.assertDataEquals(remote, local);
}
}
}
private JournalServer newServer() {
return new JournalServer(new ServerConfig() {{
setHeartbeatFrequency(TimeUnit.MILLISECONDS.toMillis(100));
setEnableMultiCast(false);
}}, getFactory());
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2018 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.net.ha;
import com.questdb.model.Quote;
import com.questdb.net.ha.config.ClientConfig;
import com.questdb.net.ha.config.ServerConfig;
import com.questdb.store.Journal;
import com.questdb.store.JournalWriter;
import com.questdb.store.factory.Factory;
import com.questdb.store.factory.configuration.JournalConfigurationBuilder;
import com.questdb.test.tools.FactoryContainer;
import com.questdb.test.tools.TestUtils;
import org.junit.*;
import java.io.InputStream;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class SSLTest {
@Rule
public final FactoryContainer factoryContainer = new FactoryContainer(new JournalConfigurationBuilder() {{
$(Quote.class).recordCountHint(2000)
.$sym("sym").valueCountHint(20)
.$sym("mode")
.$sym("ex")
;
}});
@After
public void tearDown() {
Assert.assertEquals(0, getFactory().getBusyWriterCount());
Assert.assertEquals(0, getFactory().getBusyReaderCount());
}
@Test
@Ignore
public void testAuthBothCertsMissing() throws Exception {
try (JournalWriter<Quote> remote = getFactory().writer(Quote.class, "remote")) {
JournalServer server = new JournalServer(new ServerConfig() {{
setHeartbeatFrequency(TimeUnit.MILLISECONDS.toMillis(500));
getSslConfig().setSecure(true);
getSslConfig().setRequireClientAuth(true);
try (InputStream is = this.getClass().getResourceAsStream("/keystore/singlekey.ks")) {
getSslConfig().setKeyStore(is, "changeit");
}
setEnableMultiCast(false);
setHeartbeatFrequency(50);
}}, getFactory());
try {
final AtomicInteger serverErrorCount = new AtomicInteger();
final CountDownLatch terminated = new CountDownLatch(1);
JournalClient client = new JournalClient(new ClientConfig("localhost") {{
getSslConfig().setSecure(true);
try (InputStream is = this.getClass().getResourceAsStream("/keystore/singlekey.ks")) {
getSslConfig().setTrustStore(is, "changeit");
}
}}, getFactory(), null, evt -> {
switch (evt) {
case JournalClientEvents.EVT_SERVER_ERROR:
serverErrorCount.incrementAndGet();
break;
case JournalClientEvents.EVT_TERMINATED:
terminated.countDown();
break;
default:
break;
}
});
server.publish(remote);
server.start();
client.subscribe(Quote.class, "remote", "local");
client.start();
Assert.assertTrue(terminated.await(5, TimeUnit.SECONDS));
Assert.assertEquals(0, server.getConnectedClients());
Assert.assertFalse(client.isRunning());
Assert.assertEquals(1, serverErrorCount.get());
} finally {
server.halt();
}
}
}
@Test
public void testClientAuth() throws Exception {
int size = 2000;
try (JournalWriter<Quote> remote = getFactory().writer(Quote.class, "remote")) {
JournalServer server = new JournalServer(new ServerConfig() {{
setHeartbeatFrequency(TimeUnit.MILLISECONDS.toMillis(500));
getSslConfig().setSecure(true);
getSslConfig().setRequireClientAuth(true);
try (InputStream is = this.getClass().getResourceAsStream("/keystore/singlekey.ks")) {
getSslConfig().setKeyStore(is, "changeit");
}
try (InputStream is = this.getClass().getResourceAsStream("/keystore/singlekey.ks")) {
getSslConfig().setTrustStore(is, "changeit");
}
setEnableMultiCast(false);
setHeartbeatFrequency(50);
}}, getFactory());
try {
JournalClient client = new JournalClient(new ClientConfig("localhost") {{
getSslConfig().setSecure(true);
try (InputStream is = this.getClass().getResourceAsStream("/keystore/singlekey.ks")) {
getSslConfig().setKeyStore(is, "changeit");
}
try (InputStream is = this.getClass().getResourceAsStream("/keystore/singlekey.ks")) {
getSslConfig().setTrustStore(is, "changeit");
}
}}, getFactory());
server.publish(remote);
server.start();
client.subscribe(Quote.class, "remote", "local");
client.start();
TestUtils.generateQuoteData(remote, size);
Thread.sleep(1000);
client.halt();
try (Journal<Quote> local = getFactory().reader(Quote.class, "local")) {
TestUtils.assertDataEquals(remote, local);
}
} finally {
server.halt();
}
}
}
@Test
public void testNoCertTrustAllSSL() throws Exception {
int size = 2000;
try (JournalWriter<Quote> remote = getFactory().writer(Quote.class, "remote")) {
JournalServer server = new JournalServer(new ServerConfig() {{
setHeartbeatFrequency(TimeUnit.MILLISECONDS.toMillis(500));
getSslConfig().setSecure(true);
try (InputStream is = this.getClass().getResourceAsStream("/keystore/singlekey.ks")) {
getSslConfig().setKeyStore(is, "changeit");
}
setEnableMultiCast(false);
setHeartbeatFrequency(50);
}}, getFactory());
try {
JournalClient client = new JournalClient(new ClientConfig("localhost") {{
getSslConfig().setSecure(true);
getSslConfig().setTrustAll(true);
}}, getFactory());
server.publish(remote);
server.start();
client.subscribe(Quote.class, "remote", "local");
client.start();
TestUtils.generateQuoteData(remote, size);
Thread.sleep(1000);
client.halt();
} finally {
server.halt();
}
try (Journal<Quote> local = getFactory().reader(Quote.class, "local")) {
TestUtils.assertDataEquals(remote, local);
}
}
}
@Test
public void testNonAuthClientTrustMissing() throws Exception {
try (JournalWriter<Quote> remote = getFactory().writer(Quote.class, "remote")) {
JournalServer server = new JournalServer(new ServerConfig() {{
setHeartbeatFrequency(TimeUnit.MILLISECONDS.toMillis(500));
getSslConfig().setSecure(true);
try (InputStream is = this.getClass().getResourceAsStream("/keystore/singlekey.ks")) {
getSslConfig().setKeyStore(is, "changeit");
}
setEnableMultiCast(false);
setHeartbeatFrequency(50);
}}, getFactory());
try {
final AtomicInteger serverErrorCount = new AtomicInteger();
final CountDownLatch terminated = new CountDownLatch(1);
JournalClient client = new JournalClient(new ClientConfig("localhost") {{
getSslConfig().setSecure(true);
}}, getFactory(), null, evt -> {
switch (evt) {
case JournalClientEvents.EVT_SERVER_ERROR:
serverErrorCount.incrementAndGet();
break;
case JournalClientEvents.EVT_TERMINATED:
terminated.countDown();
break;
default:
break;
}
});
server.publish(remote);
server.start();
client.subscribe(Quote.class, "remote", "local");
client.subscribe(Quote.class, "remote", "local");
client.start();
Assert.assertTrue(terminated.await(5, TimeUnit.SECONDS));
// Assert.assertEquals(0, server.getConnectedClients());
Assert.assertFalse(client.isRunning());
Assert.assertEquals(1, serverErrorCount.get());
client.halt();
} finally {
server.halt();
}
}
}
@Test
public void testServerTrustMissing() throws Exception {
try (JournalWriter<Quote> remote = getFactory().writer(Quote.class, "remote")) {
JournalServer server = new JournalServer(new ServerConfig() {{
setHeartbeatFrequency(TimeUnit.MILLISECONDS.toMillis(500));
getSslConfig().setSecure(true);
getSslConfig().setRequireClientAuth(true);
try (InputStream is = this.getClass().getResourceAsStream("/keystore/singlekey.ks")) {
getSslConfig().setKeyStore(is, "changeit");
}
setEnableMultiCast(false);
setHeartbeatFrequency(50);
}}, getFactory());
try {
final AtomicInteger serverErrorCount = new AtomicInteger();
final CountDownLatch terminated = new CountDownLatch(1);
JournalClient client = new JournalClient(new ClientConfig("localhost") {{
getSslConfig().setSecure(true);
try (InputStream is = this.getClass().getResourceAsStream("/keystore/singlekey.ks")) {
getSslConfig().setTrustStore(is, "changeit");
}
try (InputStream is = this.getClass().getResourceAsStream("/keystore/singlekey.ks")) {
getSslConfig().setKeyStore(is, "changeit");
}
}}, getFactory(), null, evt -> {
switch (evt) {
case JournalClientEvents.EVT_SERVER_ERROR:
serverErrorCount.incrementAndGet();
break;
case JournalClientEvents.EVT_TERMINATED:
terminated.countDown();
break;
default:
break;
}
});
server.publish(remote);
server.start();
client.subscribe(Quote.class, "remote", "local");
client.start();
Assert.assertTrue(terminated.await(5, TimeUnit.SECONDS));
Assert.assertFalse(client.isRunning());
Assert.assertEquals(1, serverErrorCount.get());
client.halt();
} finally {
server.halt();
}
}
}
@Test
public void testSingleKeySSL() throws Exception {
int size = 1000;
try (JournalWriter<Quote> remote = getFactory().writer(Quote.class, "remote")) {
JournalServer server = new JournalServer(new ServerConfig() {{
setHeartbeatFrequency(TimeUnit.MILLISECONDS.toMillis(500));
getSslConfig().setSecure(true);
try (InputStream is = this.getClass().getResourceAsStream("/keystore/singlekey.ks")) {
getSslConfig().setKeyStore(is, "changeit");
}
setEnableMultiCast(false);
setHeartbeatFrequency(50);
}}, getFactory());
try {
JournalClient client = new JournalClient(new ClientConfig("localhost") {{
setTcpNoDelay(false);
try (InputStream is = this.getClass().getResourceAsStream("/keystore/singlekey.ks")) {
getSslConfig().setTrustStore(is, "changeit");
}
getSslConfig().setSecure(true);
}}, getFactory());
server.publish(remote);
server.start();
client.subscribe(Quote.class, "remote", "local");
client.start();
TestUtils.generateQuoteData(remote, size);
Thread.sleep(500);
client.halt();
try (Journal<Quote> local = getFactory().reader(Quote.class, "local")) {
TestUtils.assertDataEquals(remote, local);
}
} finally {
server.halt();
}
}
}
private Factory getFactory() {
return factoryContainer.getFactory();
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2018 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.net.ha;
import com.questdb.model.Quote;
import com.questdb.net.ha.config.ClientConfig;
import com.questdb.net.ha.config.ServerConfig;
import com.questdb.std.Rnd;
import com.questdb.std.ex.JournalException;
import com.questdb.store.*;
import com.questdb.store.factory.configuration.Constants;
import com.questdb.test.tools.AbstractTest;
import com.questdb.test.tools.TestData;
import com.questdb.test.tools.TestUtils;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Ignore
public class ScenarioTest extends AbstractTest {
private final ServerConfig serverConfig = new ServerConfig() {{
setHeartbeatFrequency(TimeUnit.MILLISECONDS.toMillis(300));
setEnableMultiCast(false);
}};
private final ClientConfig clientConfig = new ClientConfig("localhost");
@Test
public void testLagTrickle() throws Exception {
// prepare test data
try (JournalWriter<Quote> origin = getFactory().writer(Quote.class, "origin")) {
TestData.appendQuoteData2(origin);
try (final JournalWriter<Quote> randomOrigin = getFactory().writer(new JournalKey<>(Quote.class, "origin-rnd", PartitionBy.NONE, Constants.NULL_RECORD_HINT, false))) {
randomOrigin.append(origin.query().all().asResultSet().shuffle(new Rnd()));
try (final JournalWriter<Quote> remote = getFactory().writer(Quote.class, "remote")) {
try (final Journal<Quote> remoteReader = getFactory().reader(Quote.class, "remote")) {
// create empty journal
getFactory().writer(Quote.class, "local").close();
// setup local where data should be trickling from client
try (final Journal<Quote> local = getFactory().reader(Quote.class, "local")) {
Assert.assertEquals(0, local.size());
JournalServer server = new JournalServer(serverConfig, getFactory());
JournalClient client = new JournalClient(clientConfig, getFactory());
server.publish(remote);
server.start();
final AtomicInteger errors = new AtomicInteger();
final CountDownLatch ready = new CountDownLatch(1);
client.subscribe(Quote.class, "remote", "local", new JournalListener() {
@Override
public void onCommit() {
try {
if (local.refresh() && local.size() == 33) {
ready.countDown();
}
} catch (JournalException e) {
errors.incrementAndGet();
e.printStackTrace();
}
}
@Override
public void onEvent(int event) {
if (event != JournalEvents.EVT_JNL_SUBSCRIBED) {
errors.incrementAndGet();
}
}
});
client.start();
int n = 0;
while (n < 400) {
lagIteration(randomOrigin, remote, n, n + 10);
n += 10;
}
Assert.assertTrue(ready.await(10, TimeUnit.SECONDS));
server.halt();
client.halt();
local.refresh();
remoteReader.refresh();
TestUtils.assertEquals(remoteReader, local);
Assert.assertEquals(0, errors.get());
}
}
}
}
}
}
@Test
public void testSingleJournalTrickle() throws Exception {
JournalServer server = new JournalServer(serverConfig, getFactory());
JournalClient client = new JournalClient(clientConfig, getFactory());
// prepare test data
try (JournalWriter<Quote> origin = getFactory().writer(Quote.class, "origin")) {
TestData.appendQuoteData1(origin);
Assert.assertEquals(100, origin.size());
// setup remote we will be trickling test data into
try (JournalWriter<Quote> remote = getFactory().writer(Quote.class, "remote")) {
getFactory().writer(Quote.class, "local").close();
// setup local where data should be trickling from client
try (Journal<Quote> local = getFactory().reader(Quote.class, "local")) {
Assert.assertEquals(0, local.size());
server.publish(remote);
server.start();
client.subscribe(Quote.class, "remote", "local");
client.start();
try {
iteration("2013-02-10T10:03:20.000Z\tALDW\t0.32885755937534\t0.5741201360255567\t1836077773\t693649102\tFast trading\tSK\n" +
"2013-02-10T10:06:40.000Z\tAMD\t0.16781047061245025\t0.4831627617900026\t1423050407\t141794980\tFast trading\tGR\n" +
"2013-02-10T10:07:30.000Z\tHSBA.L\t0.04724340267969518\t0.5988337212476811\t178180342\t1522085049\tFast trading\tSK\n",
origin, remote, local, 0, 10
);
iteration("2013-02-10T10:15:50.000Z\tALDW\t0.7976166367363274\t0.06448758069572669\t1436005581\t1897226585\tFast trading\tGR\n" +
"2013-02-10T10:15:00.000Z\tAMD\t0.6789043827286667\t0.771921575501964\t580589771\t1159590077\tFast trading\tLXE\n" +
"2013-02-10T10:14:10.000Z\tHSBA.L\t0.984512894941384\t0.2664006899723862\t1288300070\t838312365\tFast trading\tLXE\n",
origin, remote, local, 10, 20
);
iteration("2013-02-10T10:24:10.000Z\tALDW\t0.26008876203627374\t0.04354393444455451\t25334630\t1835685418\tFast trading\tGR\n" +
"2013-02-10T10:23:20.000Z\tAMD\t0.9757637204046299\t0.7654386171943978\t23937995\t992860510\tFast trading\tLXE\n" +
"2013-02-10T10:21:40.000Z\tHSBA.L\t0.5630111081489209\t0.4222995146933318\t1534594684\t1153925552\tFast trading\tLN\n",
origin, remote, local, 20, 30
);
} finally {
client.halt();
server.halt();
}
}
}
}
}
private static void iteration(String expected, Journal<Quote> origin, JournalWriter<Quote> remote, Journal<Quote> local, int lo, int hi) throws Exception {
remote.append(origin.query().all().asResultSet().subset(lo, hi));
remote.commit();
int count = 0;
do {
Thread.sleep(100);
if (count++ > 10) {
Assert.fail("Refresh is too slow!");
}
}
while (!local.refresh());
TestUtils.assertEquals(expected, local.query().head().withKeys().asResultSet());
}
private void lagIteration(final Journal<Quote> origin, final JournalWriter<Quote> remote, final int lo, final int hi) throws JournalException {
remote.mergeAppend(new ArrayList<Quote>() {{
for (Quote q : origin.query().all().asResultSet().subset(lo, hi).sort("timestamp")) {
add(q);
}
}});
remote.commit();
}
}
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2018 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
package com.questdb.ql.analytic;
import com.questdb.ql.AbstractAllTypeTest;
import org.junit.Test;
public class MiscAnalyticFunctionTest extends AbstractAllTypeTest {
@Test
public void testAnalyticAndAggregates() throws Exception {
assertThat("0\tBZ\t2016-05-01T10:40:00.000Z\n" +
"1\tXX\t2016-05-01T10:37:00.000Z\n" +
"2\tKK\t2016-05-01T10:32:00.000Z\n" +
"3\tAX\t2016-05-01T10:38:00.000Z\n",
"select dense_rank() x over(), str, max(timestamp) from 'abc'");
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册