动态延时任务总结
之所以延时任务就是为了将更新数据库的时间延时到下一个文件片更新之后,然后延时任务刷新,避免反复请求数据库。
背景
文件上传是一个很古老的业务了,业界在设计的时候都会规划三个功能:秒传、分片上传、断点续传。
MD5密码杂凑算法
在介绍这三点之前,需要先了解MD5算法。这是一种密码杂凑算法,对于给定的任何输入,都会输出128位固定长度的16进制数字。既然长度固定,就会有重复的风险。国产算法SM3就是MD5的改良版,它优化了加密流程,也将输出的比特数增加到256位。
MD5重复的问题不是本文的重点,因此在接下来的叙述中不会考虑MD5重复的问题。
针对于MD5算法,我们任意输入文件名不同但文件内容相同的文件,都能得到相同的MD5值。这就是秒传的原理。如果数据库中已经存在客户端传过来的MD5值,那就说明以前有人已经上传过这份文件了,只需要新增一条用户和文件的对应关系即可。可以通俗的理解为:MD5值就是文件的身份证。
表结构设计
很明显,用户和文件之间是多对多的关系。
我思考过两种表结构:
- 只建立用户表和文件表,在文件表里关联用户表主键。
- 建立用户表、文件表、用户文件关联表。
考虑到网盘的场景,文件内容相同的文件虽然文件名不同,但它们实际上还是同一份文件。
采用第一种方案
缺点:
MD5不能做主键,放着天然的主键属性不要,得让业务去生成主键。假如说有一个文件程序员的自我修养.txt
,A用户上传完成后,在文件表增加一条程序员的自我修养.txt->A
的一行数据。B把它更名为自我修养.txt
上传,虽然文件名不同,MD5值却是相同的。但是文件表里肯定要再插入一条数据自我修养.txt->B
的记录。MD5做主键就会冲突。但是这不是什么要紧的事。
优点:
相比于第二种方案,可以少建一张表,且避免了回表查询。
用第二种方案
缺点:
- MD5还是不能做主键。在这种方案里,如果文件表用MD5做主键,同一份文件只有一行数据,A对这个文件的文件名做了修改,在B用户看来,文件名是被同步修改的。B用户肯定会觉得碰上鬼打墙了,啥都没干文件名就被改了。
- 要多建一张表,且会导致回表查询。查询某个用户上传的全部文件。如果是建立关联表的话,得先查关联表,拿到文件ID列表再去做回表查询。但如果是在文件表加用户字段,只需要查询
where userID = xxx
就可以把用户上传的全部文件查出来。
优点:
相比于第一种方案并没有什么优点。
三个功能
- 秒传:发现数据库里已经存在相同MD5值的文件时,去文件表新增一行数据,判定此次上传完毕。
- 分片上传:将文件划分为多个子部分,分别上传每个子部分,是断点续传的实现原理。
- 断点续传:如果上一次上传被中断需要重新上传,不需要从头开始上传,而是从上一次已上传的最新部分开始上传。
总结
虽然由于场景限制导致MD5这一天然的主键属性不能作为主键,这是产品特性决定的,同一份文件可能在不同用户的视角下名字不一样。但是MD5这个属性可以建立非唯一索引。
总体来看,流程比较简单,但是深究起来其中还是有不少可以挖掘的业务问题。
没上传完怎么办?
上传后的文件片怎么处理?
服务端需要将上传的那些文件片按照一定的规则(年-月)保存在临时目录中。出于需要实现断点续传的考虑,临时目录不能立马删掉。得保存一段时间。
- 后续由延时任务将它删除吗?我认为并不需要,临时目录的删除并不需要很高的时间精准度,犯不着用延时任务增加系统复杂度。
- 那把删除临时目录的任务发送到MQ?也没有必要。在MQ不出现消息积压的情况下,几秒钟内消息就会被消费。破坏了断点续传。
我认为最好的办法就是在月初统一删除上一个月的临时目录。比如第一级文件夹是xxxx年,第二级是xx月,里面保存了所有在xx月上传的文件片(
当然根据需要也可以精确到天,增加三级目录xx天)
。虽然会导致极端场景下断点续传失效(月底的晚上开始上传),但没必要为了极端场景做非常多优化。这些优化本身的成本可能比极端场景发生后的损失还大。技术总是服务于业务。(那我现在写的就是极端场景下恶意上传打垮服务器的解决方案,为什么要考虑它的解决方案?不考虑解决方案我怎么写简历?)
有人恶意攻击怎么办
前面说到文件片按照一定的规则(年-月)保存在临时目录中,月初才清理。
且文件片只有在全部上传完毕后才会合并,并更新用户已使用空间。
假如现在硬盘资源有限,有人从月初开始攻击服务器,具体表现为上传文件时不上传完。那么这些上传的文件碎片就会白白占据服务器硬盘资源而上传它们的用户的已使用空间却不会增加。久而久之用户上传的文件片已经远远超过他们的额度,服务器硬盘资源会被打垮。怎么解决?
解决方案
每个文件片上传时都去数据库里增加用户已使用空间吗?这会不会给数据库带来压力呢?数据库是整个服务端的最后方的组件,将数据持久化至硬盘,因此数据库本身的QPS完全比不上缓存。所以不能随随便便就请求数据库。
那就引入缓存?
将用户的已使用空间保存在redis里面?那是不是要考虑和数据库的一致性呢?redis虽然本身有备份机制,**
但他的备份机制并不能保证完全不丢数据。如果需要保证和数据库的一致性,**那和直接更新数据库没有任何区别,因为这些请求全部是写请求。
还有一个问题。用户上传到一半手动取消上传了,从用户角度考虑,用户肯定不希望在查看已使用空间时看到取消上传的文件还占据着已使用空间。这就带来一个新的问题:
服务器视角下的已使用大小是已上传完毕的文件总大小加上未上传完毕的总大小。用户视角下的已使用大小是他已经上传完毕的那些文件的总大小(实际使用中发现百度网盘、夸克网盘等等都是这样做的)。这就需要我们准备两套计算已使用大小的方案。
在用户表里用两个字段分别表示已上传完毕文件总大小以及未上传完毕总大小。前者只在文件片合并和删除文件时更新即可,后者则是每有一个文件片上传都需要更新。
服务器在判断是否超出限额时需要将两者相加。
在 Redis 中记录用户的这两个字段。每个文件片更新时更新未上传完毕总大小。然后保证和数据库的一致性。
但是这样做和直接更新数据库还是没有区别。有多少文件片被上传就要更新多少次数据库。能不能减少更新次数?
注意到即使更新无数次数据库,每一次更新都可以覆盖前一次更新。
举个例子
第一次更新是将未上传完大小100M
加上这次文件片的大小5M
得到105M
;第二次更新是将未上传完大小105M
加上这次文件片的大小5M
得到110M
。更新了两次数据库。如果在第二次更新时加上5M * 2 = 10M
,再把第一次更新删除,也就是100M + 10M = 110M
。这样做只更新了一次数据库。
因此考虑引入延时任务,每个文件片上传后,在上一个延时任务执行之前刷新延时任务的执行时间,将它接着向后延。让最后一次更新覆盖之前所有更新,做到只更新一次数据库。
延时任务
存储延时任务的结构
列表
插入:O(1)
查询:O(logN)
实现:我们可以利用列表去存即将触发的任务信息,通过遍历的方式去取到大于当前时间的任务,并且触发。
优点:实现简单
缺点:但需要对所有任务进行遍历,查出很多无效数据,极其低效。
大顶堆
删除:O(logN)
查询:O(1)
实现:我们也可以利用大顶堆的性质,每次都取堆顶元素,如果堆顶元素大于当前时间,那么就取最大元素。其余元素会利用大顶堆的性质,继续浮出最大的元素,然后继续比较。
优点:查询快,只会查到快到时间的任务,实现简单。
缺点:需要维护自身堆的性质,cpu压力高,无法抗住高并发。
B+树
查询:O(logN)
B+树(B-plus tree)是一种自平衡的树数据结构,它能够保持数据有序,允许插入、删除和查找操作在对数时间内完成。B+树特别适合于磁盘或其他直接存取辅助设备的存储系统,因为它能够最大化地减少I/O操作次数。
跳表
查询:O(logN)
跳表(Skip List)是一种基于有序链表的高效数据结构,它通过在链表的基础上增加多级索引来实现快速的查找操作。跳表允许在对数时间内完成搜索、插入和删除操作,且插入和删除操作不需要频繁调整数据结构。
小总结
总的来说,列表和大顶堆由于自身的性质,并不适合这样的场景。对于扫表+触发的模式,其实本质是需要一个能高速范围查询的数据结构。
B+树和跳表都是高效的能范围查询数据结构,但它们各自适用于不同的场景。B+树更适合于磁盘存储和范围查询,而跳表则更适合于内存中的快速查找和分布式环境。
存储数据库分析
我们举出基于内存的数据库的代表Redis和基于磁盘的数据库进行分析。
Redis VS MySQL
1.Redis的底层是跳表,而MySQL的底层是B+树。就范围查询而言,两者不分伯仲。
2.但Redis没有事务概念,内部实现是单线程,没有锁竞争,再加上IO多路复用的特性和极其高效的数据结构实现,就注定单机qps要远超过mysql。
3.mysql在这个场景下的优势则是有持久化能力,不容易丢数据,Redis可能在RDB和AOF的过程中有丢数据的可能性。
因此,mysql和redis都有可能是作为存储任务的数据库,需要区分场景。
综合考虑下我选用Redis。虽然Redis有丢数据的风险无法完美保证延时任务不丢,但是在本文中描述的延时任务是不断更新的,是动态的。比如某条数据现在是10M,然后执行RDB备份。再然后更新延时任务变成11M,假设此时断电宕机,11M这个数据没保存下来,但是10M这个数据已经保存了啊。虽然丢了一点数据,但是不多。系统停机维护时把延时任务和临时目录全删了,再把数据库中记录用户未上传完大小的这个字段置为0。一切又回到最初的起点。
问题又来了,既然这里可以接受丢数据,那为什么不把未上传完大小这个字段放在 redis ,然后不保证和 mysql 的一致性?或者说只有
redis 里放未上传完大小,mysql 表字段里只有用户已上传完大小。都是相同的丢数据风险。为什么这里可以接受上面不可以接受?其实这个需求就不是个合理的需求。只是我需要通过这个需求找到一个可以写简历的亮点。恰好对任务调度比较熟悉,所以需要通过这个不合理的需求引入任务调度。
如果用mysql存储动态延时任务,那就要经常更新mysql。其实也可以。但是mysql扛不住比较大的QPS,速度不如redis。综合考虑我还是用redis存延时任务。
缓存一致性问题
只要涉及到了缓存和数据库就一定会有缓存一致性问题,延时任务也不例外。
这里用的策略是先更新缓存,再更新数据库。
数据库的更新取决于延时任务被执行的时间。
通过延时任务的执行来保证 redis 和 mysql 的最终一致性。
延时任务的具体思路
文件片被上传时,用一个redis结构保存延时任务。
1 | class Member { |
定义zset中的member
zset 中 member 的整体结构设计。
同一个zset桶中,同一个用户的不同延时任务的时间戳可能相同,这需要member中保存userID和md5作为辨别。
对上图的更正:
- HashMap结构中的V保存时间戳与userID的或运算结果与Zset桶的编号
- zset中的score保存时间戳与UserId的或运算结果。因为在某个zset桶中,不同用户上传的文件片延时任务的时间戳可能相同。但如果只是时间戳+userID的话太长了,不能作为 score。因此改为或运算结果(时间戳是Long类型,占低41位,高23位是0。userID的高23位有1有0,也就是说高23位的或运算结果就是userid的高23位。在时间戳相同的情况下,区分度完全由Userid决定,这就避免了不同用户上传的文件片的延时任务在zset桶中具有相关的score)。同时对于单个zset而言,score有不有序不重要,重要的是能通过score定义到唯一的member。但是这也意味着某个zset桶中可能存在同一个用户的不同延时任务。
再次更正:
既然 score 用 timestamp 与 userID 的或运算结果,而且区分度取决于 userID。那为什么不直接用 userID 作为 score?
最终结果:
- HashMap结构中的V保存Zset桶的编号
- zset 桶中的score 是 userID
1 | create table user_info |
用户表相关字段
某文件第一个文件分片上传完后,创建出一个任务。根据当前时间戳 ,得到目标 zset 桶编号(如23-54-1), 以 userID 作为 score,任务作为member
加入到zset
中。再用一个 map 记录md5+userid 和zset 桶编号(23-54-1)的映射。
后续的文件分片上传时,因为任务需要更新,。所以需要设计了以下几个步骤:
- 从 hashmap 中根据 userID+md5 拿到zset 桶编号。
- 从 zset 桶中用
ZRANGEBYSOCRE
取出这个 userID 对应的任务列表(都是这个用户上传的文件片的任务且还没有被调度),并从中找到当前 md5 对应的任务。如果找不到说明任务被调度了,创建新任务。 - 更新任务信息
- 根据当前 timestamp 算出存储任务的桶,如果还是这个桶,就通过
ZADD
命令把任务列表放回桶(23-54-1) ,跳过第五步;如果不是,就计算出新的桶编号,如:(23-59-3),并执行第五步。 - 将任务列表中的目标任务删除,任务列表重新插入 zset 桶(23-54-1),目标任务加入到 zset 桶(23-59-4)中。
- 更新HashMap中记录的时间戳与桶编号
第5,6步中对 redis 的操作不是原子性的,可能会有这样的问题吗?
- 调度器先扫描了 zset 桶(23-54-1),然后任务列表才被重新放回桶中,导致任务要晚一天执行。
并不会有,延时任务在业务的设定下是延时23个小时。调度器调度的任务是23个小时之前的,不会调度现在新增的任务
为什么 不用 文件id作为key呢?
主要是考虑到在文件被完全上传完之前,确定id没有意义。除非客户端在上传一个文件时携带id,且保证发生断点续传时,第二次上传携带的id和第一次相同。但是满足这个条件的id的生成规则肯定和雪花算法不同,因为雪花算法生成的id和时间有关,但当前场景下需要时间无关。注意。这里的意思并不是说一个文件被重复上传时客户端生成的id前后都一样,而是在一次上传流程中,如果因为应用重启等原因出现了中断,中断后的上传请求中的文件id和中断前保持相同。重复上传时的id则必须不同。
文件只在完全被上传后才会为它生成 id,否则用 md5+userid表示
而且这篇文章是针对于服务端的,我假设客户端上传时不携带id信息。另外,我认为服务端一个重要的原则就是不要相信客户端传来的数据,能做检验的数据一定要做检验。客户端传来的数据都是可以被篡改的。https虽然安全,但如果出现了中间服务器,https报文就变成明文了。
后续文件片上传时,去 map 里根据md5+userid 拿到保存任务的 zset 编号,根据当前时间所属分钟 + 5s 是否等于该 zset 所属的分钟得到
boolean 类型变量 x。 去这个 zset 里根据md5+userid 找到这个文件的任务。如果 x == true, 修改该任务。否则将该任务删除,移至下一分钟的
zset。
当所有文件片正常上传完触发文件片合并,然后数据库更新use_space_finished
字段。
之后根据map
通过用户id + 文件md5值
获取 zset 编号,获取该zset
中对应任务的unfinishedDBSize
和unfinishedSize
。
执行use_space_unfinished
= use_space_unfinished
- unfinishedDBSize
- unfinishedSize
。这是补偿用户的操作。
因为延时任务肯定会被执行的(use_space_unfinished
= use_space_unfinished
+ unfinishedDBSize
),先减unfinishedDBSize
和后减unfinishedDBSize
没区别。至于unfinishedSize
?只要这个文件的延时任务没有被执行过,它就是0。即使执行过也没关系,见3.4 考虑断点续传,用户只上传了一部分就宕机了,然后恢复机器继续上传。但是此时这个文件在zset里的任务已经更新到数据库了。后续的一部分文件上传是什么流程?
(延时任务能不能成功执行至关重要!!!最终一致性全靠它保证!!!)
总结
一个文件片上传完后,有六个步骤:
- 如果没有这个文件 md5 + userid 的临时目录,那么创建任务,放入当前实际的 Zset,在 map 里记录 Zset 和 MD5+
- 后续的文件片上传根据
文件md5值+用户id
去map
里找到保存上一个文件片延时任务的 zset,在根据 md5 + userid 去 Zset
里找到延时任务 - 修改任务,根据当前时间 + 5s 决定任务修改后是否移至另一 Zset 。是考虑到调度器的调度时机。
- 写
map
,更新文件md5值+用户id
对应的 Zset 编号。 - 执行延时任务,
use_space_unfinished
=use_space_unfinished
+unfinishedDBSize
- 文件上传完成,
use_space_unfinished
=use_space_unfinished
-unfinishedDBSize
-unfinishedSize
第五步和第六步的执行顺序没有要求!谁先执行都可以!
延时任务执行调度
当任务量很大时,任务的执行调度也需要花心思设计。
调度器扫描
集群模式下的调度器每分钟执行一次,扫描上一个小时的该分钟内所有的 zset 桶。注意由于是集群模式,需要先获得目标桶的锁才能扫描,抢锁用 redission 看门狗实现。如果调度器执行过程中挂了,里面这些任务就得晚一天执行了。
如果某个调度器获得了一个桶的锁却挂了,有没有一种核验机制能够确定这一分钟内的桶哪些被扫描执行了哪些没被扫描执行呢?
有,另起一个定时任务,检查上一分钟的那些桶的里有没有数据。如果有,调度一遍。
原因见下文。刚被调度了的桶再想有数据新增,得等 1 个小时。
所以上一分钟的那些桶里如果还有数据,一定是调度器有问题,得看日志排查。
任务扫描这里有个问题:扫描出来的任务在下发后要从桶中删掉,桶里新增的任务也会被删,怎么办?
由于任务调度的那些桶对应的时间比当前时间早一个小时,新增的任务都放在当前时间的那些桶里了,当前扫描的桶里的任务都是 23 小时前增加的。
比如现在时间是 2023-12-21 20:40, 某个调度器扫描了(21-40-1)这个桶,这个桶里的任务都是2023-12-20 21:40 放进去的,现在新增的任务进的桶是(20-40-1) - (20-40-n)。
(21-40-1)这个桶再想有数据新增,得等 1 个小时。
除非生产者的机器时钟有问题,否则绝对不会有数据新增。
这也就意味着一天 24 个小时,每时每刻 23 个小时里的桶有数据,1 个小时里的桶没数据。如果不能理解可以这样理解:24 个小时都没数据,零点开始运行,第零个小时里的桶有数据。扫描器扫描的是第一个小时,无数据………到了 23 点,第 23 个小时的桶开始有数据,扫描器开始扫描第零个小时的桶,有数据。
当然延时 23 个小时有点太长了,也可以根据真实业务场景用分钟代替小时,每 23 分钟一循环。
任务下发
扫描出来的任务通过带回调函数的 send 方法发给 Kafka,配置好 ack。所有任务都发给 Kafka 后,删除这个桶里全部数据。
有些任务实在发不出去,也不能留在桶里,因为调度器只要正常调度了,桶里的数据就要删掉,Kafka 的问题是 Kafka 的问题,和调度器没关系。
发不出去的任务在回调函数中设置存入数据库里的死信表里等运维人员处理。
如果任务发不出去,数据库也挂了怎么办?
消息发送前先写日志。数据库挂了先恢复,再根据日志里的记录和表中数据比对,比对不上的做数据恢复。
任务调度的更多细节具体参考
一些A&&Q
延时任务重复问题
一个文件多个分片上传,会造成该文件有多个延时任务。也就说一个分片一个延时任务。如何解决重复的问题?
只有第一个分片会创建延时任务,后续文件片根据 MD5 + userid
会找到延时任务删除、修改、新增。
用户开多个客户端同时上传,怎么保证延时任务更新正确?
zset
中的延时任务只和某一个文件有关。开多个客户端上传多个文件,就会出现多个延时任务,彼此互不干扰,并不会有什么问题。
开多个客户端上传同一份文件呢?
那就让他传。本质上和开多个客户端上传多个文件没有区别。只是服务器上会出现两份相同的文件。
而且这种情况出现概率非常小。
用户开多个客户端同时上传,怎么保证不会超过限额?
网盘产品里,用户一但登录了,大概率会开第二个客户端也登录,比如手机登录了,上传一个文件,网页接着登录,下载该文件。这期间,用户可能会多次刷新网页或者打开网盘的别的页面。而每个页面都需要显示用户已使用空间和总空间。
于是可以考虑在用户登录后缓存一些数据。在 Redis 中保存每个用户的总空间total_Space
,已上传完毕大小finishedFileSize
和未上传完毕大小unfinishedFileSize
。
文件片上传前,判断下unfinishedFileSize
+ finishedFileSize
是否大于total_Space
。如果大于,就返回错误信息,直接告诉用户别的客户端上传的文件有点大,超过了限额,当前文件不能上传。
如果上面的校验没问题,那就让unfinishedFileSize
加上文件总大小。
文件上传完毕,从unfinishedFileSize
中减去该文件的大小。
显示给用户看的已使用空间就是finishedFileSize
。只把finishedFileSize
返回即可。
考虑断点续传,用户只上传了一部分就宕机了,然后恢复机器继续上传。但是此时这个文件在zset里的任务已经更新到数据库了。后续的一部分文件上传是什么流程?
举个场景例子并给出解决方案:
某用户在数据库中use_space_unfinished
字段是0。zset
里unfinishedDBSize
字段是10M。
然后用户宕机,延时任务没有被刷新而是被触发,导致用户在数据库中的use_space_unfinished
= use_space_unfinished
+ 10M
= 0M
+ 10M
= 10M
。任务触发后被删除。
用户重启客户端发生断点续传,下一个大小为1M
的文件片被上传了。于是写zset
,新增一个延时任务,unfinishedDBSize
字段是1M
。
假如这个大小为1M
的文件片是最后一个文件片,上传完就触发文件片合并流程。在合并的流程中,计算出真实合并了11M
大小的文件片,于是修改use_space_unfinished
字段,use_space_unfinished
= use_space_unfinished
- 11M
= -1M
。
但是延时任务还没执行,等到延时任务执行后,use_space_unfinished
= use_space_unfinished
+ 1M
= -1M
+ 1M
= 0M
。保证了最终一致性。
为什么不用md5+userid 作为文件表主键呢?
因为用户可以一份文件上传多次,会出现主键重复问题
为什么延时任务不设计成一个用户一个任务而是一个文件一个任务?
一个用户一个任务是更优的方案,可以显著降低任务的数量。这是一个优化点。
在一个用户一个客户端的情况下,一个用户一个任务和一个文件一个任务是一样的。
一个用户开多个客户端上传或者单客户端并行上传多文件,可以降低任务的数量。