欢迎投稿

今日深度:

借助Redis完成延时任务,

借助Redis完成延时任务,


借助Redis完成延时任务
背景
相信我们或多或少的会遇到类似下面这样的需求:

第三方给了一批数据给我们处理,我们处理好之后就通知他们处理结果。

大概就是下面这个图说的。

本来在处理完数据之后,我们就会马上把处理结果返回给对方,但是对方要求我们处理速度不能过快,要有一种人为处理的效果。

换句话就是说,就算是处理好了,也要晚一点再执行通知操作。

这就是一个典型的延时任务。

延时,那还不简单,执行完之后,让它Sleep一下就好了,这样就达到目标了。

Sleep一下确定是最容易实现的一种方案,但是试想一下,数据的数量不断的增加,这样Sleep真的好吗?答案是否定的。

延时队列,是处理这个场景最为妥当的方案。

RabbitMQ,RocketMQ,Cmq等都可以直接或间接的达到相应的效果。

如果不具备队列条件,又要怎么处理呢?还可以借助Redis来完成这项工作。

MQ不一定每个公司都会用,但Redis应该80%以上的都会用吧。

处理方案
Redis这边,可用的方案有两种,下面分别来介绍一下。

1 键的过期时间

在设置缓存的时候,我们比较多情况下都会设置一个缓存的过期时间,这个时间过期后,会重新去数据源拿数据回来。

可以基于这个过期时间结合Redis的keyspace notifications共同完成。

keyspace notifications里面包含了非常多的事件,这里只关注EXPIRE,这个是和过期有关的。

只要订阅了__keyevent@0__:expired这个主题,当有key过期的时候,就会收到对应的信息。

注:主题@后面的0,指的是db 0.

要想使用这个特性,必不可少的一步是修改Redis默认的配置,把notify-keyspace-events设置成Ex。

Event notification

Redis can notify Pub/Sub clients about events happening in the key space.

This feature is documented at http://redis.io/topics/notifications

.........

By default all notifications are disabled because most users don't need

this feature and the feature has some overhead. Note that if you don't

specify at least one of K or E, no events will be delivered.

notify-keyspace-events "Ex"
其中 E 指的是键事件通知,x 指的是过期事件。

根据这个特性,重新调整一下流程图:

应该也比较好懂,下面通过简单的代码来实现一下这种方案。

首先是处理完数据及往Redis写数据。

public async Task DoTaskAsync()
{

// 数据处理
// ...

// 后续操作要延时,把Id记录下来
var taskId = new Random().Next(1, 10000);
// 要延迟的时间
int sec = new Random().Next(1, 5);

// 可以加个重试机制,预防单次执行失败。
await RedisHelper.SetAsync($"task:{taskId}", "1", sec);

}
还需要回传结果的后台任务,这个任务就是去订阅上面说的键过期事件,然后回传结果。

这里可以借助BackgroundService来订阅处理。

public class SubscribeTaskBgTask : BackgroundService
{

protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
    stoppingToken.ThrowIfCancellationRequested();
    var keyPrefix = "task:";
    RedisHelper.Subscribe(
        ("__keyevent@0__:expired", arg =>
            {
                var msg = arg.Body;
                Console.WriteLine($"recive {msg}");
                if (msg.StartsWith(keyPrefix))
                {
                    // 取到任务Id
                    var val = msg.Substring(keyPrefix.Length);
                    Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} begin to do task {val}");
                    
                    // 回传处理结果给第三方,这里可以考虑这个并发锁,避免多实例都处理了这个任务。
                    // ....
                }
            }
        ));

    return Task.CompletedTask;
}

}
这里有一个要注意的地方,要在key里面包含任务的Id,因为订阅处理的时候,只能拿到一个key,后续能做的操作也只是基于这个key。

上面的例子,是用了task:任务Id的形式,所以在订阅处理的时候,只处理以task:开头的那些key。

效果如下:

这种方案,直观上是非常简单的,不过这种方案会遇到一个小问题。

当一个key过期后,并不一定会马上收到通知,这个也是会有一定的延时的,取决于Redis的内部机制。

Redis Keyspace Notifications文档的最后一段也提到了这个问题。

所以用这种方案的时候,要考虑一下,你的延时是不是要及时~~

2 有序集合

有序集合是Redis中一种十分有用的数据结构,它的本质其实就是集合加了一个排序的功能,每个集合里面的元素还会有一个分值的属性。

它提供了一个可以获取指定分值范围内的元素,这个也就是我们的出发点。

在这个场景下,什么东西可能作为这个分值呢?现在只有一个处理任务的Id还有一个延迟的时间,Id肯定不行,那么也只能是延迟时间来作这个分值了。

延迟1秒,5秒,1分钟,这个都是比较大粒度的时间,这里要转化一下,用时间戳来代替这些延迟的时间。

假设现在的时间戳是 1584171520, 要延迟5秒执行,那么执行任务的时间就是 1584171525,在当前时间戳的基础上加个5秒,就是最终要执行的了。

到时有序集合中存的元素就会是这样的

任务Id-1 1584171525
任务Id-2 1584171528
任务Id-3 1584171530
接下来就是要怎么取出这些任务的问题了!

把当前时间戳当成是取数的最大分值,0作为最小分值,这个时候取出的元素就是应该要执行回传的任务了。

根据这个方案,重新调整一下流程图:

交代清楚了思路,再来点代码,加深一下理解。

首先还是处理完数据后往Redis写数据。

public async Task DoTaskAsync()
{

// 数据处理
// ...

// 后续操作要延时,把Id记录下来
var taskId = new Random().Next(1, 10000);

var cacheKey = "task:delay";
int sec = new Random().Next(1, 5);

// 要执行这个任务的时间戳
var time = DateTimeOffset.Now.AddSeconds(sec).ToUnixTimeSeconds();

await RedisHelper.ZAddAsync(cacheKey, (time, taskId));
Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} done {taskId} here - {sec}");

}
后面就是轮训有序集合里面的元素了,这里同样是借助BackgroundService来处理。

public class SubscribeTaskBgTask : BackgroundService
{

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    stoppingToken.ThrowIfCancellationRequested();
    var cacheKey = "task:delay";
    while (true)
    {
        // 先取,后删,不具备原子性,可考虑用lua脚本来保证原子性。
        var vals = await RedisHelper.ZRangeByScoreAsync(cacheKey, -1, DateTimeOffset.Now.ToUnixTimeSeconds(), 1, 0);

        if (vals != null && vals.Length > 0)
        {
            var val = vals[0];

            var rmCount = await RedisHelper.ZRemAsync(cacheKey, vals);

            if (rmCount > 0)
            {
                // 要把这个元素先删除成功了,再执行任务,不然会重复
                Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} begin to do task {val}");
                
                // 回传处理结果给第三方,这里可以考虑这个并发锁,避免多实例都处理了这个任务。
                // ....
            }
        }
        else
        {
            // 没有数据,休眠500ms,避免CPU空转
            await Task.Delay(500);
        }
    }
}

}
效果如下:

参考文章
https://redis.io/topics/notifications

https://zhuanlan.zhihu.com/p/87113913

如果您认为这篇文章还不错或者有所收获,可以点击右下角的【推荐】按钮,因为你的支持是我继续写作,分享的最大动力!
作者:Catcher ( 黄文清 )
来源:http://catcher1994.cnblogs.com/

www.htsjk.Com true http://www.htsjk.com/redis/42458.html NewsArticle 借助Redis完成延时任务, 借助Redis完成延时任务 背景 相信我们或多或少的会遇到类似下面这样的需求: 第三方给了一批数据给我们处理,我们处理好之后就通知他们处理结果。 大概就...
评论暂时关闭