C# 简单实现线程池

  • C# 简单实现线程池已关闭评论
  • 10 次浏览
  • A+
所属分类:.NET技术
摘要

NET 6 环境开发 实现 线程数量,任务队列,非核心线程,及核心线程活跃时间的管理。

NET 6 环境开发 实现 线程数量,任务队列,非核心线程,及核心线程活跃时间的管理。

namespace CustomThreadPool; /// <summary> /// 线程池类 /// </summary> public class ThreadPoolExecutor {     /// <summary>     /// 核心线程的任务队列     /// </summary>     private readonly Queue<WorkTask> tasks = new Queue<WorkTask>();     /// <summary>     /// 最大核心线程数     /// </summary>     private int coreThreadCount;     /// <summary>     /// 最大非核心线程数     /// </summary>     private int noneCoreThreadCount;     /// <summary>     /// 当前运行的核心线程的数量     /// </summary>     private int runCoreThreadCount;     /// <summary>     /// 当前运行的非核心线程的数量     /// </summary>     private int runNoneCoreThreadCount;     /// <summary>     /// 核心线程队列的最大数     /// </summary>     private int maxQueueCount;     /// <summary>     /// 当核心线程空闲时最大活跃时间     /// </summary>     private int keepAliveTimeout;     /// <summary>     /// 设置是否为后台线程     /// </summary>     private bool isBackground;     private ThreadPoolExecutor() { }     /// <summary>     ///      /// </summary>     /// <param name="CoreThreadCount">核心线程数</param>     /// <param name="TotalThreadCount">总线程数</param>     /// <param name="IsBackground">是否为后台线程</param>     /// <param name="QueueCount">核心队列的最大数</param>     /// <param name="KeepAliveTimeout">当核心线程空闲时最大活跃时间</param>     /// <exception cref="ArgumentOutOfRangeException"></exception>     /// <exception cref="ArgumentException"></exception>     public ThreadPoolExecutor(int CoreThreadCount = 5, int TotalThreadCount = 10, bool IsBackground = true, int QueueCount = 200, int KeepAliveTimeout = 0)     {         if (CoreThreadCount < 1) throw new ArgumentOutOfRangeException(nameof(CoreThreadCount), CoreThreadCount, null);         if (TotalThreadCount < CoreThreadCount) throw new ArgumentException($"{nameof(TotalThreadCount)}:{TotalThreadCount} must be greater than {nameof(CoreThreadCount)}:{CoreThreadCount}");         if (QueueCount < 0) throw new ArgumentOutOfRangeException(nameof(QueueCount), QueueCount, null);         if (KeepAliveTimeout < 0) throw new ArgumentOutOfRangeException(nameof(KeepAliveTimeout), KeepAliveTimeout, null);         coreThreadCount = CoreThreadCount;         noneCoreThreadCount = TotalThreadCount - CoreThreadCount;         keepAliveTimeout = KeepAliveTimeout;         maxQueueCount = QueueCount;         isBackground = IsBackground;     }     /// <summary>     /// 执行任务     /// </summary>     /// <param name="task">一个自定义任务</param>     /// <exception cref="ArgumentNullException">任务为null时,抛出该错误</exception>     /// <exception cref="NotSupportedException">当核心任务队列已满且非核心线程最大数为0时抛出该错误</exception>     public void QueueTask(WorkTask task)     {         if (task == null) throw new ArgumentNullException(nameof(task));         lock (tasks)         {             tasks.Enqueue(task);             if (tasks.Count <= maxQueueCount)             {                 if (runCoreThreadCount < coreThreadCount)                 {                     ++runCoreThreadCount;                     Run(true);                 }             }             else             {                 if (noneCoreThreadCount > 0 && runNoneCoreThreadCount < noneCoreThreadCount)                 {                     ++runNoneCoreThreadCount;                     Run(false);                 }             }         }     }      private void Run(bool isCore)     {         Tuple<int, bool> state = new(keepAliveTimeout, isCore);         Thread thread = new(t => Excute(t))         {             Name = Guid.NewGuid().ToString("D"),             IsBackground = isBackground         };         thread.Start(state);     }      private void Excute(object? state)     {         if (state == null) return;         var parameter = (Tuple<int, bool>)state;         bool first = true;         DateTime firstTime = DateTime.Now;         while (true)         {             WorkTask? item = null;             lock (tasks)             {                 if (tasks.Count > 0)                 {                     first = true;                     item = tasks.Dequeue();                 }                 else                 {                     if (parameter.Item2)                     {                          if (first)                         {                             firstTime = DateTime.Now;                             first = false;                         }                         if ((DateTime.Now - firstTime).TotalMilliseconds > parameter.Item1)                         {                             --runCoreThreadCount;                             break;                         }                     }                     else                     {                         --runNoneCoreThreadCount;                         break;                     }                 }             }             item?.Runsynchronous();         }     } } 

namespace CustomThreadPool; /// <summary> /// 包装的任务类 /// </summary> public class WorkTask {     public static WorkTaskFactory Factory { get; private set; } = new WorkTaskFactory();     /// <summary>     /// 任务运行结束时触发该事件     /// </summary>     public event Action<WorkTask>? TaskCompleted;     /// <summary>     /// 任务ID     /// </summary>     private static int _id = 0;     /// <summary>     /// 委托给任务不带执行参数的代码     /// </summary>     private readonly Action? action;     /// <summary>     /// 委托给任务执行的带输入参数代码     /// </summary>     private readonly Action<object?>? actionWithParamter;     /// <summary>     /// 线程间的同步事件     /// </summary>     public AutoResetEvent WaitHandle { get; protected set; } = new AutoResetEvent(false);     /// <summary>     /// 执行代码的参数     /// </summary>     public object? State { get; protected set; }     /// <summary>     /// 接收任务抛出的异常     /// </summary>     public WorkTaskException? Exception { get; protected set; }     /// <summary>     /// 任务是否完成标志     /// </summary>     public bool IsCompleted { get; protected set; } = false;     /// <summary>     /// 任务知否有异常     /// </summary>     public bool IsFaulted { get; protected set; } = false;     /// <summary>     /// 任务状态     /// </summary>     public WorkTaskStatus Status { get; protected set; } = WorkTaskStatus.Created;     public int Id { get { return Interlocked.Increment(ref _id); } }     protected WorkTask() { }      protected void OnTaskCompleted(WorkTask sender)     {         TaskCompleted?.Invoke(sender);     }     public WorkTask(Action action)     {         this.action = action ?? throw new ArgumentNullException(nameof(action));     }     public WorkTask(Action<object?> action, object state)     {         actionWithParamter = action ?? throw new ArgumentNullException(nameof(action));         this.State = state;     }     /// <summary>     /// 任务的同步方法     /// </summary>     public virtual void Runsynchronous()     {         if (Status != WorkTaskStatus.Created) return;         Status = WorkTaskStatus.Running;         try         {             action?.Invoke();             actionWithParamter?.Invoke(State);         }         catch (Exception ex)         {             Exception = new WorkTaskException(ex.Message, ex);             IsFaulted = true;         }         finally         {             OnTaskCompleted(this);             WaitHandle.Set();             IsCompleted = true;             Status = WorkTaskStatus.RanToCompleted;         }     }     /// <summary>     /// 通过调用线程执行的方法     /// </summary>     public void Start()     {         Factory.ThreadPoolExcutor?.QueueTask(this);     }     /// <summary>     /// 通过调用线程执行的方法     /// </summary>     /// <param name="executor">线程池管理类</param>     public void Start(ThreadPoolExecutor executor)     {         executor.QueueTask(this);     }     /// <summary>     /// 执行一组任务并等待所有任务完成。     /// </summary>     /// <param name="tasks">一组任务</param>     /// <returns>所有任务是否都接收到完成的信号。</returns>     public static bool WaitAll(WorkTask[] tasks)     {         var result = true;         foreach (var task in tasks)         {             result = result && task.WaitHandle.WaitOne();         }         return result;     }      /// <summary>     /// 执行一组任务并等待任意一个任务完成。     /// </summary>     /// <param name="tasks">一组任务</param>     /// <returns>返回已完成任务的索引</returns>     public static int WaitAny(WorkTask[] tasks)     {         var index = new Random().Next(0, tasks.Length - 1);         tasks[index].WaitHandle.WaitOne();         return index;     } } /// <summary> /// 具有返回类型的任务 /// </summary> /// <typeparam name="TResult"></typeparam> public class WorkTask<TResult> : WorkTask {     private readonly Func<TResult>? func;     private readonly Func<object?, TResult>? funcWithParameter;      protected TResult? _result = default(TResult);     public TResult? Result     {         get         {             if (!isSetSignal)                 WaitHandle.WaitOne();             return _result;         }     }     public WorkTask(Func<TResult> func)     {         this.func = func ?? throw new ArgumentNullException(nameof(func));     }     public WorkTask(Func<object?, TResult> func, object? state)     {         this.funcWithParameter = func ?? throw new ArgumentNullException(nameof(func));         this.State = state;     }      private bool isSetSignal = false;     public override void Runsynchronous()     {         if (Status != WorkTaskStatus.Created) return;         Status = WorkTaskStatus.Running;         try         {             if (func != null) _result = func();             if (funcWithParameter != null) _result = funcWithParameter(State);         }         catch (Exception ex)         {             Exception = new WorkTaskException(ex.Message, ex);             IsFaulted = true;         }         finally         {             OnTaskCompleted(this);             isSetSignal = WaitHandle.Set();             Status = WorkTaskStatus.RanToCompleted;             IsCompleted = true;          }     } } public class WorkTaskException : Exception {     public WorkTaskException()     {     }     public WorkTaskException(string Message)         : base(Message)     {     }      public WorkTaskException(string Message, Exception InnerException)         : base(Message, InnerException)     {     } }  public enum WorkTaskStatus {     /// <summary>     /// 已创建     /// </summary>     Created = 0,     /// <summary>     /// 正在运行     /// </summary>     Running = 1,     /// <summary>     /// 已完成     /// </summary>     RanToCompleted = 2, } 

namespace CustomThreadPool;  public class WorkTaskFactory {     public ThreadPoolExecutor? ThreadPoolExcutor { get; private set; }     public WorkTaskFactory(ThreadPoolExecutor excutor)     {         ThreadPoolExcutor = excutor;     }      public WorkTaskFactory()     : this(new ThreadPoolExecutor(5, 10))     {     }      public WorkTask StartNew(Action action, ThreadPoolExecutor? executor = null)     {         WorkTask task = new WorkTask(action);         ThreadPoolExcutor = executor ?? ThreadPoolExcutor;         ThreadPoolExcutor?.QueueTask(task);         return task;     }      public WorkTask<TResult> StartNew<TResult>(Func<object?, TResult> func, object? state, ThreadPoolExecutor? executor = null)     {         WorkTask<TResult> task = new WorkTask<TResult>(func, state);         ThreadPoolExcutor = executor ?? ThreadPoolExcutor;         ThreadPoolExcutor?.QueueTask(task);         return task;     } } 

namespace CustomThreadPool; using System.Threading; using System.Text; using System; using System.Diagnostics; using System.Reflection.Emit;  class Program {     static void Main(string[] args)     {         int count = 5;         ThreadPoolExecutor poolExcutor = new(5, 6, QueueCount: 5, KeepAliveTimeout: 2000);         WorkTask<int?>[] workTasks = new WorkTask<int?>[count];         for (int i = 0; i < count; i++) workTasks[i] = WorkTask.Factory.StartNew(t => Action(t), state: i, executor: poolExcutor);          WorkTask<int> task = WorkTask.Factory.StartNew(t =>         {             Thread.Sleep(100);             Console.WriteLine("start thread");             return 100;         }, state: null, executor: poolExcutor);          Console.WriteLine("start main");         WorkTask.WaitAll(workTasks);         Console.WriteLine(task.Result);         Console.WriteLine(workTasks.Sum(t => t.Result));     }     private static int? Action(object? t)     {         Thread.Sleep(2000);         Console.WriteLine($"Task Id:{Environment.CurrentManagedThreadId},Parameter:{t}");         return t == null ? default(int?) : (int)t + 1;     } } 

调用结果

C# 简单实现线程池