0%

如何快速遍历对象存储Bucket中的海量对象

对象存储由于其价格低廉、结构简单易用、可扩展性强等特点被大量应用,几乎已经成为云存储的代名词,例如AWS S3,Google Cloud、阿里云OSS等。

与传统的文件存储不同,对象存储是一种基于HTTP的存储服务,需要通过例如GetObjectListObjectsPutObjectDeleteObjectHeadObject 等接口完成类似文件系统read,write,readdir,getattr的操作。而HTTP接口的网络时延很大,一般在几十到几百毫秒之间,所以其存储服务是一种带宽型的业务。

这其中,ListObject是一种可以列举Bucket中对象的接口,类似文件系统的readdir

虽然在通常的业务中不常用,但是在例如对象存储文件网关、或者GCSFuse、Mountpoint这种对象存储POSIX客户端中被大量使用。在AI场景中也有大量的遍历数据集文件的操作(例如把数据从对象存储拉到例如高性能的并行文件系统中做训练/推理),ListObject速度就成为一个问题:

例如一个ListObject请求是100ms,配置每次请求的对象个数为最大1000,对于一个1亿文件的Bucket,就需要166min,大概2.7个小时

这还仅仅是遍历一遍的接口用时,如果加上其他的业务逻辑处理,那时间就更多了。

任务并行化

一个很简单的想法是将任务并行化,正如AWS这个博客中提到的:五行俱下 – 如何在短时间里遍历 Amazon S3 亿级对象桶(原理篇) | 亚马逊AWS官方博客 ,具体的工具代码库在:GitHub - aws-samples/s3-fast-list: Concurrently list Amazon S3 bucket

其基本思想是将整个 S3 存储桶的 Object Key 空间切分,然后用并发的任务取List每个部分。对于一个良好组织的Bucket,这当然很容易。但很多时候遇到的情况是,Bucket过于庞大和复杂,根本无法很清楚地了解下面Object Key是如何分布的,特定prefix下面有多少对象。

启发式的并发List算法

针对这一个问题,Google Cloud Storage提出了一个启发式的方法,在牺牲一些冗余的API请求的前提下实现了更好的并发List。

具体的代码仓库是:GoogleCloudPlatform/dataflux-client-python,是Google Pytorch数据集的客户端库,可以快速列出和下载GCS中的数据,主要用于机器学习应用。根据他们的评测结果,针对海量对象的Bucket,该算法能够大大提升List速度,且使用时无需针对存储桶的Object Key进行手动空间切分:

File Count VM Core Count List Time Without Dataflux List Time With Dataflux
17944239 Obj 48 Core 1630.75s 79.55s
5000000 Obj 48 Core 289.95s 23.43s
1999002 Obj 48 Core 117.61s 12.45s
578411 Obj 48 Core 30.70s 9.39s
10013 Obj 48 Core 2.35s 6.06s

ListWorker实例

其基本实现是利用Python多进程机制创建多个ListWorker实例执行任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14

class ListingController(object):
...
def run(self) -> list[tuple[str, int]]:
processes = []
results: set[tuple[str, int]] = set()
for i in range(self.max_parallelism):
p = multiprocessing.Process(
target=run_list_worker,
...
)
processes.append(p)
p.start()
...

而每个ListWoker定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class ListWorker(object):
def __init__():
...
self.send_work_stealing_needed_queue = send_work_stealing_needed_queue
self.heartbeat_queue = heartbeat_queue
self.direct_work_available_queue = direct_work_available_queue
self.idle_queue = idle_queue
self.unidle_queue = unidle_queue
self.results_queue = results_queue
self.metadata_queue = metadata_queue
self.error_queue = error_queue
self.max_results = 5000
self.results: set[tuple[str, int]] = set()

self.start_range = start_range
self.end_range = end_range
self.default_alph = "ab"
...

使用各种队列来协调worker之间的任务分配和状态同步,完成当前任务后,它会尝试从队列中窃取(steal) 新的任务:

  • idle队列:等待需要steal一些工作来做的队列
  • unidle队列:如果worker成功steal了,就推送到unidle队列
  • heartbeat队列:正常工作队列
  • direct_work_available:可用的steal任务队列
  • send_work_stealing_needed:需要更多的工作的队列
    start_range,end_range 是范围切分的首尾。

ListWorker流程

ListWorker工作的主要流程是:

  1. 执行ListObject操作,个数为max_results(默认5000)
  2. 判断返回结果是否达到max_results,并记录下来
  3. 如果没达到(==说明已经到头了,不用继续List了,本次任务可以结束了==),则继续从别的worker那里steal任务来做
  4. 如果达到max_results了(==说明还可以继续List==),且有别的worker可以执行steal任务,则继续划分剩余空间,第一部分还是自己干,剩下的生成steal任务交给别人干

核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
while True:
try:
blobs = self.client.bucket(self.bucket).list_blobs(**list_blob_args)
self.api_call_count += 1
i = 0
self.heartbeat_queue.put(self.name)
for blob in blobs:
...
i += 1
self.results.add((blob.name, blob.size))
self.start_range = remove_prefix(blob.name, self.prefix)
if i == self.max_results:
has_results = True
break
...
except Exception as e:
...

if has_results:
# 达到max_results了,检查send_work_stealing队列,这个队列
# 不空说明有别的worker执行stealing任务,否则就自己干,跳过
try:
self.send_work_stealing_needed_queue.get_nowait()
except queue.Empty:
continue

# 继续划分空间,为两部分,将第二部分当成steal_range,生成steal任务
split_points = self.splitter.split_range(
self.start_range, self.end_range, 1)
steal_range = (split_points[0], self.end_range)
self.direct_work_available_queue.put(steal_range)
self.end_range = split_points[0]
self.max_results = 5000
else:
if len(self.results) > 0:
self.results_queue.put(self.results)
self.results = set()
if not self.wait_for_work():
return

wait_for_work的处理逻辑:当走到这里时,说明这个worker 自己的任务已经做完了,可以steal一些任务来做了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  def wait_for_work(self) -> bool:
self.send_work_stealing_needed_queue.put(self.name)
self.idle_queue.put(self.name)
logging.debug(f"Process {self.name} waiting for work...")

...
if new_range[0] is None:
logging.debug(f"Process {self.name} didn't receive work")
...
return False
self.start_range = new_range[0]
self.end_range = new_range[1]
logging.debug(f"Process {self.name} got new range [{self.start_range},"
f" {self.end_range}]")
return True

一言以蔽之,算法的核心就是生成Worker放到多个Process中去并行执行List操作,通过不断动态拆分Object Key空间来生成不同的steal任务给worker去steal。

Object Key Range划分

根据上一节的分析,我们可以发现算法中很重要的部分就是Object Key空间的动态划分,其核心逻辑是:

  1. 初始化字母表集合
  2. 根据List结果中的start_rangeend_range重新构建字母表集合,并将字符串映射为int,便于分割
  3. 根据数字的起始位置划分范围,并划分为不同的分割点
  4. 将分割点转化成字符串

核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
   def split_range(
self,
start_range: str,
end_range: str,
num_splits: int,
) -> Sequence[str]:

if len(end_range) != 0 and start_range >= end_range:
return []

if self.is_range_equal_with_padding(start_range, end_range):
return []

# 加入当前字母表
self.add_characters_to_alphabet(start_range + end_range)

# 转化为int range
min_int_range = self.string_to_minimal_int_range(
start_range, end_range, num_splits)

# 将int range分割点转化为字符串分割点
split_points = self.generate_splits(
GenerateSplitsOpts(min_int_range, num_splits, start_range,
end_range))
return split_points

这里需要注意的是,开始时的start_rangeend_range可以都为””,然后根据List的结果动态更新字母表,产生新的分割点自动赋值给新的start_rangeend_range。如此,可以将整个Object Key空间不断地动态划分,并交给不同的ListWorker执行List任务,且自动产生steal任务以供空闲的worker执行。