Rx .Net 简介

1 定义:

Reactive Extensions是一个遵循函数式编程的类库,它引用【观察者模式】以及【迭代器模式】对可观察对象产生的数据进行异步消费。

使用Rx需要引用System.Reactive.Core的Nuget程序包(.Net Core)

2 核心:

2个核心接口:IObservable<T>、IObserver<T>

其中IObservable代表观察源,而IObserver是观察者(“鼠标点击”是观察源,“点击后刷新”是观察者)

2.1 IObservable接口

IObservable只有一个方法,就是【触发事件】

[NullableContextAttribute(1)]
public interface IObservable<[NullableAttribute(2)] out T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

2.2 IObserver接口

[NullableContextAttribute(1)]
public interface IObserver<[NullableAttribute(2)] in T>
{
  
    void OnCompleted();

    void OnError(Exception error);

    void OnNext(T value);
}

void OnNext<T>(T value), 序列里有新的值的时候会调用这个

void OnCompleted(), 序列结束的时候调用这个

void OnError(Exception ex), 发生错误的时候调用这个

3 获取事件源的方式

3.1 获取事件源

 /// <summary>
 /// 获取返回一个简单值的事件源
 /// </summary>
 /// <returns></returns>
 private static IObservable<int> GetSimpleObservable()
 {
     return Observable.Return(42);
 }

 /// <summary>
 /// 返回抛出一个异常的事件源
 /// </summary>
 /// <returns></returns>
 private static IObservable<int> GetThrowObservable()
 {
     return Observable.Throw<int>(new ArgumentException("Error in observable"));
 }

 /// <summary>
 /// 返回一个空的事件源
 /// </summary>
 /// <returns></returns>
 private static IObservable<int> GetEmptyObservable()
 {
     return Observable.Empty<int>();
 }

 /// <summary>
 /// 返回一个任务事件源
 /// </summary>
 /// <returns></returns>
 private static IObservable<int> GetTaskObservable()
 {
     return GetTask().ToObservable();
 }

 private static async Task<int> GetTask()
 {
     return 42;
 }

 /// <summary>
 /// 返回一个范围的事件源序列
 /// </summary>
 /// <returns></returns>
 private static IObservable<int> GetRangeObservable()
 {
     return Observable.Range(2, 10);
 }

 /// <summary>
 /// 返回一个定时器事件源序列
 /// </summary>
 /// <returns></returns>
 private static IObservable<long> GetIntervalObservable()
 {
     return Observable.Interval(TimeSpan.FromMilliseconds(200));
 }

 /// <summary>
 /// 创建一个基本事件源序列
 /// </summary>
 /// <returns></returns>
 private static IObservable<int> GetCreateObservable()
 {
     return Observable.Create<int>(observer =>
     {
         observer.OnNext(1);
         observer.OnNext(2);
         observer.OnNext(3);
         observer.OnNext(4);
         observer.OnCompleted();
         return Disposable.Empty;
     });
 }

 /// <summary>
 /// 创建一个类似for循环的事件源序列
 /// </summary>
 /// <returns></returns>
 private static IObservable<int> GetGenerateObservable()
 {
     // 类似for循环
     return Observable.Generate(
         1,              // 初始值
         x => x < 10,    // 循环停止条件
         x => x + 1,     // 循环一次+1
         x => x + 20     // 循环中执行的操作
     );
 }

3.2 事件调用

最简单的用法应该是

class Program
{
    static void Main(string[] args)
    {
        var numbers = Observable.Range(2, 10);
        var observer = new MyConsoleObserver<int>();
        numbers.Subscribe(observer);
        Console.ReadLine();
    }
}

/// <summary>
/// 自定义观察者对象
/// </summary>
/// <typeparam name="T"></typeparam>
public class MyConsoleObserver<T> : IObserver<T>
{
    public void OnNext(T value)
    {
        Console.WriteLine("接收到 value {0}", value);
    }
    public void OnError(Exception error)
    {
        Console.WriteLine("出现异常! {0}", error);
    }
    public void OnCompleted()
    {
        Console.WriteLine("关闭观察行为");
    }
}

但是上述方法没有办法使用【多播】的特性,要想使用多播就必须使用 ISubject 接口了;通过 ISubject 接口就可以为事件源绑定多个相应。

代码如下:

static async Task Main(string[] args)
{

    // 定义事件流
    var sequence = GetGenerateObservable();
    // 对事件流进行筛选
    sequence = sequence.Where(a => a < 23);

    // 注册观察者
    ISubject<int> subject = new ReplaySubject<int>();//申明Subject
    subject.Subscribe((temperature) => Console.WriteLine($"当前温度:{temperature}"));//订阅subject
    subject.Subscribe((temperature) => Console.WriteLine($"嘟嘟嘟,当前水温:{temperature}"));//订阅subject
    
    // 触发
    sequence.Subscribe(subject);
    Console.ReadKey();
}
private static IObservable<int> GetGenerateObservable()
{
    // 类似for循环
    return Observable.Generate(
        1,              // 初始值
        x => x < 10,    // 循环停止条件
        x => x + 1,     // 循环一次+1
        x => x + 20     // 循环中执行的操作
    );
}

4 制造观察者 ISubject

ISubject接口可以用来生成观察者,其实通过定义可以知道他既是观察者也是观察源

public interface ISubject<T> : ISubject<T, T>, IObserver<T>, IObservable<T>

但是我们还是主要用它来生成观察者信息

比如上面例子中的 ReplaySubject 就是 ISubject 的一种实现,ISubject有多种实现,区别如下

  1. Subject - 向所有观察者广播每个通知
  2. AsyncSubject - 当可观察序列完成后有且仅发送一个通知
  3. ReplaySubject - 缓存指定通知以对后续订阅的观察者进行重放
  4. BehaviorSubject - 推送默认值或最新值给观察者

 通过ISubject可以实现定义观察者,并实现多播,上面已经有例子

5 观察源的温度

  1. Cold Observable:当有观察者订阅时才发送通知,且每个观察者【独享】一份完整的观察者序列。在被订阅 Subscribe 后运行,也就是说,observables序列仅在subscribe函数被调用后才会推送数据。
  2. Hot Observable:不管有无观察者订阅都会发送通知,且所有观察者【共享】同一份观察者序列。Hot Observables在被订阅之前就已经开始产生数据,例如鼠标移动事件。

5.1 冷观察源

var clock = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5).Select(a => a + 1);
clock.Subscribe(x => Console.WriteLine($"a:{x}"));
Thread.Sleep(2000);
clock.Subscribe(x => Console.WriteLine($"c:{x}"));
Console.ReadKey();

输出:

 可以看到c的输出虽然晚了2秒,但是他还是拿着完整的一份观察者序列的;

5.2 热观察源

var clock = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5).Select(a => a + 1).Publish();
clock.Connect();
clock.Subscribe(x => Console.WriteLine($"a:{x}"));
Thread.Sleep(2000);
clock.Subscribe(x => Console.WriteLine($"c:{x}"));
Console.ReadKey();

输出:

 可以看到c在延迟后,没有办法获取1的数据,只能获取和a同步的数据,也就是c和a是公用的观察者序列。

其中publish是将冷序列转换为热序列,connect方法是触发使生成观察者序列

可以稍微在connect和注册观察者之间稍微加一个sleep如下

var clock = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5).Select(a => a + 1).Publish();
clock.Connect();
Thread.Sleep(2000);
clock.Subscribe(x => Console.WriteLine($"a:{x}"));
Thread.Sleep(2000);
clock.Subscribe(x => Console.WriteLine($"c:{x}"));
Console.ReadKey();

输出:

 可以看到由于connect和注册没在一起,a也获取不到完整的序列了。

5.3 RefCount方法

还有一种特殊的RefCount方法需要注意

var clock = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5).Select(a => a + 1).Publish().RefCount();
Thread.Sleep(2000);
clock.Subscribe(x => Console.WriteLine($"a:{x}"));
Thread.Sleep(2000);
clock.Subscribe(x => Console.WriteLine($"c:{x}"));
Console.ReadKey();

输出:

 可以看出虽然sleep了2s,但是a依然获取到完整的观察者序列,可见RefCount是默认看到第一个观察者后才会生产观察者序列的。

6 Rx的调度 Scheduler

在Rx中,使用Scheduler来控制并发。而对于Scheduler我们可以理解为程序调度,通过Scheduler来规定在什么时间什么地点执行什么事情。Rx提供了以下几种Scheduler:

  1. NewThreadScheduler:即在新线程上执行
  2. ThreadPoolScheduler:即在线程池中执行
  3. TaskPoolScheduler:与ThreadPoolScheduler类似
  4. CurrentThreadScheduler:在当前线程执行
  5. ImmediateScheduler:在当前线程立即执行
  6. EventLoopScheduler:创建一个后台线程按序执行所有操作

使用方式如下:

Observable.Return(42, NewThreadScheduler.Default);

总结

其实Rx .Net就是对观察者模式的linq封装,使用起来会比event更方便一些,因为可以对事件源进行合并,筛选,聚合等操作。

这个和事件总线还不一样,事件总线是将事件进行集中处理的功能。

参考:

https://www.cnblogs.com/Leo_wl/p/10400983.html

https://segmentfault.com/a/1190000011052037

原文地址:https://www.cnblogs.com/gamov/p/13143436.html