2024-07-15 10:31:26 +08:00

94 lines
3.0 KiB
C#

namespace TPLBufferBlockTest
{
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Collections.Concurrent;
internal class Program
{
static void Produce(ITargetBlock<byte[]> target)
{
var rand = new Random();
Parallel.For(0, int.MaxValue, a =>
{
var buffer = new byte[1024];
rand.NextBytes(buffer);
target.Post(buffer);
});
//target.Complete();
}
static async Task ConsumeAsync(ISourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
byte[] data = await source.ReceiveAsync();
Console.WriteLine(data[10]);
//byte[] data = await source.ReceiveAsync();
//bytesProcessed += data.Length;
}
//return bytesProcessed;
}
static async Task Main()
{
//using (FileStream fileStream = new FileStream(@"D:\ISAS\Data\SysAttachment\InspectionReports\" + Guid.NewGuid().ToString() + ".xlsx", FileMode.CreateNew))
//{
// //excelDatas.Add(excelData);
// //excelWrite.WriteExcel(fileStream, excelDatas);
// fileStream.Write(new byte[1024]);
// fileStream.Seek(0, SeekOrigin.Begin);
//}
//ConcurrentQeueTest();
//return;
var buffer = new BufferBlock<byte[]>();
var consumerTask = ConsumeAsync(buffer);
Produce(buffer);
await consumerTask;
Console.WriteLine("Processed");
Console.ReadLine();
//Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
}
static void ConcurrentQeueTest()
{
ConcurrentQueue<int> cq = new ConcurrentQueue<int>();
// Populate the queue.
for (int i = 0; i < 10000; i++)
{
cq.Enqueue(i);
}
// Peek at the first element.
int result;
if (!cq.TryPeek(out result))
{
Console.WriteLine("CQ: TryPeek failed when it should have succeeded");
}
else if (result != 0)
{
Console.WriteLine("CQ: Expected TryPeek result of 0, got {0}", result);
}
int outerSum = 0;
// An action to consume the ConcurrentQueue.
Action action = () =>
{
int localSum = 0;
int localValue;
while (cq.TryDequeue(out localValue))
localSum += localValue;
Interlocked.Add(ref outerSum, localSum);
};
// Start 4 concurrent consuming actions.
Parallel.Invoke(action, action, action, action);
Console.WriteLine("outerSum = {0}, should be 49995000", outerSum);
}
}
}