让IObservable支持await操作

C# 4.5中引入了await关键字,可以让大大简化异步编程操作。在Reactive Extensions v2.0中实现了让IObservable支持await操作,通过它我们可以简化observable的异步编程:

    static async Task Samples()
    {
        var xs = Observable.Range(010, ThreadPoolScheduler.Instance);

        Console.WriteLine("Last  = " + await xs);
        Console.WriteLine("First = " + await xs.FirstAsync());
        Console.WriteLine("Third = " + await xs.ElementAt(3));
        Console.WriteLine("All   = " + string.Join("", await xs.ToList()));

        try
        {
            Console.WriteLine("Error = " + await xs.Select(x => 1 / (5 - x)));
        }
        catch (DivideByZeroException)
        {
            Console.WriteLine("Yups, we failed!");
        }
    }

 但是,RX v2.0的await扩展目前只支持.net 4.5,如果我们要在.net 4.0或以下的版本中则无法使用。因此,我手动写了个扩展函数,让IObservable支持await操作。这样一来我们也可以在.net 4.0的observable的异步编程中使用await了。

    static class ObservableExtend
    {
        public static ObservableAwaiter<T> GetAwaiter<T>(this IObservable<T> stream)
        {
            return new ObservableAwaiter<T>(stream);
        }
    }

    class ObservableAwaiter<T> : System.Runtime.CompilerServices.INotifyCompletion
    {
        IObservable<T> stream;
        T result;
        Exception err;

        public ObservableAwaiter(IObservable<T> stream)
        {
            this.stream = stream;
        }

        public bool IsCompleted { get { return false; } }

        public void OnCompleted(Action continuation)
        {
            stream.Subscribe(item => this.result = item,
                err => { this.err = err; continuation(); },
                () => continuation());
        }

        public T GetResult()
        {
            if (err != null)
                throw err;

            return result;
        }
    }

由于目前.net 4.5还没有正式版,这里我使用的环境是VisualStudio 2012 RC,不保证正式版本中GetAwaiter约束是否和现在一致,不过估计也差不多。

用了一段时间后,感觉IObservable和await关键字配合起来非常好用,前面我也写过两个简单的扩展函数,感兴趣的朋友可以看一下。

原文地址:https://www.cnblogs.com/TianFang/p/2545680.html