Skip to content

Commit b186f6f

Browse files
committed
WIP DMCM
On-behalf-of: @SAP [email protected]
1 parent 1f4fabc commit b186f6f

File tree

18 files changed

+345
-463
lines changed

18 files changed

+345
-463
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ install-yq:
174174
@UNCOMPRESSED=true hack/uget.sh https://github.com/mikefarah/yq/releases/download/v{VERSION}/yq_{GOOS}_{GOARCH} yq $(YQ_VERSION) yq_*
175175

176176
.PHONY: install-kcp
177-
install-kcp: UGET_CHECKSUMS=false # do not checksum because the version regularly gets overwritten in CI jobs
177+
install-kcp: UGET_CHECKSUMS= # do not checksum because the version regularly gets overwritten in CI jobs
178178
install-kcp:
179179
@hack/uget.sh https://github.com/kcp-dev/kcp/releases/download/v{VERSION}/kcp_{VERSION}_{GOOS}_{GOARCH}.tar.gz kcp $(KCP_VERSION)
180180

cmd/api-syncagent/kcp.go

Lines changed: 32 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,12 @@ import (
2323
"regexp"
2424

2525
"github.com/kcp-dev/logicalcluster/v3"
26-
27-
"github.com/kcp-dev/api-syncagent/internal/kcp"
28-
2926
kcpdevv1alpha1 "github.com/kcp-dev/sdk/apis/apis/v1alpha1"
3027
kcpdevcore "github.com/kcp-dev/sdk/apis/core"
3128
kcpdevcorev1alpha1 "github.com/kcp-dev/sdk/apis/core/v1alpha1"
3229

30+
"github.com/kcp-dev/api-syncagent/internal/kcp"
31+
3332
"k8s.io/apimachinery/pkg/fields"
3433
"k8s.io/apimachinery/pkg/runtime"
3534
"k8s.io/apimachinery/pkg/types"
@@ -39,25 +38,10 @@ import (
3938
"sigs.k8s.io/controller-runtime/pkg/cluster"
4039
)
4140

42-
// The agent has two potentially different kcp clusters:
43-
//
44-
// endpointCluster - this is where the source of the virtual workspace URLs
45-
// live, i.e. where the APIExport/EndpointSlice.
46-
// managedCluster - this is where the APIExport and APIResourceSchemas
47-
// exist that are meant to be reconciled.
48-
//
49-
// The managedCluster always exists, the endpointCluster only if the workspace
50-
// for the virtual workspace source is different from the managed cluster.
51-
5241
// setupEndpointKcpCluster sets up a plain, non-kcp-aware ctrl-runtime Cluster object
5342
// that is solvely used to watch whichever object holds the virtual workspace URLs,
5443
// either the APIExport or the APIExportEndpointSlice.
55-
func setupEndpointKcpCluster(endpoint *syncEndpoint) (cluster.Cluster, error) {
56-
// no need for a dedicated endpoint cluster
57-
if endpoint.EndpointSlice == nil || endpoint.EndpointSlice.Cluster == endpoint.APIExport.Cluster {
58-
return nil, nil
59-
}
60-
44+
func setupEndpointKcpCluster(endpointSlice qualifiedAPIExportEndpointSlice) (cluster.Cluster, error) {
6145
scheme := runtime.NewScheme()
6246

6347
if err := kcpdevv1alpha1.AddToScheme(scheme); err != nil {
@@ -72,11 +56,11 @@ func setupEndpointKcpCluster(endpoint *syncEndpoint) (cluster.Cluster, error) {
7256
// restrict the cache's selectors accordingly so we can still make use of caching.
7357
byObject := map[ctrlruntimeclient.Object]cache.ByObject{
7458
&kcpdevv1alpha1.APIExportEndpointSlice{}: {
75-
Field: fields.SelectorFromSet(fields.Set{"metadata.name": endpoint.EndpointSlice.Name}),
59+
Field: fields.SelectorFromSet(fields.Set{"metadata.name": endpointSlice.Name}),
7660
},
7761
}
7862

79-
return cluster.New(endpoint.EndpointSlice.Config, func(o *cluster.Options) {
63+
return cluster.New(endpointSlice.Config, func(o *cluster.Options) {
8064
o.Scheme = scheme
8165
o.Cache = cache.Options{
8266
Scheme: scheme,
@@ -87,7 +71,7 @@ func setupEndpointKcpCluster(endpoint *syncEndpoint) (cluster.Cluster, error) {
8771

8872
// setupManagedKcpCluster sets up a plain, non-kcp-aware ctrl-runtime Cluster object
8973
// that is solvely used to manage the APIExport and APIResourceSchemas.
90-
func setupManagedKcpCluster(endpoint *syncEndpoint) (cluster.Cluster, error) {
74+
func setupManagedKcpCluster(apiExport qualifiedAPIExport) (cluster.Cluster, error) {
9175
scheme := runtime.NewScheme()
9276

9377
if err := kcpdevv1alpha1.AddToScheme(scheme); err != nil {
@@ -102,11 +86,11 @@ func setupManagedKcpCluster(endpoint *syncEndpoint) (cluster.Cluster, error) {
10286
// restrict the cache's selectors accordingly so we can still make use of caching.
10387
byObject := map[ctrlruntimeclient.Object]cache.ByObject{
10488
&kcpdevv1alpha1.APIExport{}: {
105-
Field: fields.SelectorFromSet(fields.Set{"metadata.name": endpoint.APIExport.Name}),
89+
Field: fields.SelectorFromSet(fields.Set{"metadata.name": apiExport.Name}),
10690
},
10791
}
10892

109-
return cluster.New(endpoint.APIExport.Config, func(o *cluster.Options) {
93+
return cluster.New(apiExport.Config, func(o *cluster.Options) {
11094
o.Scheme = scheme
11195
o.Cache = cache.Options{
11296
Scheme: scheme,
@@ -133,7 +117,7 @@ type qualifiedAPIExportEndpointSlice struct {
133117

134118
type syncEndpoint struct {
135119
APIExport qualifiedAPIExport
136-
EndpointSlice *qualifiedAPIExportEndpointSlice
120+
EndpointSlice qualifiedAPIExportEndpointSlice
137121
}
138122

139123
// resolveSyncEndpoint takes the user provided (usually via CLI flags) APIExportEndpointSliceRef and
@@ -142,7 +126,7 @@ type syncEndpoint struct {
142126
// must point to the cluster where the APIExport lives, and vice versa for the endpoint slice;
143127
// however the endpoint slice references an APIExport in potentially another cluster, and for this
144128
// case the initialRestConfig will be rewritten accordingly).
145-
func resolveSyncEndpoint(ctx context.Context, initialRestConfig *rest.Config, endpointSliceRef string, apiExportRef string) (*syncEndpoint, error) {
129+
func resolveSyncEndpoint(ctx context.Context, initialRestConfig *rest.Config, endpointSliceRef string) (*syncEndpoint, error) {
146130
// construct temporary, uncached client
147131
scheme := runtime.NewScheme()
148132
if err := kcpdevcorev1alpha1.AddToScheme(scheme); err != nil {
@@ -160,52 +144,32 @@ func resolveSyncEndpoint(ctx context.Context, initialRestConfig *rest.Config, en
160144

161145
se := &syncEndpoint{}
162146

163-
// When an endpoint ref is given, both the APIExportEndpointSlice and the APIExport must exist.
164-
if endpointSliceRef != "" {
165-
endpointSlice, err := resolveAPIExportEndpointSlice(ctx, client, endpointSliceRef)
166-
if err != nil {
167-
return nil, fmt.Errorf("failed to resolve APIExportEndpointSlice: %w", err)
168-
}
169-
endpointSlice.Config = initialRestConfig
170-
171-
// find the APIExport referenced not by the user (can't: both ref parameters to this function
172-
// are mutually exclusive), but in the APIExportEndpointSlice.
173-
restConfig, err := retargetRestConfig(initialRestConfig, endpointSlice.Spec.APIExport.Path)
174-
if err != nil {
175-
return nil, fmt.Errorf("failed to re-target the given kubeconfig to cluster %q: %w", endpointSlice.Spec.APIExport.Path, err)
176-
}
177-
178-
client, err := ctrlruntimeclient.New(restConfig, clientOpts)
179-
if err != nil {
180-
return nil, fmt.Errorf("failed to create service reader: %w", err)
181-
}
147+
// First we find the APIExportEndpointSlice.
148+
endpointSlice, err := resolveAPIExportEndpointSlice(ctx, client, endpointSliceRef)
149+
if err != nil {
150+
return nil, fmt.Errorf("failed to resolve APIExportEndpointSlice: %w", err)
151+
}
152+
endpointSlice.Config = initialRestConfig
182153

183-
apiExport, err := resolveAPIExport(ctx, client, endpointSlice.Spec.APIExport.Name)
184-
if err != nil {
185-
return nil, fmt.Errorf("failed to resolve APIExport: %w", err)
186-
}
187-
apiExport.Config = restConfig
188-
189-
se.APIExport = apiExport
190-
se.EndpointSlice = &endpointSlice
191-
} else { // if an export ref is given, the endpoint slice is optional (for compat with kcp <0.28)
192-
apiExport, err := resolveAPIExport(ctx, client, apiExportRef)
193-
if err != nil {
194-
return nil, fmt.Errorf("failed to resolve APIExport: %w", err)
195-
}
196-
apiExport.Config = initialRestConfig
154+
// Now we find the APIExport referenced in the APIExportEndpointSlice.
155+
restConfig, err := retargetRestConfig(initialRestConfig, endpointSlice.Spec.APIExport.Path)
156+
if err != nil {
157+
return nil, fmt.Errorf("failed to re-target the given kubeconfig to cluster %q: %w", endpointSlice.Spec.APIExport.Path, err)
158+
}
197159

198-
se.APIExport = apiExport
160+
client, err = ctrlruntimeclient.New(restConfig, clientOpts)
161+
if err != nil {
162+
return nil, fmt.Errorf("failed to create service reader: %w", err)
163+
}
199164

200-
// try to find an endpoint slice in the same workspace with the same name as the APIExport
201-
endpointSlice, err := resolveAPIExportEndpointSlice(ctx, client, apiExportRef)
202-
if ctrlruntimeclient.IgnoreNotFound(err) != nil {
203-
return nil, fmt.Errorf("failed to resolve APIExportEndpointSlice: %w", err)
204-
} else if err == nil {
205-
apiExport.Config = initialRestConfig
206-
se.EndpointSlice = &endpointSlice
207-
}
165+
apiExport, err := resolveAPIExport(ctx, client, endpointSlice.Spec.APIExport.Name)
166+
if err != nil {
167+
return nil, fmt.Errorf("failed to resolve APIExport: %w", err)
208168
}
169+
apiExport.Config = restConfig
170+
171+
se.APIExport = apiExport
172+
se.EndpointSlice = endpointSlice
209173

210174
return se, nil
211175
}

cmd/api-syncagent/main.go

Lines changed: 33 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,11 @@ import (
3232
"github.com/kcp-dev/api-syncagent/internal/controller/apiexport"
3333
"github.com/kcp-dev/api-syncagent/internal/controller/apiresourceschema"
3434
"github.com/kcp-dev/api-syncagent/internal/controller/syncmanager"
35+
"github.com/kcp-dev/api-syncagent/internal/kcp"
3536
syncagentlog "github.com/kcp-dev/api-syncagent/internal/log"
3637
"github.com/kcp-dev/api-syncagent/internal/version"
3738
syncagentv1alpha1 "github.com/kcp-dev/api-syncagent/sdk/apis/syncagent/v1alpha1"
3839

39-
kcpdevv1alpha1 "github.com/kcp-dev/sdk/apis/apis/v1alpha1"
40-
4140
corev1 "k8s.io/api/core/v1"
4241
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
4342
"k8s.io/apimachinery/pkg/runtime"
@@ -85,13 +84,11 @@ func main() {
8584

8685
func run(ctx context.Context, log *zap.SugaredLogger, opts *Options) error {
8786
v := version.NewAppVersion()
88-
hello := log.With("version", v.GitVersion, "name", opts.AgentName)
89-
90-
if opts.APIExportEndpointSliceRef != "" {
91-
hello = hello.With("apiexportendpointslice", opts.APIExportEndpointSliceRef)
92-
} else {
93-
hello = hello.With("apiexport", opts.APIExportRef)
94-
}
87+
hello := log.With(
88+
"version", v.GitVersion,
89+
"name", opts.AgentName,
90+
"apiexportendpointslice", opts.APIExportEndpointSliceRef,
91+
)
9592

9693
hello.Info("Moin, I'm the kcp Sync Agent")
9794

@@ -112,22 +109,20 @@ func run(ctx context.Context, log *zap.SugaredLogger, opts *Options) error {
112109
return fmt.Errorf("kcp kubeconfig does not point to a specific workspace")
113110
}
114111

115-
// We check if the APIExport/APIExportEndpointSlice exists and extract information we need to set up our kcpCluster.
116-
endpoint, err := resolveSyncEndpoint(ctx, kcpRestConfig, opts.APIExportEndpointSliceRef, opts.APIExportRef)
112+
// We check if the APIExportEndpointSlice exists and extract information we need to set up our kcpCluster.
113+
endpoint, err := resolveSyncEndpoint(ctx, kcpRestConfig, opts.APIExportEndpointSliceRef)
117114
if err != nil {
118115
return fmt.Errorf("failed to resolve APIExport/EndpointSlice: %w", err)
119116
}
120117

121118
log.Infow("Resolved APIExport", "name", endpoint.APIExport.Name, "workspace", endpoint.APIExport.Path, "logicalcluster", endpoint.APIExport.Cluster)
122-
123-
if s := endpoint.EndpointSlice; s != nil {
124-
log.Infow("Using APIExportEndpointSlice", "name", endpoint.EndpointSlice.Name, "workspace", s.Path, "logicalcluster", s.Cluster)
125-
}
119+
log.Infow("Using APIExportEndpointSlice", "name", endpoint.EndpointSlice.Name, "workspace", endpoint.EndpointSlice.Path, "logicalcluster", endpoint.EndpointSlice.Cluster)
126120

127121
// init the "permanent" kcp cluster connections
128122

129-
// always need the managedKcpCluster
130-
managedKcpCluster, err := setupManagedKcpCluster(endpoint)
123+
// always need the managedKcpCluster, this is where we will manage the APIExport and
124+
// its resource schemas.
125+
managedKcpCluster, err := setupManagedKcpCluster(endpoint.APIExport)
131126
if err != nil {
132127
return fmt.Errorf("failed to initialize managed kcp cluster: %w", err)
133128
}
@@ -138,18 +133,32 @@ func run(ctx context.Context, log *zap.SugaredLogger, opts *Options) error {
138133
return fmt.Errorf("failed to add managed kcp cluster runnable: %w", err)
139134
}
140135

141-
// the endpoint cluster can be nil
142-
endpointKcpCluster, err := setupEndpointKcpCluster(endpoint)
143-
if err != nil {
144-
return fmt.Errorf("failed to initialize endpoint kcp cluster: %w", err)
145-
}
136+
// If needed, start an additional cluster for the endpoint workspace, where
137+
// the EndpointSlice lives.
138+
if endpoint.EndpointSlice.Cluster != endpoint.APIExport.Cluster {
139+
endpointKcpCluster, err := setupEndpointKcpCluster(endpoint.EndpointSlice)
140+
if err != nil {
141+
return fmt.Errorf("failed to initialize endpoint kcp cluster: %w", err)
142+
}
146143

147-
if endpointKcpCluster != nil {
148144
if err := mgr.Add(endpointKcpCluster); err != nil {
149145
return fmt.Errorf("failed to add endpoint kcp cluster runnable: %w", err)
150146
}
151147
}
152148

149+
// Setup the magical dynamic multicluster manager. It's a dynamic version of the
150+
// regular mcmanager, capable of starting new controllers at any later time and
151+
// allowing them to be also stopped at any time. The syncmanager needs it to
152+
// start/stop sync controllers for each PublishedResource.
153+
dmcm, err := kcp.NewDynamicMultiClusterManager(endpoint.EndpointSlice.Config, endpoint.EndpointSlice.Name)
154+
if err != nil {
155+
return fmt.Errorf("failed to start dynamic multi cluster manager: %w", err)
156+
}
157+
158+
if err := mgr.Add(dmcm); err != nil {
159+
return fmt.Errorf("failed to add endpoint kcp cluster runnable: %w", err)
160+
}
161+
153162
startController := func(name string, creator func() error) error {
154163
if slices.Contains(opts.DisabledControllers, name) {
155164
log.Infof("Not starting %s controller because it is disabled.", name)
@@ -178,20 +187,7 @@ func run(ctx context.Context, log *zap.SugaredLogger, opts *Options) error {
178187
// This controller is called "sync" because it makes the most sense to the users, even though internally the relevant
179188
// controller is the syncmanager (which in turn would start/stop the sync controllers).
180189
if err := startController("sync", func() error {
181-
cluster := endpointKcpCluster
182-
if cluster == nil {
183-
cluster = managedKcpCluster
184-
}
185-
186-
var endpointSlice *kcpdevv1alpha1.APIExportEndpointSlice
187-
if endpoint.EndpointSlice != nil {
188-
endpointSlice = endpoint.EndpointSlice.APIExportEndpointSlice
189-
}
190-
191-
// It doesn't matter which rest config we specify, as the URL will be overwritten with the
192-
// virtual workspace URL anyway.
193-
194-
return syncmanager.Add(ctx, mgr, cluster, kcpRestConfig, log, endpoint.APIExport.APIExport, endpointSlice, opts.PublishedResourceSelector, opts.Namespace, opts.AgentName)
190+
return syncmanager.Add(ctx, mgr, dmcm, log, opts.PublishedResourceSelector, opts.Namespace, opts.AgentName)
195191
}); err != nil {
196192
return err
197193
}

cmd/api-syncagent/options.go

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,6 @@ type Options struct {
5454
// If not given, defaults to "<service ref>-syncagent".
5555
AgentName string
5656

57-
// APIExportRef references the APIExport within a kcp workspace that this
58-
// Sync Agent should work with by name. The APIExport has to already exist, but it must not have
59-
// any pre-existing resource schemas configured, the agent will fill them in based on
60-
// PublishedResources.
61-
//
62-
// Deprecated: Use APIExportEndpointSliceRef instead. If an APIExport is referenced, the agent
63-
// will attempt to find and use an endpoint slice of the same name.
64-
APIExportRef string
65-
6657
// APIExportEndpointSliceRef references the APIExportEndpointSlice within a kcp workspace that this
6758
// Sync Agent should work with by name. The agent will automatically manage the resource schemas
6859
// in the APIExport referenced by this endpoint slice.
@@ -96,7 +87,6 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
9687
flags.StringVar(&o.KcpKubeconfig, "kcp-kubeconfig", o.KcpKubeconfig, "kubeconfig file of kcp")
9788
flags.StringVar(&o.Namespace, "namespace", o.Namespace, "Kubernetes namespace the Sync Agent is running in")
9889
flags.StringVar(&o.AgentName, "agent-name", o.AgentName, "name of this Sync Agent, must not be changed after the first run, can be left blank to auto-generate a name")
99-
flags.StringVar(&o.APIExportRef, "apiexport-ref", o.APIExportRef, "name of the APIExport in kcp that this Sync Agent is powering (deprecated, use --apiexportendpointslice-ref instead)")
10090
flags.StringVar(&o.APIExportEndpointSliceRef, "apiexportendpointslice-ref", o.APIExportEndpointSliceRef, "name of the APIExportEndpointSlice in kcp that this Sync Agent is powering")
10191
flags.StringVar(&o.PublishedResourceSelectorString, "published-resource-selector", o.PublishedResourceSelectorString, "restrict this Sync Agent to only process PublishedResources matching this label selector (optional)")
10292
flags.BoolVar(&o.EnableLeaderElection, "enable-leader-election", o.EnableLeaderElection, "whether to perform leader election")
@@ -124,12 +114,8 @@ func (o *Options) Validate() error {
124114
}
125115
}
126116

127-
if len(o.APIExportRef) == 0 && len(o.APIExportEndpointSliceRef) == 0 {
128-
errs = append(errs, errors.New("either --apiexportendpointslice-ref or --apiexport-ref is required"))
129-
}
130-
131-
if len(o.APIExportRef) != 0 && len(o.APIExportEndpointSliceRef) != 0 {
132-
errs = append(errs, errors.New("--apiexportendpointslice-ref and --apiexport-ref are mutually exclusive"))
117+
if len(o.APIExportEndpointSliceRef) == 0 {
118+
errs = append(errs, errors.New("--apiexportendpointslice-ref is required"))
133119
}
134120

135121
if len(o.KcpKubeconfig) == 0 {
@@ -156,7 +142,7 @@ func (o *Options) Complete() error {
156142
errs := []error{}
157143

158144
if len(o.AgentName) == 0 {
159-
o.AgentName = o.APIExportRef + "-syncagent"
145+
o.AgentName = o.APIExportEndpointSliceRef + "-syncagent"
160146
}
161147

162148
if s := o.PublishedResourceSelectorString; len(s) > 0 {

internal/controller/apiexport/controller.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222

2323
"github.com/kcp-dev/logicalcluster/v3"
24+
kcpdevv1alpha1 "github.com/kcp-dev/sdk/apis/apis/v1alpha1"
2425
"go.uber.org/zap"
2526

2627
"github.com/kcp-dev/api-syncagent/internal/controllerutil"
@@ -31,8 +32,6 @@ import (
3132
"github.com/kcp-dev/api-syncagent/internal/resources/reconciling"
3233
syncagentv1alpha1 "github.com/kcp-dev/api-syncagent/sdk/apis/syncagent/v1alpha1"
3334

34-
kcpdevv1alpha1 "github.com/kcp-dev/sdk/apis/apis/v1alpha1"
35-
3635
"k8s.io/apimachinery/pkg/labels"
3736
"k8s.io/apimachinery/pkg/runtime/schema"
3837
"k8s.io/apimachinery/pkg/types"

internal/controller/apiexport/reconciler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ import (
2121
"slices"
2222
"strings"
2323

24+
kcpdevv1alpha1 "github.com/kcp-dev/sdk/apis/apis/v1alpha1"
25+
2426
"github.com/kcp-dev/api-syncagent/internal/resources/reconciling"
2527
syncagentv1alpha1 "github.com/kcp-dev/api-syncagent/sdk/apis/syncagent/v1alpha1"
2628

27-
kcpdevv1alpha1 "github.com/kcp-dev/sdk/apis/apis/v1alpha1"
28-
2929
corev1 "k8s.io/api/core/v1"
3030
"k8s.io/apimachinery/pkg/runtime"
3131
"k8s.io/apimachinery/pkg/runtime/schema"

0 commit comments

Comments
 (0)