Skip to main content

26.3 任务队列

📝 模块更新日志
  • 新特性
    •   任务队列支持配置延迟队列是否立即执行一次 4.9.1.56 ⏱️2024.03.21 #I9AF54
    •   任务队列支持配置特定的任务采用并行还是串行的执行方式 4.9.1.38 ⏱️2024.03.07 a732c72
    •   任务队列可传入自定义任务 TaskId 4.9.1.38 ⏱️2024.03.07 a732c72
    •   任务队列支持分配任务 TaskId,管道分类 ChannelOnExecuted 事件订阅 4.9.1.28 ⏱️2024.01.31 1c27434
    •   任务队列支持配置重试次数、重试间隔 4.9.1.24 ⏱️2023.01.18 1c27434
    •   任务队列支持配置 Concurrent 串行执行 4.9.1.22 ⏱️2024.01.14 #I8VXFV
    •   Enqueue/EnqueueAsync 支持 Cron 表达式 实例重载方法 4.8.4.10 ⏱️2023.01.09 #I69HM4
  • 突破性变化
    •   任务入队返回值类型,由 Guid 改为 object 类型 4.9.1.38 ⏱️2024.03.07 a732c72
  • 问题修复
    •   任务队列在主动停止主机时出现意外异常 4.9.7.83 ⏱️2025.06.10 74ed091
    •   任务队列在个别情况下出现出队同步阻塞问题 4.9.2.43 ⏱️2024.05.08 f595b47
    •   任务队列订阅任务执行结果处理程序中出现异常后重复触发问题 4.9.1.35 ⏱️2024.02.27 fa81c54
  • 其他更改
    •   任务队列管道容量,从 3000 增加到 12000,提升消费吞吐量 4.9.4.1 ⏱️2024.06.17 4d9feb3
    •   任务队列 concurrent 类型定义,由 object 调整为 bool? 4.9.1.57 ⏱️2024.03.22 cebb48d
版本说明

以下内容仅限 Furion 4.8.3 + 版本使用。

小知识

任务队列可取代旧版本定时任务的 SpareTime.DoIt()SpareTime.DoOnce 功能。

26.3.1 关于任务队列

任务队列常用于管理后台工作,通常这些后台工作在主线程响应之外,不会对主线程或当前线程响应阻塞。任务队列的一个显著特定就是它是一个队列,入队的顺序决定它出队执行的先后。

任务队列使用 Channel + Task + ThreadPool(线程池) 实现,入队/出队速度非常快,吞吐量极高,内存和 CPU 占用几乎忽略不计。

任务队列应用场景:对于可能需长时间运行的任务,或不是那么及时的需要立即反馈的任务。 比如发送邮件,发送短信等等。

26.3.2 与事件总线的区别

事件总线基于消息通讯,任务队列最显著的特点就是将操作依次加入队列,然后按照入队的顺序出队去执行。

前者(事件总线)是无序的,只有完全匹配的消息 Id 才会触发执行操作,否则处于 “静待” 状态。

而后者(任务队列)则是将可能耗时且一定会执行的操作放到队列中,之后依次出队执行

26.3.3 快速入门

任务队列使用非常简单,只需要注册 services.AddTaskQueue() 服务,之后通过依赖注入 ITaskQueue 服务或通过 TaskQueued 静态类使用即可,

1. 注册 TaskQueue 服务

services.AddTaskQueue();

2. 使用 ITaskQueue 服务

using Furion.TaskQueue;

namespace Your.Application;

public class YourService : IYourService
{
private readonly ITaskQueue _taskQueue;
public YourService(ITaskQueue taskQueue)
{
_taskQueue = taskQueue;
}

/// <summary>
/// 同步入队
/// </summary>
public void SyncTask()
{
_taskQueue.Enqueue(provider =>
{
Console.WriteLine("我是同步的");
});

// 如无需使用 provider 参数,可用 _ 替代
_taskQueue.Enqueue(_ => {});
}

/// <summary>
/// 同步入队,延迟 3 秒触发
/// </summary>
public void SyncTask2()
{
_taskQueue.Enqueue(provider =>
{
Console.WriteLine("我是同步的,但我延迟了 3 秒");
}, task => task.WithDelay(3000));

// 如无需使用 provider 参数,可用 _ 替代
_taskQueue.Enqueue(_ => {}, task => task.WithDelay(3000));
}

/// <summary>
/// 异步入队
/// </summary>
public async Task AsyncTask()
{
await _taskQueue.EnqueueAsync(async (provider, token) =>
{
Console.WriteLine("我是异步的");
await ValueTask.CompletedTask;
});

// 如无需使用 provider 和 token 参数,可用 _ 替代
await _taskQueue.EnqueueAsync(async (_, _) => {});
}

/// <summary>
/// 异步入队,延迟 3 秒触发
/// </summary>
public async Task AsyncTask2()
{
await _taskQueue.EnqueueAsync(async (provider, token) =>
{
Console.WriteLine("我是异步的,但我延迟了 3 秒");
await ValueTask.CompletedTask;
}, task => task.WithDelay(3000));

// 如无需使用 provider 和 token 参数,可用 _ 替代
await _taskQueue.EnqueueAsync(async (_, _) => {}, task => task.WithDelay(3000));
}

/// <summary>
/// 同步入队,支持 Cron 表达式延迟
/// </summary>
public void SyncTask3()
{
_taskQueue.Enqueue(provider =>
{
Console.WriteLine("Cron ...");
}, task => task.WithDelay("* * * * *"));

// 如无需使用 provider 参数,可用 _ 替代
_taskQueue.Enqueue(_ => {}, task => task.WithDelay("* * * * *", CronStringFormat.Default));
}

/// <summary>
/// 异步入队,支持 Cron 表达式延迟
/// </summary>
public async Task AsyncTask3()
{
await _taskQueue.EnqueueAsync(async (provider, token) =>
{
Console.WriteLine("Cron ...");
await ValueTask.CompletedTask;
}, task => task.WithDelay("* * * * *"));

// 如无需使用 provider 和 token 参数,可用 _ 替代
await _taskQueue.EnqueueAsync(async (_, _) => {}, task => task.WithDelay("* * * * *", CronStringFormat.Default));
}

/// <summary>
/// 同步入队,延迟 3 秒触发,并立即执行一次(Furion 4.9.1.56+ 版本支持)
/// </summary>
public void SyncTask3()
{
_taskQueue.Enqueue(provider =>
{
Console.WriteLine("我是同步的,但我延迟了 3 秒");
}, task => task.WithDelay(3000).WithRunOnceIfDelaySet(true));
}

/// <summary>
/// 异步入队,延迟 3 秒触发,并立即执行一次(Furion 4.9.1.56+ 版本支持)
/// </summary>
public async Task AsyncTask3()
{
await _taskQueue.EnqueueAsync(async (_, _) =>
{
Console.WriteLine("我是异步的,但我延迟了 3 秒");
await ValueTask.CompletedTask;
}, task => task.WithDelay(3000).WithRunOnceIfDelaySet(true));
}
}
注意事项

框架内置了一套简单的错误策略机制,也就是如果任务执行失败会默认重试 3 次,每次间隔 1秒,该策略配置暂不对外公开(Furion 4.9.1.24+ 版本已提供配置)。

26.3.4 TaskQueued 静态类

框架还提供了 TaskQueued 静态类可在任何线程中操作,如:

// 同步入队
TaskQueued.Enqueue((provider) => {}, [configure]);

// 异步入队
await TaskQueued.EnqueueAsync(async (provider, token) => {}, [configure]);

26.3.5 在处理程序中使用服务

如果在任务队列处理程序中使用了外部的服务,如:

public class YourService : IYourService
{
private readonly ITaskQueue _taskQueue;
private readonly ILogger<YourService> _logger

public YourService(ITaskQueue taskQueue
, ILogger<YourService> logger)
{
_taskQueue = taskQueue;
_logger = logger;
}

public void SyncTask()
{
_taskQueue.Enqueue(provider =>
{
_logger.LogInformation("我使用了外部的 logger");
});
}
}

那么需要注意的是,如果使用的外部服务是 单例 服务,那么无需任何处理,但如果使用的服务属于 瞬时范围 作用域,那么需要创建作用域,如:

_taskQueue.Enqueue(provider =>
{
// Repository 注册为范围,需创建作用域
using var scoped = provider.CreateScope();
var repository = scoped.ServiceProvider.GetService<IRepository<User>>();

// Logger 注册为单例,可以直接使用
_logger.LogInformation("我使用了外部的 logger");
});

26.3.6 订阅执行任务意外异常

任务处理程序使用的是 Task 对象进行创建并执行,但可能存在一些意外且难以捕获的异常,这时候可以通过以下方式订阅:

services.AddTaskQueue(builder =>
{
// 订阅 TaskQueue 意外未捕获异常
builder.UnobservedTaskExceptionHandler = (obj, args) =>
{
// ....
};
});

26.3.7 并行与串行

版本说明

以下内容仅限 Furion 4.9.1.22 + 版本使用。

默认情况下,为了提升队列出队吞吐量,队列出队默认采用并行的方式执行,不会依次进行出队。如需配置依次出队(串行)方式,可通全局设置 Concurrent 属性:

services.AddTaskQueue(builder =>
{
builder.Concurrent = false; // 串行执行
});

Furion 4.9.1.38+ 版本开始,可以进行局部配置任务采用 并行 还是 串行 的执行方式,如:

// 不设置,默认采用全局的配置
await _taskQueue.EnqueueAsync(async (provider, token) =>
{
// ...
}, task => task.WithDelay(3000));

// 设置并行执行
await _taskQueue.EnqueueAsync(async (provider, token) =>
{
// ...
}, task => task.WithDelay(3000).WithConcurrent(true));

// 设置串行执行
await _taskQueue.EnqueueAsync(async (provider, token) =>
{
// ...
}, task => task.WithDelay(3000).WithConcurrent(false));

26.3.8 任务标识 TaskId

版本说明

以下内容仅限 Furion 4.9.1.28 + 版本使用。

Furion 4.9.1.28 之前的版本,任务入队是没有返回值的,在之后的版本任务入队后会返回任务的唯一 TaskId,默认值是 Guid.NewGuid()

// Furion 4.9.1.28-(无返回值)
await _taskQueue.EnqueueAsync(async (provider, token) =>
{
// ...
});

// Furion 4.9.1.28+(返回任务唯一 ID)
var taskId = await _taskQueue.EnqueueAsync(async (provider, token) =>
{
// ...
});

Furion 4.9.1.38+ 版本支持设置自定义 TaskId,如:

var taskId = await _taskQueue.EnqueueAsync(async (provider, token) =>
{
// ...
}, task => task.WithTaskId("唯一 id")); // object 类型

26.3.9 任务管道 Channel

版本说明

以下内容仅限 Furion 4.9.1.28 + 版本使用。

Furion 4.9.1.28+ 版本,任务入队支持设置 管道/批次,通过 .WithChannel(channel) 设置,后续可以根据这个 channel 进行任务执行结果订阅进而触发回调操作。

var taskId = await _taskQueue.EnqueueAsync(async (provider, token) =>
{
// ...
}, task => task.WithChannel("批次1"));

26.3.10 异常重试配置

版本说明

以下内容仅限 Furion 4.9.1.24 + 版本使用。

Furion 4.9.1.24 之前的版本并未公开任务队列执行异常后重试配置,该版本之后可自定义配置:

services.AddTaskQueue(builder =>
{
builder.NumRetries = 3; // 默认重试 3 次
builder.RetryTimeout = 1000; // 每次重试间隔 1000ms
});

26.3.11 任务执行结果订阅

版本说明

以下内容仅限 Furion 4.9.1.28 + 版本使用。

Furion 4.9.1.28+ 版本,新增了 OnExecuted 事件订阅器,可以订阅事件执行成功还是失败,如:

// 订阅任务事件
_taskQueue.OnExecuted += (sender, args) =>
{
Console.WriteLine($"任务 {args.TaskId} 管道 {args.Channel},执行结果:{args.Status},异常:{args.Exception}");
}

26.3.11.1 重复订阅问题

由于 ITaskQueue 注册为单例,所以如果在非单例服务或者特定代码范围内订阅,请务必订阅处理之后移除事件,避免重复多播订阅(除非需要多播订阅)

// 订阅局部或类全局事件方法
void TaskSubscribe(object sender, TaskHandlerEventArgs args)
{
Console.WriteLine($"任务 {args.TaskId} 管道 {args.Channel},执行结果:{args.Status},异常:{args.Exception}");
}

// 订阅任务事件
_taskQueue.OnExecuted += TaskSubscribe;

// 其他操作。。。。

// 移除订阅事件
_taskQueue.OnExecuted -= TaskSubscribe;

如果是在类构造函数中订阅,那么需要实现 IDisposable 接口并在其中移除订阅。

public class SomeService : ISomeService, IScoped, IDisposable
{
private readonly ITaskQueue _taskQueue;
public SomeService(ITaskQueue taskQueue)
{
_taskQueue = taskQueue;

_taskQueue.OnExecuted += Subscribe;
}

void Subscribe(object sender, TaskHandlerEventArgs args)
{
Console.WriteLine($"任务 {args.TaskId} 管道 {args.Channel},执行结果:{args.Status},异常:{args.Exception}");
}

public void Dispose()
{
_taskQueue.OnExecuted -= Subscribe;
}
}

26.3.11.2 订阅最佳推荐

  • Startup.cs 中订阅

由于在非单例的服务中存在重复订阅的问题,所以推荐在 Startup.csConfigure 中统一订阅,如:

Startup.cs
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
// 解析任务队列服务
var taskQueue = app.ApplicationServices.GetRequiredService<ITaskQueue>();

// 订阅
taskQueue.OnExecuted += (sender, args) =>
{
Console.WriteLine($"任务 {args.TaskId} 管道 {args.Channel},执行结果:{args.Status},异常:{args.Exception}");
};

// ....
}
  • 在单例服务中订阅
public class SomeService : ISomeService, ISingleton
{
private readonly ITaskQueue _taskQueue;
public SomeService(ITaskQueue taskQueue)
{
_taskQueue = taskQueue;

_taskQueue.OnExecuted += (sender, args) =>
{
Console.WriteLine($"任务 {args.TaskId} 管道 {args.Channel},执行结果:{args.Status},异常:{args.Exception}");
};
}

// ...
}

之后再将 ISomeService 注入到其他类中使用即可。

26.3.12 反馈与建议

与我们交流

给 Furion 提 Issue