对象存储由于其价格低廉、结构简单易用、可扩展性强等特点被大量应用,几乎已经成为云存储的代名词,例如AWS S3,Google Cloud、阿里云OSS等。
与传统的文件存储不同,对象存储是一种基于HTTP的存储服务,需要通过例如GetObject
、ListObjects
、PutObject
、DeleteObject
、HeadObject
等接口完成类似文件系统read
,write
,readdir
,getattr
的操作。而HTTP接口的网络时延很大,一般在几十到几百毫秒之间,所以其存储服务是一种带宽型的业务。
这其中,ListObject
是一种可以列举Bucket中对象的接口,类似文件系统的readdir
:
虽然在通常的业务中不常用,但是在例如对象存储文件网关、或者GCSFuse、Mountpoint这种对象存储POSIX客户端中被大量使用。在AI场景中也有大量的遍历数据集文件的操作(例如把数据从对象存储拉到例如高性能的并行文件系统中做训练/推理),ListObject速度就成为一个问题:
例如一个ListObject请求是100ms,配置每次请求的对象个数为最大1000,对于一个1亿文件的Bucket,就需要166min,大概2.7个小时
这还仅仅是遍历一遍的接口用时,如果加上其他的业务逻辑处理,那时间就更多了。
任务并行化
一个很简单的想法是将任务并行化,正如AWS这个博客中提到的:五行俱下 – 如何在短时间里遍历 Amazon S3 亿级对象桶(原理篇) | 亚马逊AWS官方博客 ,具体的工具代码库在:GitHub - aws-samples/s3-fast-list: Concurrently list Amazon S3 bucket
其基本思想是将整个 S3 存储桶的 Object Key 空间切分,然后用并发的任务取List每个部分。对于一个良好组织的Bucket,这当然很容易。但很多时候遇到的情况是,Bucket过于庞大和复杂,根本无法很清楚地了解下面Object Key是如何分布的,特定prefix下面有多少对象。
启发式的并发List算法
针对这一个问题,Google Cloud Storage提出了一个启发式的方法,在牺牲一些冗余的API请求的前提下实现了更好的并发List。
具体的代码仓库是:GoogleCloudPlatform/dataflux-client-python,是Google Pytorch数据集的客户端库,可以快速列出和下载GCS中的数据,主要用于机器学习应用。根据他们的评测结果,针对海量对象的Bucket,该算法能够大大提升List速度,且使用时无需针对存储桶的Object Key进行手动空间切分:
File Count | VM Core Count | List Time Without Dataflux | List Time With Dataflux |
---|---|---|---|
17944239 Obj | 48 Core | 1630.75s | 79.55s |
5000000 Obj | 48 Core | 289.95s | 23.43s |
1999002 Obj | 48 Core | 117.61s | 12.45s |
578411 Obj | 48 Core | 30.70s | 9.39s |
10013 Obj | 48 Core | 2.35s | 6.06s |
ListWorker实例
其基本实现是利用Python多进程机制创建多个ListWorker
实例执行任务:
1 |
|
而每个ListWoker
定义如下:
1 | class ListWorker(object): |
使用各种队列来协调worker之间的任务分配和状态同步,完成当前任务后,它会尝试从队列中窃取(steal) 新的任务:
- idle队列:等待需要steal一些工作来做的队列
- unidle队列:如果worker成功steal了,就推送到unidle队列
- heartbeat队列:正常工作队列
- direct_work_available:可用的steal任务队列
- send_work_stealing_needed:需要更多的工作的队列
start_range
,end_range
是范围切分的首尾。
ListWorker流程
ListWorker工作的主要流程是:
- 执行ListObject操作,个数为max_results(默认5000)
- 判断返回结果是否达到max_results,并记录下来
- 如果没达到(==说明已经到头了,不用继续List了,本次任务可以结束了==),则继续从别的worker那里steal任务来做
- 如果达到max_results了(==说明还可以继续List==),且有别的worker可以执行steal任务,则继续划分剩余空间,第一部分还是自己干,剩下的生成steal任务交给别人干
核心代码如下:
1 | while True: |
wait_for_work的处理逻辑
:当走到这里时,说明这个worker
自己的任务已经做完了,可以steal一些任务来做了:
1 | def wait_for_work(self) -> bool: |
一言以蔽之,算法的核心就是生成Worker放到多个Process中去并行执行List操作,通过不断动态拆分Object Key空间来生成不同的steal任务给worker去steal。
Object Key Range划分
根据上一节的分析,我们可以发现算法中很重要的部分就是Object Key空间的动态划分,其核心逻辑是:
- 初始化字母表集合
- 根据List结果中的
start_range
和end_range
重新构建字母表集合,并将字符串映射为int
,便于分割 - 根据数字的起始位置划分范围,并划分为不同的分割点
- 将分割点转化成字符串
核心代码如下:
1 | def split_range( |
这里需要注意的是,开始时的start_range
和end_range
可以都为””,然后根据List的结果动态更新字母表,产生新的分割点自动赋值给新的start_range
和end_range
。如此,可以将整个Object Key空间不断地动态划分,并交给不同的ListWorker执行List任务,且自动产生steal任务以供空闲的worker执行。