提交 124cfef1 编写于 作者: C chegar

6855335: Several changes in the SCTP implementation.

Reviewed-by: michaelm
上级 e58b0432
......@@ -27,6 +27,7 @@ SUNWprivate_1.1 {
global:
Java_sun_nio_ch_SctpNet_socket0;
Java_sun_nio_ch_SctpNet_bindx;
Java_sun_nio_ch_SctpNet_branch0;
Java_sun_nio_ch_SctpNet_getLocalAddresses0;
Java_sun_nio_ch_SctpNet_getRemoteAddresses0;
Java_sun_nio_ch_SctpNet_getPrimAddrOption0;
......
......@@ -26,6 +26,7 @@ package sun.nio.ch;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.InetSocketAddress;
import java.io.FileDescriptor;
import java.io.IOException;
......@@ -122,6 +123,8 @@ public class SctpChannelImpl extends SctpChannel
private Association association;
private Set<SocketAddress> remoteAddresses = Collections.EMPTY_SET;
/* -- End of fields protected by stateLock -- */
private SctpResultContainer commUpResultContainer; /* null */
......@@ -142,18 +145,32 @@ public class SctpChannelImpl extends SctpChannel
*/
public SctpChannelImpl(SelectorProvider provider, FileDescriptor fd)
throws IOException {
this(provider, fd, null);
}
/**
* Constructor for sockets obtained from branching
*/
public SctpChannelImpl(SelectorProvider provider,
FileDescriptor fd,
Association association)
throws IOException {
super(provider);
this.fd = fd;
this.fdVal = IOUtil.fdVal(fd);
this.state = ChannelState.CONNECTED;
port = (Net.localAddress(fd)).getPort();
/* Receive COMM_UP */
ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
try {
receive(buf, null, null, true);
} finally {
Util.releaseTemporaryDirectBuffer(buf);
if (association != null) { /* branched */
this.association = association;
} else { /* obtained from server channel */
/* Receive COMM_UP */
ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
try {
receive(buf, null, null, true);
} finally {
Util.releaseTemporaryDirectBuffer(buf);
}
}
}
......@@ -391,6 +408,12 @@ public class SctpChannelImpl extends SctpChannel
} finally {
Util.releaseTemporaryDirectBuffer(buf);
}
/* cache remote addresses */
try {
remoteAddresses = getRemoteAddresses();
} catch (IOException unused) { /* swallow exception */ }
return true;
}
} else {
......@@ -414,6 +437,7 @@ public class SctpChannelImpl extends SctpChannel
int maxOutStreams,
int maxInStreams)
throws IOException {
ensureOpenAndUnconnected();
return setOption(SCTP_INIT_MAXSTREAMS, InitMaxStreams.
create(maxInStreams, maxOutStreams)).connect(endpoint);
......@@ -512,6 +536,12 @@ public class SctpChannelImpl extends SctpChannel
} finally {
Util.releaseTemporaryDirectBuffer(buf);
}
/* cache remote addresses */
try {
remoteAddresses = getRemoteAddresses();
} catch (IOException unused) { /* swallow exception */ }
return true;
}
}
......@@ -966,7 +996,7 @@ public class SctpChannelImpl extends SctpChannel
int pos = src.position();
int lim = src.limit();
assert (pos <= lim && streamNumber > 0);
assert (pos <= lim && streamNumber >= 0);
int rem = (pos <= lim ? lim - pos : 0);
if (src instanceof DirectBuffer)
......@@ -1043,10 +1073,15 @@ public class SctpChannelImpl extends SctpChannel
synchronized (stateLock) {
if (!isOpen())
throw new ClosedChannelException();
if (!isConnected())
if (!isConnected() || isShutdown)
return Collections.EMPTY_SET;
return SctpNet.getRemoteAddresses(fdVal, 0/*unused*/);
try {
return SctpNet.getRemoteAddresses(fdVal, 0/*unused*/);
} catch (SocketException unused) {
/* an open connected channel should always have remote addresses */
return remoteAddresses;
}
}
}
......
......@@ -26,6 +26,7 @@ package sun.nio.ch;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.InetSocketAddress;
import java.io.FileDescriptor;
import java.io.IOException;
......@@ -398,8 +399,8 @@ public class SctpMultiChannelImpl extends SctpMultiChannel
if (!isOpen())
throw new ClosedChannelException();
SctpNet.setSocketOption(fdVal, name, value,
association.associationID());
int assocId = association == null ? 0 : association.associationID();
SctpNet.setSocketOption(fdVal, name, value, assocId);
}
return this;
}
......@@ -414,12 +415,15 @@ public class SctpMultiChannelImpl extends SctpMultiChannel
throw new UnsupportedOperationException("'" + name + "' not supported");
synchronized (stateLock) {
checkAssociation(association);
if (association != null && (name.equals(SCTP_PRIMARY_ADDR) ||
name.equals(SCTP_SET_PEER_PRIMARY_ADDR))) {
checkAssociation(association);
}
if (!isOpen())
throw new ClosedChannelException();
return (T)SctpNet.getSocketOption(fdVal, name,
association.associationID());
int assocId = association == null ? 0 : association.associationID();
return (T)SctpNet.getSocketOption(fdVal, name, assocId);
}
}
......@@ -626,15 +630,19 @@ public class SctpMultiChannelImpl extends SctpMultiChannel
case ASSOCIATION_CHANGED :
result = absHandler.handleNotification(
resultContainer.getAssociationChanged(), attachment);
break;
case PEER_ADDRESS_CHANGED :
result = absHandler.handleNotification(
resultContainer.getPeerAddressChanged(), attachment);
break;
case SEND_FAILED :
result = absHandler.handleNotification(
resultContainer.getSendFailed(), attachment);
break;
case SHUTDOWN :
result = absHandler.handleNotification(
resultContainer.getShutdown(), attachment);
break;
default :
/* implementation specific handlers */
result = absHandler.handleNotification(
......@@ -836,7 +844,7 @@ public class SctpMultiChannelImpl extends SctpMultiChannel
int ppid = messageInfo.payloadProtocolID();
int pos = src.position();
int lim = src.limit();
assert (pos <= lim && streamNumber > 0);
assert (pos <= lim && streamNumber >= 0);
int rem = (pos <= lim ? lim - pos : 0);
if (src instanceof DirectBuffer)
......@@ -914,7 +922,13 @@ public class SctpMultiChannelImpl extends SctpMultiChannel
if (!isOpen())
throw new ClosedChannelException();
return SctpNet.getRemoteAddresses(fdVal, association.associationID());
try {
return SctpNet.getRemoteAddresses(fdVal, association.associationID());
} catch (SocketException se) {
/* a valid association should always have remote addresses */
Set<SocketAddress> addrs = associationMap.get(association);
return addrs != null ? addrs : Collections.EMPTY_SET;
}
}
}
......@@ -922,7 +936,16 @@ public class SctpMultiChannelImpl extends SctpMultiChannel
public SctpChannel branch(Association association)
throws IOException {
synchronized (stateLock) {
return null; //TODO: implement
checkAssociation(association);
if (!isOpen())
throw new ClosedChannelException();
FileDescriptor bFd = SctpNet.branch(fdVal,
association.associationID());
/* successfully branched, we can now remove it from assoc list */
removeAssociation(association);
return new SctpChannelImpl(provider(), bFd, association);
}
}
......
......@@ -232,6 +232,11 @@ public class SctpNet {
shutdown0(fd, assocId);
}
static FileDescriptor branch(int fd, int assocId) throws IOException {
int nativefd = branch0(fd, assocId);
return IOUtil.newFD(nativefd);
}
/* Native Methods */
static native int socket0(boolean oneToOne) throws IOException;
......@@ -248,6 +253,8 @@ public class SctpNet {
static native SocketAddress[] getRemoteAddresses0(int fd, int assocId)
throws IOException;
static native int branch0(int fd, int assocId) throws IOException;
static native void setPrimAddrOption0(int fd, int assocId, InetAddress ia,
int port) throws IOException;
......
......@@ -121,6 +121,8 @@ public class SctpResultContainer {
case SHUTDOWN: sb.append("SHUTDOWN"); break;
default : sb.append("Unknown result type");
}
return sb.append(", Value: ").append(value.toString()).toString();
sb.append(", Value: ");
sb.append((value == null) ? "null" : value.toString());
return sb.toString();
}
}
......@@ -407,7 +407,7 @@ public class SctpServerChannelImpl extends SctpServerChannel
if (!isOpen())
throw new ClosedChannelException();
if (!isBound())
return null;
return Collections.EMPTY_SET;
return SctpNet.getLocalAddresses(fdVal);
}
......
......@@ -63,6 +63,8 @@ typedef int sctp_freeladdrs_func(void* addrs);
typedef int sctp_getpaddrs_func(int sock, sctp_assoc_t id, void **addrs);
typedef int sctp_freepaddrs_func(void *addrs);
typedef int sctp_bindx_func(int sock, void *addrs, int addrcnt, int flags);
typedef int sctp_peeloff_func(int sock, sctp_assoc_t id);
#else /* __linux__ */
......@@ -315,6 +317,8 @@ typedef int sctp_freeladdrs_func(struct sockaddr *addrs);
typedef int sctp_getpaddrs_func(int sd, sctp_assoc_t id, struct sockaddr **addrs);
typedef int sctp_freepaddrs_func(struct sockaddr *addrs);
typedef int sctp_bindx_func(int sd, struct sockaddr *addrs, int addrcnt, int flags);
typedef int sctp_peeloff_func(int sock, sctp_assoc_t id);
#endif /* __linux__ */
......@@ -323,6 +327,7 @@ sctp_freeladdrs_func* nio_sctp_freeladdrs;
sctp_getpaddrs_func* nio_sctp_getpaddrs;
sctp_freepaddrs_func* nio_sctp_freepaddrs;
sctp_bindx_func* nio_sctp_bindx;
sctp_peeloff_func* nio_sctp_peeloff;
jboolean loadSocketExtensionFuncs(JNIEnv* env);
......
......@@ -254,7 +254,6 @@ void handleSendFailed
if (remaining > 0) {
if ((rv = recvmsg(fd, msg, 0)) < 0) {
fprintf(stdout, "\nNative: handleSFN: recvmsg failed: errno = %d ", errno);
handleSocketError(env, errno);
return;
}
......@@ -269,7 +268,7 @@ void handleSendFailed
/* create SctpSendFailed */
resultObj = (*env)->NewObject(env, ssf_class, ssf_ctrID, ssf->ssf_assoc_id,
sri->sinfo_stream, ssf->ssf_error, isaObj, bufferObj);
isaObj, bufferObj, ssf->ssf_error, sri->sinfo_stream);
CHECK_NULL(resultObj);
(*env)->SetObjectField(env, resultContainerObj, src_valueID, resultObj);
(*env)->SetIntField(env, resultContainerObj, src_typeID,
......
......@@ -96,6 +96,13 @@ jboolean loadSocketExtensionFuncs
return JNI_FALSE;
}
if ((nio_sctp_peeloff = (sctp_peeloff_func*)
dlsym(RTLD_DEFAULT, "sctp_peeloff")) == NULL) {
JNU_ThrowByName(env, "java/lang/UnsupportedOperationException",
dlerror());
return JNI_FALSE;
}
funcsLoaded = JNI_TRUE;
return JNI_TRUE;
}
......@@ -440,12 +447,10 @@ JNIEXPORT int JNICALL Java_sun_nio_ch_SctpNet_getIntOption0
JNIEXPORT jobject JNICALL Java_sun_nio_ch_SctpNet_getPrimAddrOption0
(JNIEnv *env, jclass klass, jint fd, jint assocId) {
struct sctp_setprim prim;
struct sockaddr_storage ss;
int ss_len = sizeof(ss);
unsigned int prim_len = sizeof(prim);
struct sockaddr* sap = (struct sockaddr*)&prim.ssp_addr;
prim.ssp_assoc_id = assocId;
prim.ssp_addr = ss;
if (getsockopt(fd, IPPROTO_SCTP, SCTP_PRIMARY_ADDR, &prim, &prim_len) < 0) {
JNU_ThrowByNameWithLastError(env, JNU_JAVANETPKG "SocketException",
......@@ -453,7 +458,7 @@ JNIEXPORT jobject JNICALL Java_sun_nio_ch_SctpNet_getPrimAddrOption0
return NULL;
}
return SockAddrToInetSocketAddress(env, (struct sockaddr*)&ss);
return SockAddrToInetSocketAddress(env, sap);
}
/*
......@@ -464,16 +469,15 @@ JNIEXPORT jobject JNICALL Java_sun_nio_ch_SctpNet_getPrimAddrOption0
JNIEXPORT void JNICALL Java_sun_nio_ch_SctpNet_setPrimAddrOption0
(JNIEnv *env, jclass klass, jint fd, jint assocId, jobject iaObj, jint port) {
struct sctp_setprim prim;
struct sockaddr_storage ss;
int ss_len = sizeof(ss);
struct sockaddr* sap = (struct sockaddr*)&prim.ssp_addr;
int sap_len;
if (NET_InetAddressToSockaddr(env, iaObj, port, (struct sockaddr *)&ss,
&ss_len, JNI_TRUE) != 0) {
if (NET_InetAddressToSockaddr(env, iaObj, port, sap,
&sap_len, JNI_TRUE) != 0) {
return;
}
prim.ssp_assoc_id = assocId;
prim.ssp_addr = ss;
if (setsockopt(fd, IPPROTO_SCTP, SCTP_PRIMARY_ADDR, &prim, sizeof(prim)) < 0) {
JNU_ThrowByNameWithLastError(env, JNU_JAVANETPKG "SocketException",
......@@ -607,3 +611,17 @@ JNIEXPORT void JNICALL Java_sun_nio_ch_SctpNet_shutdown0
}
}
/*
* Class: sun_nio_ch_SctpNet
* Method: branch
* Signature: (II)I
*/
JNIEXPORT int JNICALL Java_sun_nio_ch_SctpNet_branch0
(JNIEnv *env, jclass klass, jint fd, jint assocId) {
int newfd = 0;
if ((newfd = nio_sctp_peeloff(fd, assocId)) < 0) {
handleSocketError(env, errno);
}
return newfd;
}
......@@ -30,8 +30,8 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.nio.channels.AlreadyConnectedException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ConnectionPendingException;
......@@ -48,47 +48,29 @@ import static java.lang.System.err;
* getRemoteAddresses and association.
*/
public class Connect {
final CountDownLatch finishedLatch = new CountDownLatch(1);
void test(String[] args) {
SocketAddress address = null;
Server server = null;
if (!Util.isSCTPSupported()) {
out.println("SCTP protocol is not supported");
out.println("Test cannot be run");
return;
}
if (args.length == 2) {
/* requested to connect to a specific address */
try {
int port = Integer.valueOf(args[1]);
address = new InetSocketAddress(args[0], port);
} catch (NumberFormatException nfe) {
err.println(nfe);
}
} else {
/* start server on local machine, default */
try {
server = new Server();
server.start();
address = server.address();
debug("Server started and listening on " + address);
} catch (IOException ioe) {
ioe.printStackTrace();
return;
}
}
doTest(address);
doTest();
}
void doTest(SocketAddress addr) {
void doTest() {
SctpChannel channel = null;
final SocketAddress peerAddress = addr;
SctpServerChannel ssc = null;
try {
/* Create a server channel to connect to */
ssc = SctpServerChannel.open().bind(null);
Set<SocketAddress> addrs = ssc.getAllLocalAddresses();
if (addrs.isEmpty())
debug("addrs should not be empty");
final SocketAddress peerAddress = (InetSocketAddress) addrs.iterator().next();
channel = SctpChannel.open();
/* TEST 0.5 Verify default values for new/unconnected channel */
......@@ -118,6 +100,9 @@ public class Connect {
"finishConnect should have returned true");
}
ssc.accept();
ssc.close();
/* TEST 1.5 Verify after connect */
check(!channel.getRemoteAddresses().isEmpty(),
"empty set for connected channel");
......@@ -136,6 +121,16 @@ public class Connect {
unexpected(ioe);
}
/* TEST 2.5: Verify AlreadyConnectedException thrown */
try {
channel.connect(peerAddress, 5, 5);
fail("should have thrown AlreadyConnectedException");
} catch (AlreadyConnectedException unused) {
pass();
} catch (IOException ioe) {
unexpected(ioe);
}
/* TEST 3: UnresolvedAddressException */
channel.close();
channel = SctpChannel.open();
......@@ -200,9 +195,10 @@ public class Connect {
} catch (IOException ioe) {
unexpected(ioe);
} finally {
finishedLatch.countDown();
try { if (channel != null) channel.close(); }
catch (IOException e) { unexpected(e);}
catch (IOException unused) {}
try { if (ssc != null) ssc.close(); }
catch (IOException unused) {}
}
}
......@@ -219,47 +215,6 @@ public class Connect {
}
}
class Server implements Runnable
{
final InetSocketAddress serverAddr;
private SctpServerChannel ssc;
public Server() throws IOException {
ssc = SctpServerChannel.open().bind(null);
java.util.Set<SocketAddress> addrs = ssc.getAllLocalAddresses();
if (addrs.isEmpty())
debug("addrs should not be empty");
serverAddr = (InetSocketAddress) addrs.iterator().next();
}
public void start() {
(new Thread(this, "Server-" + serverAddr.getPort())).start();
}
public InetSocketAddress address() {
return serverAddr;
}
@Override
public void run() {
SctpChannel sc = null;
try {
sc = ssc.accept();
finishedLatch.await();
} catch (IOException ioe) {
unexpected(ioe);
} catch (InterruptedException ie) {
unexpected(ie);
} finally {
try { if (ssc != null) ssc.close(); }
catch (IOException ioe) { unexpected(ioe); }
try { if (sc != null) sc.close(); }
catch (IOException ioe) { unexpected(ioe); }
}
}
}
//--------------------- Infrastructure ---------------------------
boolean debug = true;
volatile int passed = 0, failed = 0;
......
......@@ -151,6 +151,16 @@ public class Shutdown {
} catch (IOException ioe) {
unexpected(ioe);
}
/* TEST 6: getRemoteAddresses */
debug("Test 6: getRemoteAddresses");
try {
java.util.Set<SocketAddress> remoteAddrs = channel.getRemoteAddresses();
check(remoteAddrs.isEmpty(),
"A shutdown channel should not have remote addresses");
} catch (IOException ioe) {
unexpected(ioe);
}
} catch (IOException ioe) {
unexpected(ioe);
} catch (InterruptedException ie) {
......
......@@ -29,15 +29,24 @@
import java.io.IOException;
import java.util.Set;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Arrays;
import java.util.Iterator;
import java.nio.channels.ClosedChannelException;
import com.sun.nio.sctp.SctpChannel;
import com.sun.nio.sctp.SctpServerChannel;
import com.sun.nio.sctp.SctpSocketOption;
import java.security.AccessController;
import sun.security.action.GetPropertyAction;
import static com.sun.nio.sctp.SctpStandardSocketOption.*;
import static java.lang.System.out;
public class SocketOptionTests {
final String osName = AccessController.doPrivileged(
new GetPropertyAction("os.name"));
<T> void checkOption(SctpChannel sc, SctpSocketOption<T> name,
T expectedValue) throws IOException {
T value = sc.getOption(name);
......@@ -92,13 +101,6 @@ public class SocketOptionTests {
optionalSupport(sc, SCTP_EXPLICIT_COMPLETE, true);
optionalSupport(sc, SCTP_FRAGMENT_INTERLEAVE, 1);
//TODO: SCTP_PRIMARY_ADDR
//sc.bind(null);
//connect
//InetSocketAddress addr = new InetSocketAddress(0);
//sc.setOption(SCTP_PRIMARY_ADDR, addr);
sc.setOption(SCTP_NODELAY, true);
checkOption(sc, SCTP_NODELAY, true);
sc.setOption(SO_SNDBUF, 16*1024);
......@@ -107,6 +109,8 @@ public class SocketOptionTests {
sc.setOption(SO_LINGER, 2000);
checkOption(sc, SO_LINGER, 2000);
/* SCTP_PRIMARY_ADDR */
sctpPrimaryAddr();
/* NullPointerException */
try {
......@@ -135,6 +139,60 @@ public class SocketOptionTests {
}
}
/* SCTP_PRIMARY_ADDR */
void sctpPrimaryAddr() throws IOException {
SocketAddress addrToSet = null;;
System.out.println("TESTING SCTP_PRIMARY_ADDR");
SctpChannel sc = SctpChannel.open();
SctpServerChannel ssc = SctpServerChannel.open().bind(null);
Set<SocketAddress> addrs = ssc.getAllLocalAddresses();
if (addrs.isEmpty())
debug("addrs should not be empty");
debug("Listening on " + addrs);
InetSocketAddress serverAddr = (InetSocketAddress) addrs.iterator().next();
debug("connecting to " + serverAddr);
sc.connect(serverAddr);
SctpChannel peerChannel = ssc.accept();
ssc.close();
Set<SocketAddress> peerAddrs = peerChannel.getAllLocalAddresses();
debug("Peer local Addresses: ");
for (Iterator<SocketAddress> it = peerAddrs.iterator(); it.hasNext(); ) {
InetSocketAddress addr = (InetSocketAddress)it.next();
debug("\t" + addr);
addrToSet = addr; // any of the peer addresses will do!
}
/* retrieval of SCTP_PRIMARY_ADDR is not supported on Solaris */
if ("SunOS".equals(osName)) {
/* For now do not set this option. There is a bug on Solaris 10 pre Update 5
* where setting this option returns Invalid argument */
//debug("Set SCTP_PRIMARY_ADDR with " + addrToSet);
//sc.setOption(SCTP_PRIMARY_ADDR, addrToSet);
return;
} else { /* Linux */
SocketAddress primaryAddr = sc.getOption(SCTP_PRIMARY_ADDR);
System.out.println("SCTP_PRIMARY_ADDR returned: " + primaryAddr);
/* Verify that this is one of the peer addresses */
boolean found = false;
addrToSet = primaryAddr; // may not have more than one addr
for (Iterator<SocketAddress> it = peerAddrs.iterator(); it.hasNext(); ) {
InetSocketAddress addr = (InetSocketAddress)it.next();
if (addr.equals(primaryAddr)) {
found = true;
}
addrToSet = addr;
}
check(found, "SCTP_PRIMARY_ADDR returned bogus address!");
sc.setOption(SCTP_PRIMARY_ADDR, addrToSet);
System.out.println("SCTP_PRIMARY_ADDR set to: " + addrToSet);
primaryAddr = sc.getOption(SCTP_PRIMARY_ADDR);
System.out.println("SCTP_PRIMARY_ADDR returned: " + primaryAddr);
check(addrToSet.equals(primaryAddr),"SCTP_PRIMARY_ADDR not set correctly");
}
}
//--------------------- Infrastructure ---------------------------
boolean debug = true;
volatile int passed = 0, failed = 0;
......
/*
* Copyright 2009 Sun Microsystems, Inc. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
* CA 95054 USA or visit www.sun.com if you need additional information or
* have any questions.
*/
/* @test
* @bug 4927640
* @summary Tests the SCTP protocol implementation
* @author chegar
*/
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.io.IOException;
import java.util.Set;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.nio.ByteBuffer;
import com.sun.nio.sctp.AbstractNotificationHandler;
import com.sun.nio.sctp.Association;
import com.sun.nio.sctp.AssociationChangeNotification;
import com.sun.nio.sctp.AssociationChangeNotification.AssocChangeEvent;
import com.sun.nio.sctp.HandlerResult;
import com.sun.nio.sctp.InvalidStreamException;
import com.sun.nio.sctp.MessageInfo;
import com.sun.nio.sctp.SctpChannel;
import com.sun.nio.sctp.SctpMultiChannel;
import com.sun.nio.sctp.ShutdownNotification;
import static java.lang.System.out;
import static java.lang.System.err;
public class Branch {
/* Latches used to synchronize between the client and server so that
* connections without any IO may not be closed without being accepted */
final CountDownLatch clientFinishedLatch = new CountDownLatch(1);
final CountDownLatch serverFinishedLatch = new CountDownLatch(1);
void test(String[] args) {
SocketAddress address = null;
Server server = null;
if (!Util.isSCTPSupported()) {
out.println("SCTP protocol is not supported");
out.println("Test cannot be run");
return;
}
if (args.length == 2) {
/* requested to connecct to a specific address */
try {
int port = Integer.valueOf(args[1]);
address = new InetSocketAddress(args[0], port);
} catch (NumberFormatException nfe) {
err.println(nfe);
}
} else {
/* start server on local machine, default */
try {
server = new Server();
server.start();
address = server.address();
debug("Server started and listening on " + address);
} catch (IOException ioe) {
ioe.printStackTrace();
return;
}
}
doTest(address);
}
void doTest(SocketAddress peerAddress) {
SctpMultiChannel channel = null;
ByteBuffer buffer = ByteBuffer.allocate(Util.LARGE_BUFFER);
MessageInfo info = MessageInfo.createOutgoing(null, 0);
try {
channel = SctpMultiChannel.open();
/* setup an association implicitly by sending a small message */
int streamNumber = 0;
debug("sending to " + peerAddress + " on stream number: " + streamNumber);
info = MessageInfo.createOutgoing(peerAddress, streamNumber);
buffer.put(Util.SMALL_MESSAGE.getBytes("ISO-8859-1"));
buffer.flip();
int position = buffer.position();
int remaining = buffer.remaining();
debug("sending small message: " + buffer);
int sent = channel.send(buffer, info);
check(sent == remaining, "sent should be equal to remaining");
check(buffer.position() == (position + sent),
"buffers position should have been incremented by sent");
/* Receive the COMM_UP */
buffer.clear();
BranchNotificationHandler handler = new BranchNotificationHandler();
channel.configureBlocking(false);
info = channel.receive(buffer, null, handler);
check(handler.receivedCommUp(), "COMM_UP no received");
Set<Association> associations = channel.associations();
check(!associations.isEmpty(),"There should be some associations");
Association bassoc = associations.iterator().next();
/* TEST 1: branch */
SctpChannel bchannel = channel.branch(bassoc);
check(!bchannel.getAllLocalAddresses().isEmpty(),
"branched channel should be bound");
check(!bchannel.getRemoteAddresses().isEmpty(),
"branched channel should be connected");
check(channel.associations().isEmpty(),
"there should be no associations since the only one was branched off");
buffer.clear();
info = bchannel.receive(buffer, null, null);
buffer.flip();
check(info != null, "info is null");
check(info.streamNumber() == streamNumber,
"message not sent on the correct stream");
check(info.bytes() == Util.SMALL_MESSAGE.getBytes("ISO-8859-1").
length, "bytes received not equal to message length");
check(info.bytes() == buffer.remaining(), "bytes != remaining");
check(Util.compare(buffer, Util.SMALL_MESSAGE),
"received message not the same as sent message");
} catch (IOException ioe) {
unexpected(ioe);
} finally {
clientFinishedLatch.countDown();
try { serverFinishedLatch.await(10L, TimeUnit.SECONDS); }
catch (InterruptedException ie) { unexpected(ie); }
if (channel != null) {
try { channel.close(); }
catch (IOException e) { unexpected (e);}
}
}
}
class Server implements Runnable
{
final InetSocketAddress serverAddr;
private SctpMultiChannel serverChannel;
public Server() throws IOException {
serverChannel = SctpMultiChannel.open().bind(null);
java.util.Set<SocketAddress> addrs = serverChannel.getAllLocalAddresses();
if (addrs.isEmpty())
debug("addrs should not be empty");
serverAddr = (InetSocketAddress) addrs.iterator().next();
}
public void start() {
(new Thread(this, "Server-" + serverAddr.getPort())).start();
}
public InetSocketAddress address() {
return serverAddr;
}
@Override
public void run() {
ByteBuffer buffer = ByteBuffer.allocateDirect(Util.LARGE_BUFFER);
try {
MessageInfo info;
/* receive a small message */
do {
info = serverChannel.receive(buffer, null, null);
if (info == null) {
fail("Server: unexpected null from receive");
return;
}
} while (!info.isComplete());
buffer.flip();
check(info != null, "info is null");
check(info.streamNumber() == 0,
"message not sent on the correct stream");
check(info.bytes() == Util.SMALL_MESSAGE.getBytes("ISO-8859-1").
length, "bytes received not equal to message length");
check(info.bytes() == buffer.remaining(), "bytes != remaining");
check(Util.compare(buffer, Util.SMALL_MESSAGE),
"received message not the same as sent message");
check(info != null, "info is null");
Set<Association> assocs = serverChannel.associations();
check(assocs.size() == 1, "there should be only one association");
/* echo the message */
debug("Server: echoing first message");
buffer.flip();
int bytes = serverChannel.send(buffer, info);
debug("Server: sent " + bytes + "bytes");
clientFinishedLatch.await(10L, TimeUnit.SECONDS);
serverFinishedLatch.countDown();
} catch (IOException ioe) {
unexpected(ioe);
} catch (InterruptedException ie) {
unexpected(ie);
} finally {
try { if (serverChannel != null) serverChannel.close(); }
catch (IOException unused) {}
}
}
}
class BranchNotificationHandler extends AbstractNotificationHandler<Object>
{
boolean receivedCommUp; // false
boolean receivedCommUp() {
return receivedCommUp;
}
@Override
public HandlerResult handleNotification(
AssociationChangeNotification notification, Object attachment) {
AssocChangeEvent event = notification.event();
debug("AssociationChangeNotification");
debug(" Association: " + notification.association());
debug(" Event: " + event);
if (event.equals(AssocChangeEvent.COMM_UP))
receivedCommUp = true;
return HandlerResult.RETURN;
}
/* A ShutdownNotification handler is provided to ensure that no
* shutdown notification are being handled since we don't expect
* to receive them. This is not part of branch testing, it just
* fits here to test another bug. */
@Override
public HandlerResult handleNotification(
ShutdownNotification notification, Object attachment) {
debug("ShutdownNotification");
debug(" Association: " + notification.association());
fail("Shutdown should not be received");
return HandlerResult.RETURN;
}
}
//--------------------- Infrastructure ---------------------------
boolean debug = true;
volatile int passed = 0, failed = 0;
void pass() {passed++;}
void fail() {failed++; Thread.dumpStack();}
void fail(String msg) {System.err.println(msg); fail();}
void unexpected(Throwable t) {failed++; t.printStackTrace();}
void check(boolean cond) {if (cond) pass(); else fail();}
void check(boolean cond, String failMessage) {if (cond) pass(); else fail(failMessage);}
void debug(String message) {if(debug) { System.out.println(message); } }
public static void main(String[] args) throws Throwable {
Class<?> k = new Object(){}.getClass().getEnclosingClass();
try {k.getMethod("instanceMain",String[].class)
.invoke( k.newInstance(), (Object) args);}
catch (Throwable e) {throw e.getCause();}}
public void instanceMain(String[] args) throws Throwable {
try {test(args);} catch (Throwable t) {unexpected(t);}
System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
if (failed > 0) throw new AssertionError("Some tests failed");}
}
/*
* Copyright 2009 Sun Microsystems, Inc. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
* CA 95054 USA or visit www.sun.com if you need additional information or
* have any questions.
*/
/* @test
* @bug 4927640
* @summary Tests the SCTP protocol implementation
* @author chegar
*/
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Iterator;
import java.util.Set;
import java.util.List;
import java.util.Arrays;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import com.sun.nio.sctp.AbstractNotificationHandler;
import com.sun.nio.sctp.Association;
import com.sun.nio.sctp.AssociationChangeNotification;
import com.sun.nio.sctp.AssociationChangeNotification.AssocChangeEvent;
import com.sun.nio.sctp.HandlerResult;
import com.sun.nio.sctp.MessageInfo;
import com.sun.nio.sctp.SctpChannel;
import com.sun.nio.sctp.SctpMultiChannel;
import com.sun.nio.sctp.SctpServerChannel;
import com.sun.nio.sctp.SctpSocketOption;
import java.security.AccessController;
import sun.security.action.GetPropertyAction;
import static com.sun.nio.sctp.SctpStandardSocketOption.*;
import static java.lang.System.out;
public class SocketOptionTests {
final String osName = AccessController.doPrivileged(
new GetPropertyAction("os.name"));
<T> void checkOption(SctpMultiChannel smc, SctpSocketOption<T> name,
T expectedValue) throws IOException {
T value = smc.getOption(name, null);
check(value.equals(expectedValue), name + ": value (" + value +
") not as expected (" + expectedValue + ")");
}
<T> void optionalSupport(SctpMultiChannel smc, SctpSocketOption<T> name,
T value) {
try {
smc.setOption(name, value, null);
checkOption(smc, name, value);
} catch (IOException e) {
/* Informational only, not all options have native support */
out.println(name + " not supported. " + e);
}
}
void test(String[] args) {
if (!Util.isSCTPSupported()) {
out.println("SCTP protocol is not supported");
out.println("Test cannot be run");
return;
}
try {
SctpMultiChannel smc = SctpMultiChannel.open();
/* check supported options */
Set<SctpSocketOption<?>> options = smc.supportedOptions();
List<? extends SctpSocketOption<?>> expected = Arrays.<SctpSocketOption<?>>asList(
SCTP_DISABLE_FRAGMENTS, SCTP_EXPLICIT_COMPLETE,
SCTP_FRAGMENT_INTERLEAVE, SCTP_INIT_MAXSTREAMS,
SCTP_NODELAY, SCTP_PRIMARY_ADDR, SCTP_SET_PEER_PRIMARY_ADDR,
SO_SNDBUF, SO_RCVBUF, SO_LINGER);
for (SctpSocketOption opt: expected) {
if (!options.contains(opt))
fail(opt.name() + " should be supported");
}
InitMaxStreams streams = InitMaxStreams.create(1024, 1024);
smc.setOption(SCTP_INIT_MAXSTREAMS, streams, null);
checkOption(smc, SCTP_INIT_MAXSTREAMS, streams);
streams = smc.getOption(SCTP_INIT_MAXSTREAMS, null);
check(streams.maxInStreams() == 1024, "Max in streams: value: "
+ streams.maxInStreams() + ", expected 1024 ");
check(streams.maxOutStreams() == 1024, "Max out streams: value: "
+ streams.maxOutStreams() + ", expected 1024 ");
optionalSupport(smc, SCTP_DISABLE_FRAGMENTS, true);
optionalSupport(smc, SCTP_EXPLICIT_COMPLETE, true);
optionalSupport(smc, SCTP_FRAGMENT_INTERLEAVE, 1);
smc.setOption(SCTP_NODELAY, true, null);
checkOption(smc, SCTP_NODELAY, true);
smc.setOption(SO_SNDBUF, 16*1024, null);
smc.setOption(SO_RCVBUF, 16*1024, null);
checkOption(smc, SO_LINGER, -1); /* default should be negative */
/* Setting SO_LINGER not support for one-to-many on Solaris */
if (!"SunOS".equals(osName)) {
smc.setOption(SO_LINGER, 2000, null);
checkOption(smc, SO_LINGER, 2000);
}
/* SCTP_PRIMARY_ADDR */
sctpPrimaryAddr();
/* NullPointerException */
try {
smc.setOption(null, "value", null);
fail("NullPointerException not thrown for setOption");
} catch (NullPointerException unused) {
pass();
}
try {
smc.getOption(null, null);
fail("NullPointerException not thrown for getOption");
} catch (NullPointerException unused) {
pass();
}
/* ClosedChannelException */
smc.close();
try {
smc.setOption(SCTP_INIT_MAXSTREAMS, streams, null);
fail("ClosedChannelException not thrown");
} catch (ClosedChannelException unused) {
pass();
}
} catch (IOException ioe) {
unexpected(ioe);
}
}
/* SCTP_PRIMARY_ADDR */
void sctpPrimaryAddr() throws IOException {
SocketAddress addrToSet = null;
ByteBuffer buffer = ByteBuffer.allocate(Util.SMALL_BUFFER);
System.out.println("TESTING SCTP_PRIMARY_ADDR");
/* create listening channel */
SctpServerChannel ssc = SctpServerChannel.open().bind(null);
Set<SocketAddress> addrs = ssc.getAllLocalAddresses();
if (addrs.isEmpty())
debug("addrs should not be empty");
InetSocketAddress serverAddr = (InetSocketAddress) addrs.iterator().next();
/* setup an association implicitly by sending a small message */
int streamNumber = 0;
debug("sending to " + serverAddr + " on stream number: " + streamNumber);
MessageInfo info = MessageInfo.createOutgoing(serverAddr, streamNumber);
buffer.put(Util.SMALL_MESSAGE.getBytes("ISO-8859-1"));
buffer.flip();
debug("sending small message: " + buffer);
SctpMultiChannel smc = SctpMultiChannel.open();
int sent = smc.send(buffer, info);
/* Receive the COMM_UP */
buffer.clear();
SOTNotificationHandler handler = new SOTNotificationHandler();
smc.configureBlocking(false);
info = smc.receive(buffer, null, handler);
check(handler.receivedCommUp(), "COMM_UP no received");
Set<Association> associations = smc.associations();
check(!associations.isEmpty(),"There should be some associations");
Association assoc = associations.iterator().next();
SctpChannel peerChannel = ssc.accept();
ssc.close();
Set<SocketAddress> peerAddrs = peerChannel.getAllLocalAddresses();
debug("Peer local Addresses: ");
for (Iterator<SocketAddress> it = peerAddrs.iterator(); it.hasNext(); ) {
InetSocketAddress addr = (InetSocketAddress)it.next();
debug("\t" + addr);
addrToSet = addr; // any of the peer addresses will do!
}
/* retrieval of SCTP_PRIMARY_ADDR is not supported on Solaris */
if ("SunOS".equals(osName)) {
/* For now do not set this option. There is a bug on Solaris 10 pre Update 5
* where setting this option returns Invalid argument */
//debug("Set SCTP_PRIMARY_ADDR with " + addrToSet);
//smc.setOption(SCTP_PRIMARY_ADDR, addrToSet, assoc);
return;
} else { /* Linux */
SocketAddress primaryAddr = smc.getOption(SCTP_PRIMARY_ADDR, assoc);
System.out.println("SCTP_PRIMARY_ADDR returned: " + primaryAddr);
/* Verify that this is one of the peer addresses */
boolean found = false;
addrToSet = primaryAddr; // may not have more than one addr
for (Iterator<SocketAddress> it = peerAddrs.iterator(); it.hasNext(); ) {
InetSocketAddress addr = (InetSocketAddress)it.next();
if (addr.equals(primaryAddr)) {
found = true;
}
addrToSet = addr;
}
check(found, "SCTP_PRIMARY_ADDR returned bogus address!");
smc.setOption(SCTP_PRIMARY_ADDR, addrToSet, assoc);
System.out.println("SCTP_PRIMARY_ADDR set to: " + addrToSet);
primaryAddr = smc.getOption(SCTP_PRIMARY_ADDR, assoc);
System.out.println("SCTP_PRIMARY_ADDR returned: " + primaryAddr);
check(addrToSet.equals(primaryAddr),"SCTP_PRIMARY_ADDR not set correctly");
}
}
class SOTNotificationHandler extends AbstractNotificationHandler<Object>
{
boolean receivedCommUp; // false
boolean receivedCommUp() {
return receivedCommUp;
}
@Override
public HandlerResult handleNotification(
AssociationChangeNotification notification, Object attachment) {
AssocChangeEvent event = notification.event();
debug("AssociationChangeNotification");
debug(" Association: " + notification.association());
debug(" Event: " + event);
if (event.equals(AssocChangeEvent.COMM_UP))
receivedCommUp = true;
return HandlerResult.RETURN;
}
}
//--------------------- Infrastructure ---------------------------
boolean debug = true;
volatile int passed = 0, failed = 0;
void pass() {passed++;}
void fail() {failed++; Thread.dumpStack();}
void fail(String msg) {System.err.println(msg); fail();}
void unexpected(Throwable t) {failed++; t.printStackTrace();}
void check(boolean cond) {if (cond) pass(); else fail();}
void check(boolean cond, String failMessage) {if (cond) pass(); else fail(failMessage);}
void debug(String message) {if(debug) { System.out.println(message); } }
public static void main(String[] args) throws Throwable {
Class<?> k = new Object(){}.getClass().getEnclosingClass();
try {k.getMethod("instanceMain",String[].class)
.invoke( k.newInstance(), (Object) args);}
catch (Throwable e) {throw e.getCause();}}
public void instanceMain(String[] args) throws Throwable {
try {test(args);} catch (Throwable t) {unexpected(t);}
System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
if (failed > 0) throw new AssertionError("Some tests failed");}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册