Skip to content

Commit 410ec8e

Browse files
committed
[manage_s3] Download objects metadata concurrently
Using `concurrent.futures.ThreadPoolExecutor` This speeds up rebuilding `whl/test` index from 300 sec to 90 sec on my laptop
1 parent 21ffba1 commit 410ec8e

File tree

1 file changed

+21
-12
lines changed

1 file changed

+21
-12
lines changed

s3_management/manage.py

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import argparse
44
import base64
5+
import concurrent.futures
56
import dataclasses
67
import functools
78
import time
@@ -17,6 +18,7 @@
1718

1819

1920
S3 = boto3.resource('s3')
21+
CLIENT = boto3.client('s3')
2022
BUCKET = S3.Bucket('pytorch')
2123

2224
ACCEPTED_FILE_EXTENSIONS = ("whl", "zip", "tar.gz")
@@ -359,8 +361,8 @@ def save_pep503_htmls(self) -> None:
359361

360362
@classmethod
361363
def from_S3(cls: Type[S3IndexType], prefix: str) -> S3IndexType:
362-
objects = []
363364
prefix = prefix.rstrip("/")
365+
obj_names = []
364366
for obj in BUCKET.objects.filter(Prefix=prefix):
365367
is_acceptable = any([path.dirname(obj.key) == prefix] + [
366368
match(
@@ -371,18 +373,25 @@ def from_S3(cls: Type[S3IndexType], prefix: str) -> S3IndexType:
371373
]) and obj.key.endswith(ACCEPTED_FILE_EXTENSIONS)
372374
if not is_acceptable:
373375
continue
376+
obj_names.append(obj.key)
377+
objects = []
378+
def fetch_metadata(key: str) :
379+
return CLIENT.head_object(Bucket=BUCKET.name, Key=key, ChecksumMode="Enabled")
380+
381+
with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor:
374382
# Add PEP 503-compatible hashes to URLs to allow clients to avoid spurious downloads, if possible.
375-
response = obj.meta.client.head_object(Bucket=BUCKET.name, Key=obj.key, ChecksumMode="ENABLED")
376-
sha256 = (_b64 := response.get("ChecksumSHA256")) and base64.b64decode(_b64).hex()
377-
# For older files, rely on checksum-sha256 metadata that can be added to the file later
378-
if sha256 is None:
379-
sha256 = response.get("Metadata", {}).get("checksum-sha256")
380-
sanitized_key = obj.key.replace("+", "%2B")
381-
s3_object = S3Object(
382-
key=sanitized_key,
383-
checksum=sha256,
384-
)
385-
objects.append(s3_object)
383+
for obj_key, future in {key: executor.submit(fetch_metadata, key) for key in obj_names}.items():
384+
response = future.result()
385+
sha256 = (_b64 := response.get("ChecksumSHA256")) and base64.b64decode(_b64).hex()
386+
# For older files, rely on checksum-sha256 metadata that can be added to the file later
387+
if sha256 is None:
388+
sha256 = response.get("Metadata", {}).get("checksum-sha256")
389+
sanitized_key = obj_key.replace("+", "%2B")
390+
s3_object = S3Object(
391+
key=sanitized_key,
392+
checksum=sha256,
393+
)
394+
objects.append(s3_object)
386395
return cls(objects, prefix)
387396

388397

0 commit comments

Comments
 (0)