[Core][Bugfix] allow graceful worker termination (#32965)
Signed-off-by: Joe Runde <Joseph.Runde@ibm.com>
This commit is contained in:
@@ -391,14 +391,17 @@ class MultiprocExecutor(Executor):
|
||||
time.sleep(0.1)
|
||||
return False
|
||||
|
||||
active_procs = lambda: [proc for proc in worker_procs if proc.is_alive()]
|
||||
# Give processes time to clean themselves up properly first
|
||||
if wait_for_termination(active_procs(), 4):
|
||||
return
|
||||
|
||||
# Send SIGTERM if still running
|
||||
active_procs = [proc for proc in worker_procs if proc.is_alive()]
|
||||
for p in active_procs:
|
||||
for p in active_procs():
|
||||
p.terminate()
|
||||
if not wait_for_termination(active_procs, 4):
|
||||
if not wait_for_termination(active_procs(), 4):
|
||||
# Send SIGKILL if still running
|
||||
active_procs = [p for p in active_procs if p.is_alive()]
|
||||
for p in active_procs:
|
||||
for p in active_procs():
|
||||
p.kill()
|
||||
|
||||
def shutdown(self):
|
||||
@@ -701,6 +704,9 @@ class WorkerProc:
|
||||
nonlocal shutdown_requested
|
||||
if not shutdown_requested:
|
||||
shutdown_requested = True
|
||||
logger.debug(
|
||||
"WorkerProc handling signal %d, raising SystemExit", signum
|
||||
)
|
||||
raise SystemExit()
|
||||
|
||||
# Either SIGTERM or SIGINT will terminate the worker
|
||||
@@ -774,6 +780,13 @@ class WorkerProc:
|
||||
# SystemExit() to avoid zmq exceptions in __del__.
|
||||
shutdown_requested = True
|
||||
|
||||
except SystemExit as e:
|
||||
# SystemExit is raised on SIGTERM or SIGKILL, which usually indicates that
|
||||
# the graceful shutdown process did not succeed
|
||||
logger.warning("WorkerProc was terminated")
|
||||
# SystemExit must never be ignored
|
||||
raise e
|
||||
|
||||
finally:
|
||||
if ready_writer is not None:
|
||||
ready_writer.close()
|
||||
|
||||
Reference in New Issue
Block a user