案例研究:CopyToAsync

返回该系列目录《基于Task的异步模式--全面介绍》


把一个流拷贝到另一个流是有用且常见的操作。Stream.CopyTo 方法在.Net 4中就已经加入来满足要求这个功能的场景,例如在一个指定的URL处下载数据:

public static byte[] DownloadData(string url)
{
    using(var request = WebRequest.Create(url))
    using(var response = request.GetResponse())
    using(var responseStream = response.GetResponseStream())
    using(var result = new MemoryStream())
    {
        responseStream.CopyTo(result);
        return result.ToArray();
    }
}

为了提高响应能力和伸缩性,我们想使用基于TAP模式来实现上面的功能。可以尝试按下面的来做:

public static async Task<byte[]> DownloadDataAsync(string url)
{
    using(var request = WebRequest.Create(url))
    {
        return await Task.Run(() =>
        {
            using(var response = request.GetResponse())
            using(var responseStream = response.GetResponseStream())
            using(var result = new MemoryStream())
            {
                responseStream.CopyTo(result);
                return result.ToArray();
            }
        }
    }
}

此实现如果用于UI线程会提升响应能力,因为它脱离了从网络流下载数据任务的调用线程以及把该网络流复制到最终将下载的数据转成一个数组的内存流。然而,该实现对伸缩性没有效果,因为它在等待数据下载的过程中,仍旧执行同步I/O和阻塞线程池线程。反之,我们想要的是下面的功能代码:

public static async Task<byte[]> DownloadDataAsync(string url)
{
    using(var request = WebRequest.Create(url))
    using(var response = await request.GetResponseAsync())
    using(var responseStream = response.GetResponseStream())
    using(var result = new MemoryStream())
    {
        await responseStream.CopyToAsync(result);
        return result.ToArray();
    }
}

不幸的是,在.Net 4中缺少异步的CopyToAsync方法,只有Stream类有一个同步的CopyTo方法。现在我们就自己提供一个实现:

public static void CopyTo(this Stream source, Stream destination)
{
    var buffer = new byte[0x1000];
    int bytesRead;
    while((bytesRead = source.Read(buffer, 0, buffer.Length)) > 0)
    {
        destination.Write(buffer, 0, bytesRead);
    }
}

为了提供一个异步的CopyTo实现,我们可以利用编译器实现TAP的能力,稍微地修改这个实现:

public static async Task CopyToAsync(this Stream source, Stream destination)
{
    var buffer = new byte[0x1000];
    int bytesRead;
    while((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length)) > 0)
    {
        await destination.WriteAsync(buffer, 0, bytesRead);
    }
}

这里我们将返回类型从void改成了Task,将Read和Write分别换成了ReadAsync和WriteAsync,并且在ReadAsync和WriteAsync的调用前加了与上下文相关的await关键字前缀。.Net 4 中不存在ReadAsycn和WriteAsync,但是可以通过基于Task.Factory.FromAsync实现,关于这个描述在上一篇随笔中的“Tasks和APM”章节讲过:

public static Task<int> ReadAsync(
    this Stream source, byte [] buffer, int offset, int count)
{
    return Task<int>.Factory.FromAsync(source.BeginRead, source.EndRead, 
        buffer, offset, count, null);
}

public static Task WriteAsync(
    this Stream destination, byte [] buffer, int offset, int count)
{
    return Task.Factory.FromAsync(
        destination.BeginWrite, destination.EndWrite, 
        buffer, offset, count, null);
}

有了这些方法,我们可以成功地实现CopyToAsync方法。我们也可以通过添加一个CancellationToken到方法中以支持撤销请求,该CancellationToken将会在复制过程中的每次读写之后被监控到(如果ReadAsync和WriteAsync支持撤销,那么也可以将CancellationToken线程化到那些调用中):

public static async Task CopyToAsync(
    this Stream source, Stream destination, 
    CancellationToken cancellationToken)
{
    var buffer = new byte[0x1000];
    int bytesRead;
    while((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length)) > 0)
    {
        await destination.WriteAsync(buffer, 0, bytesRead);
        cancellationToken.ThrowIfCancellationRequested();
    }
}

【注意这种撤销在同步的CopyTo实现中也是有用的,传入的CancellationToken会启用撤销。实现会依赖一个从该方法返回的可取消的对象,但实现接收到那么对象已经太晚了,因为同步调用完成时,已经没有留下要取消的东西了。】

我们也加入了进度通知的支持,包括至今已经复制了多少数据:

public static async Task CopyToAsync(
    this Stream source, Stream destination, 
    CancellationToken cancellationToken,
    IProgress<long> progress)
{
    var buffer = new byte[0x1000];
    int bytesRead;
    long totalRead = 0;
    while((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length)) > 0)
    {
        await destination.WriteAsync(buffer, 0, bytesRead);
        cancellationToken.ThrowIfCancellationRequested();
        totalRead += bytesRead;
        progress.Report(totalRead);
    }
}

有了该方法,我们现在可以完全实现我们的DownloadDataAsync方法了,包括加入撤销和进度支持:

public static async Task<byte[]> DownloadDataAsync(
    string url,
    CancellationToken cancellationToken,
    IProgress<long> progress)
{
    using(var request = WebRequest.Create(url))
    using(var response = await request.GetResponseAsync())
    using(var responseStream = response.GetResponseStream())
    using(var result = new MemoryStream())
    {
        await responseStream.CopyToAsync(
            result, cancellationToken, progress);
        return result.ToArray();
    }
}

给我们的CopyToAsync方法做进一步的优化也是可能的。比如,如果我们要使用两个buffer而不是一个,就可以在读取下一片数据时写入之前读取的数据,因此如果读取和写入都使用了异步了I/O就会产生交叉延迟:

public static async Task CopyToAsync(this Stream source, Stream destination)
{
    int i = 0;
    var buffers = new [] { new byte[0x1000], new byte[0x1000] };
    Task writeTask = null;
    while(true)
    {
        var readTask = source.ReadAsync(buffers[i], 0, buffers[i].Length))>0;
        if (writeTask != null) await Task.WhenAll(readTask, writeTask);
        int bytesRead = await readTask;
        if (bytesRead == 0) break;
        writeTask = destination.WriteAsync(buffers[i], 0, bytesRead);
        i ^= 1; // swap buffers
    }
}

消除不必要的上下文转换是另一个优化。正如之前提到的,默认await一个Task开始执行的时候,会传输回到当前的SynchronizationContext。在CopyToAsynch实现的情况下,使用这样的转换时没必要的,因为我们没有操作任何UI状态。我们可以发挥Task.ConfigureAwait的优势类关闭这个自动的转换。为了简化,上面的原始异步的实现修改如下:

public static Task CopyToAsync(this Stream source, Stream destination)
{
    var buffer = new byte[0x1000];
    int bytesRead;
    while((bytesRead = await
        source.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false)) > 0)
    {
        await destination.WriteAsync(buffer, 0, bytesRead)
            .ConfigureAwait(false);
    }
}                                                                                           
返回该系列目录《基于Task的异步模式--全面介绍》

原文地址:https://www.cnblogs.com/farb/p/4906828.html