* Add a function to check for killed processes so that if any
threads are sigkilled or sigtermed, the entire playbook execution is aborted.
(cherry picked from commit 238c6461f643d7610896c89c08f0e57eff16d0e5)
... | ... |
@@ -309,6 +309,18 @@ class TaskQueueManager: |
309 | 309 |
def terminate(self): |
310 | 310 |
self._terminated = True |
311 | 311 |
|
312 |
+ def has_dead_workers(self): |
|
313 |
+ |
|
314 |
+ # [<WorkerProcess(WorkerProcess-2, stopped[SIGKILL])>, |
|
315 |
+ # <WorkerProcess(WorkerProcess-2, stopped[SIGTERM])> |
|
316 |
+ |
|
317 |
+ defunct = False |
|
318 |
+ for idx,x in enumerate(self._workers): |
|
319 |
+ if hasattr(x[0], 'exitcode'): |
|
320 |
+ if x[0].exitcode in [-9, -15]: |
|
321 |
+ defunct = True |
|
322 |
+ return defunct |
|
323 |
+ |
|
312 | 324 |
def send_callback(self, method_name, *args, **kwargs): |
313 | 325 |
for callback_plugin in [self._stdout_callback] + self._callback_plugins: |
314 | 326 |
# a plugin that set self.disabled to True will not be called |
... | ... |
@@ -443,9 +443,14 @@ class StrategyBase: |
443 | 443 |
|
444 | 444 |
display.debug("waiting for pending results...") |
445 | 445 |
while self._pending_results > 0 and not self._tqm._terminated: |
446 |
+ |
|
447 |
+ if self._tqm.has_dead_workers(): |
|
448 |
+ raise AnsibleError("A worker was found in a dead state") |
|
449 |
+ |
|
446 | 450 |
results = self._process_pending_results(iterator) |
447 | 451 |
ret_results.extend(results) |
448 |
- time.sleep(0.0001) |
|
452 |
+ time.sleep(0.005) |
|
453 |
+ |
|
449 | 454 |
display.debug("no more pending results, returning what we have") |
450 | 455 |
|
451 | 456 |
return ret_results |
... | ... |
@@ -162,7 +162,7 @@ class TestStrategyBase(unittest.TestCase): |
162 | 162 |
raise Queue.Empty |
163 | 163 |
else: |
164 | 164 |
return queue_items.pop() |
165 |
- |
|
165 |
+ |
|
166 | 166 |
mock_queue = MagicMock() |
167 | 167 |
mock_queue.empty.side_effect = _queue_empty |
168 | 168 |
mock_queue.get.side_effect = _queue_get |
... | ... |
@@ -228,6 +228,10 @@ class TestStrategyBase(unittest.TestCase): |
228 | 228 |
strategy_base._variable_manager = mock_var_mgr |
229 | 229 |
strategy_base._blocked_hosts = dict() |
230 | 230 |
|
231 |
+ def _has_dead_workers(): |
|
232 |
+ return False |
|
233 |
+ |
|
234 |
+ strategy_base._tqm.has_dead_workers = _has_dead_workers |
|
231 | 235 |
results = strategy_base._wait_on_pending_results(iterator=mock_iterator) |
232 | 236 |
self.assertEqual(len(results), 0) |
233 | 237 |
|