คำชี้แจงปัญหา − ใช้ไลบรารี Boto3 ใน Python เพื่ออัปโหลดวัตถุไปยัง S3 เช่น วิธีอัปโหลด test.zip ลงใน Bucket_1 ของ S3
แนวทาง/อัลกอริทึมในการแก้ปัญหานี้
ขั้นตอนที่ 1 - นำเข้าข้อยกเว้น boto3 และ botocore เพื่อจัดการกับข้อยกเว้น
ขั้นตอนที่ 2 − จาก pathlib , นำเข้า PurePosixPath เพื่อดึงชื่อไฟล์จากเส้นทาง
ขั้นตอนที่ 3 − s3_path และ เส้นทางไฟล์ คือพารามิเตอร์สองตัวในฟังก์ชัน upload_object_into_s3
ขั้นตอนที่ 4 − ตรวจสอบ s3_path ถูกส่งผ่านในรูปแบบ AWS เป็น s3://bucket_name/key และ เส้นทางไฟล์ เป็นเส้นทางท้องถิ่น C://users/filename
ขั้นตอนที่ 5 − สร้างเซสชัน AWS โดยใช้ไลบรารี boto3
ขั้นตอนที่ 6 − สร้างทรัพยากร AWS สำหรับ S3
ขั้นตอนที่ 7 - แยกเส้นทาง S3 และดำเนินการเพื่อแยกชื่อที่ฝากข้อมูลรากและเส้นทางคีย์
ขั้นตอนที่ 8 − รับชื่อไฟล์สำหรับพาธไฟล์ที่สมบูรณ์และเพิ่มลงในพาธคีย์ S3
ขั้นตอนที่ 9 − ตอนนี้ใช้ฟังก์ชัน upload_fileobj เพื่ออัปโหลดไฟล์ในเครื่องไปยัง S3
ขั้นตอนที่ 10 − ใช้ฟังก์ชัน รอจนกว่าจะมีอยู่ เพื่อรอจนกว่าการดำเนินการจะเสร็จสิ้น
ขั้นตอนที่ 11 − จัดการข้อยกเว้นตามรหัสตอบกลับเพื่อตรวจสอบว่าไฟล์ถูกอัปโหลดหรือไม่
ขั้นตอนที่ 12 − จัดการข้อยกเว้นทั่วไปหากมีข้อผิดพลาดขณะอัปโหลดไฟล์
ตัวอย่าง
ใช้รหัสต่อไปนี้เพื่ออัปโหลดไฟล์ไปยัง AWS S3 -
import boto3 from botocore.exceptions import ClientError from pathlib import PurePosixPath def upload_object_into_s3(s3_path, filepath): if 's3://' in filepath: print('SourcePath is not a valid path.' + filepath) raise Exception('SourcePath is not a valid path.') elif s3_path.find('s3://') == -1: print('DestinationPath is not a s3 path.' + s3_path) raise Exception('DestinationPath is not a valid path.') session = boto3.session.Session() s3_resource = session.resource('s3') tokens = s3_path.split('/') target_key = "" if len(tokens) > 3: for tokn in range(3, len(tokens)): if tokn == 3: target_key += tokens[tokn] else: target_key += "/" + tokens[tokn] target_bucket_name = tokens[2] file_name = PurePosixPath(filepath).name if target_key != '': target_key.strip() key_path = target_key + "/" + file_name else: key_path = file_name print(("key_path: " + key_path, 'target_bucket: ' + target_bucket_name)) try: # uploading Entity from local path with open(filepath, "rb") as file: s3_resource.meta.client.upload_fileobj(file, target_bucket_name, key_path) try: s3_resource.Object(target_bucket_name, key_path).wait_until_exists() file.close() except ClientError as error: error_code = int(error.response['Error']['Code']) if error_code == 412 or error_code == 304: print("Object didn't Upload Successfully ", target_bucket_name) raise error return "Object Uploaded Successfully" except Exception as error: print("Error in upload object function of s3 helper: " + error.__str__()) raise error print(upload_object_into_s3('s3://Bucket_1/testfolder', 'c://test.zip'))
ผลลัพธ์
key_path:/testfolder/test.zip, target_bucket: Bucket_1 Object Uploaded Successfully