@@ -27,6 +27,7 @@ import (
27
27
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28
28
"k8s.io/apimachinery/pkg/runtime"
29
29
"k8s.io/apimachinery/pkg/types"
30
+ "k8s.io/apimachinery/pkg/util/intstr"
30
31
metaapplyv1 "k8s.io/client-go/applyconfigurations/meta/v1"
31
32
"k8s.io/client-go/tools/record"
32
33
"k8s.io/klog/v2"
@@ -66,6 +67,7 @@ func NewServiceReconciler(client client.Client, scheme *runtime.Scheme, record r
66
67
//+kubebuilder:rbac:groups=inference.llmaz.io,resources=services,verbs=get;list;watch;create;update;patch;delete
67
68
//+kubebuilder:rbac:groups=inference.llmaz.io,resources=services/status,verbs=get;update;patch
68
69
//+kubebuilder:rbac:groups=inference.llmaz.io,resources=services/finalizers,verbs=update
70
+ //+kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete
69
71
70
72
// Reconcile is part of the main kubernetes reconciliation loop which aims to
71
73
// move the current state of the cluster closer to the desired state.
@@ -97,6 +99,11 @@ func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
97
99
return ctrl.Result {}, err
98
100
}
99
101
102
+ // Create a service for the leader pods of the lws for loadbalancing.
103
+ if err := CreateServiceIfNotExists (ctx , r .Client , r .Scheme , service ); err != nil {
104
+ return ctrl.Result {}, err
105
+ }
106
+
100
107
// Handle status.
101
108
102
109
workload := & lws.LeaderWorkerSet {}
@@ -299,3 +306,48 @@ func setControllerReferenceForWorkload(owner metav1.Object, lws *applyconfigurat
299
306
WithController (true ))
300
307
return nil
301
308
}
309
+
310
+ func CreateServiceIfNotExists (ctx context.Context , k8sClient client.Client , Scheme * runtime.Scheme , service * inferenceapi.Service ) error {
311
+ log := ctrl .LoggerFrom (ctx )
312
+ // The load balancing service name.
313
+ svcName := service .Name + "-lb"
314
+
315
+ var svc corev1.Service
316
+ if err := k8sClient .Get (ctx , types.NamespacedName {Name : svcName , Namespace : service .Namespace }, & svc ); err != nil {
317
+ if client .IgnoreNotFound (err ) != nil {
318
+ return err
319
+ }
320
+ svc = corev1.Service {
321
+ ObjectMeta : metav1.ObjectMeta {
322
+ Name : svcName ,
323
+ Namespace : service .Namespace ,
324
+ },
325
+ Spec : corev1.ServiceSpec {
326
+ Ports : []corev1.ServicePort {
327
+ {
328
+ Name : "http" ,
329
+ Protocol : corev1 .ProtocolTCP ,
330
+ Port : modelSource .DEFAULT_BACKEND_PORT ,
331
+ TargetPort : intstr .FromInt (modelSource .DEFAULT_BACKEND_PORT ),
332
+ },
333
+ },
334
+ Selector : map [string ]string {
335
+ lws .SetNameLabelKey : service .Name ,
336
+ // the leader pod.
337
+ lws .WorkerIndexLabelKey : "0" ,
338
+ },
339
+ },
340
+ }
341
+
342
+ // Set the controller owner reference for garbage collection and reconciliation.
343
+ if err := ctrl .SetControllerReference (service , & svc , Scheme ); err != nil {
344
+ return err
345
+ }
346
+ // create the service in the cluster
347
+ log .V (2 ).Info ("Creating service." )
348
+ if err := k8sClient .Create (ctx , & svc ); err != nil {
349
+ return err
350
+ }
351
+ }
352
+ return nil
353
+ }
0 commit comments