Reactive Extensions学习

今天抽空学习了一下Reactive Extensions库,感觉还是比较容易上手的,顺手练习了一下,写了个ReadAsync的扩展。 

    static void Main(string[] args)
    {
        var bufferSize 
= 1000;
        var buffer 
= new byte[bufferSize];

        var stream 
= File.OpenRead(@"r:\1.txt");

        stream.ReadAsync(buffer).Subscribe(
                        onNext: i 
=> { Console.Write(Encoding.Default.GetString(buffer, 0, i)); },
                        onCompleted: () 
=> Console.WriteLine("completed")
                    );

        Console.ReadLine();
    }

    
public static IObservable<int> ReadAsync(this Stream stream, byte[] buffer)
    {
        var bufferLen 
= buffer.Length;
        var readAsync 
= Observable.FromAsyncPattern<byte[], intintint>(stream.BeginRead, stream.EndRead);

        
return Observable.Defer(() => readAsync(buffer, 0, bufferLen)).Repeat().TakeWhile(i => i > 0)

        // return Observable.While(() => stream.Position != stream.Length, Observable.Defer(() => readAsync(buffer, 0, bufferLen)));
    }

    
public static IObservable<string> ReadLinesAsync(this Stream stream, byte[] buffer)
    {
        
return Observable.CreateWithDisposable<string>(observer =>
        {
            var blocks 
= stream.ReadAsync(buffer).Select(i => Encoding.Default.GetString(buffer, 0, i));
            var lastLine 
= string.Empty;

            
return blocks.Subscribe(data =>
            {
                
if (data.Contains("\r\n"))
                {
                    var lines 
= data.Split(new string[] { "\r\n" }, StringSplitOptions.None);

                    observer.OnNext(lastLine 
+ lines[0]);
                    
foreach (var line in lines.Skip(1).Take(lines.Length - 2))
                    {
                        observer.OnNext(line);
                    }

                    lastLine 
= lines[lines.Length - 1];
                }
                
else
                {
                    lastLine 
+= data;
                }
            },
            observer.OnError,
            () 
=>
            {
                observer.OnNext(lastLine);
                observer.OnCompleted();
            });
        });
    }

当然,到C# 5.0后,在编译器的原生语法糖支持下,实现这个扩展更加简单和直观,不过Reactive Extensions是异步的基础,并且在一些高级应用方便语法糖仍然不能取代它。先熟悉下这个框架还是有必要的。

由于目前还只熟悉了皮毛,有许多东西还不会,等后续详细了解后再专门写篇文章介绍它。对这个库感兴趣的朋友可以访问一下下面几个链接:

Reactive Extensions for .NET (Rx)

Rx: Curing your asynchronous programming blues

Introducing the Reactive Framework

原文地址:https://www.cnblogs.com/TianFang/p/2011967.html