[IO]System.IO.Pipelines

时间:2019-10-22 22:24:14   收藏:0   阅读:116

  System.IO.Pipelines 是一个新库,旨在使在 .NET 中执行高性能 I/O 更加容易。 该库的目标为适用于所有 .NET 实现的 .NET Standard。

System.IO.Pipelines 解决什么问题

  System.IO.Pipelines 已构建为:

下面的代码是典型的 TCP 服务器,它从客户机接收行分隔的消息(由 ‘\n‘ 分隔):

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);
    
    // Process a single line from the buffer
    ProcessLine(buffer);
}

前面的代码有几个问题:

要解决上述问题,需要进行以下更改:

 

下面的代码解决了其中一些问题:

 

技术分享图片
 1 async Task ProcessLinesAsync(NetworkStream stream)
 2 {
 3     byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
 4     var bytesBuffered = 0;
 5     var bytesConsumed = 0;
 6 
 7     while (true)
 8     {
 9         // Calculate the amount of bytes remaining in the buffer.
10         var bytesRemaining = buffer.Length - bytesBuffered;
11 
12         if (bytesRemaining == 0)
13         {
14             // Double the buffer size and copy the previously buffered data into the new buffer.
15             var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
16             Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
17             // Return the old buffer to the pool.
18             ArrayPool<byte>.Shared.Return(buffer);
19             buffer = newBuffer;
20             bytesRemaining = buffer.Length - bytesBuffered;
21         }
22 
23         var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
24         if (bytesRead == 0)
25         {
26             // EOF
27             break;
28         }
29 
30         // Keep track of the amount of buffered bytes.
31         bytesBuffered += bytesRead;
32         var linePosition = -1;
33 
34         do
35         {
36             // Look for a EOL in the buffered data.
37             linePosition = Array.IndexOf(buffer, (byte)\n, bytesConsumed,
38                                          bytesBuffered - bytesConsumed);
39 
40             if (linePosition >= 0)
41             {
42                 // Calculate the length of the line based on the offset.
43                 var lineLength = linePosition - bytesConsumed;
44 
45                 // Process the line.
46                 ProcessLine(buffer, bytesConsumed, lineLength);
47 
48                 // Move the bytesConsumed to skip past the line consumed (including \n).
49                 bytesConsumed += lineLength + 1;
50             }
51         }
52         while (linePosition >= 0);
53     }
54 }
View Code

Pipe

 Pipe 类可用于创建 PipeWriter/PipeReader 对。 写入 PipeWriter 的所有数据都可用于 PipeReader

var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;

Pipe基本用法

技术分享图片
 1 async Task ProcessLinesAsync(Socket socket)
 2 {
 3     var pipe = new Pipe();
 4     Task writing = FillPipeAsync(socket, pipe.Writer);
 5     Task reading = ReadPipeAsync(pipe.Reader);
 6 
 7     await Task.WhenAll(reading, writing);
 8 }
 9 
10 async Task FillPipeAsync(Socket socket, PipeWriter writer)
11 {
12     const int minimumBufferSize = 512;
13 
14     while (true)
15     {
16         // Allocate at least 512 bytes from the PipeWriter.
17         Memory<byte> memory = writer.GetMemory(minimumBufferSize);
18         try
19         {
20             int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
21             if (bytesRead == 0)
22             {
23                 break;
24             }
25             // Tell the PipeWriter how much was read from the Socket.
26             writer.Advance(bytesRead);
27         }
28         catch (Exception ex)
29         {
30             LogError(ex);
31             break;
32         }
33 
34         // Make the data available to the PipeReader.
35         FlushResult result = await writer.FlushAsync();
36 
37         if (result.IsCompleted)
38         {
39             break;
40         }
41     }
42 
43      // By completing PipeWriter, tell the PipeReader that there‘s no more data coming.
44     await writer.CompleteAsync();
45 }
46 
47 async Task ReadPipeAsync(PipeReader reader)
48 {
49     while (true)
50     {
51         ReadResult result = await reader.ReadAsync();
52         ReadOnlySequence<byte> buffer = result.Buffer;
53 
54         while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
55         {
56             // Process the line.
57             ProcessLine(line);
58         }
59 
60         // Tell the PipeReader how much of the buffer has been consumed.
61         reader.AdvanceTo(buffer.Start, buffer.End);
62 
63         // Stop reading if there‘s no more data coming.
64         if (result.IsCompleted)
65         {
66             break;
67         }
68     }
69 
70     // Mark the PipeReader as complete.
71     await reader.CompleteAsync();
72 }
73 
74 bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
75 {
76     // Look for a EOL in the buffer.
77     SequencePosition? position = buffer.PositionOf((byte)\n);
78 
79     if (position == null)
80     {
81         line = default;
82         return false;
83     }
84 
85     // Skip the line + the \n.
86     line = buffer.Slice(0, position.Value);
87     buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
88     return true;
89 }
View Code

有两个循环:

没有分配显式缓冲区。 所有缓冲区管理都委托给 PipeReader 和 PipeWriter 实现。 委派缓冲区管理使使用代码更容易集中关注业务逻辑。

在第一个循环中:

在第二个循环中,PipeReader 使用由 PipeWriter 写入的缓冲区。 缓冲区来自套接字。 对 PipeReader.ReadAsync 的调用:

找到行尾 (EOL) 分隔符并分析该行后:

读取器和编写器循环通过调用 Complete 结束。 Complete 使基础管道释放其分配的内存。

反压和流量控制

理想情况下,读取和分析可协同工作:

通常,分析所花费的时间比仅从网络复制数据块所用时间更长:

为了获得最佳性能,需要在频繁暂停和分配更多内存之间取得平衡。

为解决上述问题,Pipe 提供了两个设置来控制数据流:

技术分享图片

PipeWriter.FlushAsync

使用两个值可防止快速循环,如果只使用一个值,则可能发生这种循环。

原文:https://www.cnblogs.com/amytal/p/11723026.html

评论(0
© 2014 bubuko.com 版权所有 - 联系我们:wmxa8@hotmail.com
打开技术之扣,分享程序人生!