39
39
40
40
spawn_context = multiprocessing .get_context ('spawn' )
41
41
42
+
42
43
class MlflowMonitorProcess (spawn_context .Process ):
43
44
44
45
def __init__ (self , main_pid , mlflow_run_id , mlflow_tracking_uri ):
@@ -62,18 +63,19 @@ def run(self):
62
63
from mlflow import MlflowClient
63
64
64
65
os .setsid ()
66
+
65
67
# Define signal handlers for communication
66
68
def handle_exit_signal (signum , frame ):
67
69
self .exit_event .set ()
68
-
70
+
69
71
def handle_crash_signal (signum , frame ):
70
72
self .crash_event .set ()
71
73
self .exit_event .set ()
72
-
74
+
73
75
# Register the signal handlers
74
76
signal .signal (signal .SIGUSR1 , handle_exit_signal ) # For normal exit
75
- signal .signal (signal .SIGUSR2 , handle_crash_signal ) # For crash exit
76
- signal .signal (signal .SIGTERM , self .handle_sigterm ) # For termination
77
+ signal .signal (signal .SIGUSR2 , handle_crash_signal ) # For crash exit
78
+ signal .signal (signal .SIGTERM , self .handle_sigterm ) # For termination
77
79
78
80
while not self .exit_event .wait (10 ):
79
81
try :
@@ -89,11 +91,11 @@ def handle_crash_signal(signum, frame):
89
91
client .set_terminated (self .mlflow_run_id , status = 'FAILED' )
90
92
91
93
def stop (self ):
92
- log .debug (" Setting exit event" )
93
- print (" Setting exit event" )
94
+ log .debug (' Setting exit event' )
95
+ print (' Setting exit event' )
94
96
os .kill (self .pid , signal .SIGUSR1 )
95
- log .debug (" Setting exit event done" )
96
- print (" Setting exit event done" )
97
+ log .debug (' Setting exit event done' )
98
+ print (' Setting exit event done' )
97
99
98
100
def crash (self ):
99
101
os .kill (self .pid , signal .SIGUSR2 )
@@ -608,50 +610,50 @@ def log_images(
608
610
def post_close (self ):
609
611
if self ._enabled :
610
612
if hasattr (self , 'monitor_process' ):
611
- log .debug (" Stopping the monitor process" )
613
+ log .debug (' Stopping the monitor process' )
612
614
# Check if there is an uncaught exception, which means `post_close()` is triggered
613
615
# due to program crash.
614
616
finish_with_exception = self ._global_exception_occurred == 1
615
617
if finish_with_exception :
616
- log .debug (" Crashing the monitor process" )
618
+ log .debug (' Crashing the monitor process' )
617
619
self .monitor_process .crash ()
618
- log .debug (" Returning 1" )
620
+ log .debug (' Returning 1' )
619
621
return
620
622
621
- log .debug (" Stopping the monitor process" )
623
+ log .debug (' Stopping the monitor process' )
622
624
# Stop the monitor process since it's entering the cleanup phase.
623
625
self .monitor_process .stop ()
624
- log .debug (" Stopped the monitor process" )
626
+ log .debug (' Stopped the monitor process' )
625
627
626
628
import mlflow
627
629
628
630
assert isinstance (self ._run_id , str )
629
631
630
- log .debug (" Flushing" )
632
+ log .debug (' Flushing' )
631
633
mlflow .flush_async_logging ()
632
- log .debug (" Flushed" )
634
+ log .debug (' Flushed' )
633
635
exc_tpe , exc_info , tb = sys .exc_info ()
634
636
if (exc_tpe , exc_info , tb ) == (None , None , None ):
635
- log .debug (" Get run" )
637
+ log .debug (' Get run' )
636
638
current_status = self ._mlflow_client .get_run (self ._run_id ).info .status
637
- log .debug (" Gotten run" )
639
+ log .debug (' Gotten run' )
638
640
if current_status == 'RUNNING' :
639
- log .debug (" Set terminated" )
641
+ log .debug (' Set terminated' )
640
642
self ._mlflow_client .set_terminated (self ._run_id , status = 'FINISHED' )
641
- log .debug (" Set terminated done" )
643
+ log .debug (' Set terminated done' )
642
644
else :
643
- log .debug (" Set terminated 2" )
645
+ log .debug (' Set terminated 2' )
644
646
# Record there was an error
645
647
self ._mlflow_client .set_terminated (self ._run_id , status = 'FAILED' )
646
- log .debug (" Set terminated done 2" )
648
+ log .debug (' Set terminated done 2' )
647
649
648
- log .debug (" End run" )
650
+ log .debug (' End run' )
649
651
mlflow .end_run ()
650
- log .debug (" End run done" )
652
+ log .debug (' End run done' )
651
653
if hasattr (self , 'monitor_process' ):
652
- log .debug (" Join the monitor process" )
654
+ log .debug (' Join the monitor process' )
653
655
self .monitor_process .join ()
654
- log .debug (" Joined the monitor process" )
656
+ log .debug (' Joined the monitor process' )
655
657
656
658
657
659
def _convert_to_mlflow_image (
0 commit comments