diff --git a/core/pkg/ingress/controller/controller.go b/core/pkg/ingress/controller/controller.go index c7d854dc5..5489219e8 100644 --- a/core/pkg/ingress/controller/controller.go +++ b/core/pkg/ingress/controller/controller.go @@ -39,6 +39,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/flowcontrol" + "k8s.io/kubernetes/pkg/client/leaderelection/resourcelock" "k8s.io/ingress/core/pkg/ingress" "k8s.io/ingress/core/pkg/ingress/annotations/class" @@ -49,7 +50,6 @@ import ( "k8s.io/ingress/core/pkg/ingress/defaults" "k8s.io/ingress/core/pkg/ingress/resolver" "k8s.io/ingress/core/pkg/ingress/status" - "k8s.io/ingress/core/pkg/ingress/status/leaderelection/resourcelock" "k8s.io/ingress/core/pkg/ingress/store" "k8s.io/ingress/core/pkg/k8s" "k8s.io/ingress/core/pkg/net/ssl" diff --git a/core/pkg/ingress/status/election.go b/core/pkg/ingress/status/election.go deleted file mode 100644 index 17fc0dd0f..000000000 --- a/core/pkg/ingress/status/election.go +++ /dev/null @@ -1,122 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package status - -import ( - "encoding/json" - "os" - "time" - - "github.com/golang/glog" - - "k8s.io/apimachinery/pkg/api/errors" - meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - client "k8s.io/client-go/kubernetes" - def_api "k8s.io/client-go/pkg/api" - api "k8s.io/client-go/pkg/api/v1" - "k8s.io/client-go/tools/record" - - "k8s.io/ingress/core/pkg/ingress/status/leaderelection" - "k8s.io/ingress/core/pkg/ingress/status/leaderelection/resourcelock" -) - -func getCurrentLeader(electionID, namespace string, c client.Interface) (string, *api.Endpoints, error) { - endpoints, err := c.Core().Endpoints(namespace).Get(electionID, meta_v1.GetOptions{}) - if err != nil { - return "", nil, err - } - val, found := endpoints.Annotations[resourcelock.LeaderElectionRecordAnnotationKey] - if !found { - return "", endpoints, nil - } - electionRecord := resourcelock.LeaderElectionRecord{} - if err = json.Unmarshal([]byte(val), &electionRecord); err != nil { - return "", nil, err - } - return electionRecord.HolderIdentity, endpoints, err -} - -// NewElection creates an election. 'namespace'/'election' should be an existing Kubernetes Service -// 'id' is the id if this leader, should be unique. -func NewElection(electionID, - id, - namespace string, - ttl time.Duration, - callback func(leader string), - c client.Interface) (*leaderelection.LeaderElector, error) { - - _, err := c.Core().Endpoints(namespace).Get(electionID, meta_v1.GetOptions{}) - if err != nil { - if errors.IsNotFound(err) { - _, err = c.Core().Endpoints(namespace).Create(&api.Endpoints{ - ObjectMeta: meta_v1.ObjectMeta{ - Name: electionID, - }, - }) - if err != nil && !errors.IsConflict(err) { - return nil, err - } - } else { - return nil, err - } - } - - callbacks := leaderelection.LeaderCallbacks{ - OnStartedLeading: func(stop <-chan struct{}) { - callback(id) - }, - OnStoppedLeading: func() { - leader, _, err := getCurrentLeader(electionID, namespace, c) - if err != nil { - glog.Errorf("failed to get leader: %v", err) - // empty string means leader is unknown - callback("") - return - } - callback(leader) - }, - } - - broadcaster := record.NewBroadcaster() - hostname, err := os.Hostname() - if err != nil { - return nil, err - } - recorder := broadcaster.NewRecorder(def_api.Scheme, api.EventSource{ - Component: "ingress-leader-elector", - Host: hostname, - }) - - lock := resourcelock.EndpointsLock{ - EndpointsMeta: meta_v1.ObjectMeta{Namespace: namespace, Name: electionID}, - Client: c, - LockConfig: resourcelock.ResourceLockConfig{ - Identity: id, - EventRecorder: recorder, - }, - } - - config := leaderelection.LeaderElectionConfig{ - Lock: &lock, - LeaseDuration: ttl, - RenewDeadline: ttl / 2, - RetryPeriod: ttl / 4, - Callbacks: callbacks, - } - - return leaderelection.NewLeaderElector(config) -} diff --git a/core/pkg/ingress/status/election_test.go b/core/pkg/ingress/status/election_test.go deleted file mode 100644 index 3e6504d00..000000000 --- a/core/pkg/ingress/status/election_test.go +++ /dev/null @@ -1,132 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package status - -import ( - "encoding/json" - "testing" - "time" - - meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes/fake" - "k8s.io/client-go/pkg/api" - api_v1 "k8s.io/client-go/pkg/api/v1" - - "k8s.io/ingress/core/pkg/ingress/status/leaderelection/resourcelock" -) - -func TestGetCurrentLeaderLeaderExist(t *testing.T) { - fkER := resourcelock.LeaderElectionRecord{ - HolderIdentity: "currentLeader", - LeaseDurationSeconds: 30, - AcquireTime: meta_v1.NewTime(time.Now()), - RenewTime: meta_v1.NewTime(time.Now()), - LeaderTransitions: 3, - } - leaderInfo, _ := json.Marshal(fkER) - fkEndpoints := api_v1.Endpoints{ - ObjectMeta: meta_v1.ObjectMeta{ - Name: "ingress-controller-test", - Namespace: api.NamespaceSystem, - Annotations: map[string]string{ - resourcelock.LeaderElectionRecordAnnotationKey: string(leaderInfo), - }, - }, - } - fk := fake.NewSimpleClientset(&api_v1.EndpointsList{Items: []api_v1.Endpoints{fkEndpoints}}) - identity, endpoints, err := getCurrentLeader("ingress-controller-test", api.NamespaceSystem, fk) - if err != nil { - t.Fatalf("expected identitiy and endpoints but returned error %s", err) - } - - if endpoints == nil { - t.Fatalf("returned nil but expected an endpoints") - } - - if identity != "currentLeader" { - t.Fatalf("returned %v but expected %v", identity, "currentLeader") - } -} - -func TestGetCurrentLeaderLeaderNotExist(t *testing.T) { - fkEndpoints := api_v1.Endpoints{ - ObjectMeta: meta_v1.ObjectMeta{ - Name: "ingress-controller-test", - Namespace: api.NamespaceSystem, - Annotations: map[string]string{}, - }, - } - fk := fake.NewSimpleClientset(&api_v1.EndpointsList{Items: []api_v1.Endpoints{fkEndpoints}}) - identity, endpoints, err := getCurrentLeader("ingress-controller-test", api.NamespaceSystem, fk) - if err != nil { - t.Fatalf("unexpeted error: %v", err) - } - - if endpoints == nil { - t.Fatalf("returned nil but expected an endpoints") - } - - if identity != "" { - t.Fatalf("returned %s but expected %s", identity, "") - } -} - -func TestGetCurrentLeaderAnnotationError(t *testing.T) { - fkEndpoints := api_v1.Endpoints{ - ObjectMeta: meta_v1.ObjectMeta{ - Name: "ingress-controller-test", - Namespace: api.NamespaceSystem, - Annotations: map[string]string{ - resourcelock.LeaderElectionRecordAnnotationKey: "just-test-error-leader-annotation", - }, - }, - } - fk := fake.NewSimpleClientset(&api_v1.EndpointsList{Items: []api_v1.Endpoints{fkEndpoints}}) - _, _, err := getCurrentLeader("ingress-controller-test", api.NamespaceSystem, fk) - if err == nil { - t.Errorf("expected error") - } -} - -func TestNewElection(t *testing.T) { - fk := fake.NewSimpleClientset(&api_v1.EndpointsList{Items: []api_v1.Endpoints{ - { - ObjectMeta: meta_v1.ObjectMeta{ - Name: "ingress-controller-test", - Namespace: api.NamespaceSystem, - }, - }, - { - ObjectMeta: meta_v1.ObjectMeta{ - Name: "ingress-controller-test-020", - Namespace: api.NamespaceSystem, - }, - }, - }}) - - ne, err := NewElection("ingress-controller-test", "startLeader", api.NamespaceSystem, 4*time.Second, func(leader string) { - // do nothing - go t.Logf("execute callback fun, leader is: %s", leader) - }, fk) - if err != nil { - t.Fatalf("unexpected error %v", err) - } - - if ne == nil { - t.Fatalf("unexpected nil") - } -} diff --git a/core/pkg/ingress/status/leaderelection/leaderelection.go b/core/pkg/ingress/status/leaderelection/leaderelection.go deleted file mode 100644 index 2fd7b01ec..000000000 --- a/core/pkg/ingress/status/leaderelection/leaderelection.go +++ /dev/null @@ -1,333 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package leaderelection implements leader election of a set of endpoints. -// It uses an annotation in the endpoints object to store the record of the -// election state. -// -// This implementation does not guarantee that only one client is acting as a -// leader (a.k.a. fencing). A client observes timestamps captured locally to -// infer the state of the leader election. Thus the implementation is tolerant -// to arbitrary clock skew, but is not tolerant to arbitrary clock skew rate. -// -// However the level of tolerance to skew rate can be configured by setting -// RenewDeadline and LeaseDuration appropriately. The tolerance expressed as a -// maximum tolerated ratio of time passed on the fastest node to time passed on -// the slowest node can be approximately achieved with a configuration that sets -// the same ratio of LeaseDuration to RenewDeadline. For example if a user wanted -// to tolerate some nodes progressing forward in time twice as fast as other nodes, -// the user could set LeaseDuration to 60 seconds and RenewDeadline to 30 seconds. -// -// While not required, some method of clock synchronization between nodes in the -// cluster is highly recommended. It's important to keep in mind when configuring -// this client that the tolerance to skew rate varies inversely to master -// availability. -// -// Larger clusters often have a more lenient SLA for API latency. This should be -// taken into account when configuring the client. The rate of leader transitions -// should be monitored and RetryPeriod and LeaseDuration should be increased -// until the rate is stable and acceptably low. It's important to keep in mind -// when configuring this client that the tolerance to API latency varies inversely -// to master availability. -// -// DISCLAIMER: this is an alpha API. This library will likely change significantly -// or even be removed entirely in subsequent releases. Depend on this API at -// your own risk. -package leaderelection - -import ( - "fmt" - "reflect" - "time" - - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" - rl "k8s.io/ingress/core/pkg/ingress/status/leaderelection/resourcelock" - - "github.com/golang/glog" - "github.com/spf13/pflag" -) - -const ( - JitterFactor = 1.2 - DefaultLeaseDuration = 15 * time.Second - DefaultRenewDeadline = 10 * time.Second - DefaultRetryPeriod = 2 * time.Second -) - -// LeaderElectionConfiguration defines the configuration of leader election -// clients for components that can run with leader election enabled. -type LeaderElectionConfiguration struct { - // leaderElect enables a leader election client to gain leadership - // before executing the main loop. Enable this when running replicated - // components for high availability. - LeaderElect bool - // leaseDuration is the duration that non-leader candidates will wait - // after observing a leadership renewal until attempting to acquire - // leadership of a led but unrenewed leader slot. This is effectively the - // maximum duration that a leader can be stopped before it is replaced - // by another candidate. This is only applicable if leader election is - // enabled. - LeaseDuration metav1.Duration - // renewDeadline is the interval between attempts by the acting master to - // renew a leadership slot before it stops leading. This must be less - // than or equal to the lease duration. This is only applicable if leader - // election is enabled. - RenewDeadline metav1.Duration - // retryPeriod is the duration the clients should wait between attempting - // acquisition and renewal of a leadership. This is only applicable if - // leader election is enabled. - RetryPeriod metav1.Duration -} - -// NewLeadereElector creates a LeaderElector from a LeaderElecitionConfig -func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) { - if lec.LeaseDuration <= lec.RenewDeadline { - return nil, fmt.Errorf("leaseDuration must be greater than renewDeadline") - } - if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) { - return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor") - } - if lec.Lock == nil { - return nil, fmt.Errorf("Lock must not be nil.") - } - return &LeaderElector{ - config: lec, - }, nil -} - -type LeaderElectionConfig struct { - // Lock is the resource that will be used for locking - Lock rl.Interface - - // LeaseDuration is the duration that non-leader candidates will - // wait to force acquire leadership. This is measured against time of - // last observed ack. - LeaseDuration time.Duration - // RenewDeadline is the duration that the acting master will retry - // refreshing leadership before giving up. - RenewDeadline time.Duration - // RetryPeriod is the duration the LeaderElector clients should wait - // between tries of actions. - RetryPeriod time.Duration - - // Callbacks are callbacks that are triggered during certain lifecycle - // events of the LeaderElector - Callbacks LeaderCallbacks -} - -// LeaderCallbacks are callbacks that are triggered during certain -// lifecycle events of the LeaderElector. These are invoked asynchronously. -// -// possible future callbacks: -// * OnChallenge() -type LeaderCallbacks struct { - // OnStartedLeading is called when a LeaderElector client starts leading - OnStartedLeading func(stop <-chan struct{}) - // OnStoppedLeading is called when a LeaderElector client stops leading - OnStoppedLeading func() - // OnNewLeader is called when the client observes a leader that is - // not the previously observed leader. This includes the first observed - // leader when the client starts. - OnNewLeader func(identity string) -} - -// LeaderElector is a leader election client. -// -// possible future methods: -// * (le *LeaderElector) IsLeader() -// * (le *LeaderElector) GetLeader() -type LeaderElector struct { - config LeaderElectionConfig - // internal bookkeeping - observedRecord rl.LeaderElectionRecord - observedTime time.Time - // used to implement OnNewLeader(), may lag slightly from the - // value observedRecord.HolderIdentity if the transition has - // not yet been reported. - reportedLeader string -} - -// Run starts the leader election loop -func (le *LeaderElector) Run() { - defer func() { - runtime.HandleCrash() - le.config.Callbacks.OnStoppedLeading() - }() - le.acquire() - stop := make(chan struct{}) - go le.config.Callbacks.OnStartedLeading(stop) - le.renew() - close(stop) -} - -// RunOrDie starts a client with the provided config or panics if the config -// fails to validate. -func RunOrDie(lec LeaderElectionConfig) { - le, err := NewLeaderElector(lec) - if err != nil { - panic(err) - } - le.Run() -} - -// GetLeader returns the identity of the last observed leader or returns the empty string if -// no leader has yet been observed. -func (le *LeaderElector) GetLeader() string { - return le.observedRecord.HolderIdentity -} - -// IsLeader returns true if the last observed leader was this client else returns false. -func (le *LeaderElector) IsLeader() bool { - return le.observedRecord.HolderIdentity == le.config.Lock.Identity() -} - -// acquire loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew succeeds. -func (le *LeaderElector) acquire() { - stop := make(chan struct{}) - glog.Infof("attempting to acquire leader lease...") - wait.JitterUntil(func() { - succeeded := le.tryAcquireOrRenew() - le.maybeReportTransition() - desc := le.config.Lock.Describe() - if !succeeded { - glog.V(4).Infof("failed to acquire lease %v", desc) - return - } - le.config.Lock.RecordEvent("became leader") - glog.Infof("successfully acquired lease %v", desc) - close(stop) - }, le.config.RetryPeriod, JitterFactor, true, stop) -} - -// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails. -func (le *LeaderElector) renew() { - stop := make(chan struct{}) - wait.Until(func() { - err := wait.Poll(le.config.RetryPeriod, le.config.RenewDeadline, func() (bool, error) { - return le.tryAcquireOrRenew(), nil - }) - le.maybeReportTransition() - desc := le.config.Lock.Describe() - if err == nil { - glog.V(4).Infof("successfully renewed lease %v", desc) - return - } - le.config.Lock.RecordEvent("stopped leading") - glog.Infof("failed to renew lease %v", desc) - close(stop) - }, 0, stop) -} - -// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired, -// else it tries to renew the lease if it has already been acquired. Returns true -// on success else returns false. -func (le *LeaderElector) tryAcquireOrRenew() bool { - now := metav1.Now() - leaderElectionRecord := rl.LeaderElectionRecord{ - HolderIdentity: le.config.Lock.Identity(), - LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second), - RenewTime: now, - AcquireTime: now, - } - - // 1. obtain or create the ElectionRecord - oldLeaderElectionRecord, err := le.config.Lock.Get() - if err != nil { - if !errors.IsNotFound(err) { - glog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err) - return false - } - if err = le.config.Lock.Create(leaderElectionRecord); err != nil { - glog.Errorf("error initially creating leader election record: %v", err) - return false - } - le.observedRecord = leaderElectionRecord - le.observedTime = time.Now() - return true - } - - // 2. Record obtained, check the Identity & Time - if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) { - le.observedRecord = *oldLeaderElectionRecord - le.observedTime = time.Now() - } - if le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && - oldLeaderElectionRecord.HolderIdentity != le.config.Lock.Identity() { - glog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity) - return false - } - - // 3. We're going to try to update. The leaderElectionRecord is set to it's default - // here. Let's correct it before updating. - if oldLeaderElectionRecord.HolderIdentity == le.config.Lock.Identity() { - leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime - leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions - } else { - leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1 - } - - // update the lock itself - if err = le.config.Lock.Update(leaderElectionRecord); err != nil { - glog.Errorf("Failed to update lock: %v", err) - return false - } - le.observedRecord = leaderElectionRecord - le.observedTime = time.Now() - return true -} - -func (l *LeaderElector) maybeReportTransition() { - if l.observedRecord.HolderIdentity == l.reportedLeader { - return - } - l.reportedLeader = l.observedRecord.HolderIdentity - if l.config.Callbacks.OnNewLeader != nil { - go l.config.Callbacks.OnNewLeader(l.reportedLeader) - } -} - -func DefaultLeaderElectionConfiguration() LeaderElectionConfiguration { - return LeaderElectionConfiguration{ - LeaderElect: false, - LeaseDuration: metav1.Duration{Duration: DefaultLeaseDuration}, - RenewDeadline: metav1.Duration{Duration: DefaultRenewDeadline}, - RetryPeriod: metav1.Duration{Duration: DefaultRetryPeriod}, - } -} - -// BindFlags binds the common LeaderElectionCLIConfig flags to a flagset -func BindFlags(l *LeaderElectionConfiguration, fs *pflag.FlagSet) { - fs.BoolVar(&l.LeaderElect, "leader-elect", l.LeaderElect, ""+ - "Start a leader election client and gain leadership before "+ - "executing the main loop. Enable this when running replicated "+ - "components for high availability.") - fs.DurationVar(&l.LeaseDuration.Duration, "leader-elect-lease-duration", l.LeaseDuration.Duration, ""+ - "The duration that non-leader candidates will wait after observing a leadership "+ - "renewal until attempting to acquire leadership of a led but unrenewed leader "+ - "slot. This is effectively the maximum duration that a leader can be stopped "+ - "before it is replaced by another candidate. This is only applicable if leader "+ - "election is enabled.") - fs.DurationVar(&l.RenewDeadline.Duration, "leader-elect-renew-deadline", l.RenewDeadline.Duration, ""+ - "The interval between attempts by the acting master to renew a leadership slot "+ - "before it stops leading. This must be less than or equal to the lease duration. "+ - "This is only applicable if leader election is enabled.") - fs.DurationVar(&l.RetryPeriod.Duration, "leader-elect-retry-period", l.RetryPeriod.Duration, ""+ - "The duration the clients should wait between attempting acquisition and renewal "+ - "of a leadership. This is only applicable if leader election is enabled.") -} diff --git a/core/pkg/ingress/status/leaderelection/resourcelock/endpointslock.go b/core/pkg/ingress/status/leaderelection/resourcelock/endpointslock.go deleted file mode 100644 index 26d5782b6..000000000 --- a/core/pkg/ingress/status/leaderelection/resourcelock/endpointslock.go +++ /dev/null @@ -1,103 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package resourcelock - -import ( - "encoding/json" - "errors" - "fmt" - - meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/pkg/api/v1" -) - -type EndpointsLock struct { - // EndpointsMeta should contain a Name and a Namespace of an - // Endpoints object that the LeaderElector will attempt to lead. - EndpointsMeta meta_v1.ObjectMeta - Client clientset.Interface - LockConfig ResourceLockConfig - e *v1.Endpoints -} - -func (el *EndpointsLock) Get() (*LeaderElectionRecord, error) { - var record LeaderElectionRecord - var err error - el.e, err = el.Client.Core().Endpoints(el.EndpointsMeta.Namespace).Get(el.EndpointsMeta.Name, meta_v1.GetOptions{}) - if err != nil { - return nil, err - } - if el.e.Annotations == nil { - el.e.Annotations = make(map[string]string) - } - if recordBytes, found := el.e.Annotations[LeaderElectionRecordAnnotationKey]; found { - if err := json.Unmarshal([]byte(recordBytes), &record); err != nil { - return nil, err - } - } - return &record, nil -} - -// Create attempts to create a LeaderElectionRecord annotation -func (el *EndpointsLock) Create(ler LeaderElectionRecord) error { - recordBytes, err := json.Marshal(ler) - if err != nil { - return err - } - el.e, err = el.Client.Core().Endpoints(el.EndpointsMeta.Namespace).Create(&v1.Endpoints{ - ObjectMeta: meta_v1.ObjectMeta{ - Name: el.EndpointsMeta.Name, - Namespace: el.EndpointsMeta.Namespace, - Annotations: map[string]string{ - LeaderElectionRecordAnnotationKey: string(recordBytes), - }, - }, - }) - return err -} - -// Update will update and existing annotation on a given resource. -func (el *EndpointsLock) Update(ler LeaderElectionRecord) error { - if el.e == nil { - return errors.New("endpoint not initialized, call get or create first") - } - recordBytes, err := json.Marshal(ler) - if err != nil { - return err - } - el.e.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes) - el.e, err = el.Client.Core().Endpoints(el.EndpointsMeta.Namespace).Update(el.e) - return err -} - -// RecordEvent in leader election while adding meta-data -func (el *EndpointsLock) RecordEvent(s string) { - events := fmt.Sprintf("%v %v", el.LockConfig.Identity, s) - el.LockConfig.EventRecorder.Eventf(&v1.Endpoints{ObjectMeta: el.e.ObjectMeta}, v1.EventTypeNormal, "LeaderElection", events) -} - -// Describe is used to convert details on current resource lock -// into a string -func (el *EndpointsLock) Describe() string { - return fmt.Sprintf("%v/%v", el.EndpointsMeta.Namespace, el.EndpointsMeta.Name) -} - -// returns the Identity of the lock -func (el *EndpointsLock) Identity() string { - return el.LockConfig.Identity -} diff --git a/core/pkg/ingress/status/leaderelection/resourcelock/interface.go b/core/pkg/ingress/status/leaderelection/resourcelock/interface.go deleted file mode 100644 index 7b3775b9b..000000000 --- a/core/pkg/ingress/status/leaderelection/resourcelock/interface.go +++ /dev/null @@ -1,71 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package resourcelock - -import ( - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/record" -) - -const ( - LeaderElectionRecordAnnotationKey = "control-plane.alpha.kubernetes.io/leader" -) - -// LeaderElectionRecord is the record that is stored in the leader election annotation. -// This information should be used for observational purposes only and could be replaced -// with a random string (e.g. UUID) with only slight modification of this code. -// TODO(mikedanese): this should potentially be versioned -type LeaderElectionRecord struct { - HolderIdentity string `json:"holderIdentity"` - LeaseDurationSeconds int `json:"leaseDurationSeconds"` - AcquireTime metav1.Time `json:"acquireTime"` - RenewTime metav1.Time `json:"renewTime"` - LeaderTransitions int `json:"leaderTransitions"` -} - -// ResourceLockConfig common data that exists across different -// resource locks -type ResourceLockConfig struct { - Identity string - EventRecorder record.EventRecorder -} - -// Interface offers a common interface for locking on arbitrary -// resources used in leader election. The Interface is used -// to hide the details on specific implementations in order to allow -// them to change over time. This interface is strictly for use -// by the leaderelection code. -type Interface interface { - // Get returns the LeaderElectionRecord - Get() (*LeaderElectionRecord, error) - - // Create attempts to create a LeaderElectionRecord - Create(ler LeaderElectionRecord) error - - // Update will update and existing LeaderElectionRecord - Update(ler LeaderElectionRecord) error - - // RecordEvent is used to record events - RecordEvent(string) - - // Identity will return the locks Identity - Identity() string - - // Describe is used to convert details on current resource lock - // into a string - Describe() string -} diff --git a/core/pkg/ingress/status/status.go b/core/pkg/ingress/status/status.go index aa20b4728..3bd7d480a 100644 --- a/core/pkg/ingress/status/status.go +++ b/core/pkg/ingress/status/status.go @@ -31,9 +31,11 @@ import ( clientset "k8s.io/client-go/kubernetes" api_v1 "k8s.io/client-go/pkg/api/v1" extensions "k8s.io/client-go/pkg/apis/extensions/v1beta1" + "k8s.io/client-go/tools/record" + "k8s.io/kubernetes/pkg/client/leaderelection" + "k8s.io/kubernetes/pkg/client/leaderelection/resourcelock" "k8s.io/ingress/core/pkg/ingress/annotations/class" - "k8s.io/ingress/core/pkg/ingress/status/leaderelection" "k8s.io/ingress/core/pkg/ingress/store" "k8s.io/ingress/core/pkg/k8s" "k8s.io/ingress/core/pkg/strings" @@ -52,13 +54,15 @@ type Sync interface { // Config ... type Config struct { - Client clientset.Interface + Client *clientset.Clientset PublishService string IngressLister store.IngressLister ElectionID string DefaultIngressClass string IngressClass string + + Recorder record.EventRecorder } // statusSync keeps the status IP in each Ingress rule updated executing a periodic check @@ -195,12 +199,26 @@ func NewStatusSyncer(config Config) Sync { id = fmt.Sprintf("%v-%v", config.ElectionID, config.IngressClass) } - le, err := NewElection(id, - pod.Name, pod.Namespace, 30*time.Second, - st.callback, config.Client) + rl, err := resourcelock.New("ConfigMapsResourceLock", + pod.Namespace, + id, + config.Client, + resourcelock.ResourceLockConfig{ + Identity: id, + EventRecorder: config.Recorder, + }) if err != nil { - glog.Fatalf("unexpected error starting leader election: %v", err) + glog.Fatalf("error creating lock: %v", err) } + + le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ + Lock: rl, + LeaseDuration: updateInterval, + RenewDeadline: updateInterval, + RetryPeriod: updateInterval, + Callbacks: st.callback, + }) + st.elector = le return st }