This commit is contained in:
Tim Hockin 2017-02-28 22:54:42 +00:00 committed by GitHub
commit 50196e6f49
3 changed files with 76 additions and 48 deletions

View file

@ -320,8 +320,13 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
// successful GC we know that there are no dangling cloud resources that // successful GC we know that there are no dangling cloud resources that
// don't have an associated Kubernetes Ingress/Service/Endpoint. // don't have an associated Kubernetes Ingress/Service/Endpoint.
allNodePorts := []int64{}
for _, lb := range lbs {
allNodePorts = append(allNodePorts, lb.NodePorts...)
}
lbNames := lbc.ingLister.Store.ListKeys()
defer func() { defer func() {
if deferErr := lbc.CloudClusterManager.GC(lbNames, nodePorts); deferErr != nil { if deferErr := lbc.CloudClusterManager.GC(lbNames, allNodePorts); deferErr != nil {
err = fmt.Errorf("Error during sync %v, error during GC %v", err, deferErr) err = fmt.Errorf("Error during sync %v, error during GC %v", err, deferErr)
} }
glog.V(3).Infof("Finished syncing %v", key) glog.V(3).Infof("Finished syncing %v", key)
@ -330,7 +335,7 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
// Record any errors during sync and throw a single error at the end. This // Record any errors during sync and throw a single error at the end. This
// allows us to free up associated cloud resources ASAP. // allows us to free up associated cloud resources ASAP.
var syncError error var syncError error
if err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, nodePorts); err != nil { if err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, allNodePorts); err != nil {
// TODO: Implement proper backoff for the queue. // TODO: Implement proper backoff for the queue.
eventMsg := "GCE" eventMsg := "GCE"
if utils.IsHTTPErrorCode(err, http.StatusForbidden) { if utils.IsHTTPErrorCode(err, http.StatusForbidden) {
@ -427,10 +432,16 @@ func (lbc *LoadBalancerController) ListRuntimeInfo() (lbs []*loadbalancers.L7Run
if err != nil { if err != nil {
glog.Warningf("Cannot get certs for Ingress %v/%v: %v", ing.Namespace, ing.Name, err) glog.Warningf("Cannot get certs for Ingress %v/%v: %v", ing.Namespace, ing.Name, err)
} }
nodePorts, missingPorts := lbc.getNodePorts(&ing)
if len(missingPorts) > 0 {
glog.Warningf("Ingress %v/%v is missing some NodePorts", ing.Namespace, ing.Name)
}
annotations := ingAnnotations(ing.ObjectMeta.Annotations) annotations := ingAnnotations(ing.ObjectMeta.Annotations)
lbs = append(lbs, &loadbalancers.L7RuntimeInfo{ lbs = append(lbs, &loadbalancers.L7RuntimeInfo{
Name: k, Name: k,
TLS: tls, TLS: tls,
NodePorts: nodePorts,
AllowHTTP: annotations.allowHTTP(), AllowHTTP: annotations.allowHTTP(),
StaticIPName: annotations.staticIPName(), StaticIPName: annotations.staticIPName(),
}) })
@ -438,6 +449,39 @@ func (lbc *LoadBalancerController) ListRuntimeInfo() (lbs []*loadbalancers.L7Run
return lbs, nil return lbs, nil
} }
func (lbc *LoadBalancerController) getNodePorts(ing *extensions.Ingress) ([]int64, []int64) {
nodePorts := []int64{}
missing := []int64{}
defaultBackend := ing.Spec.Backend
if defaultBackend != nil {
port, err := lbc.tr.getServiceNodePort(*defaultBackend, ing.Namespace)
if err != nil {
lbc.recorder.Eventf(ing, api.EventTypeWarning, "Service", err.Error())
missing = append(missing, int64(port))
} else {
nodePorts = append(nodePorts, int64(port))
}
}
for _, rule := range ing.Spec.Rules {
if rule.HTTP == nil {
glog.Errorf("Ignoring non http Ingress rule.")
continue
}
for _, path := range rule.HTTP.Paths {
port, err := lbc.tr.getServiceNodePort(path.Backend, ing.Namespace)
if err != nil {
lbc.recorder.Eventf(ing, api.EventTypeWarning, "Service", err.Error())
missing = append(missing, int64(port))
continue
}
nodePorts = append(nodePorts, int64(port))
}
}
return nodePorts, missing
}
// syncNodes manages the syncing of kubernetes nodes to gce instance groups. // syncNodes manages the syncing of kubernetes nodes to gce instance groups.
// The instancegroups are referenced by loadbalancer backends. // The instancegroups are referenced by loadbalancer backends.
func (lbc *LoadBalancerController) syncNodes(key string) error { func (lbc *LoadBalancerController) syncNodes(key string) error {

View file

@ -342,37 +342,6 @@ func (t *GCETranslator) getServiceNodePort(be extensions.IngressBackend, namespa
"Could not find matching nodeport from service.")} "Could not find matching nodeport from service.")}
} }
// toNodePorts converts a pathlist to a flat list of nodeports.
func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []int64 {
knownPorts := []int64{}
for _, ing := range ings.Items {
defaultBackend := ing.Spec.Backend
if defaultBackend != nil {
port, err := t.getServiceNodePort(*defaultBackend, ing.Namespace)
if err != nil {
glog.Infof("%v", err)
} else {
knownPorts = append(knownPorts, int64(port))
}
}
for _, rule := range ing.Spec.Rules {
if rule.HTTP == nil {
glog.Errorf("Ignoring non http Ingress rule.")
continue
}
for _, path := range rule.HTTP.Paths {
port, err := t.getServiceNodePort(path.Backend, ing.Namespace)
if err != nil {
glog.Infof("%v", err)
continue
}
knownPorts = append(knownPorts, int64(port))
}
}
}
return knownPorts
}
func getZone(n *api.Node) string { func getZone(n *api.Node) string {
zone, ok := n.Labels[zoneKey] zone, ok := n.Labels[zoneKey]
if !ok { if !ok {

View file

@ -162,23 +162,36 @@ func (l *L7s) Delete(name string) error {
// Sync loadbalancers with the given runtime info from the controller. // Sync loadbalancers with the given runtime info from the controller.
func (l *L7s) Sync(lbs []*L7RuntimeInfo) error { func (l *L7s) Sync(lbs []*L7RuntimeInfo) error {
glog.V(3).Infof("Creating loadbalancers %+v", lbs) glog.V(4).Infof("Syncing %d loadbalancers", len(lbs))
if len(lbs) != 0 { // Make sure there is at least one active port in the set of LBs, before we
// Lazily create a default backend so we don't tax users who don't care // do any work.
// about Ingress by consuming 1 of their 3 GCE BackendServices. This validLBs := []*L7RuntimeInfo{}
// BackendService is GC'd when there are no more Ingresses.
if err := l.defaultBackendPool.Add(l.defaultBackendNodePort); err != nil {
return err
}
defaultBackend, err := l.defaultBackendPool.Get(l.defaultBackendNodePort)
if err != nil {
return err
}
l.glbcDefaultBackend = defaultBackend
}
// create new loadbalancers, validate existing
for _, ri := range lbs { for _, ri := range lbs {
if len(ri.NodePorts) > 0 {
validLBs = append(validLBs, ri)
}
}
glog.V(4).Infof("%d of %d loadbalancers are valid", len(validLBs), len(lbs))
if len(validLBs) == 0 {
return nil
}
// Lazily create a default backend so we don't tax users who don't care
// about Ingress by consuming 1 of their 3 GCE BackendServices. This
// BackendService is GC'd when there are no more Ingresses.
if err := l.defaultBackendPool.Add(l.defaultBackendNodePort); err != nil {
return err
}
defaultBackend, err := l.defaultBackendPool.Get(l.defaultBackendNodePort)
if err != nil {
return err
}
l.glbcDefaultBackend = defaultBackend
// create new loadbalancers, validate existing
for _, ri := range validLBs {
if err := l.Add(ri); err != nil { if err := l.Add(ri); err != nil {
return err return err
} }
@ -244,6 +257,8 @@ type L7RuntimeInfo struct {
Name string Name string
// IP is the desired ip of the loadbalancer, eg from a staticIP. // IP is the desired ip of the loadbalancer, eg from a staticIP.
IP string IP string
// NodePorts is the set of ports being used by this L7.
NodePorts []int64
// TLS are the tls certs to use in termination. // TLS are the tls certs to use in termination.
TLS *TLSCerts TLS *TLSCerts
// AllowHTTP will not setup :80, if TLS is nil and AllowHTTP is set, // AllowHTTP will not setup :80, if TLS is nil and AllowHTTP is set,