未验证 提交 a84cb658 编写于 作者: J Jimmi Dyson

Upgrade to Kubernetes 1.3.3

上级 e88306d1
此差异已折叠。
......@@ -387,23 +387,16 @@ func toContainerStats2(s *cgroups.Stats, ret *info.ContainerStats) {
ret.Memory.ContainerData.Pgmajfault = v
ret.Memory.HierarchicalData.Pgmajfault = v
}
if v, ok := s.MemoryStats.Stats["total_inactive_anon"]; ok {
workingSet := ret.Memory.Usage
workingSet := ret.Memory.Usage
if v, ok := s.MemoryStats.Stats["total_inactive_file"]; ok {
if workingSet < v {
workingSet = 0
} else {
workingSet -= v
}
if v, ok := s.MemoryStats.Stats["total_inactive_file"]; ok {
if workingSet < v {
workingSet = 0
} else {
workingSet -= v
}
}
ret.Memory.WorkingSet = workingSet
}
ret.Memory.WorkingSet = workingSet
}
func toContainerStats3(libcontainerStats *libcontainer.Stats, ret *info.ContainerStats) {
......
......@@ -17,11 +17,13 @@ limitations under the License.
package app
import (
"errors"
"io/ioutil"
"strconv"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/sysctl"
)
......@@ -32,11 +34,25 @@ type Conntracker interface {
type realConntracker struct{}
var readOnlySysFSError = errors.New("ReadOnlySysFS")
func (realConntracker) SetMax(max int) error {
glog.Infof("Setting nf_conntrack_max to %d", max)
if err := sysctl.SetSysctl("net/netfilter/nf_conntrack_max", max); err != nil {
return err
}
// sysfs is expected to be mounted as 'rw'. However, it may be unexpectedly mounted as
// 'ro' by docker because of a known docker issue (https://github.com/docker/docker/issues/24000).
// Setting conntrack will fail when sysfs is readonly. When that happens, we don't set conntrack
// hashsize and return a special error readOnlySysFSError here. The caller should deal with
// readOnlySysFSError differently.
writable, err := isSysFSWritable()
if err != nil {
return err
}
if !writable {
return readOnlySysFSError
}
// TODO: generify this and sysctl to a new sysfs.WriteInt()
glog.Infof("Setting conntrack hashsize to %d", max/4)
return ioutil.WriteFile("/sys/module/nf_conntrack/parameters/hashsize", []byte(strconv.Itoa(max/4)), 0640)
......@@ -46,3 +62,27 @@ func (realConntracker) SetTCPEstablishedTimeout(seconds int) error {
glog.Infof("Setting nf_conntrack_tcp_timeout_established to %d", seconds)
return sysctl.SetSysctl("net/netfilter/nf_conntrack_tcp_timeout_established", seconds)
}
// isSysFSWritable checks /proc/mounts to see whether sysfs is 'rw' or not.
func isSysFSWritable() (bool, error) {
const permWritable = "rw"
const sysfsDevice = "sysfs"
m := mount.New()
mountPoints, err := m.List()
if err != nil {
glog.Errorf("failed to list mount points: %v", err)
return false, err
}
for _, mountPoint := range mountPoints {
if mountPoint.Device != sysfsDevice {
continue
}
// Check whether sysfs is 'rw'
if len(mountPoint.Opts) > 0 && mountPoint.Opts[0] == permWritable {
return true, nil
}
glog.Errorf("sysfs is not writable: %+v", mountPoint)
break
}
return false, nil
}
......@@ -83,6 +83,9 @@ func (s *ProxyServerConfig) AddFlags(fs *pflag.FlagSet) {
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver")
fs.Int32Var(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver")
fs.DurationVar(&s.UDPIdleTimeout.Duration, "udp-timeout", s.UDPIdleTimeout.Duration, "How long an idle UDP connection will be kept open (e.g. '250ms', '2s'). Must be greater than 0. Only applicable for proxy-mode=userspace")
fs.Int32Var(&s.ConntrackMax, "conntrack-max", s.ConntrackMax, "Maximum number of NAT connections to track (0 to leave as-is)")
fs.Int32Var(&s.ConntrackMax, "conntrack-max", s.ConntrackMax,
"Maximum number of NAT connections to track (0 to leave as-is).")
fs.Int32Var(&s.ConntrackMaxPerCore, "conntrack-max-per-core", s.ConntrackMaxPerCore,
"Maximum number of NAT connections to track per CPU core (0 to leave as-is). This is only considered if conntrack-max is 0.")
fs.DurationVar(&s.ConntrackTCPEstablishedTimeout.Duration, "conntrack-tcp-timeout-established", s.ConntrackTCPEstablishedTimeout.Duration, "Idle timeout for established TCP connections (0 to leave as-is)")
}
......@@ -24,6 +24,7 @@ import (
"net"
"net/http"
_ "net/http/pprof"
"runtime"
"strconv"
"time"
......@@ -298,9 +299,24 @@ func (s *ProxyServer) Run() error {
// Tune conntrack, if requested
if s.Conntracker != nil {
if s.Config.ConntrackMax > 0 {
if err := s.Conntracker.SetMax(int(s.Config.ConntrackMax)); err != nil {
return err
max, err := getConntrackMax(s.Config)
if err != nil {
return err
}
if max > 0 {
err := s.Conntracker.SetMax(max)
if err != nil {
if err != readOnlySysFSError {
return err
}
// readOnlySysFSError is caused by a known docker issue (https://github.com/docker/docker/issues/24000),
// the only remediation we know is to restart the docker daemon.
// Here we'll send an node event with specific reason and message, the
// administrator should decide whether and how to handle this issue,
// whether to drain the node and restart docker.
// TODO(random-liu): Remove this when the docker bug is fixed.
const message = "DOCKER RESTART NEEDED (docker issue #24000): /sys is read-only: can't raise conntrack limits, problems may arise later."
s.Recorder.Eventf(s.Config.NodeRef, api.EventTypeWarning, err.Error(), message)
}
}
if s.Config.ConntrackTCPEstablishedTimeout.Duration > 0 {
......@@ -318,6 +334,18 @@ func (s *ProxyServer) Run() error {
return nil
}
func getConntrackMax(config *options.ProxyServerConfig) (int, error) {
if config.ConntrackMax > 0 && config.ConntrackMaxPerCore > 0 {
return -1, fmt.Errorf("invalid config: ConntrackMax and ConntrackMaxPerCore are mutually exclusive")
}
if config.ConntrackMax > 0 {
return int(config.ConntrackMax), nil
} else if config.ConntrackMaxPerCore > 0 {
return (int(config.ConntrackMaxPerCore) * runtime.NumCPU()), nil
}
return 0, nil
}
type nodeGetter interface {
Get(hostname string) (*api.Node, error)
}
......
......@@ -1180,11 +1180,11 @@ func validateContainerResourceDivisor(rName string, divisor resource.Quantity, f
switch rName {
case "limits.cpu", "requests.cpu":
if !validContainerResourceDivisorForCPU.Has(divisor.String()) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("divisor"), rName, fmt.Sprintf("only divisor's values 1m and 1 are supported with the cpu resource")))
allErrs = append(allErrs, field.Invalid(fldPath.Child("divisor"), rName, "only divisor's values 1m and 1 are supported with the cpu resource"))
}
case "limits.memory", "requests.memory":
if !validContainerResourceDivisorForMemory.Has(divisor.String()) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("divisor"), rName, fmt.Sprintf("only divisor's values 1, 1k, 1M, 1G, 1T, 1P, 1E, 1Ki, 1Mi, 1Gi, 1Ti, 1Pi, 1Ei are supported with the memory resource")))
allErrs = append(allErrs, field.Invalid(fldPath.Child("divisor"), rName, "only divisor's values 1, 1k, 1M, 1G, 1T, 1P, 1E, 1Ki, 1Mi, 1Gi, 1Ti, 1Pi, 1Ei are supported with the memory resource"))
}
}
return allErrs
......@@ -2768,11 +2768,11 @@ func ValidateResourceRequirements(requirements *api.ResourceRequirements, fldPat
// Check that request <= limit.
requestQuantity, exists := requirements.Requests[resourceName]
if exists {
// For GPUs, require that no request be set.
if resourceName == api.ResourceNvidiaGPU {
allErrs = append(allErrs, field.Invalid(reqPath, requestQuantity.String(), "cannot be set"))
// For GPUs, not only requests can't exceed limits, they also can't be lower, i.e. must be equal.
if resourceName == api.ResourceNvidiaGPU && quantity.Cmp(requestQuantity) != 0 {
allErrs = append(allErrs, field.Invalid(reqPath, requestQuantity.String(), fmt.Sprintf("must be equal to %s limit", api.ResourceNvidiaGPU)))
} else if quantity.Cmp(requestQuantity) < 0 {
allErrs = append(allErrs, field.Invalid(fldPath, quantity.String(), "must be greater than or equal to request"))
allErrs = append(allErrs, field.Invalid(limPath, quantity.String(), fmt.Sprintf("must be greater than or equal to %s request", resourceName)))
}
}
}
......@@ -2936,7 +2936,7 @@ func validateFinalizerName(stringValue string, fldPath *field.Path) field.ErrorL
if len(strings.Split(stringValue, "/")) == 1 {
if !api.IsStandardFinalizerName(stringValue) {
return append(allErrs, field.Invalid(fldPath, stringValue, fmt.Sprintf("name is neither a standard finalizer name nor is it fully qualified")))
return append(allErrs, field.Invalid(fldPath, stringValue, "name is neither a standard finalizer name nor is it fully qualified"))
}
}
......
......@@ -174,6 +174,7 @@ func DeepCopy_componentconfig_KubeProxyConfiguration(in KubeProxyConfiguration,
return err
}
out.ConntrackMax = in.ConntrackMax
out.ConntrackMaxPerCore = in.ConntrackMaxPerCore
if err := unversioned.DeepCopy_unversioned_Duration(in.ConntrackTCPEstablishedTimeout, &out.ConntrackTCPEstablishedTimeout, c); err != nil {
return err
}
......
......@@ -81,16 +81,16 @@ func (x *KubeProxyConfiguration) CodecEncodeSelf(e *codec1978.Encoder) {
} else {
yysep2 := !z.EncBinary()
yy2arr2 := z.EncBasicHandle().StructToArray
var yyq2 [19]bool
var yyq2 [20]bool
_, _, _ = yysep2, yyq2, yy2arr2
const yyr2 bool = false
yyq2[17] = x.Kind != ""
yyq2[18] = x.APIVersion != ""
yyq2[18] = x.Kind != ""
yyq2[19] = x.APIVersion != ""
var yynn2 int
if yyr2 || yy2arr2 {
r.EncodeArrayStart(19)
r.EncodeArrayStart(20)
} else {
yynn2 = 17
yynn2 = 18
for _, b := range yyq2 {
if b {
yynn2++
......@@ -431,36 +431,55 @@ func (x *KubeProxyConfiguration) CodecEncodeSelf(e *codec1978.Encoder) {
}
if yyr2 || yy2arr2 {
z.EncSendContainerState(codecSelfer_containerArrayElem1234)
yy60 := &x.ConntrackTCPEstablishedTimeout
yym60 := z.EncBinary()
_ = yym60
if false {
} else {
r.EncodeInt(int64(x.ConntrackMaxPerCore))
}
} else {
z.EncSendContainerState(codecSelfer_containerMapKey1234)
r.EncodeString(codecSelferC_UTF81234, string("conntrackMaxPerCore"))
z.EncSendContainerState(codecSelfer_containerMapValue1234)
yym61 := z.EncBinary()
_ = yym61
if false {
} else if z.HasExtensions() && z.EncExt(yy60) {
} else if !yym61 && z.IsJSONHandle() {
z.EncJSONMarshal(yy60)
} else {
z.EncFallback(yy60)
r.EncodeInt(int64(x.ConntrackMaxPerCore))
}
}
if yyr2 || yy2arr2 {
z.EncSendContainerState(codecSelfer_containerArrayElem1234)
yy63 := &x.ConntrackTCPEstablishedTimeout
yym64 := z.EncBinary()
_ = yym64
if false {
} else if z.HasExtensions() && z.EncExt(yy63) {
} else if !yym64 && z.IsJSONHandle() {
z.EncJSONMarshal(yy63)
} else {
z.EncFallback(yy63)
}
} else {
z.EncSendContainerState(codecSelfer_containerMapKey1234)
r.EncodeString(codecSelferC_UTF81234, string("conntrackTCPEstablishedTimeout"))
z.EncSendContainerState(codecSelfer_containerMapValue1234)
yy62 := &x.ConntrackTCPEstablishedTimeout
yym63 := z.EncBinary()
_ = yym63
yy65 := &x.ConntrackTCPEstablishedTimeout
yym66 := z.EncBinary()
_ = yym66
if false {
} else if z.HasExtensions() && z.EncExt(yy62) {
} else if !yym63 && z.IsJSONHandle() {
z.EncJSONMarshal(yy62)
} else if z.HasExtensions() && z.EncExt(yy65) {
} else if !yym66 && z.IsJSONHandle() {
z.EncJSONMarshal(yy65)
} else {
z.EncFallback(yy62)
z.EncFallback(yy65)
}
}
if yyr2 || yy2arr2 {
z.EncSendContainerState(codecSelfer_containerArrayElem1234)
if yyq2[17] {
yym65 := z.EncBinary()
_ = yym65
if yyq2[18] {
yym68 := z.EncBinary()
_ = yym68
if false {
} else {
r.EncodeString(codecSelferC_UTF81234, string(x.Kind))
......@@ -469,12 +488,12 @@ func (x *KubeProxyConfiguration) CodecEncodeSelf(e *codec1978.Encoder) {
r.EncodeString(codecSelferC_UTF81234, "")
}
} else {
if yyq2[17] {
if yyq2[18] {
z.EncSendContainerState(codecSelfer_containerMapKey1234)
r.EncodeString(codecSelferC_UTF81234, string("kind"))
z.EncSendContainerState(codecSelfer_containerMapValue1234)
yym66 := z.EncBinary()
_ = yym66
yym69 := z.EncBinary()
_ = yym69
if false {
} else {
r.EncodeString(codecSelferC_UTF81234, string(x.Kind))
......@@ -483,9 +502,9 @@ func (x *KubeProxyConfiguration) CodecEncodeSelf(e *codec1978.Encoder) {
}
if yyr2 || yy2arr2 {
z.EncSendContainerState(codecSelfer_containerArrayElem1234)
if yyq2[18] {
yym68 := z.EncBinary()
_ = yym68
if yyq2[19] {
yym71 := z.EncBinary()
_ = yym71
if false {
} else {
r.EncodeString(codecSelferC_UTF81234, string(x.APIVersion))
......@@ -494,12 +513,12 @@ func (x *KubeProxyConfiguration) CodecEncodeSelf(e *codec1978.Encoder) {
r.EncodeString(codecSelferC_UTF81234, "")
}
} else {
if yyq2[18] {
if yyq2[19] {
z.EncSendContainerState(codecSelfer_containerMapKey1234)
r.EncodeString(codecSelferC_UTF81234, string("apiVersion"))
z.EncSendContainerState(codecSelfer_containerMapValue1234)
yym69 := z.EncBinary()
_ = yym69
yym72 := z.EncBinary()
_ = yym72
if false {
} else {
r.EncodeString(codecSelferC_UTF81234, string(x.APIVersion))
......@@ -701,19 +720,25 @@ func (x *KubeProxyConfiguration) codecDecodeSelfFromMap(l int, d *codec1978.Deco
} else {
x.ConntrackMax = int32(r.DecodeInt(32))
}
case "conntrackMaxPerCore":
if r.TryDecodeAsNil() {
x.ConntrackMaxPerCore = 0
} else {
x.ConntrackMaxPerCore = int32(r.DecodeInt(32))
}
case "conntrackTCPEstablishedTimeout":
if r.TryDecodeAsNil() {
x.ConntrackTCPEstablishedTimeout = pkg1_unversioned.Duration{}
} else {
yyv24 := &x.ConntrackTCPEstablishedTimeout
yym25 := z.DecBinary()
_ = yym25
yyv25 := &x.ConntrackTCPEstablishedTimeout
yym26 := z.DecBinary()
_ = yym26
if false {
} else if z.HasExtensions() && z.DecExt(yyv24) {
} else if !yym25 && z.IsJSONHandle() {
z.DecJSONUnmarshal(yyv24)
} else if z.HasExtensions() && z.DecExt(yyv25) {
} else if !yym26 && z.IsJSONHandle() {
z.DecJSONUnmarshal(yyv25)
} else {
z.DecFallback(yyv24, false)
z.DecFallback(yyv25, false)
}
}
case "kind":
......@@ -739,16 +764,16 @@ func (x *KubeProxyConfiguration) codecDecodeSelfFromArray(l int, d *codec1978.De
var h codecSelfer1234
z, r := codec1978.GenHelperDecoder(d)
_, _, _ = h, z, r
var yyj28 int
var yyb28 bool
var yyhl28 bool = l >= 0
yyj28++
if yyhl28 {
yyb28 = yyj28 > l
var yyj29 int
var yyb29 bool
var yyhl29 bool = l >= 0
yyj29++
if yyhl29 {
yyb29 = yyj29 > l
} else {
yyb28 = r.CheckBreak()
yyb29 = r.CheckBreak()
}
if yyb28 {
if yyb29 {
z.DecSendContainerState(codecSelfer_containerArrayEnd1234)
return
}
......@@ -758,13 +783,13 @@ func (x *KubeProxyConfiguration) codecDecodeSelfFromArray(l int, d *codec1978.De
} else {
x.BindAddress = string(r.DecodeString())
}
yyj28++
if yyhl28 {
yyb28 = yyj28 > l
yyj29++
if yyhl29 {
yyb29 = yyj29 > l
} else {
yyb28 = r.CheckBreak()
yyb29 = r.CheckBreak()
}
if yyb28 {
if yyb29 {
z.DecSendContainerState(codecSelfer_containerArrayEnd1234)
return
}
......@@ -774,13 +799,13 @@ func (x *KubeProxyConfiguration) codecDecodeSelfFromArray(l int, d *codec1978.De
} else {
x.ClusterCIDR = string(r.DecodeString())
}
yyj28++
if yyhl28 {
yyb28 = yyj28 > l
yyj29++
if yyhl29 {
yyb29 = yyj29 > l
} else {
yyb28 = r.CheckBreak()
yyb29 = r.CheckBreak()
}
if yyb28 {
if yyb29 {
z.DecSendContainerState(codecSelfer_containerArrayEnd1234)
return
}
......@@ -790,13 +815,13 @@ func (x *KubeProxyConfiguration) codecDecodeSelfFromArray(l int, d *codec1978.De
} else {
x.HealthzBindAddress = string(r.DecodeString())
}
yyj28++
if yyhl28 {
yyb28 = yyj28 > l
yyj29++
if yyhl29 {
yyb29 = yyj29 > l
} else {
yyb28 = r.CheckBreak()
yyb29 = r.CheckBreak()
}
if yyb28 {
if yyb29 {
z.DecSendContainerState(codecSelfer_containerArrayEnd1234)
return
}
......@@ -806,13 +831,13 @@ func (x *KubeProxyConfiguration) codecDecodeSelfFromArray(l int, d *codec1978.De
} else {
x.HealthzPort = int32(r.DecodeInt(32))
}
yyj28++
if yyhl28 {
yyb28 = yyj28 > l
yyj29++
if yyhl29 {
yyb29 = yyj29 > l
} else {
yyb28 = r.CheckBreak()
yyb29 = r.CheckBreak()
}
if yyb28 {
if yyb29 {
z.DecSendContainerState(codecSelfer_containerArrayEnd1234)
return
}
......@@ -822,13 +847,13 @@ func (x *KubeProxyConfiguration) codecDecodeSelfFromArray(l int, d *codec1978.De
} else {
x.HostnameOverride = string(r.DecodeString())
}
yyj28++
if yyhl28 {
yyb28 = yyj28 > l
yyj29++
if yyhl29 {
yyb29 = yyj29 > l
} else {
yyb28 = r.CheckBreak()
yyb29 = r.CheckBreak()
}
if yyb28 {
if yyb29 {
z.DecSendContainerState(codecSelfer_containerArrayEnd1234)
return
}
......@@ -841,20 +866,20 @@ func (x *KubeProxyConfiguration) codecDecodeSelfFromArray(l int, d *codec1978.De
if x.IPTablesMasqueradeBit == nil {
x.IPTablesMasqueradeBit = new(int32)
}
yym35 := z.DecBinary()
_ = yym35
yym36 := z.DecBinary()
_ = yym36
if false {
} else {
*((*int32)(x.IPTablesMasqueradeBit)) = int32(r.DecodeInt(32))
}
}
yyj28++
if yyhl28 {
yyb28 = yyj28 > l
yyj29++
if yyhl29 {
yyb29 = yyj29 > l
} else {
yyb28 = r.CheckBreak()
yyb29 = r.CheckBreak()
}
if yyb28 {
if yyb29 {
z.DecSendContainerState(codecSelfer_containerArrayEnd1234)
return
}
......@@ -862,24 +887,24 @@ func (x *KubeProxyConfiguration) codecDecodeSelfFromArray(l int, d *codec1978.De
if r.TryDecodeAsNil() {
x.IPTablesSyncPeriod = pkg1_unversioned.Duration{}
} else {
yyv36 := &x.IPTablesSyncPeriod
yym37 := z.DecBinary()
_ = yym37
yyv37 := &x.IPTablesSyncPeriod
yym38 := z.DecBinary()
_ = yym38
if false {
} else if z.HasExtensions() && z.DecExt(yyv36) {
} else if !yym37 && z.IsJSONHandle() {
z.DecJSONUnmarshal(yyv36)
} else if z.HasExtensions() && z.DecExt(yyv37) {
} else if !yym38 && z.IsJSONHandle() {
z.DecJSONUnmarshal(yyv37)
} else {
z.DecFallback(yyv36, false)
z.DecFallback(yyv37, false)
}
}
yyj28++
if yyhl28 {
yyb28 = yyj28 > l
yyj29++
if yyhl29 {
yyb29 = yyj29 > l
} else {
yyb28 = r.CheckBreak()
yyb29 = r.CheckBreak()
}
if yyb28 {
if yyb29 {
z.DecSendContainerState(codecSelfer_containerArrayEnd1234)
return
}
......@@ -889,13 +914,13 @@ func (x *KubeProxyConfiguration) codecDecodeSelfFromArray(l int, d *codec1978.De
} else {
x.KubeconfigPath = string(r.DecodeString())
}
yyj28++
if yyhl28 {
yyb28 = yyj28 > l
yyj29++
if yyhl29 {
yyb29 = yyj29 > l
} else {
yyb28 = r.CheckBreak()
yyb29 = r.CheckBreak()
}
if yyb28 {
if yyb29 {
z.DecSendContainerState(codecSelfer_containerArrayEnd1234)
return
}
......@@ -905,13 +930,13 @@ func (x *KubeProxyConfiguration) codecDecodeSelfFromArray(l int, d *codec1978.De
} else {
x.MasqueradeAll = bool(r.DecodeBool())
}
yyj28++
if yyhl28 {
yyb28 = yyj28 > l
yyj29++
if yyhl29 {
yyb29 = yyj29 > l
} else {
yyb28 = r.CheckBreak()
yyb29 = r.CheckBreak()
}
if yyb28 {
if yyb29 {
z.DecSendContainerState(codecSelfer_containerArrayEnd1234)
return
}
......@@ -921,13 +946,13 @@ func (x *KubeProxyConfiguration) codecDecodeSelfFromArray(l int, d *codec1978.De
} else {
x.Master = string(r.DecodeString())
}
yyj28++
if yyhl28 {
yyb28 = yyj28 > l
yyj29++
if yyhl29 {
yyb29 = yyj29 > l
} else {
yyb28 = r.CheckBreak()
yyb29 = r.CheckBreak()
}
if yyb28 {
if yyb29 {
z.DecSendContainerState(codecSelfer_containerArrayEnd1234)
return
}
......@@ -940,20 +965,20 @@ func (x *KubeProxyConfiguration) codecDecodeSelfFromArray(l int, d *codec1978.De
if x.OOMScoreAdj == nil {
x.OOMScoreAdj = new(int32)
}
yym42 := z.DecBinary()
_ = yym42
yym43 := z.DecBinary()
_ = yym43
if false {
} else {
*((*int32)(x.OOMScoreAdj)) = int32(r.DecodeInt(32))
}
}
yyj28++
if yyhl28 {
yyb28 = yyj28 > l
yyj29++
if yyhl29 {
yyb29 = yyj29 > l
} else {
yyb28 = r.CheckBreak()
yyb29 = r.CheckBreak()
}
if yyb28 {
if yyb29 {
z.DecSendContainerState(codecSelfer_containerArrayEnd1234)
return
}
......@@ -963,13 +988,13 @@ func (x *KubeProxyConfiguration) codecDecodeSelfFromArray(l int, d *codec1978.De
} else {
x.Mode = ProxyMode(r.DecodeString())
}
yyj28++
if yyhl28 {
yyb28 = yyj28 > l
yyj29++
if yyhl29 {
yyb29 = yyj29 > l
} else {
yyb28 = r.CheckBreak()
yyb29 = r.CheckBreak()
}
if yyb28 {
if yyb29 {
z.DecSendContainerState(codecSelfer_containerArrayEnd1234)
return
}
......@@ -979,13 +1004,13 @@ func (x *KubeProxyConfiguration) codecDecodeSelfFromArray(l int, d *codec1978.De
} else {
x.PortRange = string(r.DecodeString())
}
yyj28++
if yyhl28 {
yyb28 = yyj28 > l
yyj29++
if yyhl29 {
yyb29 = yyj29 > l
} else {
yyb28 = r.CheckBreak()
yyb29 = r.CheckBreak()
}
if yyb28 {
if yyb29 {
z.DecSendContainerState(codecSelfer_containerArrayEnd1234)
return
}
......@@ -995,13 +1020,13 @@ func (x *KubeProxyConfiguration) codecDecodeSelfFromArray(l int, d *codec1978.De
} else {
x.ResourceContainer = string(r.DecodeString())
}
yyj28++
if yyhl28 {
yyb28 = yyj28 > l
yyj29++
if yyhl29 {
yyb29 = yyj29 > l
} else {
yyb28 = r.CheckBreak()
yyb29 = r.CheckBreak()
}
if yyb28 {
if yyb29 {
z.DecSendContainerState(codecSelfer_containerArrayEnd1234)
return
}
......@@ -1009,24 +1034,24 @@ func (x *KubeProxyConfiguration) codecDecodeSelfFromArray(l int, d *codec1978.De
if r.TryDecodeAsNil() {
x.UDPIdleTimeout = pkg1_unversioned.Duration{}
} else {
yyv46 := &x.UDPIdleTimeout
yym47 := z.DecBinary()
_ = yym47
yyv47 := &x.UDPIdleTimeout
yym48 := z.DecBinary()
_ = yym48
if false {
} else if z.HasExtensions() && z.DecExt(yyv46) {
} else if !yym47 && z.IsJSONHandle() {
z.DecJSONUnmarshal(yyv46)
} else if z.HasExtensions() && z.DecExt(yyv47) {
} else if !yym48 && z.IsJSONHandle() {
z.DecJSONUnmarshal(yyv47)
} else {
z.DecFallback(yyv46, false)
z.DecFallback(yyv47, false)
}
}
yyj28++
if yyhl28 {
yyb28 = yyj28 > l
yyj29++
if yyhl29 {
yyb29 = yyj29 > l
} else {
yyb28 = r.CheckBreak()
yyb29 = r.CheckBreak()
}
if yyb28 {
if yyb29 {
z.DecSendContainerState(codecSelfer_containerArrayEnd1234)
return
}
......@@ -1036,13 +1061,29 @@ func (x *KubeProxyConfiguration) codecDecodeSelfFromArray(l int, d *codec1978.De
} else {
x.ConntrackMax = int32(r.DecodeInt(32))
}
yyj28++
if yyhl28 {
yyb28 = yyj28 > l
yyj29++
if yyhl29 {
yyb29 = yyj29 > l
} else {
yyb29 = r.CheckBreak()
}
if yyb29 {
z.DecSendContainerState(codecSelfer_containerArrayEnd1234)
return
}
z.DecSendContainerState(codecSelfer_containerArrayElem1234)
if r.TryDecodeAsNil() {
x.ConntrackMaxPerCore = 0
} else {
x.ConntrackMaxPerCore = int32(r.DecodeInt(32))
}
yyj29++
if yyhl29 {
yyb29 = yyj29 > l
} else {
yyb28 = r.CheckBreak()
yyb29 = r.CheckBreak()
}
if yyb28 {
if yyb29 {
z.DecSendContainerState(codecSelfer_containerArrayEnd1234)
return
}
......@@ -1050,24 +1091,24 @@ func (x *KubeProxyConfiguration) codecDecodeSelfFromArray(l int, d *codec1978.De
if r.TryDecodeAsNil() {
x.ConntrackTCPEstablishedTimeout = pkg1_unversioned.Duration{}
} else {
yyv49 := &x.ConntrackTCPEstablishedTimeout
yym50 := z.DecBinary()
_ = yym50
yyv51 := &x.ConntrackTCPEstablishedTimeout
yym52 := z.DecBinary()
_ = yym52
if false {
} else if z.HasExtensions() && z.DecExt(yyv49) {
} else if !yym50 && z.IsJSONHandle() {
z.DecJSONUnmarshal(yyv49)
} else if z.HasExtensions() && z.DecExt(yyv51) {
} else if !yym52 && z.IsJSONHandle() {
z.DecJSONUnmarshal(yyv51)
} else {
z.DecFallback(yyv49, false)
z.DecFallback(yyv51, false)
}
}
yyj28++
if yyhl28 {
yyb28 = yyj28 > l
yyj29++
if yyhl29 {
yyb29 = yyj29 > l
} else {
yyb28 = r.CheckBreak()
yyb29 = r.CheckBreak()
}
if yyb28 {
if yyb29 {
z.DecSendContainerState(codecSelfer_containerArrayEnd1234)
return
}
......@@ -1077,13 +1118,13 @@ func (x *KubeProxyConfiguration) codecDecodeSelfFromArray(l int, d *codec1978.De
} else {
x.Kind = string(r.DecodeString())
}
yyj28++
if yyhl28 {
yyb28 = yyj28 > l
yyj29++
if yyhl29 {
yyb29 = yyj29 > l
} else {
yyb28 = r.CheckBreak()
yyb29 = r.CheckBreak()
}
if yyb28 {
if yyb29 {
z.DecSendContainerState(codecSelfer_containerArrayEnd1234)
return
}
......@@ -1094,17 +1135,17 @@ func (x *KubeProxyConfiguration) codecDecodeSelfFromArray(l int, d *codec1978.De
x.APIVersion = string(r.DecodeString())
}
for {
yyj28++
if yyhl28 {
yyb28 = yyj28 > l
yyj29++
if yyhl29 {
yyb29 = yyj29 > l
} else {
yyb28 = r.CheckBreak()
yyb29 = r.CheckBreak()
}
if yyb28 {
if yyb29 {
break
}
z.DecSendContainerState(codecSelfer_containerArrayElem1234)
z.DecStructFieldNotFound(yyj28-1, "")
z.DecStructFieldNotFound(yyj29-1, "")
}
z.DecSendContainerState(codecSelfer_containerArrayEnd1234)
}
......
......@@ -62,10 +62,15 @@ type KubeProxyConfiguration struct {
// udpIdleTimeout is how long an idle UDP connection will be kept open (e.g. '250ms', '2s').
// Must be greater than 0. Only applicable for proxyMode=userspace.
UDPIdleTimeout unversioned.Duration `json:"udpTimeoutMilliseconds"`
// conntrackMax is the maximum number of NAT connections to track (0 to leave as-is)")
// conntrackMax is the maximum number of NAT connections to track (0 to
// leave as-is). This takes precendence over conntrackMaxPerCore.
ConntrackMax int32 `json:"conntrackMax"`
// conntrackTCPEstablishedTimeout is how long an idle UDP connection will be kept open
// (e.g. '250ms', '2s'). Must be greater than 0. Only applicable for proxyMode is Userspace
// conntrackMaxPerCore is the maximum number of NAT connections to track
// per CPU core (0 to leave as-is). This value is only considered if
// conntrackMax == 0.
ConntrackMaxPerCore int32 `json:"conntrackMaxPerCore"`
// conntrackTCPEstablishedTimeout is how long an idle TCP connection will be kept open
// (e.g. '250ms', '2s'). Must be greater than 0.
ConntrackTCPEstablishedTimeout unversioned.Duration `json:"conntrackTCPEstablishedTimeout"`
}
......
......@@ -61,6 +61,7 @@ func autoConvert_v1alpha1_KubeProxyConfiguration_To_componentconfig_KubeProxyCon
out.ResourceContainer = in.ResourceContainer
out.UDPIdleTimeout = in.UDPIdleTimeout
out.ConntrackMax = in.ConntrackMax
out.ConntrackMaxPerCore = in.ConntrackMaxPerCore
out.ConntrackTCPEstablishedTimeout = in.ConntrackTCPEstablishedTimeout
return nil
}
......@@ -89,6 +90,7 @@ func autoConvert_componentconfig_KubeProxyConfiguration_To_v1alpha1_KubeProxyCon
out.ResourceContainer = in.ResourceContainer
out.UDPIdleTimeout = in.UDPIdleTimeout
out.ConntrackMax = in.ConntrackMax
out.ConntrackMaxPerCore = in.ConntrackMaxPerCore
out.ConntrackTCPEstablishedTimeout = in.ConntrackTCPEstablishedTimeout
return nil
}
......
......@@ -73,6 +73,7 @@ func DeepCopy_v1alpha1_KubeProxyConfiguration(in KubeProxyConfiguration, out *Ku
return err
}
out.ConntrackMax = in.ConntrackMax
out.ConntrackMaxPerCore = in.ConntrackMaxPerCore
if err := unversioned.DeepCopy_unversioned_Duration(in.ConntrackTCPEstablishedTimeout, &out.ConntrackTCPEstablishedTimeout, c); err != nil {
return err
}
......
......@@ -58,8 +58,12 @@ func SetDefaults_KubeProxyConfiguration(obj *KubeProxyConfiguration) {
if obj.UDPIdleTimeout == zero {
obj.UDPIdleTimeout = unversioned.Duration{Duration: 250 * time.Millisecond}
}
// If ConntrackMax is set, respect it.
if obj.ConntrackMax == 0 {
obj.ConntrackMax = 256 * 1024 // 4x default (64k)
// If ConntrackMax is *not* set, use per-core scaling.
if obj.ConntrackMaxPerCore == 0 {
obj.ConntrackMaxPerCore = 32 * 1024
}
}
if obj.IPTablesMasqueradeBit == nil {
temp := int32(14)
......
......@@ -62,10 +62,15 @@ type KubeProxyConfiguration struct {
// udpIdleTimeout is how long an idle UDP connection will be kept open (e.g. '250ms', '2s').
// Must be greater than 0. Only applicable for proxyMode=userspace.
UDPIdleTimeout unversioned.Duration `json:"udpTimeoutMilliseconds"`
// conntrackMax is the maximum number of NAT connections to track (0 to leave as-is)")
// conntrackMax is the maximum number of NAT connections to track (0 to
// leave as-is). This takes precendence over conntrackMaxPerCore.
ConntrackMax int32 `json:"conntrackMax"`
// conntrackTCPEstablishedTimeout is how long an idle UDP connection will be kept open
// (e.g. '250ms', '2s'). Must be greater than 0. Only applicable for proxyMode is Userspace
// conntrackMaxPerCore is the maximum number of NAT connections to track
// per CPU core (0 to leave as-is). This value is only considered if
// conntrackMax == 0.
ConntrackMaxPerCore int32 `json:"conntrackMaxPerCore"`
// conntrackTCPEstablishedTimeout is how long an idle TCP connection will be kept open
// (e.g. '250ms', '2s'). Must be greater than 0.
ConntrackTCPEstablishedTimeout unversioned.Duration `json:"conntrackTCPEstablishedTimeout"`
}
......
......@@ -549,6 +549,9 @@ func (gce *GCECloud) EnsureLoadBalancer(apiService *api.Service, hostNames []str
if err != nil {
return nil, err
}
if !fwdRuleExists {
glog.Infof("Forwarding rule %v for Service %v/%v doesn't exist", loadBalancerName, apiService.Namespace, apiService.Name)
}
// Make sure we know which IP address will be used and have properly reserved
// it as static before moving forward with the rest of our operations.
......@@ -683,6 +686,9 @@ func (gce *GCECloud) EnsureLoadBalancer(apiService *api.Service, hostNames []str
if err != nil {
return nil, err
}
if !tpExists {
glog.Infof("Target pool %v for Service %v/%v doesn't exist", loadBalancerName, apiService.Namespace, apiService.Name)
}
// Now we get to some slightly more interesting logic.
// First, neither target pools nor forwarding rules can be updated in place -
......@@ -699,13 +705,13 @@ func (gce *GCECloud) EnsureLoadBalancer(apiService *api.Service, hostNames []str
if err := gce.deleteForwardingRule(loadBalancerName, gce.region); err != nil {
return nil, fmt.Errorf("failed to delete existing forwarding rule %s for load balancer update: %v", loadBalancerName, err)
}
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): deleted forwarding rule", loadBalancerName, serviceName)
glog.Infof("EnsureLoadBalancer(%v(%v)): deleted forwarding rule", loadBalancerName, serviceName)
}
if tpExists && tpNeedsUpdate {
if err := gce.deleteTargetPool(loadBalancerName, gce.region); err != nil {
return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", loadBalancerName, err)
}
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): deleted target pool", loadBalancerName, serviceName)
glog.Infof("EnsureLoadBalancer(%v(%v)): deleted target pool", loadBalancerName, serviceName)
}
// Once we've deleted the resources (if necessary), build them back up (or for
......@@ -720,9 +726,9 @@ func (gce *GCECloud) EnsureLoadBalancer(apiService *api.Service, hostNames []str
return nil, fmt.Errorf("failed to create target pool %s: %v", loadBalancerName, err)
}
if len(hosts) <= maxTargetPoolCreateInstances {
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created target pool", loadBalancerName, serviceName)
glog.Infof("EnsureLoadBalancer(%v(%v)): created target pool", loadBalancerName, serviceName)
} else {
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created initial target pool (now updating with %d hosts)", loadBalancerName, serviceName, len(hosts)-maxTargetPoolCreateInstances)
glog.Infof("EnsureLoadBalancer(%v(%v)): created initial target pool (now updating with %d hosts)", loadBalancerName, serviceName, len(hosts)-maxTargetPoolCreateInstances)
created := sets.NewString()
for _, host := range createInstances {
......@@ -760,20 +766,31 @@ func (gce *GCECloud) forwardingRuleNeedsUpdate(name, region string, loadBalancer
if isHTTPErrorCode(err, http.StatusNotFound) {
return false, true, "", nil
}
return false, false, "", fmt.Errorf("error getting load balancer's forwarding rule: %v", err)
// Err on the side of caution in case of errors. Caller should notice the error and retry.
// We never want to end up recreating resources because gce api flaked.
return true, false, "", fmt.Errorf("error getting load balancer's forwarding rule: %v", err)
}
if loadBalancerIP != fwd.IPAddress {
// If the user asks for a specific static ip through the Service spec,
// check that we're actually using it.
// TODO: we report loadbalancer IP through status, so we want to verify if
// that matches the forwarding rule as well.
if loadBalancerIP != "" && loadBalancerIP != fwd.IPAddress {
glog.Infof("LoadBalancer ip for forwarding rule %v was expected to be %v, but was actually %v", fwd.Name, fwd.IPAddress, loadBalancerIP)
return true, true, fwd.IPAddress, nil
}
portRange, err := loadBalancerPortRange(ports)
if err != nil {
return false, false, "", err
// Err on the side of caution in case of errors. Caller should notice the error and retry.
// We never want to end up recreating resources because gce api flaked.
return true, false, "", err
}
if portRange != fwd.PortRange {
glog.Infof("LoadBalancer port range for forwarding rule %v was expected to be %v, but was actually %v", fwd.Name, fwd.PortRange, portRange)
return true, true, fwd.IPAddress, nil
}
// The service controller verified all the protocols match on the ports, just check the first one
if string(ports[0].Protocol) != fwd.IPProtocol {
glog.Infof("LoadBalancer protocol for forwarding rule %v was expected to be %v, but was actually %v", fwd.Name, fwd.IPProtocol, string(ports[0].Protocol))
return true, true, fwd.IPAddress, nil
}
......@@ -811,9 +828,20 @@ func (gce *GCECloud) targetPoolNeedsUpdate(name, region string, affinityType api
if isHTTPErrorCode(err, http.StatusNotFound) {
return false, true, nil
}
return false, false, fmt.Errorf("error getting load balancer's target pool: %v", err)
}
if translateAffinityType(affinityType) != tp.SessionAffinity {
// Err on the side of caution in case of errors. Caller should notice the error and retry.
// We never want to end up recreating resources because gce api flaked.
return true, false, fmt.Errorf("error getting load balancer's target pool: %v", err)
}
// TODO: If the user modifies their Service's session affinity, it *should*
// reflect in the associated target pool. However, currently not setting the
// session affinity on a target pool defaults it to the empty string while
// not setting in on a Service defaults it to None. There is a lack of
// documentation around the default setting for the target pool, so if we
// find it's the undocumented empty string, don't blindly recreate the
// target pool (which results in downtime). Fix this when we have formally
// defined the defaults on either side.
if tp.SessionAffinity != "" && translateAffinityType(affinityType) != tp.SessionAffinity {
glog.Infof("LoadBalancer target pool %v changed affinity from %v to %v", name, tp.SessionAffinity, affinityType)
return true, true, nil
}
return true, false, nil
......
......@@ -18,6 +18,7 @@ package gcp_credentials
import (
"encoding/json"
"io/ioutil"
"net/http"
"strings"
"time"
......@@ -31,13 +32,20 @@ const (
metadataAttributes = metadataUrl + "instance/attributes/"
dockerConfigKey = metadataAttributes + "google-dockercfg"
dockerConfigUrlKey = metadataAttributes + "google-dockercfg-url"
serviceAccounts = metadataUrl + "instance/service-accounts/"
metadataScopes = metadataUrl + "instance/service-accounts/default/scopes"
metadataToken = metadataUrl + "instance/service-accounts/default/token"
metadataEmail = metadataUrl + "instance/service-accounts/default/email"
storageScopePrefix = "https://www.googleapis.com/auth/devstorage"
cloudPlatformScopePrefix = "https://www.googleapis.com/auth/cloud-platform"
googleProductName = "Google"
defaultServiceAccount = "default/"
)
// Product file path that contains the cloud service name.
// This is a variable instead of a const to enable testing.
var gceProductNameFile = "/sys/class/dmi/id/product_name"
// For these urls, the parts of the host name can be glob, for example '*.gcr.io" will match
// "foo.gcr.io" and "bar.gcr.io".
var containerRegistryUrls = []string{"container.cloud.google.com", "gcr.io", "*.gcr.io"}
......@@ -98,10 +106,20 @@ func init() {
})
}
// Returns true if it finds a local GCE VM.
// Looks at a product file that is an undocumented API.
func onGCEVM() bool {
data, err := ioutil.ReadFile(gceProductNameFile)
if err != nil {
glog.V(2).Infof("Error while reading product_name: %v", err)
return false
}
return strings.Contains(string(data), googleProductName)
}
// Enabled implements DockerConfigProvider for all of the Google implementations.
func (g *metadataProvider) Enabled() bool {
_, err := credentialprovider.ReadUrl(metadataUrl, g.Client, metadataHeader)
return err == nil
return onGCEVM()
}
// LazyProvide implements DockerConfigProvider. Should never be called.
......@@ -148,18 +166,74 @@ func (g *dockerConfigUrlKeyProvider) Provide() credentialprovider.DockerConfig {
return credentialprovider.DockerConfig{}
}
// runcWithBackoff runs input function `f` with an exponential backoff.
// Note that this method can block indefinitely.
func runWithBackoff(f func() ([]byte, error)) []byte {
var backoff = 100 * time.Millisecond
const maxBackoff = time.Minute
for {
value, err := f()
if err == nil {
return value
}
time.Sleep(backoff)
backoff = backoff * 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
}
// Enabled implements a special metadata-based check, which verifies the
// storage scope is available on the GCE VM.
// If running on a GCE VM, check if 'default' service account exists.
// If it does not exist, assume that registry is not enabled.
// If default service account exists, check if relevant scopes exist in the default service account.
// The metadata service can become temporarily inaccesible. Hence all requests to the metadata
// service will be retried until the metadata server returns a `200`.
// It is expected that "http://metadata.google.internal./computeMetadata/v1/instance/service-accounts/" will return a `200`
// and "http://metadata.google.internal./computeMetadata/v1/instance/service-accounts/default/scopes" will also return `200`.
// More information on metadata service can be found here - https://cloud.google.com/compute/docs/storing-retrieving-metadata
func (g *containerRegistryProvider) Enabled() bool {
value, err := credentialprovider.ReadUrl(metadataScopes+"?alt=json", g.Client, metadataHeader)
if err != nil {
if !onGCEVM() {
return false
}
// Given that we are on GCE, we should keep retrying until the metadata server responds.
value := runWithBackoff(func() ([]byte, error) {
value, err := credentialprovider.ReadUrl(serviceAccounts, g.Client, metadataHeader)
if err != nil {
glog.V(2).Infof("Failed to Get service accounts from gce metadata server: %v", err)
}
return value, err
})
// We expect the service account to return a list of account directories separated by newlines, e.g.,
// sv-account-name1/
// sv-account-name2/
// ref: https://cloud.google.com/compute/docs/storing-retrieving-metadata
defaultServiceAccountExists := false
for _, sa := range strings.Split(string(value), "\n") {
if strings.TrimSpace(sa) == defaultServiceAccount {
defaultServiceAccountExists = true
break
}
}
if !defaultServiceAccountExists {
glog.V(2).Infof("'default' service account does not exist. Found following service accounts: %q", string(value))
return false
}
url := metadataScopes + "?alt=json"
value = runWithBackoff(func() ([]byte, error) {
value, err := credentialprovider.ReadUrl(url, g.Client, metadataHeader)
if err != nil {
glog.V(2).Infof("Failed to Get scopes in default service account from gce metadata server: %v", err)
}
return value, err
})
var scopes []string
if err := json.Unmarshal([]byte(value), &scopes); err != nil {
if err := json.Unmarshal(value, &scopes); err != nil {
glog.Errorf("Failed to unmarshal scopes: %v", err)
return false
}
for _, v := range scopes {
// cloudPlatformScope implies storage scope.
if strings.HasPrefix(v, storageScopePrefix) || strings.HasPrefix(v, cloudPlatformScopePrefix) {
......
......@@ -29,7 +29,11 @@ import (
// DockerConfigProvider is the interface that registered extensions implement
// to materialize 'dockercfg' credentials.
type DockerConfigProvider interface {
// Enabled returns true if the config provider is enabled.
// Implementations can be blocking - e.g. metadata server unavailable.
Enabled() bool
// Provide returns docker configuration.
// Implementations can be blocking - e.g. metadata server unavailable.
Provide() DockerConfig
// LazyProvide() gets called after URL matches have been performed, so the
// location used as the key in DockerConfig would be redundant.
......
......@@ -860,8 +860,17 @@ func (dm *DockerManager) IsImagePresent(image kubecontainer.ImageSpec) (bool, er
// Removes the specified image.
func (dm *DockerManager) RemoveImage(image kubecontainer.ImageSpec) error {
// TODO(harryz) currently Runtime interface does not provide other remove options.
_, err := dm.client.RemoveImage(image.Image, dockertypes.ImageRemoveOptions{})
// If the image has multiple tags, we need to remove all the tags
if inspectImage, err := dm.client.InspectImage(image.Image); err == nil && len(inspectImage.RepoTags) > 1 {
for _, tag := range inspectImage.RepoTags {
if _, err := dm.client.RemoveImage(tag, dockertypes.ImageRemoveOptions{PruneChildren: true}); err != nil {
return err
}
}
return nil
}
_, err := dm.client.RemoveImage(image.Image, dockertypes.ImageRemoveOptions{PruneChildren: true})
return err
}
......
......@@ -32,6 +32,11 @@ import (
"k8s.io/kubernetes/pkg/api"
)
type calledDetail struct {
name string
arguments []interface{}
}
// FakeDockerClient is a simple fake docker client, so that kubelet can be run for testing without requiring a real docker setup.
type FakeDockerClient struct {
sync.Mutex
......@@ -41,7 +46,7 @@ type FakeDockerClient struct {
Image *dockertypes.ImageInspect
Images []dockertypes.Image
Errors map[string]error
called []string
called []calledDetail
pulled []string
// Created, Stopped and Removed all container docker ID
......@@ -95,13 +100,21 @@ func (f *FakeDockerClient) ClearErrors() {
func (f *FakeDockerClient) ClearCalls() {
f.Lock()
defer f.Unlock()
f.called = []string{}
f.called = []calledDetail{}
f.Stopped = []string{}
f.pulled = []string{}
f.Created = []string{}
f.Removed = []string{}
}
func (f *FakeDockerClient) getCalledNames() []string {
names := []string{}
for _, detail := range f.called {
names = append(names, detail.name)
}
return names
}
// Because the new data type returned by engine-api is too complex to manually initialize, we need a
// fake container which is easier to initialize.
type FakeContainer struct {
......@@ -178,6 +191,17 @@ func (f *FakeDockerClient) AssertCalls(calls []string) (err error) {
f.Lock()
defer f.Unlock()
if !reflect.DeepEqual(calls, f.getCalledNames()) {
err = fmt.Errorf("expected %#v, got %#v", calls, f.getCalledNames())
}
return
}
func (f *FakeDockerClient) AssertCallDetails(calls []calledDetail) (err error) {
f.Lock()
defer f.Unlock()
if !reflect.DeepEqual(calls, f.called) {
err = fmt.Errorf("expected %#v, got %#v", calls, f.called)
}
......@@ -216,24 +240,6 @@ func (f *FakeDockerClient) AssertStopped(stopped []string) error {
return nil
}
func (f *FakeDockerClient) AssertUnorderedCalls(calls []string) (err error) {
f.Lock()
defer f.Unlock()
expected := make([]string, len(calls))
actual := make([]string, len(f.called))
copy(expected, calls)
copy(actual, f.called)
sort.StringSlice(expected).Sort()
sort.StringSlice(actual).Sort()
if !reflect.DeepEqual(actual, expected) {
err = fmt.Errorf("expected(sorted) %#v, got(sorted) %#v", expected, actual)
}
return
}
func (f *FakeDockerClient) popError(op string) error {
if f.Errors == nil {
return nil
......@@ -252,7 +258,7 @@ func (f *FakeDockerClient) popError(op string) error {
func (f *FakeDockerClient) ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error) {
f.Lock()
defer f.Unlock()
f.called = append(f.called, "list")
f.called = append(f.called, calledDetail{name: "list"})
err := f.popError("list")
containerList := append([]dockertypes.Container{}, f.RunningContainerList...)
if options.All {
......@@ -269,7 +275,7 @@ func (f *FakeDockerClient) ListContainers(options dockertypes.ContainerListOptio
func (f *FakeDockerClient) InspectContainer(id string) (*dockertypes.ContainerJSON, error) {
f.Lock()
defer f.Unlock()
f.called = append(f.called, "inspect_container")
f.called = append(f.called, calledDetail{name: "inspect_container"})
err := f.popError("inspect_container")
if container, ok := f.ContainerMap[id]; ok {
return container, err
......@@ -282,7 +288,7 @@ func (f *FakeDockerClient) InspectContainer(id string) (*dockertypes.ContainerJS
func (f *FakeDockerClient) InspectImage(name string) (*dockertypes.ImageInspect, error) {
f.Lock()
defer f.Unlock()
f.called = append(f.called, "inspect_image")
f.called = append(f.called, calledDetail{name: "inspect_image"})
err := f.popError("inspect_image")
return f.Image, err
}
......@@ -306,7 +312,7 @@ func (f *FakeDockerClient) normalSleep(mean, stdDev, cutOffMillis int) {
func (f *FakeDockerClient) CreateContainer(c dockertypes.ContainerCreateConfig) (*dockertypes.ContainerCreateResponse, error) {
f.Lock()
defer f.Unlock()
f.called = append(f.called, "create")
f.called = append(f.called, calledDetail{name: "create"})
if err := f.popError("create"); err != nil {
return nil, err
}
......@@ -329,7 +335,7 @@ func (f *FakeDockerClient) CreateContainer(c dockertypes.ContainerCreateConfig)
func (f *FakeDockerClient) StartContainer(id string) error {
f.Lock()
defer f.Unlock()
f.called = append(f.called, "start")
f.called = append(f.called, calledDetail{name: "start"})
if err := f.popError("start"); err != nil {
return err
}
......@@ -352,7 +358,7 @@ func (f *FakeDockerClient) StartContainer(id string) error {
func (f *FakeDockerClient) StopContainer(id string, timeout int) error {
f.Lock()
defer f.Unlock()
f.called = append(f.called, "stop")
f.called = append(f.called, calledDetail{name: "stop"})
if err := f.popError("stop"); err != nil {
return err
}
......@@ -390,7 +396,7 @@ func (f *FakeDockerClient) StopContainer(id string, timeout int) error {
func (f *FakeDockerClient) RemoveContainer(id string, opts dockertypes.ContainerRemoveOptions) error {
f.Lock()
defer f.Unlock()
f.called = append(f.called, "remove")
f.called = append(f.called, calledDetail{name: "remove"})
err := f.popError("remove")
if err != nil {
return err
......@@ -413,7 +419,7 @@ func (f *FakeDockerClient) RemoveContainer(id string, opts dockertypes.Container
func (f *FakeDockerClient) Logs(id string, opts dockertypes.ContainerLogsOptions, sopts StreamOptions) error {
f.Lock()
defer f.Unlock()
f.called = append(f.called, "logs")
f.called = append(f.called, calledDetail{name: "logs"})
return f.popError("logs")
}
......@@ -422,7 +428,7 @@ func (f *FakeDockerClient) Logs(id string, opts dockertypes.ContainerLogsOptions
func (f *FakeDockerClient) PullImage(image string, auth dockertypes.AuthConfig, opts dockertypes.ImagePullOptions) error {
f.Lock()
defer f.Unlock()
f.called = append(f.called, "pull")
f.called = append(f.called, calledDetail{name: "pull"})
err := f.popError("pull")
if err == nil {
authJson, _ := json.Marshal(auth)
......@@ -445,21 +451,21 @@ func (f *FakeDockerClient) CreateExec(id string, opts dockertypes.ExecConfig) (*
f.Lock()
defer f.Unlock()
f.execCmd = opts.Cmd
f.called = append(f.called, "create_exec")
f.called = append(f.called, calledDetail{name: "create_exec"})
return &dockertypes.ContainerExecCreateResponse{ID: "12345678"}, nil
}
func (f *FakeDockerClient) StartExec(startExec string, opts dockertypes.ExecStartCheck, sopts StreamOptions) error {
f.Lock()
defer f.Unlock()
f.called = append(f.called, "start_exec")
f.called = append(f.called, calledDetail{name: "start_exec"})
return nil
}
func (f *FakeDockerClient) AttachToContainer(id string, opts dockertypes.ContainerAttachOptions, sopts StreamOptions) error {
f.Lock()
defer f.Unlock()
f.called = append(f.called, "attach")
f.called = append(f.called, calledDetail{name: "attach"})
return nil
}
......@@ -468,12 +474,13 @@ func (f *FakeDockerClient) InspectExec(id string) (*dockertypes.ContainerExecIns
}
func (f *FakeDockerClient) ListImages(opts dockertypes.ImageListOptions) ([]dockertypes.Image, error) {
f.called = append(f.called, "list_images")
f.called = append(f.called, calledDetail{name: "list_images"})
err := f.popError("list_images")
return f.Images, err
}
func (f *FakeDockerClient) RemoveImage(image string, opts dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDelete, error) {
f.called = append(f.called, calledDetail{name: "remove_image", arguments: []interface{}{image, opts}})
err := f.popError("remove_image")
if err == nil {
for i := range f.Images {
......@@ -541,7 +548,7 @@ func (f *FakeDockerPuller) IsImagePresent(name string) (bool, error) {
func (f *FakeDockerClient) ImageHistory(id string) ([]dockertypes.ImageHistory, error) {
f.Lock()
defer f.Unlock()
f.called = append(f.called, "image_history")
f.called = append(f.called, calledDetail{name: "image_history"})
history := f.ImageHistoryMap[id]
return history, nil
}
......
......@@ -29,5 +29,6 @@ type LoadBalancer interface {
// service-port and source address.
NextEndpoint(service proxy.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool) (string, error)
NewService(service proxy.ServicePortName, sessionAffinityType api.ServiceAffinity, stickyMaxAgeMinutes int) error
DeleteService(service proxy.ServicePortName)
CleanupStaleStickySessions(service proxy.ServicePortName)
}
......@@ -447,6 +447,7 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) {
if err != nil {
glog.Errorf("Failed to stop service %q: %v", name, err)
}
proxier.loadBalancer.DeleteService(name)
}
}
}
......
......@@ -82,6 +82,7 @@ func NewLoadBalancerRR() *LoadBalancerRR {
}
func (lb *LoadBalancerRR) NewService(svcPort proxy.ServicePortName, affinityType api.ServiceAffinity, ttlMinutes int) error {
glog.V(4).Infof("LoadBalancerRR NewService %q", svcPort)
lb.lock.Lock()
defer lb.lock.Unlock()
lb.newServiceInternal(svcPort, affinityType, ttlMinutes)
......@@ -103,6 +104,13 @@ func (lb *LoadBalancerRR) newServiceInternal(svcPort proxy.ServicePortName, affi
return lb.services[svcPort]
}
func (lb *LoadBalancerRR) DeleteService(svcPort proxy.ServicePortName) {
glog.V(4).Infof("LoadBalancerRR DeleteService %q", svcPort)
lb.lock.Lock()
defer lb.lock.Unlock()
delete(lb.services, svcPort)
}
// return true if this service is using some form of session affinity.
func isSessionAffinity(affinity *affinityPolicy) bool {
// Should never be empty string, but checking for it to be safe.
......@@ -281,7 +289,11 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
for k := range lb.services {
if _, exists := registeredEndpoints[k]; !exists {
glog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", k)
delete(lb.services, k)
// Reset but don't delete.
state := lb.services[k]
state.endpoints = []string{}
state.index = 0
state.affinity.affinityMap = map[string]*affinityState{}
}
}
}
......
......@@ -51,7 +51,7 @@ var (
// semantic version is a git hash, but the version itself is no
// longer the direct output of "git describe", but a slight
// translation to be semver compliant.
gitVersion string = "v1.3.2+$Format:%h$"
gitVersion string = "v1.3.3+$Format:%h$"
gitCommit string = "$Format:%H$" // sha1 from git, output of $(git rev-parse HEAD)
gitTreeState string = "not a git tree" // state of git tree, either "clean" or "dirty"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册