Browse code

Use the ThreadPool interface to thread multipart uploads and return a proper response

Jerome Leclanche authored on 2011/05/31 18:59:01
Showing 2 changed files
... ...
@@ -56,6 +56,7 @@ class MultiPartUpload(object):
56 56
 		self.file = file
57 57
 		self.uri = uri
58 58
 		self.upload_id = None
59
+		self.parts = {}
59 60
 	
60 61
 	def initiate_multipart_upload(self):
61 62
 		"""
... ...
@@ -69,7 +70,7 @@ class MultiPartUpload(object):
69 69
 		self.upload_id = upload_id
70 70
 		return s3, key, upload_id
71 71
 	
72
-	def upload_all_parts(self, num_processes = 1, chunk_size = MIN_CHUNK_SIZE):
72
+	def upload_all_parts(self, num_threads = 4, chunk_size = MIN_CHUNK_SIZE):
73 73
 		"""
74 74
 		Execute a full multipart upload on a file
75 75
 		Returns the id/etag dict
... ...
@@ -80,7 +81,7 @@ class MultiPartUpload(object):
80 80
 		
81 81
 		chunk_size = max(self.MIN_CHUNK_SIZE, chunk_size)
82 82
 		id = 1
83
-		parts = {}
83
+		pool = ThreadPool(num_threads)
84 84
 		
85 85
 		while True:
86 86
 			if id == self.MAX_CHUNKS:
... ...
@@ -89,10 +90,11 @@ class MultiPartUpload(object):
89 89
 				data = self.file.read(chunk_size)
90 90
 			if not data:
91 91
 				break
92
-			parts[id] = self.upload_part(data, id)
92
+			pool.add_task(self.upload_part, data, id)
93 93
 			id += 1
94 94
 		
95
-		return parts
95
+		debug("Thread pool with %i threads and %i tasks awaiting completion." % (num_threads, id))
96
+		pool.wait_completion()
96 97
 	
97 98
 	def upload_part(self, data, id):
98 99
 		"""
... ...
@@ -107,19 +109,21 @@ class MultiPartUpload(object):
107 107
 		request = self.s3.create_request("OBJECT_PUT", uri = self.uri, headers = headers, extra = query_string)
108 108
 		response = self.s3.send_request(request, body = data)
109 109
 		
110
-		return response["headers"]["etag"]
110
+		self.parts[id] = response["headers"]["etag"]
111 111
 	
112
-	def complete_multipart_upload(self, parts):
112
+	def complete_multipart_upload(self):
113 113
 		"""
114 114
 		Finish a multipart upload
115 115
 		http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadComplete.html
116 116
 		"""
117 117
 		parts_xml = []
118 118
 		part_xml = "<Part><PartNumber>%i</PartNumber><ETag>%s</ETag></Part>"
119
-		for id, etag in parts.items():
119
+		for id, etag in self.parts.items():
120 120
 			parts_xml.append(part_xml % (id, etag))
121 121
 		body = "<CompleteMultipartUpload>%s</CompleteMultipartUpload>" % ("".join(parts_xml))
122 122
 		
123 123
 		headers = { "Content-Length": len(body) }
124 124
 		request = self.s3.create_request("OBJECT_POST", uri = self.uri, headers = headers, extra = "?uploadId=%s" % (self.upload_id))
125 125
 		response = self.s3.send_request(request, body = body)
126
+		
127
+		return response
126 128
\ No newline at end of file
... ...
@@ -675,7 +675,7 @@ class S3(object):
675 675
 		# S3 from time to time doesn't send ETag back in a response :-(
676 676
 		# Force re-upload here.
677 677
 		if "etag" not in response["headers"]:
678
-			response['headers']['etag'] = '' 
678
+			response["headers"]["etag"] = ""
679 679
 
680 680
 		if response["status"] < 200 or response["status"] >= 300:
681 681
 			try_retry = False
... ...
@@ -718,8 +718,10 @@ class S3(object):
718 718
 		bucket, key, upload_id = upload.initiate_multipart_upload()
719 719
 		
720 720
 		file.seek(0)
721
-		parts = upload.upload_all_parts()
722
-		upload.complete_multipart_upload(parts)
721
+		upload.upload_all_parts()
722
+		response = upload.complete_multipart_upload()
723
+		response["speed"] = 0 # XXX
724
+		return response
723 725
 		exit() # TODO return response
724 726
 	
725 727
 	def recv_file(self, request, stream, labels, start_position = 0, retries = _max_retries):