День 1801. #TipsAndTricks
10 Крутых Трюков в C#. Продолжение
1-2
3-4
5. Конвейеры для потоковой обработки
Pipelines — библиотека потоковой обработки в .NET для эффективной обработки больших потоков данных с низкими задержками. Пространство имён System.IO.Pipelines предоставляет абстракции для чтения и записи данных в потоковом режиме, минимизируя при этом выделение памяти и копирование.
async Task ProcessStream(
Stream stream)
{
var pipe = new Pipe();
var write = FillPipe(stream, pipe.Writer);
var read = ReadPipe(pipe.Reader);
await Task.WhenAll(write, read);
}
async Task FillPipe(
Stream stream,
PipeWriter writer
)
{
const int minSize = 512;
while (true)
{
var memory = writer.GetMemory(minSize);
int bytes = await stream.ReadAsync(memory);
if (bytes == 0)
break;
writer.Advance(bytes);
var result = await writer.FlushAsync();
if (result.IsCompleted)
break;
}
writer.Complete();
}
async Task ReadPipe(
PipeReader reader)
{
while (true)
{
var result = await reader.ReadAsync();
var buffer = result.Buffer;
foreach (var seg in buffer)
{
// Обработка данных частями
Console.WriteLine(
$"Прочитано {seg.Length} байт");
}
reader.AdvanceTo(buffer.End);
if (result.IsCompleted)
break;
}
reader.Complete();
}
// использование
using var stream =
new MemoryStream(
Encoding.UTF8.GetBytes("Hello, Pipelines!"));
await ProcessStream(stream);
Вывод:
Прочитано 17 байт
Как это работает и почему это полезно:
В примере выше метод ProcessStream создаёт новый экземпляр Pipe и запускает две задачи: для записи данных в канал (FillPipe) и для чтения данных из канала (ReadPipe). Task.WhenAll используется для ожидания завершения обеих задач. FillPipe считывает данные из входного потока и записывает их в PipeWriter. Он делает это в цикле, получая память от PipeWriter и считывая данные из потока в память. Метод PipeWriter.FlushAsync вызывается, чтобы сигнализировать о том, что данные доступны для чтения, и цикл продолжается до тех пор, пока поток не будет исчерпан или канал не будет закрыт. Метод ReadPipeAsync ожидает PipeReader.ReadAsync, который возвращает ReadResult, содержащий буфер ReadOnlySequence<byte>. Буфер обрабатывается частями, а метод PipeReader.AdvanceTo вызывается, чтобы сигнализировать о том, что данные были использованы. Цикл продолжается до тех пор, пока конвейер не будет завершён.
Использование конвейеров для потоковой обработки может обеспечить значительный выигрыш в производительности, особенно в сценариях, где требуется обработка с малой задержкой и минимальное выделение памяти. Абстракции и дизайн библиотеки упрощают обработку сложных сценариев потоковой передачи, таких как сетевое взаимодействие или файловый ввод-вывод, с эффективным управлением ресурсами и оптимальной производительностью.
Продолжение следует…
Источник: https://maherz.medium.com/10-mind-blowing-c-hacks-95fa629cfcef
>>Click here to continue<<