LOGO OA教程 ERP教程 模切知识交流 PMS教程 CRM教程 开发文档 其他文档  
 
网站管理员

.NET 异步并发操作,只保留最后一次操作

freeflydom
2024年9月5日 11:35 本文热度 2592

在我们业务操作时,难免会有多次操作,比如:界面搜索框,输入内容时时查询数据库/后台数据。多次触发搜索,我们期望什么结果呢?绝大部分情况,应该是只需要最后一次操作的结果,其它操作应该无效。

自定义等待的任务类

1. 可等待的任务类 AwaitableTask:

/// <summary>
    /// 可等待的任务
    /// </summary>
    public class AwaitableTask
    {
        /// <summary>
        /// 获取任务是否为不可执行状态
        /// </summary>
        public bool NotExecutable { get; private set; }


        /// <summary>
        /// 设置任务不可执行
        /// </summary>
        public void SetNotExecutable()
        {
            NotExecutable = true;
        }


        /// <summary>
        /// 获取任务是否有效
        /// 注:对无效任务,可以不做处理。减少并发操作导致的干扰
        /// </summary>
        public bool IsInvalid { get; private set; } = true;


        /// <summary>
        /// 标记任务无效
        /// </summary>
        public void MarkTaskValid()
        {
            IsInvalid = false;
        }


        #region Task


        private readonly Task _task;
        /// <summary>
        /// 初始化可等待的任务。
        /// </summary>
        /// <param name="task"></param>
        public AwaitableTask(Task task) => _task = task;


        /// <summary>
        /// 获取任务是否已完成
        /// </summary>
        public bool IsCompleted => _task.IsCompleted;


        /// <summary>
        /// 任务的Id
        /// </summary>
        public int TaskId => _task.Id;


        /// <summary>
        /// 开始任务
        /// </summary>
        public void Start() => _task.Start();


        /// <summary>
        /// 同步执行开始任务
        /// </summary>
        public void RunSynchronously() => _task.RunSynchronously();


        #endregion


        #region TaskAwaiter


        /// <summary>
        /// 获取任务等待器
        /// </summary>
        /// <returns></returns>
        public TaskAwaiter GetAwaiter() => new TaskAwaiter(this);


        /// <summary>Provides an object that waits for the completion of an asynchronous task. </summary>
        [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]
        public struct TaskAwaiter : INotifyCompletion
        {
            private readonly AwaitableTask _task;


            /// <summary>
            /// 任务等待器
            /// </summary>
            /// <param name="awaitableTask"></param>
            public TaskAwaiter(AwaitableTask awaitableTask) => _task = awaitableTask;


            /// <summary>
            /// 任务是否完成.
            /// </summary>
            public bool IsCompleted => _task._task.IsCompleted;


            /// <inheritdoc />
            public void OnCompleted(Action continuation)
            {
                var This = this;
                _task._task.ContinueWith(t =>
                {
                    if (!This._task.NotExecutable) continuation?.Invoke();
                });
            }
            /// <summary>
            /// 获取任务结果
            /// </summary>
            public void GetResult() => _task._task.Wait();
        }


        #endregion


    }

无效的操作可以分为以下俩种:

已经进行中的操作,后续结果应标记为无效

还没开始的操作,后续不执行


自定义任务类型 AwaitableTask中,添加俩个字段NotExecutable、IsInvalid:

/// <summary>
    /// 获取任务是否为不可执行状态
    /// </summary>
    public bool NotExecutable { get; private set; }
    /// <summary>
    /// 获取任务是否有效
    /// 注:对无效任务,可以不做处理。减少并发操作导致的干扰
    /// </summary>
    public bool IsInvalid { get; private set; } = true;


2. 有返回结果的可等待任务类 AwaitableTask<TResult>:

/// <summary>
    /// 可等待的任务
    /// </summary>
    /// <typeparam name="TResult"></typeparam>
    public class AwaitableTask<TResult> : AwaitableTask
    {
        private readonly Task<TResult> _task;
        /// <summary>
        /// 初始化可等待的任务
        /// </summary>
        /// <param name="task">需要执行的任务</param>
        public AwaitableTask(Task<TResult> task) : base(task) => _task = task;


        #region TaskAwaiter


        /// <summary>
        /// 获取任务等待器
        /// </summary>
        /// <returns></returns>
        public new TaskAwaiter GetAwaiter() => new TaskAwaiter(this);


        /// <summary>
        /// 任务等待器
        /// </summary>
        [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]
        public new struct TaskAwaiter : INotifyCompletion
        {
            private readonly AwaitableTask<TResult> _task;


            /// <summary>
            /// 初始化任务等待器
            /// </summary>
            /// <param name="awaitableTask"></param>
            public TaskAwaiter(AwaitableTask<TResult> awaitableTask) => _task = awaitableTask;


            /// <summary>
            /// 任务是否已完成。
            /// </summary>
            public bool IsCompleted => _task._task.IsCompleted;


            /// <inheritdoc />
            public void OnCompleted(Action continuation)
            {
                var This = this;
                _task._task.ContinueWith(t =>
                {
                    if (!This._task.NotExecutable) continuation?.Invoke();
                });
            }


            /// <summary>
            /// 获取任务结果。
            /// </summary>
            /// <returns></returns>
            public TResult GetResult() => _task._task.Result;
        }


        #endregion
    }


添加任务等待器,同步等待结果返回:

/// <summary>
    /// 获取任务等待器
    /// </summary>
    /// <returns></returns>
    public new TaskAwaiter GetAwaiter() => new TaskAwaiter(this);


    /// <summary>
    /// 任务等待器
    /// </summary>
    [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]
    public new struct TaskAwaiter : INotifyCompletion
    {
        private readonly AwaitableTask<TResult> _task;


        /// <summary>
        /// 初始化任务等待器
        /// </summary>
        /// <param name="awaitableTask"></param>
        public TaskAwaiter(AwaitableTask<TResult> awaitableTask) => _task = awaitableTask;


        /// <summary>
        /// 任务是否已完成。
        /// </summary>
        public bool IsCompleted => _task._task.IsCompleted;


        /// <inheritdoc />
        public void OnCompleted(Action continuation)
        {
            var This = this;
            _task._task.ContinueWith(t =>
            {
                if (!This._task.NotExecutable) continuation?.Invoke();
            });
        }


        /// <summary>
        /// 获取任务结果。
        /// </summary>
        /// <returns></returns>
        public TResult GetResult() => _task._task.Result;
    }


异步任务队列

/// <summary>
    /// 异步任务队列
    /// </summary>
    public class AsyncTaskQueue : IDisposable
    {
        /// <summary>
        /// 异步任务队列
        /// </summary>
        public AsyncTaskQueue()
        {
            _autoResetEvent = new AutoResetEvent(false);
            _thread = new Thread(InternalRunning) { IsBackground = true };
            _thread.Start();
        }


        #region 执行


        /// <summary>
        /// 执行异步操作
        /// </summary>
        /// <typeparam name="T">返回结果类型</typeparam>
        /// <param name="func">异步操作</param>
        /// <returns>isInvalid:异步操作是否有效;result:异步操作结果</returns>
        public async Task<(bool isInvalid, T reslut)> ExecuteAsync<T>(Func<Task<T>> func)
        {
            var task = GetExecutableTask(func);
            var result = await await task;
            if (!task.IsInvalid)
            {
                result = default(T);
            }
            return (task.IsInvalid, result);
        }


        /// <summary>
        /// 执行异步操作
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="func"></param>
        /// <returns></returns>
        public async Task<bool> ExecuteAsync<T>(Func<Task> func)
        {
            var task = GetExecutableTask(func);
            await await task;
            return task.IsInvalid;
        }


        #endregion


        #region 添加任务


        /// <summary>
        /// 获取待执行任务
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        private AwaitableTask GetExecutableTask(Action action)
        {
            var awaitableTask = new AwaitableTask(new Task(action));
            AddPenddingTaskToQueue(awaitableTask);
            return awaitableTask;
        }


        /// <summary>
        /// 获取待执行任务
        /// </summary>
        /// <typeparam name="TResult"></typeparam>
        /// <param name="function"></param>
        /// <returns></returns>
        private AwaitableTask<TResult> GetExecutableTask<TResult>(Func<TResult> function)
        {
            var awaitableTask = new AwaitableTask<TResult>(new Task<TResult>(function));
            AddPenddingTaskToQueue(awaitableTask);
            return awaitableTask;
        }


        /// <summary>
        /// 添加待执行任务到队列
        /// </summary>
        /// <param name="task"></param>
        /// <returns></returns>
        private void AddPenddingTaskToQueue(AwaitableTask task)
        {
            //添加队列,加锁。
            lock (_queue)
            {
                _queue.Enqueue(task);
                //开始执行任务
                _autoResetEvent.Set();
            }
        }


        #endregion


        #region 内部运行


        private void InternalRunning()
        {
            while (!_isDisposed)
            {
                if (_queue.Count == 0)
                {
                    //等待后续任务
                    _autoResetEvent.WaitOne();
                }
                while (TryGetNextTask(out var task))
                {
                    //如已从队列中删除
                    if (task.NotExecutable) continue;


                    if (UseSingleThread)
                    {
                        task.RunSynchronously();
                    }
                    else
                    {
                        task.Start();
                    }
                }
            }
        }
        /// <summary>
        /// 上一次异步操作
        /// </summary>
        private AwaitableTask _lastDoingTask;
        private bool TryGetNextTask(out AwaitableTask task)
        {
            task = null;
            while (_queue.Count > 0)
            {
                //获取并从队列中移除任务
                if (_queue.TryDequeue(out task) && (!AutoCancelPreviousTask || _queue.Count == 0))
                {
                    //设置进行中的异步操作无效
                    _lastDoingTask?.MarkTaskValid();
                    _lastDoingTask = task;
                    return true;
                }
                //并发操作,设置任务不可执行
                task.SetNotExecutable();
            }
            return false;
        }


        #endregion


        #region dispose


        /// <inheritdoc />
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }


        /// <summary>
        /// 析构任务队列
        /// </summary>
        ~AsyncTaskQueue() => Dispose(false);


        private void Dispose(bool disposing)
        {
            if (_isDisposed) return;
            if (disposing)
            {
                _autoResetEvent.Dispose();
            }
            _thread = null;
            _autoResetEvent = null;
            _isDisposed = true;
        }


        #endregion


        #region 属性及字段


        /// <summary>
        /// 是否使用单线程完成任务.
        /// </summary>
        public bool UseSingleThread { get; set; } = true;


        /// <summary>
        /// 自动取消以前的任务。
        /// </summary>
        public bool AutoCancelPreviousTask { get; set; } = false;


        private bool _isDisposed;
        private readonly ConcurrentQueue<AwaitableTask> _queue = new ConcurrentQueue<AwaitableTask>();
        private Thread _thread;
        private AutoResetEvent _autoResetEvent;


        #endregion

 

添加异步任务队列类,用于任务的管理,如添加、执行、筛选等:

1. 自动取消之前的任务 AutoCancelPreviousTask

内部使用线程,循环获取当前任务列表,如果当前任务被标记NotExecutable不可执行,则跳过。

NotExecutable是何时标记的?

获取任务时,标记所有获取的任务为NotExecutable。直到任务列表中为空,那么只执行最后获取的一个任务。

2. 标记已经进行的任务无效 MarkTaskValid

当前进行的任务,无法中止,那么标记为无效即可。

/// <summary>
    /// 上一次异步操作
    /// </summary>
    private AwaitableTask _lastDoingTask;
    private bool TryGetNextTask(out AwaitableTask task)
    {
        task = null;
        while (_queue.Count > 0)
        {
            //获取并从队列中移除任务
            if (_queue.TryDequeue(out task) && (!AutoCancelPreviousTask || _queue.Count == 0))
            {
                //设置进行中的异步操作无效
                _lastDoingTask?.MarkTaskValid();
                _lastDoingTask = task;
                return true;
            }
            //并发操作,设置任务不可执行
            task.SetNotExecutable();
        }
        return false;
    }


后续执行完后,根据此标记,设置操作结果为空。

/// <summary>
    /// 执行异步操作
    /// </summary>
    /// <typeparam name="T">返回结果类型</typeparam>
    /// <param name="func">异步操作</param>
    /// <returns>isInvalid:异步操作是否有效;result:异步操作结果</returns>
    public async Task<(bool isInvalid, T reslut)> ExecuteAsync<T>(Func<Task<T>> func)
    {
        var task = GetExecutableTask(func);
        var result = await await task;
        if (!task.IsInvalid)
        {
            result = default(T);
        }
        return (task.IsInvalid, result);
    }


实践测试

启动9个并发任务,测试实际的任务队列并发操作管理:

public MainWindow()
    {
        InitializeComponent();
        _asyncTaskQueue = new AsyncTaskQueue
        {
            AutoCancelPreviousTask = true,
            UseSingleThread = true
        };
    }
    private AsyncTaskQueue _asyncTaskQueue;
    private void ButtonBase_OnClick(object sender, RoutedEventArgs e)
    {
        // 快速启动9个任务
        for (var i = 1; i < 10; i++)
        {
            Test(_asyncTaskQueue, i);
        }
    }
    public static async void Test(AsyncTaskQueue taskQueue, int num)
    {
        var result = await taskQueue.ExecuteAsync(async () =>
        {
            Debug.WriteLine("输入:" + num);
            // 长时间耗时任务
            await Task.Delay(TimeSpan.FromSeconds(5));
            return num * 100;
        });
        Debug.WriteLine($"{num}输出的:" + result);
    }

测试结果如下:

一共9次操作,只有最后一次操作结果,才是有效的。其它8次操作,一次是无效的,7次操作被取消不执行。

固定时间间隔只保留最后操作

实际业务过程中,大部分场景并不是瞬间丢过来N个任务,而是比如100ms内有20个操作触发。

这类高并发操作,不止是上方做一个队列获取首尾任务,还需要加个延时(时间间隔),减少并发操作的执行,可以基于上方TryGetNextTask方法操作:

/// <summary>
        /// 获取下一任务
        /// </summary>
        /// <returns></returns>
        private async Task<AwaitableTask> TryGetNextTask()
        {
            //获取并从队列中移除任务
            if (_queue.TryDequeue(out var task))
            {
                //设置进行中的异步操作无效
                foreach (var lastDoingTask in _lastDoingTasks)
                {
                    lastDoingTask.MarkTaskInvalid();
                }
                _lastDoingTasks.Add(task);
                //如果有中间任务,则立即取消
                if (_queue.Count != 0)
                {
                    task.SetNotExecutable();
                }
                else
                {
                    //如果没有中间任务,则设置延迟时间再确认是否有中间任务
                    await Task.Delay(_delay).ConfigureAwait(false);
                    if (_queue.Count != 0)
                    {
                        task.SetNotExecutable();
                    }
                }
                return task;
            }

设置延时取任务后,就能实现在固定的时间内最多只会触发一次,并且是最后一次操作。


作者:唐宋元明清2188

出处:http://www.cnblogs.com/kybs0/

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须在文章页面给出原文连接,否则保留追究法律责任的权利。



该文章在 2024/9/5 11:36:52 编辑过
关键字查询
相关文章
点晴ERP是一款针对中小制造业的专业生产管理软件系统,系统成熟度和易用性得到了国内大量中小企业的青睐。
点晴PMS码头管理系统主要针对港口码头集装箱与散货日常运作、调度、堆场、车队、财务费用、相关报表等业务管理,结合码头的业务特点,围绕调度、堆场作业而开发的。集技术的先进性、管理的有效性于一体,是物流码头及其他港口类企业的高效ERP管理信息系统。
点晴WMS仓储管理系统提供了货物产品管理,销售管理,采购管理,仓储管理,仓库管理,保质期管理,货位管理,库位管理,生产管理,WMS管理系统,标签打印,条形码,二维码管理,批号管理软件。
点晴免费OA是一款软件和通用服务都免费,不限功能、不限时间、不限用户的免费OA协同办公管理系统。
Copyright 2010-2025 ClickSun All Rights Reserved