在我们业务操作时,难免会有多次操作,比如:界面搜索框,输入内容时时查询数据库/后台数据。多次触发搜索,我们期望什么结果呢?绝大部分情况,应该是只需要最后一次操作的结果,其它操作应该无效。
自定义等待的任务类
1. 可等待的任务类 AwaitableTask:
public class AwaitableTask
{
public bool NotExecutable { get; private set; }
public void SetNotExecutable()
{
NotExecutable = true;
}
public bool IsInvalid { get; private set; } = true;
public void MarkTaskValid()
{
IsInvalid = false;
}
#region Task
private readonly Task _task;
public AwaitableTask(Task task) => _task = task;
public bool IsCompleted => _task.IsCompleted;
public int TaskId => _task.Id;
public void Start() => _task.Start();
public void RunSynchronously() => _task.RunSynchronously();
#endregion
#region TaskAwaiter
public TaskAwaiter GetAwaiter() => new TaskAwaiter(this);
[HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]
public struct TaskAwaiter : INotifyCompletion
{
private readonly AwaitableTask _task;
public TaskAwaiter(AwaitableTask awaitableTask) => _task = awaitableTask;
public bool IsCompleted => _task._task.IsCompleted;
public void OnCompleted(Action continuation)
{
var This = this;
_task._task.ContinueWith(t =>
{
if (!This._task.NotExecutable) continuation?.Invoke();
});
}
public void GetResult() => _task._task.Wait();
}
#endregion
}
无效的操作可以分为以下俩种:
已经进行中的操作,后续结果应标记为无效
还没开始的操作,后续不执行
自定义任务类型 AwaitableTask中,添加俩个字段NotExecutable、IsInvalid:
public bool NotExecutable { get; private set; }
public bool IsInvalid { get; private set; } = true;
2. 有返回结果的可等待任务类 AwaitableTask<TResult>:
public class AwaitableTask<TResult> : AwaitableTask
{
private readonly Task<TResult> _task;
public AwaitableTask(Task<TResult> task) : base(task) => _task = task;
#region TaskAwaiter
public new TaskAwaiter GetAwaiter() => new TaskAwaiter(this);
[HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]
public new struct TaskAwaiter : INotifyCompletion
{
private readonly AwaitableTask<TResult> _task;
public TaskAwaiter(AwaitableTask<TResult> awaitableTask) => _task = awaitableTask;
public bool IsCompleted => _task._task.IsCompleted;
public void OnCompleted(Action continuation)
{
var This = this;
_task._task.ContinueWith(t =>
{
if (!This._task.NotExecutable) continuation?.Invoke();
});
}
public TResult GetResult() => _task._task.Result;
}
#endregion
}
添加任务等待器,同步等待结果返回:
public new TaskAwaiter GetAwaiter() => new TaskAwaiter(this);
[HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]
public new struct TaskAwaiter : INotifyCompletion
{
private readonly AwaitableTask<TResult> _task;
public TaskAwaiter(AwaitableTask<TResult> awaitableTask) => _task = awaitableTask;
public bool IsCompleted => _task._task.IsCompleted;
public void OnCompleted(Action continuation)
{
var This = this;
_task._task.ContinueWith(t =>
{
if (!This._task.NotExecutable) continuation?.Invoke();
});
}
public TResult GetResult() => _task._task.Result;
}
异步任务队列
public class AsyncTaskQueue : IDisposable
{
public AsyncTaskQueue()
{
_autoResetEvent = new AutoResetEvent(false);
_thread = new Thread(InternalRunning) { IsBackground = true };
_thread.Start();
}
#region 执行
public async Task<(bool isInvalid, T reslut)> ExecuteAsync<T>(Func<Task<T>> func)
{
var task = GetExecutableTask(func);
var result = await await task;
if (!task.IsInvalid)
{
result = default(T);
}
return (task.IsInvalid, result);
}
public async Task<bool> ExecuteAsync<T>(Func<Task> func)
{
var task = GetExecutableTask(func);
await await task;
return task.IsInvalid;
}
#endregion
#region 添加任务
private AwaitableTask GetExecutableTask(Action action)
{
var awaitableTask = new AwaitableTask(new Task(action));
AddPenddingTaskToQueue(awaitableTask);
return awaitableTask;
}
private AwaitableTask<TResult> GetExecutableTask<TResult>(Func<TResult> function)
{
var awaitableTask = new AwaitableTask<TResult>(new Task<TResult>(function));
AddPenddingTaskToQueue(awaitableTask);
return awaitableTask;
}
private void AddPenddingTaskToQueue(AwaitableTask task)
{
lock (_queue)
{
_queue.Enqueue(task);
_autoResetEvent.Set();
}
}
#endregion
#region 内部运行
private void InternalRunning()
{
while (!_isDisposed)
{
if (_queue.Count == 0)
{
_autoResetEvent.WaitOne();
}
while (TryGetNextTask(out var task))
{
if (task.NotExecutable) continue;
if (UseSingleThread)
{
task.RunSynchronously();
}
else
{
task.Start();
}
}
}
}
private AwaitableTask _lastDoingTask;
private bool TryGetNextTask(out AwaitableTask task)
{
task = null;
while (_queue.Count > 0)
{
if (_queue.TryDequeue(out task) && (!AutoCancelPreviousTask || _queue.Count == 0))
{
_lastDoingTask?.MarkTaskValid();
_lastDoingTask = task;
return true;
}
task.SetNotExecutable();
}
return false;
}
#endregion
#region dispose
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
~AsyncTaskQueue() => Dispose(false);
private void Dispose(bool disposing)
{
if (_isDisposed) return;
if (disposing)
{
_autoResetEvent.Dispose();
}
_thread = null;
_autoResetEvent = null;
_isDisposed = true;
}
#endregion
#region 属性及字段
public bool UseSingleThread { get; set; } = true;
public bool AutoCancelPreviousTask { get; set; } = false;
private bool _isDisposed;
private readonly ConcurrentQueue<AwaitableTask> _queue = new ConcurrentQueue<AwaitableTask>();
private Thread _thread;
private AutoResetEvent _autoResetEvent;
#endregion
添加异步任务队列类,用于任务的管理,如添加、执行、筛选等:
1. 自动取消之前的任务 AutoCancelPreviousTask
内部使用线程,循环获取当前任务列表,如果当前任务被标记NotExecutable不可执行,则跳过。
NotExecutable是何时标记的?
获取任务时,标记所有获取的任务为NotExecutable。直到任务列表中为空,那么只执行最后获取的一个任务。
2. 标记已经进行的任务无效 MarkTaskValid
当前进行的任务,无法中止,那么标记为无效即可。
private AwaitableTask _lastDoingTask;
private bool TryGetNextTask(out AwaitableTask task)
{
task = null;
while (_queue.Count > 0)
{
if (_queue.TryDequeue(out task) && (!AutoCancelPreviousTask || _queue.Count == 0))
{
_lastDoingTask?.MarkTaskValid();
_lastDoingTask = task;
return true;
}
task.SetNotExecutable();
}
return false;
}
后续执行完后,根据此标记,设置操作结果为空。
public async Task<(bool isInvalid, T reslut)> ExecuteAsync<T>(Func<Task<T>> func)
{
var task = GetExecutableTask(func);
var result = await await task;
if (!task.IsInvalid)
{
result = default(T);
}
return (task.IsInvalid, result);
}
实践测试
启动9个并发任务,测试实际的任务队列并发操作管理:
public MainWindow()
{
InitializeComponent();
_asyncTaskQueue = new AsyncTaskQueue
{
AutoCancelPreviousTask = true,
UseSingleThread = true
};
}
private AsyncTaskQueue _asyncTaskQueue;
private void ButtonBase_OnClick(object sender, RoutedEventArgs e)
{
for (var i = 1; i < 10; i++)
{
Test(_asyncTaskQueue, i);
}
}
public static async void Test(AsyncTaskQueue taskQueue, int num)
{
var result = await taskQueue.ExecuteAsync(async () =>
{
Debug.WriteLine("输入:" + num);
await Task.Delay(TimeSpan.FromSeconds(5));
return num * 100;
});
Debug.WriteLine($"{num}输出的:" + result);
}
测试结果如下:

一共9次操作,只有最后一次操作结果,才是有效的。其它8次操作,一次是无效的,7次操作被取消不执行。
固定时间间隔只保留最后操作
实际业务过程中,大部分场景并不是瞬间丢过来N个任务,而是比如100ms内有20个操作触发。
这类高并发操作,不止是上方做一个队列获取首尾任务,还需要加个延时(时间间隔),减少并发操作的执行,可以基于上方TryGetNextTask方法操作:
private async Task<AwaitableTask> TryGetNextTask()
{
if (_queue.TryDequeue(out var task))
{
foreach (var lastDoingTask in _lastDoingTasks)
{
lastDoingTask.MarkTaskInvalid();
}
_lastDoingTasks.Add(task);
if (_queue.Count != 0)
{
task.SetNotExecutable();
}
else
{
await Task.Delay(_delay).ConfigureAwait(false);
if (_queue.Count != 0)
{
task.SetNotExecutable();
}
}
return task;
}
设置延时取任务后,就能实现在固定的时间内最多只会触发一次,并且是最后一次操作。
作者:唐宋元明清2188
出处:http://www.cnblogs.com/kybs0/
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须在文章页面给出原文连接,否则保留追究法律责任的权利。
该文章在 2024/9/5 11:36:52 编辑过