... | ... |
@@ -8,122 +8,122 @@ from logging import debug, info, warning, error |
8 | 8 |
from Utils import getTextFromXml |
9 | 9 |
|
10 | 10 |
class Worker(Thread): |
11 |
- """ |
|
12 |
- Thread executing tasks from a given tasks queue |
|
13 |
- """ |
|
14 |
- def __init__(self, tasks): |
|
15 |
- super(Worker, self).__init__() |
|
16 |
- self.tasks = tasks |
|
17 |
- self.daemon = True |
|
18 |
- self.start() |
|
19 |
- |
|
20 |
- def run(self): |
|
21 |
- while True: |
|
22 |
- func, args, kargs = self.tasks.get() |
|
23 |
- func(*args, **kargs) |
|
24 |
- self.tasks.task_done() |
|
11 |
+ """ |
|
12 |
+ Thread executing tasks from a given tasks queue |
|
13 |
+ """ |
|
14 |
+ def __init__(self, tasks): |
|
15 |
+ super(Worker, self).__init__() |
|
16 |
+ self.tasks = tasks |
|
17 |
+ self.daemon = True |
|
18 |
+ self.start() |
|
19 |
+ |
|
20 |
+ def run(self): |
|
21 |
+ while True: |
|
22 |
+ func, args, kargs = self.tasks.get() |
|
23 |
+ func(*args, **kargs) |
|
24 |
+ self.tasks.task_done() |
|
25 | 25 |
|
26 | 26 |
class ThreadPool(object): |
27 |
- """ |
|
28 |
- Pool of threads consuming tasks from a queue |
|
29 |
- """ |
|
30 |
- def __init__(self, num_threads): |
|
31 |
- self.tasks = Queue(num_threads) |
|
32 |
- for _ in range(num_threads): |
|
33 |
- Worker(self.tasks) |
|
34 |
- |
|
35 |
- def add_task(self, func, *args, **kargs): |
|
36 |
- """ |
|
37 |
- Add a task to the queue |
|
38 |
- """ |
|
39 |
- self.tasks.put((func, args, kargs)) |
|
40 |
- |
|
41 |
- def wait_completion(self): |
|
42 |
- """ |
|
43 |
- Wait for completion of all the tasks in the queue |
|
44 |
- """ |
|
45 |
- self.tasks.join() |
|
27 |
+ """ |
|
28 |
+ Pool of threads consuming tasks from a queue |
|
29 |
+ """ |
|
30 |
+ def __init__(self, num_threads): |
|
31 |
+ self.tasks = Queue(num_threads) |
|
32 |
+ for _ in range(num_threads): |
|
33 |
+ Worker(self.tasks) |
|
34 |
+ |
|
35 |
+ def add_task(self, func, *args, **kargs): |
|
36 |
+ """ |
|
37 |
+ Add a task to the queue |
|
38 |
+ """ |
|
39 |
+ self.tasks.put((func, args, kargs)) |
|
40 |
+ |
|
41 |
+ def wait_completion(self): |
|
42 |
+ """ |
|
43 |
+ Wait for completion of all the tasks in the queue |
|
44 |
+ """ |
|
45 |
+ self.tasks.join() |
|
46 | 46 |
|
47 | 47 |
class MultiPartUpload(object): |
48 |
- |
|
49 |
- MIN_CHUNK_SIZE = 5242880 # 5MB |
|
50 |
- MAX_CHUNK_SIZE = 5368709120 # 5GB |
|
51 |
- MAX_CHUNKS = 100 |
|
52 |
- MAX_FILE_SIZE = 42949672960 # 5TB |
|
53 |
- |
|
54 |
- def __init__(self, s3, file, uri): |
|
55 |
- self.s3 = s3 |
|
56 |
- self.file = file |
|
57 |
- self.uri = uri |
|
58 |
- self.upload_id = None |
|
59 |
- self.parts = {} |
|
60 |
- |
|
61 |
- def initiate_multipart_upload(self): |
|
62 |
- """ |
|
63 |
- Begin a multipart upload |
|
64 |
- http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadInitiate.html |
|
65 |
- """ |
|
66 |
- request = self.s3.create_request("OBJECT_POST", uri = self.uri, extra = "?uploads") |
|
67 |
- response = self.s3.send_request(request) |
|
68 |
- data = response["data"] |
|
69 |
- s3, key, upload_id = getTextFromXml(data, "Bucket"), getTextFromXml(data, "Key"), getTextFromXml(data, "UploadId") |
|
70 |
- self.upload_id = upload_id |
|
71 |
- return s3, key, upload_id |
|
72 |
- |
|
73 |
- def upload_all_parts(self, num_threads, chunk_size): |
|
74 |
- """ |
|
75 |
- Execute a full multipart upload on a file |
|
76 |
- Returns the id/etag dict |
|
77 |
- TODO use num_processes to thread it |
|
78 |
- """ |
|
79 |
- if not self.upload_id: |
|
80 |
- raise RuntimeError("Attempting to use a multipart upload that has not been initiated.") |
|
81 |
- |
|
82 |
- chunk_size = max(self.MIN_CHUNK_SIZE, chunk_size) |
|
83 |
- id = 1 |
|
84 |
- pool = ThreadPool(num_threads) |
|
85 |
- |
|
86 |
- while True: |
|
87 |
- if id == self.MAX_CHUNKS: |
|
88 |
- data = self.file.read(-1) |
|
89 |
- else: |
|
90 |
- data = self.file.read(chunk_size) |
|
91 |
- if not data: |
|
92 |
- break |
|
93 |
- pool.add_task(self.upload_part, data, id) |
|
94 |
- id += 1 |
|
95 |
- |
|
96 |
- debug("Thread pool with %i threads and %i tasks awaiting completion." % (num_threads, id)) |
|
97 |
- pool.wait_completion() |
|
98 |
- |
|
99 |
- def upload_part(self, data, id): |
|
100 |
- """ |
|
101 |
- Upload a file chunk |
|
102 |
- http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadUploadPart.html |
|
103 |
- """ |
|
104 |
- # TODO implement Content-MD5 |
|
105 |
- content_length = str(len(data)) |
|
106 |
- debug("Uploading part %i of %r (%s bytes)" % (id, self.upload_id, content_length)) |
|
107 |
- headers = { "Content-Length": content_length } |
|
108 |
- query_string = "?partNumber=%i&uploadId=%s" % (id, self.upload_id) |
|
109 |
- request = self.s3.create_request("OBJECT_PUT", uri = self.uri, headers = headers, extra = query_string) |
|
110 |
- response = self.s3.send_request(request, body = data) |
|
111 |
- |
|
112 |
- self.parts[id] = response["headers"]["etag"] |
|
113 |
- |
|
114 |
- def complete_multipart_upload(self): |
|
115 |
- """ |
|
116 |
- Finish a multipart upload |
|
117 |
- http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadComplete.html |
|
118 |
- """ |
|
119 |
- parts_xml = [] |
|
120 |
- part_xml = "<Part><PartNumber>%i</PartNumber><ETag>%s</ETag></Part>" |
|
121 |
- for id, etag in self.parts.items(): |
|
122 |
- parts_xml.append(part_xml % (id, etag)) |
|
123 |
- body = "<CompleteMultipartUpload>%s</CompleteMultipartUpload>" % ("".join(parts_xml)) |
|
124 |
- |
|
125 |
- headers = { "Content-Length": len(body) } |
|
126 |
- request = self.s3.create_request("OBJECT_POST", uri = self.uri, headers = headers, extra = "?uploadId=%s" % (self.upload_id)) |
|
127 |
- response = self.s3.send_request(request, body = body) |
|
128 |
- |
|
129 |
- return response |
|
130 | 48 |
\ No newline at end of file |
49 |
+ |
|
50 |
+ MIN_CHUNK_SIZE = 5242880 # 5MB |
|
51 |
+ MAX_CHUNK_SIZE = 5368709120 # 5GB |
|
52 |
+ MAX_CHUNKS = 100 |
|
53 |
+ MAX_FILE_SIZE = 42949672960 # 5TB |
|
54 |
+ |
|
55 |
+ def __init__(self, s3, file, uri): |
|
56 |
+ self.s3 = s3 |
|
57 |
+ self.file = file |
|
58 |
+ self.uri = uri |
|
59 |
+ self.upload_id = None |
|
60 |
+ self.parts = {} |
|
61 |
+ |
|
62 |
+ def initiate_multipart_upload(self): |
|
63 |
+ """ |
|
64 |
+ Begin a multipart upload |
|
65 |
+ http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadInitiate.html |
|
66 |
+ """ |
|
67 |
+ request = self.s3.create_request("OBJECT_POST", uri = self.uri, extra = "?uploads") |
|
68 |
+ response = self.s3.send_request(request) |
|
69 |
+ data = response["data"] |
|
70 |
+ s3, key, upload_id = getTextFromXml(data, "Bucket"), getTextFromXml(data, "Key"), getTextFromXml(data, "UploadId") |
|
71 |
+ self.upload_id = upload_id |
|
72 |
+ return s3, key, upload_id |
|
73 |
+ |
|
74 |
+ def upload_all_parts(self, num_threads, chunk_size): |
|
75 |
+ """ |
|
76 |
+ Execute a full multipart upload on a file |
|
77 |
+ Returns the id/etag dict |
|
78 |
+ TODO use num_processes to thread it |
|
79 |
+ """ |
|
80 |
+ if not self.upload_id: |
|
81 |
+ raise RuntimeError("Attempting to use a multipart upload that has not been initiated.") |
|
82 |
+ |
|
83 |
+ chunk_size = max(self.MIN_CHUNK_SIZE, chunk_size) |
|
84 |
+ id = 1 |
|
85 |
+ pool = ThreadPool(num_threads) |
|
86 |
+ |
|
87 |
+ while True: |
|
88 |
+ if id == self.MAX_CHUNKS: |
|
89 |
+ data = self.file.read(-1) |
|
90 |
+ else: |
|
91 |
+ data = self.file.read(chunk_size) |
|
92 |
+ if not data: |
|
93 |
+ break |
|
94 |
+ pool.add_task(self.upload_part, data, id) |
|
95 |
+ id += 1 |
|
96 |
+ |
|
97 |
+ debug("Thread pool with %i threads and %i tasks awaiting completion." % (num_threads, id)) |
|
98 |
+ pool.wait_completion() |
|
99 |
+ |
|
100 |
+ def upload_part(self, data, id): |
|
101 |
+ """ |
|
102 |
+ Upload a file chunk |
|
103 |
+ http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadUploadPart.html |
|
104 |
+ """ |
|
105 |
+ # TODO implement Content-MD5 |
|
106 |
+ content_length = str(len(data)) |
|
107 |
+ debug("Uploading part %i of %r (%s bytes)" % (id, self.upload_id, content_length)) |
|
108 |
+ headers = { "Content-Length": content_length } |
|
109 |
+ query_string = "?partNumber=%i&uploadId=%s" % (id, self.upload_id) |
|
110 |
+ request = self.s3.create_request("OBJECT_PUT", uri = self.uri, headers = headers, extra = query_string) |
|
111 |
+ response = self.s3.send_request(request, body = data) |
|
112 |
+ |
|
113 |
+ self.parts[id] = response["headers"]["etag"] |
|
114 |
+ |
|
115 |
+ def complete_multipart_upload(self): |
|
116 |
+ """ |
|
117 |
+ Finish a multipart upload |
|
118 |
+ http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadComplete.html |
|
119 |
+ """ |
|
120 |
+ parts_xml = [] |
|
121 |
+ part_xml = "<Part><PartNumber>%i</PartNumber><ETag>%s</ETag></Part>" |
|
122 |
+ for id, etag in self.parts.items(): |
|
123 |
+ parts_xml.append(part_xml % (id, etag)) |
|
124 |
+ body = "<CompleteMultipartUpload>%s</CompleteMultipartUpload>" % ("".join(parts_xml)) |
|
125 |
+ |
|
126 |
+ headers = { "Content-Length": len(body) } |
|
127 |
+ request = self.s3.create_request("OBJECT_POST", uri = self.uri, headers = headers, extra = "?uploadId=%s" % (self.upload_id)) |
|
128 |
+ response = self.s3.send_request(request, body = body) |
|
129 |
+ |
|
130 |
+ return response |