UPD: switched to pagination. resume with nextKey is dropped

This commit is contained in:
xpk 2024-07-16 13:14:33 +08:00
parent 77fc514056
commit 1afeac32f0
Signed by: xpk
GPG Key ID: CD4FF6793F09AB86

View File

@ -1,9 +1,9 @@
#!/usr/bin/python3 #!/usr/bin/python3
import boto3 import boto3
import pickle # import pickle
import os # import os
import concurrent.futures import concurrent.futures
from pprint import pprint # from pprint import pprint
def restoreObject(bucket, key, versionId, myCount): def restoreObject(bucket, key, versionId, myCount):
@ -30,49 +30,60 @@ bucketName = 's3-emr-hbase'
concurrency = 15 concurrency = 15
client = boto3.client('s3') client = boto3.client('s3')
count = 0 count = 0
pagesize = 1000
try: paginator = client.get_paginator('list_object_versions')
with open('nextKey.pickle', 'rb') as file: result = paginator.paginate(Bucket=bucketName, PaginationConfig={'PageSize': pagesize})
nextKey = pickle.load(file) for page in result:
response = client.list_object_versions(
Bucket=bucketName,
MaxKeys=10,
KeyMarker=nextKey
)
except IOError:
print("No position.pickle file. Start from beginning.")
response = client.list_object_versions(
Bucket=bucketName,
MaxKeys=10
)
nextKey = response.get("NextKeyMarker")
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor: with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
for i in response["Versions"]: for i in page["Versions"]:
count += 1 count += 1
# print(count, i.get("Key"), i.get("StorageClass"))
if i.get("StorageClass") == "DEEP_ARCHIVE": if i.get("StorageClass") == "DEEP_ARCHIVE":
executor.submit(restoreObject, bucketName, i.get("Key"), i.get("VersionId"), count) executor.submit(restoreObject, bucketName, i.get("Key"), i.get("VersionId"), count)
print("NextKey", nextKey)
while nextKey is not None: #
response = client.list_object_versions( # try:
Bucket=bucketName, # with open('nextKey.pickle', 'rb') as file:
MaxKeys=300, # nextKey = pickle.load(file)
KeyMarker=nextKey # response = client.list_object_versions(
) # Bucket=bucketName,
nextKey = response.get("NextKeyMarker") # MaxKeys=10,
if nextKey is not None: # KeyMarker=nextKey
with open('nextKey.pickle', 'wb') as file: # )
pickle.dump(nextKey, file) # except IOError:
else: # print("No position.pickle file. Start from beginning.")
os.remove('nextKey.pickle') # response = client.list_object_versions(
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor: # Bucket=bucketName,
for i in response["Versions"]: # MaxKeys=10
count += 1 # )
# print(count, i.get("Key"), i.get("StorageClass")) #
if i.get("StorageClass") == "DEEP_ARCHIVE": # nextKey = response.get("NextKeyMarker")
executor.submit(restoreObject, bucketName, i.get("Key"), i.get("VersionId"), count) # with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
# for i in response["Versions"]:
# count += 1
# # print(count, i.get("Key"), i.get("StorageClass"))
# if i.get("StorageClass") == "DEEP_ARCHIVE":
# executor.submit(restoreObject, bucketName, i.get("Key"), i.get("VersionId"), count)
#
# print("NextKey", nextKey)
# while nextKey is not None:
# response = client.list_object_versions(
# Bucket=bucketName,
# MaxKeys=300,
# KeyMarker=nextKey
# )
# nextKey = response.get("NextKeyMarker")
# if nextKey is not None:
# with open('nextKey.pickle', 'wb') as file:
# pickle.dump(nextKey, file)
# else:
# os.remove('nextKey.pickle')
# with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
# for i in response["Versions"]:
# count += 1
# # print(count, i.get("Key"), i.get("StorageClass"))
# if i.get("StorageClass") == "DEEP_ARCHIVE":
# executor.submit(restoreObject, bucketName, i.get("Key"), i.get("VersionId"), count)
print('Total objects', count, sep=" ") print('Total objects', count, sep=" ")