提交 7328cd25 编写于 作者: V Vlad Ilyushchenko

cluster bits (issue #29)

上级 e9aff645
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -729,7 +729,6 @@ public class JournalWriter<T> extends Journal<T> {
Partition<T> partition = lastNonEmptyNonLag();
Partition<T> lag = getIrregularPartition();
Tx tx = new Tx();
tx.command = command;
tx.prevTxAddress = txLog.getTxAddress();
tx.journalMaxRowID = partition == null ? 0 : Rows.toRowID(partition.getPartitionIndex(), partition.size());
......
/*
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nfsdb.exceptions;
public class ClusterLossException extends JournalNetworkException {
public ClusterLossException() {
super("Cluster already has a master. LOST.");
}
}
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -198,21 +198,49 @@ public class JournalClient {
intResponseConsumer.reset();
}
private void handshake() throws JournalNetworkException {
SocketChannel channel = config.openSocketChannel();
public boolean pingServer() {
try {
statsChannel = new StatsCollectingReadableByteChannel(channel.getRemoteAddress());
} catch (IOException e) {
throw new JournalNetworkException("Cannot get remote address", e);
openChannel();
sendProtocolVersion();
return true;
} catch (JournalNetworkException e) {
return false;
}
}
SslConfig sslConfig = config.getSslConfig();
if (sslConfig.isSecure()) {
this.channel = new SecureByteChannel(channel, sslConfig);
} else {
this.channel = channel;
boolean sendClusterWin(int instance) throws JournalNetworkException {
openChannel();
commandProducer.write(channel, Command.CLUSTER_WIN);
intResponseProducer.write(channel, instance);
stringResponseConsumer.reset();
stringResponseConsumer.read(channel);
if (!stringResponseConsumer.isComplete()) {
LOGGER.info("Received incomplete response from cluster member.");
return false;
}
return "OK".equals(stringResponseConsumer.getValue());
}
private void openChannel() throws JournalNetworkException {
if (this.channel == null) {
SocketChannel channel = config.openSocketChannel();
try {
statsChannel = new StatsCollectingReadableByteChannel(channel.getRemoteAddress());
} catch (IOException e) {
throw new JournalNetworkException("Cannot get remote address", e);
}
SslConfig sslConfig = config.getSslConfig();
if (sslConfig.isSecure()) {
this.channel = new SecureByteChannel(channel, sslConfig);
} else {
this.channel = channel;
}
}
}
private void handshake() throws JournalNetworkException {
openChannel();
sendProtocolVersion();
sendKeys();
checkAuthAndSendCredential();
......
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -19,6 +19,7 @@ package com.nfsdb.net;
import com.nfsdb.JournalKey;
import com.nfsdb.JournalWriter;
import com.nfsdb.concurrent.NamedDaemonThreadFactory;
import com.nfsdb.exceptions.ClusterLossException;
import com.nfsdb.exceptions.JournalDisconnectedChannelException;
import com.nfsdb.exceptions.JournalNetworkException;
import com.nfsdb.factory.JournalReaderFactory;
......@@ -161,6 +162,10 @@ public class JournalServer {
return channels.size();
}
public int getClusterInstance() {
return config.getClusterInstance();
}
int getWriterIndex(JournalKey key) {
for (int i = 0, sz = writers.size(); i < sz; i++) {
JournalKey jk = writers.get(i).getKey();
......@@ -232,6 +237,7 @@ public class JournalServer {
@Override
public void run() {
boolean haltServer = false;
try {
while (true) {
if (!running.get()) {
......@@ -241,6 +247,10 @@ public class JournalServer {
agent.process(holder.byteChannel);
} catch (JournalDisconnectedChannelException e) {
break;
} catch (ClusterLossException e) {
haltServer = true;
LOGGER.info(e.getMessage());
break;
} catch (JournalNetworkException e) {
if (running.get()) {
LOGGER.info("Client died: " + holder.socketAddress);
......@@ -263,6 +273,10 @@ public class JournalServer {
agent.close();
removeChannel(holder);
}
if (haltServer) {
halt();
}
}
}
}
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -20,6 +20,7 @@ import com.nfsdb.Journal;
import com.nfsdb.JournalKey;
import com.nfsdb.collections.DirectIntList;
import com.nfsdb.collections.Lists;
import com.nfsdb.exceptions.ClusterLossException;
import com.nfsdb.exceptions.JournalDisconnectedChannelException;
import com.nfsdb.exceptions.JournalException;
import com.nfsdb.exceptions.JournalNetworkException;
......@@ -99,6 +100,22 @@ public class JournalServerAgent {
commandConsumer.reset();
}
break;
case CLUSTER_WIN:
checkAuthorized(channel);
intResponseConsumer.read(channel);
if (intResponseConsumer.isComplete()) {
boolean loss = intResponseConsumer.getValue() > server.getClusterInstance();
intResponseConsumer.reset();
commandConsumer.reset();
if (loss) {
ok(channel);
throw new ClusterLossException();
} else {
error(channel, "WIN");
}
}
break;
case HANDSHAKE_COMPLETE:
if (authorized) {
ok(channel);
......@@ -184,7 +201,6 @@ public class JournalServerAgent {
} else {
error(channel, "Unsupported protocol version. Client: " + version + ", Server: " + Version.PROTOCOL_VERSION);
}
}
private <T> void createReader(int index, JournalKey<T> key) throws JournalException {
......
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -33,6 +33,7 @@ public class ServerConfig extends NetworkConfig {
private static final Logger LOGGER = Logger.getLogger(ServerConfig.class);
private long heartbeatFrequency = DEFAULT_HEARTBEAT_FREQUENCY;
private int eventBufferSize = RING_BUFFER_SIZE;
private int clusterInstance = 0;
public long getHeartbeatFrequency() {
return heartbeatFrequency;
......@@ -67,4 +68,12 @@ public class ServerConfig extends NetworkConfig {
throw new JournalNetworkException("Cannot open server socket", e);
}
}
public int getClusterInstance() {
return clusterInstance;
}
public void setClusterInstance(int clusterInstance) {
this.clusterInstance = clusterInstance;
}
}
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -28,6 +28,7 @@ public enum Command {
PROTOCOL_VERSION(0x08),
HANDSHAKE_COMPLETE(0x09),
AUTHORIZATION(0x0a),
CLUSTER_WIN(0x0b),
UNAUTHENTIC(0xFC),
UNKNOWN_CMD(0xFE);
......
/*
* Copyright (c) 2014-2015. Vlad Ilyushchenko
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -22,6 +22,7 @@ public class Tx {
public static final byte TX_NORMAL = 0;
public static final byte TX_FORCE = 1;
public static final byte TX_PRECOMMIT = 2;
// transient
public long address;
// 8
......
/*
* Copyright (c) 2014. Vlad Ilyushchenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nfsdb.net;
import com.nfsdb.net.config.ClientConfig;
import com.nfsdb.net.config.ServerConfig;
import com.nfsdb.test.tools.AbstractTest;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
public class ClusterTest extends AbstractTest {
@Test
public void testBasicClusterWin() throws Exception {
JournalServer server = new JournalServer(new ServerConfig() {{
setHostname("localhost");
setPort(7079);
setHeartbeatFrequency(TimeUnit.MILLISECONDS.toMillis(100));
setEnableMulticast(false);
}}, factory);
JournalClient client = new JournalClient(new ClientConfig() {{
setHostname("localhost");
setPort(7080);
}}, factory);
if (!client.pingServer()) {
server.start();
if (client.pingServer() && !client.sendClusterWin(server.getClusterInstance())) {
server.halt();
}
}
System.out.println(server.isRunning());
JournalServer server2 = new JournalServer(new ServerConfig() {{
setHostname("localhost");
setPort(7080);
setHeartbeatFrequency(TimeUnit.MILLISECONDS.toMillis(100));
setEnableMulticast(false);
}}, factory);
JournalClient client2 = new JournalClient(new ClientConfig() {{
setHostname("localhost");
setPort(7079);
}}, factory);
if (!client2.pingServer()) {
server2.start();
if (client2.pingServer() && !client2.sendClusterWin(server.getClusterInstance())) {
server.halt();
}
}
client2.halt();
System.out.println(server2.isRunning());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册