12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- namespace TPLBufferBlockTest
- {
- using System;
- using System.Threading.Tasks;
- using System.Threading.Tasks.Dataflow;
- using System.Collections.Concurrent;
- internal class Program
- {
- static void Produce(ITargetBlock<byte[]> target)
- {
- var rand = new Random();
- Parallel.For(0, int.MaxValue, a =>
- {
- var buffer = new byte[1024];
- rand.NextBytes(buffer);
- target.Post(buffer);
- });
- //target.Complete();
- }
- static async Task ConsumeAsync(ISourceBlock<byte[]> source)
- {
- int bytesProcessed = 0;
- while (await source.OutputAvailableAsync())
- {
- byte[] data = await source.ReceiveAsync();
- Console.WriteLine(data[10]);
- //byte[] data = await source.ReceiveAsync();
- //bytesProcessed += data.Length;
- }
- //return bytesProcessed;
- }
- static async Task Main()
- {
- //using (FileStream fileStream = new FileStream(@"D:\ISAS\Data\SysAttachment\InspectionReports\" + Guid.NewGuid().ToString() + ".xlsx", FileMode.CreateNew))
- //{
- // //excelDatas.Add(excelData);
- // //excelWrite.WriteExcel(fileStream, excelDatas);
- // fileStream.Write(new byte[1024]);
- // fileStream.Seek(0, SeekOrigin.Begin);
- //}
- //ConcurrentQeueTest();
- //return;
- var buffer = new BufferBlock<byte[]>();
- var consumerTask = ConsumeAsync(buffer);
- Produce(buffer);
- await consumerTask;
- Console.WriteLine("Processed");
- Console.ReadLine();
- //Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
- }
- static void ConcurrentQeueTest()
- {
- ConcurrentQueue<int> cq = new ConcurrentQueue<int>();
- // Populate the queue.
- for (int i = 0; i < 10000; i++)
- {
- cq.Enqueue(i);
- }
- // Peek at the first element.
- int result;
- if (!cq.TryPeek(out result))
- {
- Console.WriteLine("CQ: TryPeek failed when it should have succeeded");
- }
- else if (result != 0)
- {
- Console.WriteLine("CQ: Expected TryPeek result of 0, got {0}", result);
- }
- int outerSum = 0;
- // An action to consume the ConcurrentQueue.
- Action action = () =>
- {
- int localSum = 0;
- int localValue;
- while (cq.TryDequeue(out localValue))
- localSum += localValue;
- Interlocked.Add(ref outerSum, localSum);
- };
- // Start 4 concurrent consuming actions.
- Parallel.Invoke(action, action, action, action);
- Console.WriteLine("outerSum = {0}, should be 49995000", outerSum);
- }
- }
- }
|