Program.cs 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. namespace TPLBufferBlockTest
  2. {
  3. using System;
  4. using System.Threading.Tasks;
  5. using System.Threading.Tasks.Dataflow;
  6. using System.Collections.Concurrent;
  7. internal class Program
  8. {
  9. static void Produce(ITargetBlock<byte[]> target)
  10. {
  11. var rand = new Random();
  12. Parallel.For(0, int.MaxValue, a =>
  13. {
  14. var buffer = new byte[1024];
  15. rand.NextBytes(buffer);
  16. target.Post(buffer);
  17. });
  18. //target.Complete();
  19. }
  20. static async Task ConsumeAsync(ISourceBlock<byte[]> source)
  21. {
  22. int bytesProcessed = 0;
  23. while (await source.OutputAvailableAsync())
  24. {
  25. byte[] data = await source.ReceiveAsync();
  26. Console.WriteLine(data[10]);
  27. //byte[] data = await source.ReceiveAsync();
  28. //bytesProcessed += data.Length;
  29. }
  30. //return bytesProcessed;
  31. }
  32. static async Task Main()
  33. {
  34. //using (FileStream fileStream = new FileStream(@"D:\ISAS\Data\SysAttachment\InspectionReports\" + Guid.NewGuid().ToString() + ".xlsx", FileMode.CreateNew))
  35. //{
  36. // //excelDatas.Add(excelData);
  37. // //excelWrite.WriteExcel(fileStream, excelDatas);
  38. // fileStream.Write(new byte[1024]);
  39. // fileStream.Seek(0, SeekOrigin.Begin);
  40. //}
  41. //ConcurrentQeueTest();
  42. //return;
  43. var buffer = new BufferBlock<byte[]>();
  44. var consumerTask = ConsumeAsync(buffer);
  45. Produce(buffer);
  46. await consumerTask;
  47. Console.WriteLine("Processed");
  48. Console.ReadLine();
  49. //Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
  50. }
  51. static void ConcurrentQeueTest()
  52. {
  53. ConcurrentQueue<int> cq = new ConcurrentQueue<int>();
  54. // Populate the queue.
  55. for (int i = 0; i < 10000; i++)
  56. {
  57. cq.Enqueue(i);
  58. }
  59. // Peek at the first element.
  60. int result;
  61. if (!cq.TryPeek(out result))
  62. {
  63. Console.WriteLine("CQ: TryPeek failed when it should have succeeded");
  64. }
  65. else if (result != 0)
  66. {
  67. Console.WriteLine("CQ: Expected TryPeek result of 0, got {0}", result);
  68. }
  69. int outerSum = 0;
  70. // An action to consume the ConcurrentQueue.
  71. Action action = () =>
  72. {
  73. int localSum = 0;
  74. int localValue;
  75. while (cq.TryDequeue(out localValue))
  76. localSum += localValue;
  77. Interlocked.Add(ref outerSum, localSum);
  78. };
  79. // Start 4 concurrent consuming actions.
  80. Parallel.Invoke(action, action, action, action);
  81. Console.WriteLine("outerSum = {0}, should be 49995000", outerSum);
  82. }
  83. }
  84. }