2016-11-10 22:56:29 +00:00
|
|
|
/*
|
|
|
|
Copyright 2015 The Kubernetes Authors.
|
|
|
|
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
you may not use this file except in compliance with the License.
|
|
|
|
You may obtain a copy of the License at
|
|
|
|
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
limitations under the License.
|
|
|
|
*/
|
|
|
|
|
|
|
|
package status
|
|
|
|
|
|
|
|
import (
|
2017-04-16 18:48:12 +00:00
|
|
|
"fmt"
|
2016-12-16 17:37:47 +00:00
|
|
|
"net"
|
2017-07-16 19:19:59 +00:00
|
|
|
"os"
|
2016-11-10 22:56:29 +00:00
|
|
|
"sort"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/golang/glog"
|
|
|
|
|
2017-07-18 23:16:22 +00:00
|
|
|
v1 "k8s.io/api/core/v1"
|
2017-07-16 19:19:59 +00:00
|
|
|
extensions "k8s.io/api/extensions/v1beta1"
|
2017-04-01 14:39:42 +00:00
|
|
|
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
|
|
clientset "k8s.io/client-go/kubernetes"
|
2017-07-16 19:19:59 +00:00
|
|
|
"k8s.io/client-go/kubernetes/scheme"
|
|
|
|
"k8s.io/client-go/tools/leaderelection"
|
|
|
|
"k8s.io/client-go/tools/leaderelection/resourcelock"
|
|
|
|
"k8s.io/client-go/tools/record"
|
2017-04-01 14:39:42 +00:00
|
|
|
|
2017-03-09 22:08:26 +00:00
|
|
|
"k8s.io/ingress/core/pkg/ingress/annotations/class"
|
2017-04-01 14:39:42 +00:00
|
|
|
"k8s.io/ingress/core/pkg/ingress/store"
|
2016-11-16 18:24:26 +00:00
|
|
|
"k8s.io/ingress/core/pkg/k8s"
|
2017-01-20 08:38:30 +00:00
|
|
|
"k8s.io/ingress/core/pkg/strings"
|
2016-11-16 18:24:26 +00:00
|
|
|
"k8s.io/ingress/core/pkg/task"
|
2016-11-10 22:56:29 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
updateInterval = 30 * time.Second
|
|
|
|
)
|
|
|
|
|
|
|
|
// Sync ...
|
|
|
|
type Sync interface {
|
|
|
|
Run(stopCh <-chan struct{})
|
|
|
|
Shutdown()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Config ...
|
|
|
|
type Config struct {
|
|
|
|
Client clientset.Interface
|
|
|
|
PublishService string
|
2017-04-01 14:39:42 +00:00
|
|
|
IngressLister store.IngressLister
|
2017-06-20 01:22:08 +00:00
|
|
|
|
|
|
|
ElectionID string
|
|
|
|
|
|
|
|
UpdateStatusOnShutdown bool
|
2017-03-09 22:08:26 +00:00
|
|
|
|
|
|
|
DefaultIngressClass string
|
|
|
|
IngressClass string
|
2016-11-10 22:56:29 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// statusSync keeps the status IP in each Ingress rule updated executing a periodic check
|
|
|
|
// in all the defined rules. To simplify the process leader election is used so the update
|
|
|
|
// is executed only in one node (Ingress controllers can be scaled to more than one)
|
|
|
|
// If the controller is running with the flag --publish-service (with a valid service)
|
|
|
|
// the IP address behind the service is used, if not the source is the IP/s of the node/s
|
|
|
|
type statusSync struct {
|
|
|
|
Config
|
|
|
|
// pod contains runtime information about this pod
|
|
|
|
pod *k8s.PodInfo
|
|
|
|
|
|
|
|
elector *leaderelection.LeaderElector
|
|
|
|
// workqueue used to keep in sync the status IP/s
|
|
|
|
// in the Ingress rules
|
|
|
|
syncQueue *task.Queue
|
|
|
|
|
|
|
|
runLock *sync.Mutex
|
|
|
|
}
|
|
|
|
|
|
|
|
// Run starts the loop to keep the status in sync
|
|
|
|
func (s statusSync) Run(stopCh <-chan struct{}) {
|
|
|
|
go wait.Forever(s.elector.Run, 0)
|
|
|
|
go s.run()
|
|
|
|
|
|
|
|
go s.syncQueue.Run(time.Second, stopCh)
|
|
|
|
|
|
|
|
<-stopCh
|
|
|
|
}
|
|
|
|
|
|
|
|
// Shutdown stop the sync. In case the instance is the leader it will remove the current IP
|
|
|
|
// if there is no other instances running.
|
|
|
|
func (s statusSync) Shutdown() {
|
|
|
|
go s.syncQueue.Shutdown()
|
|
|
|
// remove IP from Ingress
|
|
|
|
if !s.elector.IsLeader() {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2017-06-20 01:22:08 +00:00
|
|
|
if !s.UpdateStatusOnShutdown {
|
|
|
|
glog.Warningf("skipping update of status of Ingress rules")
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2016-11-10 22:56:29 +00:00
|
|
|
glog.Infof("updating status of Ingress rules (remove)")
|
|
|
|
|
2017-07-14 13:30:58 +00:00
|
|
|
addrs, err := s.runningAddresses()
|
2016-11-10 22:56:29 +00:00
|
|
|
if err != nil {
|
2016-12-16 17:37:47 +00:00
|
|
|
glog.Errorf("error obtaining running IPs: %v", addrs)
|
2016-11-10 22:56:29 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2016-12-16 17:37:47 +00:00
|
|
|
if len(addrs) > 1 {
|
2016-11-10 22:56:29 +00:00
|
|
|
// leave the job to the next leader
|
2016-12-16 17:37:47 +00:00
|
|
|
glog.Infof("leaving status update for next leader (%v)", len(addrs))
|
2016-11-10 22:56:29 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2017-04-09 19:14:20 +00:00
|
|
|
if s.isRunningMultiplePods() {
|
|
|
|
glog.V(2).Infof("skipping Ingress status update (multiple pods running - another one will be elected as master)")
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2016-12-16 17:37:47 +00:00
|
|
|
glog.Infof("removing address from ingress status (%v)", addrs)
|
2017-07-18 23:16:22 +00:00
|
|
|
s.updateStatus([]v1.LoadBalancerIngress{})
|
2016-11-10 22:56:29 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (s *statusSync) run() {
|
|
|
|
err := wait.PollInfinite(updateInterval, func() (bool, error) {
|
|
|
|
if s.syncQueue.IsShuttingDown() {
|
|
|
|
return true, nil
|
|
|
|
}
|
|
|
|
// send a dummy object to the queue to force a sync
|
|
|
|
s.syncQueue.Enqueue("dummy")
|
|
|
|
return false, nil
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
glog.Errorf("error waiting shutdown: %v", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *statusSync) sync(key interface{}) error {
|
|
|
|
s.runLock.Lock()
|
|
|
|
defer s.runLock.Unlock()
|
|
|
|
|
2017-03-31 12:46:51 +00:00
|
|
|
if s.syncQueue.IsShuttingDown() {
|
|
|
|
glog.V(2).Infof("skipping Ingress status update (shutting down in progress)")
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-11-10 22:56:29 +00:00
|
|
|
if !s.elector.IsLeader() {
|
|
|
|
glog.V(2).Infof("skipping Ingress status update (I am not the current leader)")
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-07-14 13:30:58 +00:00
|
|
|
addrs, err := s.runningAddresses()
|
2016-11-10 22:56:29 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2016-12-16 17:37:47 +00:00
|
|
|
s.updateStatus(sliceToStatus(addrs))
|
2016-11-10 22:56:29 +00:00
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// callback invoked function when a new leader is elected
|
|
|
|
func (s *statusSync) callback(leader string) {
|
|
|
|
if s.syncQueue.IsShuttingDown() {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
glog.V(2).Infof("new leader elected (%v)", leader)
|
|
|
|
if leader == s.pod.Name {
|
|
|
|
glog.V(2).Infof("I am the new status update leader")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s statusSync) keyfunc(input interface{}) (interface{}, error) {
|
|
|
|
return input, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewStatusSyncer returns a new Sync instance
|
|
|
|
func NewStatusSyncer(config Config) Sync {
|
|
|
|
pod, err := k8s.GetPodDetails(config.Client)
|
|
|
|
if err != nil {
|
|
|
|
glog.Fatalf("unexpected error obtaining pod information: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
st := statusSync{
|
|
|
|
pod: pod,
|
|
|
|
runLock: &sync.Mutex{},
|
|
|
|
Config: config,
|
|
|
|
}
|
|
|
|
st.syncQueue = task.NewCustomTaskQueue(st.sync, st.keyfunc)
|
|
|
|
|
2017-04-16 18:48:12 +00:00
|
|
|
// we need to use the defined ingress class to allow multiple leaders
|
|
|
|
// in order to update information about ingress status
|
2017-07-16 19:19:59 +00:00
|
|
|
electionID := fmt.Sprintf("%v-%v", config.ElectionID, config.DefaultIngressClass)
|
2017-04-16 18:48:12 +00:00
|
|
|
if config.IngressClass != "" {
|
2017-07-16 19:19:59 +00:00
|
|
|
electionID = fmt.Sprintf("%v-%v", config.ElectionID, config.IngressClass)
|
2017-04-16 18:48:12 +00:00
|
|
|
}
|
|
|
|
|
2017-07-16 19:19:59 +00:00
|
|
|
callbacks := leaderelection.LeaderCallbacks{
|
|
|
|
OnStartedLeading: func(stop <-chan struct{}) {
|
|
|
|
st.callback(pod.Name)
|
|
|
|
},
|
|
|
|
OnStoppedLeading: func() {
|
|
|
|
st.callback("")
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
broadcaster := record.NewBroadcaster()
|
|
|
|
hostname, _ := os.Hostname()
|
|
|
|
|
2017-07-18 23:16:22 +00:00
|
|
|
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{
|
2017-07-16 19:19:59 +00:00
|
|
|
Component: "ingress-leader-elector",
|
|
|
|
Host: hostname,
|
|
|
|
})
|
|
|
|
|
|
|
|
lock := resourcelock.ConfigMapLock{
|
|
|
|
ConfigMapMeta: meta_v1.ObjectMeta{Namespace: pod.Namespace, Name: electionID},
|
|
|
|
Client: config.Client.Core(),
|
|
|
|
LockConfig: resourcelock.ResourceLockConfig{
|
|
|
|
Identity: electionID,
|
|
|
|
EventRecorder: recorder,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
|
|
|
|
Lock: &lock,
|
|
|
|
LeaseDuration: 30 * time.Second,
|
|
|
|
RenewDeadline: 15 * time.Second,
|
|
|
|
RetryPeriod: 5 * time.Second,
|
|
|
|
Callbacks: callbacks,
|
|
|
|
})
|
|
|
|
|
2016-11-10 22:56:29 +00:00
|
|
|
if err != nil {
|
|
|
|
glog.Fatalf("unexpected error starting leader election: %v", err)
|
|
|
|
}
|
2017-07-16 19:19:59 +00:00
|
|
|
|
2016-11-10 22:56:29 +00:00
|
|
|
st.elector = le
|
|
|
|
return st
|
|
|
|
}
|
|
|
|
|
2017-07-14 13:30:58 +00:00
|
|
|
// runningAddresses returns a list of IP addresses and/or FQDN where the
|
2016-12-16 17:37:47 +00:00
|
|
|
// ingress controller is currently running
|
2017-07-14 13:30:58 +00:00
|
|
|
func (s *statusSync) runningAddresses() ([]string, error) {
|
2016-11-10 22:56:29 +00:00
|
|
|
if s.PublishService != "" {
|
|
|
|
ns, name, _ := k8s.ParseNameNS(s.PublishService)
|
2017-04-01 14:39:42 +00:00
|
|
|
svc, err := s.Client.Core().Services(ns).Get(name, meta_v1.GetOptions{})
|
2016-11-10 22:56:29 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2016-12-16 17:37:47 +00:00
|
|
|
addrs := []string{}
|
2016-11-10 22:56:29 +00:00
|
|
|
for _, ip := range svc.Status.LoadBalancer.Ingress {
|
2016-12-16 17:37:47 +00:00
|
|
|
if ip.IP == "" {
|
|
|
|
addrs = append(addrs, ip.Hostname)
|
|
|
|
} else {
|
|
|
|
addrs = append(addrs, ip.IP)
|
|
|
|
}
|
2016-11-10 22:56:29 +00:00
|
|
|
}
|
|
|
|
|
2016-12-16 17:37:47 +00:00
|
|
|
return addrs, nil
|
2016-11-10 22:56:29 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// get information about all the pods running the ingress controller
|
2017-04-01 14:39:42 +00:00
|
|
|
pods, err := s.Client.Core().Pods(s.pod.Namespace).List(meta_v1.ListOptions{
|
|
|
|
LabelSelector: labels.SelectorFromSet(s.pod.Labels).String(),
|
2016-11-10 22:56:29 +00:00
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2016-12-16 17:56:27 +00:00
|
|
|
addrs := []string{}
|
2016-11-10 22:56:29 +00:00
|
|
|
for _, pod := range pods.Items {
|
2016-12-16 17:56:27 +00:00
|
|
|
name := k8s.GetNodeIP(s.Client, pod.Spec.NodeName)
|
|
|
|
if !strings.StringInSlice(name, addrs) {
|
|
|
|
addrs = append(addrs, name)
|
2016-11-10 22:56:29 +00:00
|
|
|
}
|
|
|
|
}
|
2016-12-16 17:56:27 +00:00
|
|
|
return addrs, nil
|
2016-11-10 22:56:29 +00:00
|
|
|
}
|
|
|
|
|
2017-04-09 19:14:20 +00:00
|
|
|
func (s *statusSync) isRunningMultiplePods() bool {
|
|
|
|
pods, err := s.Client.Core().Pods(s.pod.Namespace).List(meta_v1.ListOptions{
|
|
|
|
LabelSelector: labels.SelectorFromSet(s.pod.Labels).String(),
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
return len(pods.Items) > 1
|
|
|
|
}
|
|
|
|
|
2016-12-16 17:37:47 +00:00
|
|
|
// sliceToStatus converts a slice of IP and/or hostnames to LoadBalancerIngress
|
2017-07-18 23:16:22 +00:00
|
|
|
func sliceToStatus(endpoints []string) []v1.LoadBalancerIngress {
|
|
|
|
lbi := []v1.LoadBalancerIngress{}
|
2016-12-16 17:37:47 +00:00
|
|
|
for _, ep := range endpoints {
|
|
|
|
if net.ParseIP(ep) == nil {
|
2017-07-18 23:16:22 +00:00
|
|
|
lbi = append(lbi, v1.LoadBalancerIngress{Hostname: ep})
|
2016-12-16 17:37:47 +00:00
|
|
|
} else {
|
2017-07-18 23:16:22 +00:00
|
|
|
lbi = append(lbi, v1.LoadBalancerIngress{IP: ep})
|
2016-12-16 17:37:47 +00:00
|
|
|
}
|
2016-11-10 22:56:29 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
sort.Sort(loadBalancerIngressByIP(lbi))
|
|
|
|
return lbi
|
|
|
|
}
|
|
|
|
|
2017-07-18 23:16:22 +00:00
|
|
|
func (s *statusSync) updateStatus(newIPs []v1.LoadBalancerIngress) {
|
2016-11-10 22:56:29 +00:00
|
|
|
ings := s.IngressLister.List()
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
wg.Add(len(ings))
|
|
|
|
for _, cur := range ings {
|
|
|
|
ing := cur.(*extensions.Ingress)
|
2017-03-09 22:08:26 +00:00
|
|
|
|
|
|
|
if !class.IsValid(ing, s.Config.IngressClass, s.Config.DefaultIngressClass) {
|
2017-03-31 12:46:51 +00:00
|
|
|
wg.Done()
|
2017-03-09 22:08:26 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2017-03-13 22:39:35 +00:00
|
|
|
go func(wg *sync.WaitGroup, ing *extensions.Ingress) {
|
2016-11-10 22:56:29 +00:00
|
|
|
defer wg.Done()
|
|
|
|
ingClient := s.Client.Extensions().Ingresses(ing.Namespace)
|
2017-04-01 14:39:42 +00:00
|
|
|
currIng, err := ingClient.Get(ing.Name, meta_v1.GetOptions{})
|
2016-11-10 22:56:29 +00:00
|
|
|
if err != nil {
|
|
|
|
glog.Errorf("unexpected error searching Ingress %v/%v: %v", ing.Namespace, ing.Name, err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2017-02-10 02:51:11 +00:00
|
|
|
curIPs := currIng.Status.LoadBalancer.Ingress
|
2016-11-10 22:56:29 +00:00
|
|
|
sort.Sort(loadBalancerIngressByIP(curIPs))
|
|
|
|
if ingressSliceEqual(newIPs, curIPs) {
|
2017-04-09 23:51:38 +00:00
|
|
|
glog.V(3).Infof("skipping update of Ingress %v/%v (no change)", currIng.Namespace, currIng.Name)
|
2016-11-10 22:56:29 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
glog.Infof("updating Ingress %v/%v status to %v", currIng.Namespace, currIng.Name, newIPs)
|
|
|
|
currIng.Status.LoadBalancer.Ingress = newIPs
|
|
|
|
_, err = ingClient.UpdateStatus(currIng)
|
|
|
|
if err != nil {
|
|
|
|
glog.Warningf("error updating ingress rule: %v", err)
|
|
|
|
}
|
2017-03-13 22:39:35 +00:00
|
|
|
}(&wg, ing)
|
2016-11-10 22:56:29 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
wg.Wait()
|
|
|
|
}
|
|
|
|
|
2017-07-18 23:16:22 +00:00
|
|
|
func ingressSliceEqual(lhs, rhs []v1.LoadBalancerIngress) bool {
|
2016-11-10 22:56:29 +00:00
|
|
|
if len(lhs) != len(rhs) {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
for i := range lhs {
|
|
|
|
if lhs[i].IP != rhs[i].IP {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
if lhs[i].Hostname != rhs[i].Hostname {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
// loadBalancerIngressByIP sorts LoadBalancerIngress using the field IP
|
2017-07-18 23:16:22 +00:00
|
|
|
type loadBalancerIngressByIP []v1.LoadBalancerIngress
|
2016-11-10 22:56:29 +00:00
|
|
|
|
|
|
|
func (c loadBalancerIngressByIP) Len() int { return len(c) }
|
|
|
|
func (c loadBalancerIngressByIP) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
|
|
|
|
func (c loadBalancerIngressByIP) Less(i, j int) bool {
|
|
|
|
return c[i].IP < c[j].IP
|
|
|
|
}
|