tpl demo
时间:2014-03-21 12:03:19
收藏:0
阅读:393
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace
TDFDemo
{
class Program
{
static
void Main(string[] args)
{
new Program().Run();
}
private
async void Run()
{
var bus = new Bus();
// Inline
handler
var s1 = bus.Subscribe<Message>(message => Console.WriteLine("Inline Handler 1: {0}", message.Content));
// Inline handler
factory
var s2 = bus.Subscribe<Message>(() => new
MessageHandler().Handle);
// Automatic
handler subscription
var s3 = bus.Subscribe<Message,
MessageHandler>();
for (int i = 0; i
< 10;
i++)
{
await bus.SendAsync(new
Message("Message
" + i));
}
// Unsubscribe
the second handler
bus.Unsubscribe(s2);
Thread.Sleep(1000);
// Cancellation
support
Console.WriteLine("\nSecond
Burst:");
var tokenSource = new
CancellationTokenSource();
var token = tokenSource.Token;
for (int i = 0; i
< 10;
i++)
{
await bus.SendAsync(new
Message("Message
" + i), token);
if (i == 5)
{
tokenSource.Cancel();
break;
}
}
Console.ReadLine();
}
}
public
class Message
{
public
DateTime TimeStamp { get; private set;
}
public
string Content { get; private set;
}
public
Message(string content)
{
Content
= content;
TimeStamp
= DateTime.UtcNow;
}
}
public
interface IHandle<TMessage>
{
void Handle(TMessage
message);
}
public
class MessageHandler :
IHandle<Message>
{
public
void Handle(Message message)
{
Console.WriteLine("Message
Handler Received: {0}", message.Content);
}
}
public
class Bus
{
private
readonly BroadcastBlock<object>
broadcast =
new BroadcastBlock<object>(message => message);
private
readonly ConcurrentDictionary<Guid, IDisposable> subscriptions
= new ConcurrentDictionary<Guid, IDisposable>();
public
Task<bool> SendAsync<TMessage>(TMessage message,
CancellationToken cancellationToken)
{
return
broadcast.SendAsync(message, cancellationToken);
}
public
Guid Subscribe<TMessage>(Action<TMessage>
handlerAction)
{
var handler = new
ActionBlock<object>(
message
=> handlerAction((TMessage)message),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 }
);
var subscription = broadcast.LinkTo(
handler,
new DataflowLinkOptions { PropagateCompletion = true },
message
=> message is TMessage
);
return
AddSubscription(subscription);
}
public
void Unsubscribe(Guid subscriptionId)
{
IDisposable subscription;
if (subscriptions.TryRemove(subscriptionId, out subscription))
{
subscription.Dispose();
}
}
private
Guid AddSubscription(IDisposable subscription)
{
var subscriptionId = Guid.NewGuid();
subscriptions.TryAdd(subscriptionId, subscription);
return
subscriptionId;
}
}
public
static class BusExtensions
{
public
static Task<bool>
SendAsync<TMessage>(this
Bus bus,
TMessage message)
{
return
bus.SendAsync<TMessage>(message, CancellationToken.None);
}
public
static Guid Subscribe<TMessage>(this
Bus bus,
Func<Action<TMessage>> handlerActionFactory)
{
return
bus.Subscribe<TMessage>(message => handlerActionFactory().Invoke(message));
}
public
static Guid Subscribe<TMessage, THandler>(this
Bus bus)
where THandler : IHandle<TMessage>,
new()
{
return
bus.Subscribe<TMessage>(message => new
THandler().Handle(message));
}
}
}
原文:http://www.cnblogs.com/zeroone/p/3615133.html
评论(0)