提交 24ec8bb6 编写于 作者: A Atsushi Eno

Ongoing Discovery duplex (UDP) implementation (not working yet).

上级 5e32d425
......@@ -90,18 +90,43 @@ namespace System.ServiceModel.Discovery
static readonly Random rnd = new Random ();
UdpClient GetSenderClient (Message message)
{
if (RemoteAddress != null)
return client;
var rmp = message.Properties [RemoteEndpointMessageProperty.Name] as RemoteEndpointMessageProperty;
if (rmp == null)
throw new ArgumentException ("This duplex channel from the channel listener cannot send messages without RemoteEndpointMessageProperty");
var cli = new UdpClient ();
Console.Error.WriteLine ("Target: " + rmp.Address + ":" + rmp.Port);
cli.Connect (IPAddress.Parse (rmp.Address), rmp.Port);
return cli;
}
public void Send (Message message, TimeSpan timeout)
{
if (State != CommunicationState.Opened)
throw new InvalidOperationException ("The UDP channel must be opened before sending a message.");
var cli = GetSenderClient (message);
try {
SendCore (cli, message, timeout);
} finally {
if (cli != client)
cli.Close ();
}
}
void SendCore (UdpClient cli, Message message, TimeSpan timeout)
{
var ms = new MemoryStream ();
message_encoder.WriteMessage (message, ms);
// It seems .NET sends the same Message a couple of times so that the receivers don't miss it. So, do the same hack.
for (int i = 0; i < 6; i++) {
// FIXME: use MaxAnnouncementDelay. It is fixed now.
Thread.Sleep (rnd.Next (50, 500));
client.Send (ms.GetBuffer (), (int) ms.Length);
cli.Send (ms.GetBuffer (), (int) ms.Length);
}
}
......@@ -129,6 +154,9 @@ namespace System.ServiceModel.Discovery
ThrowIfDisposedOrNotOpen ();
msg = null;
if (client == null) // could be invoked while being closed.
return false;
byte [] bytes = null;
IPEndPoint ip = new IPEndPoint (IPAddress.Any, 0);
var ar = client.BeginReceive (delegate (IAsyncResult result) {
......@@ -152,6 +180,9 @@ namespace System.ServiceModel.Discovery
if (message_ids.Count >= binding_element.TransportSettings.DuplicateMessageHistoryLength)
message_ids.Dequeue ();
}
msg.Properties.Add ("Via", LocalAddress.Uri);
msg.Properties.Add ("Encoder", message_encoder);
msg.Properties.Add (RemoteEndpointMessageProperty.Name, new RemoteEndpointMessageProperty (ip.Address.ToString (), ip.Port));
return true;
}
......@@ -159,9 +190,7 @@ namespace System.ServiceModel.Discovery
protected override void OnAbort ()
{
if (client != null)
client.Close ();
client = null;
OnClose (TimeSpan.Zero);
}
Action<TimeSpan> open_delegate, close_delegate;
......
......@@ -82,6 +82,11 @@ namespace System.ServiceModel.Discovery
MessageContracts11.FindResponse IDiscoveryProxyContract11.EndFind (IAsyncResult result)
{
OnEndFind (result);
return CreateFindResponse11 ();
}
MessageContracts11.FindResponse CreateFindResponse11 ()
{
var l = new MessageContracts11.FindResponse11 ();
foreach (var edm in find_context.Endpoints)
l.Add (new EndpointDiscoveryMetadata11 (edm));
......@@ -112,6 +117,11 @@ namespace System.ServiceModel.Discovery
MessageContractsApril2005.FindResponse IDiscoveryProxyContractApril2005.EndFind (IAsyncResult result)
{
OnEndFind (result);
return CreateFindResponseApril2005 ();
}
MessageContractsApril2005.FindResponse CreateFindResponseApril2005 ()
{
var l = new MessageContractsApril2005.FindResponseApril2005 ();
foreach (var edm in find_context.Endpoints)
l.Add (new EndpointDiscoveryMetadataApril2005 (edm));
......@@ -142,6 +152,11 @@ namespace System.ServiceModel.Discovery
MessageContractsCD1.FindResponse IDiscoveryProxyContractCD1.EndFind (IAsyncResult result)
{
OnEndFind (result);
return CreateFindResponseCD1 ();
}
MessageContractsCD1.FindResponse CreateFindResponseCD1 ()
{
var l = new MessageContractsCD1.FindResponseCD1 ();
foreach (var edm in find_context.Endpoints)
l.Add (new EndpointDiscoveryMetadataCD1 (edm));
......@@ -163,13 +178,15 @@ namespace System.ServiceModel.Discovery
// IDiscoveryTargetContract11
IAsyncResult IDiscoveryTargetContract11.BeginFind (MessageContracts11.FindRequest message, AsyncCallback callback, object state)
{
find_context = new DefaultFindRequestContext (message.Body.ToFindCriteria ());
return OnBeginFind (new DefaultFindRequestContext (message.Body.ToFindCriteria ()), callback, state);
}
void IDiscoveryTargetContract11.EndFind (IAsyncResult result)
{
OnEndFind (result);
throw new NotImplementedException ();
var cb = OperationContext.Current.GetCallbackChannel<IDiscoveryTargetCallbackContract11> ();
cb.ReplyFind (CreateFindResponse11 ());
}
IAsyncResult IDiscoveryTargetContract11.BeginReplyFind (MessageContracts11.FindResponse message, AsyncCallback callback, object state)
......@@ -221,12 +238,15 @@ namespace System.ServiceModel.Discovery
// IDiscoveryTargetContractApril2005
IAsyncResult IDiscoveryTargetContractApril2005.BeginFind (MessageContractsApril2005.FindRequest message, AsyncCallback callback, object state)
{
find_context = new DefaultFindRequestContext (message.Body.ToFindCriteria ());
return OnBeginFind (new DefaultFindRequestContext (message.Body.ToFindCriteria ()), callback, state);
}
void IDiscoveryTargetContractApril2005.EndFind (IAsyncResult result)
{
OnEndFind (result);
var cb = OperationContext.Current.GetCallbackChannel<IDiscoveryTargetCallbackContractApril2005> ();
cb.ReplyFind (CreateFindResponseApril2005 ());
}
IAsyncResult IDiscoveryTargetContractApril2005.BeginReplyFind (MessageContractsApril2005.FindResponse message, AsyncCallback callback, object state)
......@@ -278,12 +298,15 @@ namespace System.ServiceModel.Discovery
// IDiscoveryTargetContractCD1
IAsyncResult IDiscoveryTargetContractCD1.BeginFind (MessageContractsCD1.FindRequest message, AsyncCallback callback, object state)
{
find_context = new DefaultFindRequestContext (message.Body.ToFindCriteria ());
return OnBeginFind (new DefaultFindRequestContext (message.Body.ToFindCriteria ()), callback, state);
}
void IDiscoveryTargetContractCD1.EndFind (IAsyncResult result)
{
OnEndFind (result);
var cb = OperationContext.Current.GetCallbackChannel<IDiscoveryTargetCallbackContractCD1> ();
cb.ReplyFind (CreateFindResponseCD1 ());
}
IAsyncResult IDiscoveryTargetContractCD1.BeginReplyFind (MessageContractsCD1.FindResponse message, AsyncCallback callback, object state)
......@@ -369,6 +392,12 @@ namespace System.ServiceModel.Discovery
default:
break;
}
var oc = OperationContext.Current;
var rmp = oc.IncomingMessageProperties [RemoteEndpointMessageProperty.Name] as RemoteEndpointMessageProperty;
if (rmp != null)
// FIXME: use appropriate port. Client does not listen at the sending port.
oc.OutgoingMessageProperties.Add (RemoteEndpointMessageProperty.Name, new RemoteEndpointMessageProperty (rmp.Address, rmp.Port));
}
protected override IAsyncResult OnBeginResolve (ResolveCriteria resolveCriteria, AsyncCallback callback, object state)
......
......@@ -71,8 +71,6 @@ namespace MonoTests.System.ServiceModel.Discovery
host.Open ();
// It does not start announcement very soon, so wait for a while.
Thread.Sleep (1000);
foreach (var edm in host.Extensions.Find<DiscoveryServiceExtension> ().PublishedEndpoints)
TextWriter.Null.WriteLine ("Published Endpoint: " + edm.Address);
try {
// actual client, with DiscoveryClientBindingElement
var be = new DiscoveryClientBindingElement () { DiscoveryEndpointProvider = new SimpleDiscoveryEndpointProvider (dEndpoint) };
......@@ -133,8 +131,6 @@ namespace MonoTests.System.ServiceModel.Discovery
host.Open ();
// It does not start announcement very soon, so wait for a while.
Thread.Sleep (1000);
foreach (var edm in host.Extensions.Find<DiscoveryServiceExtension> ().PublishedEndpoints)
TextWriter.Null.WriteLine ("Published Endpoint: " + edm.Address);
try {
// actual client, with DiscoveryClientBindingElement
var be = new DiscoveryClientBindingElement () { DiscoveryEndpointProvider = new SimpleDiscoveryEndpointProvider (dEndpoint) };
......@@ -173,6 +169,64 @@ namespace MonoTests.System.ServiceModel.Discovery
}
}
[Test]
public void UseCase3 ()
{
RunCodeUnderDiscoveryHost3 (new Uri ("http://localhost:37564"), new Uri ("http://localhost:4989"), UseCase3Core);
AssertTcpPortOpen (4989);
AssertTcpPortOpen (37564);
}
void UseCase3Core (Uri serviceUri, AnnouncementEndpoint aEndpoint, DiscoveryEndpoint dEndpoint)
{
// actual service, announcing to 4989
var host = new ServiceHost (typeof (TestService));
var sdb = new ServiceDiscoveryBehavior ();
sdb.AnnouncementEndpoints.Add (aEndpoint);
host.Description.Behaviors.Add (sdb);
host.AddServiceEndpoint (typeof (ITestService), new BasicHttpBinding (), serviceUri);
host.Open ();
// It does not start announcement very soon, so wait for a while.
Thread.Sleep (1000);
try {
// actual client, with DiscoveryClientBindingElement
var be = new DiscoveryClientBindingElement () { DiscoveryEndpointProvider = new SimpleDiscoveryEndpointProvider (dEndpoint) };
var clientBinding = new CustomBinding (new BasicHttpBinding ());
clientBinding.Elements.Insert (0, be);
var cf = new ChannelFactory<ITestService> (clientBinding, DiscoveryClientBindingElement.DiscoveryEndpointAddress);
var ch = cf.CreateChannel ();
Assert.AreEqual ("TEST", ch.Echo ("TEST"), "#1");
cf.Close ();
} finally {
host.Close ();
}
}
void RunCodeUnderDiscoveryHost3 (Uri serviceUri, Uri aHostUri, Action<Uri,AnnouncementEndpoint,DiscoveryEndpoint> action)
{
// announcement service
var abinding = new CustomBinding (new HttpTransportBindingElement ());
var aAddress = new EndpointAddress (aHostUri);
var aEndpoint = new AnnouncementEndpoint (abinding, aAddress);
// discovery service
var dEndpoint = new UdpDiscoveryEndpoint (DiscoveryVersion.WSDiscovery11, new Uri ("soap.udp://239.255.255.250:3802/"));
// Without this, .NET rejects the host as if it had no service.
dEndpoint.IsSystemEndpoint = false;
// it internally hosts an AnnouncementService
using (var inst = new AnnouncementBoundDiscoveryService (aEndpoint)) {
var host = new ServiceHost (inst);
host.AddServiceEndpoint (dEndpoint);
try {
host.Open ();
action (serviceUri, aEndpoint, dEndpoint);
} finally {
host.Close ();
}
}
}
class SimpleDiscoveryEndpointProvider : DiscoveryEndpointProvider
{
public SimpleDiscoveryEndpointProvider (DiscoveryEndpoint endpoint)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册