提交 2ee8f51b 编写于 作者: M Matt Rickard 提交者: GitHub

Merge pull request #880 from r2d4/1.4.6

Upgrade k8s to v1.4.6
此差异已折叠。
......@@ -27,7 +27,7 @@ minikube start
--insecure-registry stringSlice Insecure Docker registries to pass to the Docker daemon
--iso-url string Location of the minikube iso (default "https://storage.googleapis.com/minikube/minikube-0.7.iso")
--kubernetes-version string The kubernetes version that the minikube VM will use (ex: v1.2.3)
OR a URI which contains a localkube binary (ex: https://storage.googleapis.com/minikube/k8sReleases/v1.3.0/localkube-linux-amd64) (default "v1.4.5")
OR a URI which contains a localkube binary (ex: https://storage.googleapis.com/minikube/k8sReleases/v1.3.0/localkube-linux-amd64) (default "v1.4.6")
--kvm-network string The KVM network name. (only supported with KVM driver) (default "default")
--memory int Amount of RAM allocated to the minikube VM (default 2048)
--network-plugin string The name of the network plugin
......
......@@ -603,6 +603,11 @@ func (as *authStore) isOpPermitted(userName string, key, rangeEnd []byte, permTy
return false
}
// root role should have permission on all ranges
if hasRootRole(user) {
return true
}
if as.isRangeOpPermitted(tx, userName, key, rangeEnd, permTyp) {
return true
}
......
......@@ -154,13 +154,13 @@ type Server interface {
// EtcdServer is the production implementation of the Server interface
type EtcdServer struct {
// r and inflightSnapshots must be the first elements to keep 64-bit alignment for atomic
// access to fields
// count the number of inflight snapshots.
// MUST use atomic operation to access this field.
inflightSnapshots int64
Cfg *ServerConfig
// inflightSnapshots holds count the number of snapshots currently inflight.
inflightSnapshots int64 // must use atomic operations to access; keep 64-bit aligned.
appliedIndex uint64 // must use atomic operations to access; keep 64-bit aligned.
// consistIndex used to hold the offset of current executing entry
// It is initialized to 0 before executing any entry.
consistIndex consistentIndex // must use atomic operations to access; keep 64-bit aligned.
Cfg *ServerConfig
readych chan struct{}
r raftNode
......@@ -195,10 +195,6 @@ type EtcdServer struct {
// compactor is used to auto-compact the KV.
compactor *compactor.Periodic
// consistent index used to hold the offset of current executing entry
// It is initialized to 0 before executing any entry.
consistIndex consistentIndex
// peerRt used to send requests (version, lease) to peers.
peerRt http.RoundTripper
reqIDGen *idutil.Generator
......@@ -212,8 +208,6 @@ type EtcdServer struct {
// wg is used to wait for the go routines that depends on the server state
// to exit when stopping the server.
wg sync.WaitGroup
appliedIndex uint64
}
// NewServer creates a new EtcdServer from the supplied configuration. The
......
......@@ -408,6 +408,13 @@ func (s *store) restore() error {
s.currentRev = rev
}
// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
// the correct revision should be set to compaction revision in the case, not the largest revision
// we have seen.
if s.currentRev.main < s.compactMainRev {
s.currentRev.main = s.compactMainRev
}
for key, lid := range keyToLease {
if s.le == nil {
panic("no lessor to attach lease")
......
......@@ -38,7 +38,7 @@ var (
// SoftState provides state that is useful for logging and debugging.
// The state is volatile and does not need to be persisted to the WAL.
type SoftState struct {
Lead uint64
Lead uint64 // must use atomic operations to access; keep 64-bit aligned.
RaftState StateType
}
......
......@@ -183,9 +183,9 @@ func (x *ConfChangeType) UnmarshalJSON(data []byte) error {
func (ConfChangeType) EnumDescriptor() ([]byte, []int) { return fileDescriptorRaft, []int{2} }
type Entry struct {
Type EntryType `protobuf:"varint,1,opt,name=Type,json=type,enum=raftpb.EntryType" json:"Type"`
Term uint64 `protobuf:"varint,2,opt,name=Term,json=term" json:"Term"`
Index uint64 `protobuf:"varint,3,opt,name=Index,json=index" json:"Index"`
Type EntryType `protobuf:"varint,1,opt,name=Type,json=type,enum=raftpb.EntryType" json:"Type"`
Data []byte `protobuf:"bytes,4,opt,name=Data,json=data" json:"Data,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
......
......@@ -15,9 +15,9 @@ enum EntryType {
}
message Entry {
optional uint64 Term = 2 [(gogoproto.nullable) = false]; // must be 64-bit aligned for atomic operations
optional uint64 Index = 3 [(gogoproto.nullable) = false]; // must be 64-bit aligned for atomic operations
optional EntryType Type = 1 [(gogoproto.nullable) = false];
optional uint64 Term = 2 [(gogoproto.nullable) = false];
optional uint64 Index = 3 [(gogoproto.nullable) = false];
optional bytes Data = 4;
}
......
......@@ -29,7 +29,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "2.3.0"
Version = "3.0.12"
Version = "3.0.13"
// Git SHA Value will be set during build
GitSHA = "Not provided (use ./build instead of go build)"
......
......@@ -27,24 +27,33 @@ import (
"k8s.io/kubernetes/pkg/util/sysctl"
)
// Conntracker is an interface to the global sysctl. Descriptions of the various
// sysctl fields can be found here:
//
// https://www.kernel.org/doc/Documentation/networking/nf_conntrack-sysctl.txt
type Conntracker interface {
// SetMax adjusts nf_conntrack_max.
SetMax(max int) error
// SetTCPEstablishedTimeout adjusts nf_conntrack_tcp_timeout_established.
SetTCPEstablishedTimeout(seconds int) error
// SetTCPCloseWaitTimeout nf_conntrack_tcp_timeout_close_wait.
SetTCPCloseWaitTimeout(seconds int) error
}
type realConntracker struct{}
var readOnlySysFSError = errors.New("ReadOnlySysFS")
func (realConntracker) SetMax(max int) error {
glog.Infof("Setting nf_conntrack_max to %d", max)
if err := sysctl.New().SetSysctl("net/netfilter/nf_conntrack_max", max); err != nil {
func (rct realConntracker) SetMax(max int) error {
if err := rct.setIntSysCtl("nf_conntrack_max", max); err != nil {
return err
}
// sysfs is expected to be mounted as 'rw'. However, it may be unexpectedly mounted as
// 'ro' by docker because of a known docker issue (https://github.com/docker/docker/issues/24000).
// Setting conntrack will fail when sysfs is readonly. When that happens, we don't set conntrack
// hashsize and return a special error readOnlySysFSError here. The caller should deal with
// sysfs is expected to be mounted as 'rw'. However, it may be
// unexpectedly mounted as 'ro' by docker because of a known docker
// issue (https://github.com/docker/docker/issues/24000). Setting
// conntrack will fail when sysfs is readonly. When that happens, we
// don't set conntrack hashsize and return a special error
// readOnlySysFSError here. The caller should deal with
// readOnlySysFSError differently.
writable, err := isSysFSWritable()
if err != nil {
......@@ -58,9 +67,22 @@ func (realConntracker) SetMax(max int) error {
return ioutil.WriteFile("/sys/module/nf_conntrack/parameters/hashsize", []byte(strconv.Itoa(max/4)), 0640)
}
func (realConntracker) SetTCPEstablishedTimeout(seconds int) error {
glog.Infof("Setting nf_conntrack_tcp_timeout_established to %d", seconds)
return sysctl.New().SetSysctl("net/netfilter/nf_conntrack_tcp_timeout_established", seconds)
func (rct realConntracker) SetTCPEstablishedTimeout(seconds int) error {
return rct.setIntSysCtl("nf_conntrack_tcp_timeout_established", seconds)
}
func (rct realConntracker) SetTCPCloseWaitTimeout(seconds int) error {
return rct.setIntSysCtl("nf_conntrack_tcp_timeout_close_wait", seconds)
}
func (realConntracker) setIntSysCtl(name string, value int) error {
entry := "net/netfilter/" + name
glog.Infof("Set sysctl '%v' to %v", entry, value)
if err := sysctl.New().SetSysctl(entry, value); err != nil {
return err
}
return nil
}
// isSysFSWritable checks /proc/mounts to see whether sysfs is 'rw' or not.
......@@ -73,16 +95,21 @@ func isSysFSWritable() (bool, error) {
glog.Errorf("failed to list mount points: %v", err)
return false, err
}
for _, mountPoint := range mountPoints {
const sysfsDevice = "sysfs"
if mountPoint.Device != sysfsDevice {
continue
}
// Check whether sysfs is 'rw'
const permWritable = "rw"
if len(mountPoint.Opts) > 0 && mountPoint.Opts[0] == permWritable {
return true, nil
}
glog.Errorf("sysfs is not writable: %+v", mountPoint)
break
glog.Errorf("sysfs is not writable: %+v (mount options are %v)",
mountPoint, mountPoint.Opts)
return false, readOnlySysFSError
}
return false, nil
return false, errors.New("No sysfs mounted")
}
......@@ -92,5 +92,10 @@ func (s *ProxyServerConfig) AddFlags(fs *pflag.FlagSet) {
fs.Int32Var(&s.ConntrackMin, "conntrack-min", s.ConntrackMin,
"Minimum number of conntrack entries to allocate, regardless of conntrack-max-per-core (set conntrack-max-per-core=0 to leave the limit as-is).")
fs.DurationVar(&s.ConntrackTCPEstablishedTimeout.Duration, "conntrack-tcp-timeout-established", s.ConntrackTCPEstablishedTimeout.Duration, "Idle timeout for established TCP connections (0 to leave as-is)")
fs.DurationVar(
&s.ConntrackTCPCloseWaitTimeout.Duration, "conntrack-tcp-timeout-close-wait",
s.ConntrackTCPCloseWaitTimeout.Duration,
"NAT timeout for TCP connections in the CLOSE_WAIT state")
config.DefaultFeatureGate.AddFlag(fs)
}
......@@ -315,12 +315,22 @@ func (s *ProxyServer) Run() error {
// administrator should decide whether and how to handle this issue,
// whether to drain the node and restart docker.
// TODO(random-liu): Remove this when the docker bug is fixed.
const message = "DOCKER RESTART NEEDED (docker issue #24000): /sys is read-only: can't raise conntrack limits, problems may arise later."
const message = "DOCKER RESTART NEEDED (docker issue #24000): /sys is read-only: " +
"cannot modify conntrack limits, problems may arise later."
s.Recorder.Eventf(s.Config.NodeRef, api.EventTypeWarning, err.Error(), message)
}
}
if s.Config.ConntrackTCPEstablishedTimeout.Duration > 0 {
if err := s.Conntracker.SetTCPEstablishedTimeout(int(s.Config.ConntrackTCPEstablishedTimeout.Duration / time.Second)); err != nil {
timeout := int(s.Config.ConntrackTCPEstablishedTimeout.Duration / time.Second)
if err := s.Conntracker.SetTCPEstablishedTimeout(timeout); err != nil {
return err
}
}
if s.Config.ConntrackTCPCloseWaitTimeout.Duration > 0 {
timeout := int(s.Config.ConntrackTCPCloseWaitTimeout.Duration / time.Second)
if err := s.Conntracker.SetTCPCloseWaitTimeout(timeout); err != nil {
return err
}
}
......
......@@ -75,8 +75,12 @@ type KubeProxyConfiguration struct {
// regardless of conntrackMaxPerCore (set conntrackMaxPerCore=0 to leave the limit as-is).
ConntrackMin int32 `json:"conntrackMin"`
// conntrackTCPEstablishedTimeout is how long an idle TCP connection will be kept open
// (e.g. '250ms', '2s'). Must be greater than 0.
// (e.g. '2s'). Must be greater than 0.
ConntrackTCPEstablishedTimeout unversioned.Duration `json:"conntrackTCPEstablishedTimeout"`
// conntrackTCPCloseWaitTimeout is how long an idle conntrack entry
// in CLOSE_WAIT state will remain in the conntrack
// table. (e.g. '60s'). Must be greater than 0 to set.
ConntrackTCPCloseWaitTimeout unversioned.Duration `json:"conntrackTCPCloseWaitTimeout"`
}
// Currently two modes of proxying are available: 'userspace' (older, stable) or 'iptables'
......
......@@ -100,6 +100,29 @@ func SetDefaults_KubeProxyConfiguration(obj *KubeProxyConfiguration) {
if obj.ConntrackTCPEstablishedTimeout == zero {
obj.ConntrackTCPEstablishedTimeout = unversioned.Duration{Duration: 24 * time.Hour} // 1 day (1/5 default)
}
if obj.ConntrackTCPCloseWaitTimeout == zero {
// See https://github.com/kubernetes/kubernetes/issues/32551.
//
// CLOSE_WAIT conntrack state occurs when the the Linux kernel
// sees a FIN from the remote server. Note: this is a half-close
// condition that persists as long as the local side keeps the
// socket open. The condition is rare as it is typical in most
// protocols for both sides to issue a close; this typically
// occurs when the local socket is lazily garbage collected.
//
// If the CLOSE_WAIT conntrack entry expires, then FINs from the
// local socket will not be properly SNAT'd and will not reach the
// remote server (if the connection was subject to SNAT). If the
// remote timeouts for FIN_WAIT* states exceed the CLOSE_WAIT
// timeout, then there will be an inconsistency in the state of
// the connection and a new connection reusing the SNAT (src,
// port) pair may be rejected by the remote side with RST. This
// can cause new calls to connect(2) to return with ECONNREFUSED.
//
// We set CLOSE_WAIT to one hour by default to better match
// typical server timeouts.
obj.ConntrackTCPCloseWaitTimeout = unversioned.Duration{Duration: 1 * time.Hour}
}
}
func SetDefaults_KubeSchedulerConfiguration(obj *KubeSchedulerConfiguration) {
......
......@@ -71,9 +71,13 @@ type KubeProxyConfiguration struct {
// conntrackMin is the minimum value of connect-tracking records to allocate,
// regardless of conntrackMaxPerCore (set conntrackMaxPerCore=0 to leave the limit as-is).
ConntrackMin int32 `json:"conntrackMin"`
// conntrackTCPEstablishedTimeout is how long an idle TCP connection will be kept open
// (e.g. '250ms', '2s'). Must be greater than 0.
// conntrackTCPEstablishedTimeout is how long an idle TCP connection
// will be kept open (e.g. '2s'). Must be greater than 0.
ConntrackTCPEstablishedTimeout unversioned.Duration `json:"conntrackTCPEstablishedTimeout"`
// conntrackTCPCloseWaitTimeout is how long an idle conntrack entry
// in CLOSE_WAIT state will remain in the conntrack
// table. (e.g. '60s'). Must be greater than 0 to set.
ConntrackTCPCloseWaitTimeout unversioned.Duration `json:"conntrackTCPCloseWaitTimeout"`
}
// Currently two modes of proxying are available: 'userspace' (older, stable) or 'iptables'
......
......@@ -71,6 +71,7 @@ func autoConvert_v1alpha1_KubeProxyConfiguration_To_componentconfig_KubeProxyCon
out.ConntrackMaxPerCore = in.ConntrackMaxPerCore
out.ConntrackMin = in.ConntrackMin
out.ConntrackTCPEstablishedTimeout = in.ConntrackTCPEstablishedTimeout
out.ConntrackTCPCloseWaitTimeout = in.ConntrackTCPCloseWaitTimeout
return nil
}
......@@ -101,6 +102,7 @@ func autoConvert_componentconfig_KubeProxyConfiguration_To_v1alpha1_KubeProxyCon
out.ConntrackMaxPerCore = in.ConntrackMaxPerCore
out.ConntrackMin = in.ConntrackMin
out.ConntrackTCPEstablishedTimeout = in.ConntrackTCPEstablishedTimeout
out.ConntrackTCPCloseWaitTimeout = in.ConntrackTCPCloseWaitTimeout
return nil
}
......
......@@ -77,6 +77,7 @@ func DeepCopy_v1alpha1_KubeProxyConfiguration(in interface{}, out interface{}, c
out.ConntrackMaxPerCore = in.ConntrackMaxPerCore
out.ConntrackMin = in.ConntrackMin
out.ConntrackTCPEstablishedTimeout = in.ConntrackTCPEstablishedTimeout
out.ConntrackTCPCloseWaitTimeout = in.ConntrackTCPCloseWaitTimeout
return nil
}
}
......
......@@ -164,6 +164,7 @@ func DeepCopy_componentconfig_KubeProxyConfiguration(in interface{}, out interfa
out.ConntrackMaxPerCore = in.ConntrackMaxPerCore
out.ConntrackMin = in.ConntrackMin
out.ConntrackTCPEstablishedTimeout = in.ConntrackTCPEstablishedTimeout
out.ConntrackTCPCloseWaitTimeout = in.ConntrackTCPCloseWaitTimeout
return nil
}
}
......
......@@ -249,14 +249,15 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
return nil, err
}
var versionedDeleteOptions runtime.Object
var versionedDeleterObject interface{}
switch {
case isGracefulDeleter:
objectPtr, err := a.group.Creater.New(optionsExternalVersion.WithKind("DeleteOptions"))
versionedDeleteOptions, err = a.group.Creater.New(optionsExternalVersion.WithKind("DeleteOptions"))
if err != nil {
return nil, err
}
versionedDeleterObject = indirectArbitraryPointer(objectPtr)
versionedDeleterObject = indirectArbitraryPointer(versionedDeleteOptions)
isDeleter = true
case isDeleter:
gracefulDeleter = rest.GracefulDeleteAdapter{Deleter: deleter}
......@@ -625,6 +626,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
Returns(http.StatusOK, "OK", versionedStatus)
if isGracefulDeleter {
route.Reads(versionedDeleterObject)
if err := addObjectParams(ws, route, versionedDeleteOptions); err != nil {
return nil, err
}
}
addParams(route, action.Params)
ws.Route(route)
......@@ -927,6 +931,12 @@ func addObjectParams(ws *restful.WebService, route *restful.RouteBuilder, obj in
}
switch sf.Type.Kind() {
case reflect.Interface, reflect.Struct:
case reflect.Ptr:
// TODO: This is a hack to let unversioned.Time through. This needs to be fixed in a more generic way eventually. bug #36191
if (sf.Type.Elem().Kind() == reflect.Interface || sf.Type.Elem().Kind() == reflect.Struct) && strings.TrimPrefix(sf.Type.String(), "*") != "unversioned.Time" {
continue
}
fallthrough
default:
jsonTag := sf.Tag.Get("json")
if len(jsonTag) == 0 {
......
......@@ -739,7 +739,7 @@ func UpdateResource(r rest.Updater, scope RequestScope, typer runtime.ObjectType
}
// DeleteResource returns a function that will handle a resource deletion
func DeleteResource(r rest.GracefulDeleter, checkBody bool, scope RequestScope, admit admission.Interface) restful.RouteFunction {
func DeleteResource(r rest.GracefulDeleter, allowsOptions bool, scope RequestScope, admit admission.Interface) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
// For performance tracking purposes.
trace := util.NewTrace("Delete " + req.Request.URL.Path)
......@@ -759,7 +759,7 @@ func DeleteResource(r rest.GracefulDeleter, checkBody bool, scope RequestScope,
ctx = api.WithNamespace(ctx, namespace)
options := &api.DeleteOptions{}
if checkBody {
if allowsOptions {
body, err := readBody(req.Request)
if err != nil {
scope.err(err, res.ResponseWriter, req.Request)
......@@ -781,6 +781,13 @@ func DeleteResource(r rest.GracefulDeleter, checkBody bool, scope RequestScope,
scope.err(fmt.Errorf("decoded object cannot be converted to DeleteOptions"), res.ResponseWriter, req.Request)
return
}
} else {
if values := req.Request.URL.Query(); len(values) > 0 {
if err := scope.ParameterCodec.DecodeParameters(values, scope.Kind.GroupVersion(), options); err != nil {
scope.err(err, res.ResponseWriter, req.Request)
return
}
}
}
}
......
......@@ -21,7 +21,6 @@ import (
"fmt"
"io"
"net"
"net/url"
"regexp"
"strconv"
"strings"
......@@ -299,28 +298,31 @@ type Volumes interface {
// Attach the disk to the specified instance
// instanceName can be empty to mean "the instance on which we are running"
// Returns the device (e.g. /dev/xvdf) where we attached the volume
AttachDisk(diskName string, instanceName string, readOnly bool) (string, error)
// Detach the disk from the specified instance
// instanceName can be empty to mean "the instance on which we are running"
AttachDisk(diskName KubernetesVolumeID, nodeName string, readOnly bool) (string, error)
// Detach the disk from the node with the specified NodeName
// nodeName can be empty to mean "the instance on which we are running"
// Returns the device where the volume was attached
DetachDisk(diskName string, instanceName string) (string, error)
DetachDisk(diskName KubernetesVolumeID, nodeName string) (string, error)
// Create a volume with the specified options
CreateDisk(volumeOptions *VolumeOptions) (volumeName string, err error)
CreateDisk(volumeOptions *VolumeOptions) (volumeName KubernetesVolumeID, err error)
// Delete the specified volume
// Returns true iff the volume was deleted
// If the was not found, returns (false, nil)
DeleteDisk(volumeName string) (bool, error)
DeleteDisk(volumeName KubernetesVolumeID) (bool, error)
// Get labels to apply to volume on creation
GetVolumeLabels(volumeName string) (map[string]string, error)
GetVolumeLabels(volumeName KubernetesVolumeID) (map[string]string, error)
// Get volume's disk path from volume name
// return the device path where the volume is attached
GetDiskPath(volumeName string) (string, error)
GetDiskPath(volumeName KubernetesVolumeID) (string, error)
// Check if the volume is already attached to the node with the specified NodeName
DiskIsAttached(diskName KubernetesVolumeID, nodeName string) (bool, error)
// Check if the volume is already attached to the instance
DiskIsAttached(diskName, instanceID string) (bool, error)
// Check if a list of volumes are attached to the node with the specified NodeName
DisksAreAttached(diskNames []KubernetesVolumeID, nodeName string) (map[KubernetesVolumeID]bool, error)
}
// InstanceGroups is an interface for managing cloud-managed instance groups / autoscaling instance groups
......@@ -362,7 +364,7 @@ type Cloud struct {
// attached, to avoid a race condition where we assign a device mapping
// and then get a second request before we attach the volume
attachingMutex sync.Mutex
attaching map[ /*nodeName*/ string]map[mountDevice]string
attaching map[string]map[mountDevice]awsVolumeID
}
var _ Volumes = &Cloud{}
......@@ -795,7 +797,7 @@ func newAWSCloud(config io.Reader, awsServices Services) (*Cloud, error) {
cfg: cfg,
region: regionName,
attaching: make(map[string]map[mountDevice]string),
attaching: make(map[string]map[mountDevice]awsVolumeID),
}
selfAWSInstance, err := awsCloud.buildSelfAWSInstance()
......@@ -1166,7 +1168,7 @@ func (i *awsInstance) describeInstance() (*ec2.Instance, error) {
// Gets the mountDevice already assigned to the volume, or assigns an unused mountDevice.
// If the volume is already assigned, this will return the existing mountDevice with alreadyAttached=true.
// Otherwise the mountDevice is assigned by finding the first available mountDevice, and it is returned with alreadyAttached=false.
func (c *Cloud) getMountDevice(i *awsInstance, volumeID string, assign bool) (assigned mountDevice, alreadyAttached bool, err error) {
func (c *Cloud) getMountDevice(i *awsInstance, volumeID awsVolumeID, assign bool) (assigned mountDevice, alreadyAttached bool, err error) {
instanceType := i.getInstanceType()
if instanceType == nil {
return "", false, fmt.Errorf("could not get instance type for instance: %s", i.awsID)
......@@ -1176,7 +1178,7 @@ func (c *Cloud) getMountDevice(i *awsInstance, volumeID string, assign bool) (as
if err != nil {
return "", false, err
}
deviceMappings := map[mountDevice]string{}
deviceMappings := map[mountDevice]awsVolumeID{}
for _, blockDevice := range info.BlockDeviceMappings {
name := aws.StringValue(blockDevice.DeviceName)
if strings.HasPrefix(name, "/dev/sd") {
......@@ -1188,7 +1190,7 @@ func (c *Cloud) getMountDevice(i *awsInstance, volumeID string, assign bool) (as
if len(name) < 1 || len(name) > 2 {
glog.Warningf("Unexpected EBS DeviceName: %q", aws.StringValue(blockDevice.DeviceName))
}
deviceMappings[mountDevice(name)] = aws.StringValue(blockDevice.Ebs.VolumeId)
deviceMappings[mountDevice(name)] = awsVolumeID(aws.StringValue(blockDevice.Ebs.VolumeId))
}
// We lock to prevent concurrent mounts from conflicting
......@@ -1234,7 +1236,7 @@ func (c *Cloud) getMountDevice(i *awsInstance, volumeID string, assign bool) (as
attaching := c.attaching[i.nodeName]
if attaching == nil {
attaching = make(map[mountDevice]string)
attaching = make(map[mountDevice]awsVolumeID)
c.attaching[i.nodeName] = attaching
}
attaching[chosen] = volumeID
......@@ -1245,7 +1247,7 @@ func (c *Cloud) getMountDevice(i *awsInstance, volumeID string, assign bool) (as
// endAttaching removes the entry from the "attachments in progress" map
// It returns true if it was found (and removed), false otherwise
func (c *Cloud) endAttaching(i *awsInstance, volumeID string, mountDevice mountDevice) bool {
func (c *Cloud) endAttaching(i *awsInstance, volumeID awsVolumeID, mountDevice mountDevice) bool {
c.attachingMutex.Lock()
defer c.attachingMutex.Unlock()
......@@ -1266,44 +1268,16 @@ type awsDisk struct {
ec2 EC2
// Name in k8s
name string
name KubernetesVolumeID
// id in AWS
awsID string
awsID awsVolumeID
}
func newAWSDisk(aws *Cloud, name string) (*awsDisk, error) {
// name looks like aws://availability-zone/id
// The original idea of the URL-style name was to put the AZ into the
// host, so we could find the AZ immediately from the name without
// querying the API. But it turns out we don't actually need it for
// multi-AZ clusters, as we put the AZ into the labels on the PV instead.
// However, if in future we want to support multi-AZ cluster
// volume-awareness without using PersistentVolumes, we likely will
// want the AZ in the host.
if !strings.HasPrefix(name, "aws://") {
name = "aws://" + "" + "/" + name
}
url, err := url.Parse(name)
func newAWSDisk(aws *Cloud, name KubernetesVolumeID) (*awsDisk, error) {
awsID, err := name.mapToAWSVolumeID()
if err != nil {
// TODO: Maybe we should pass a URL into the Volume functions
return nil, fmt.Errorf("Invalid disk name (%s): %v", name, err)
}
if url.Scheme != "aws" {
return nil, fmt.Errorf("Invalid scheme for AWS volume (%s)", name)
}
awsID := url.Path
if len(awsID) > 1 && awsID[0] == '/' {
awsID = awsID[1:]
}
// TODO: Regex match?
if strings.Contains(awsID, "/") || !strings.HasPrefix(awsID, "vol-") {
return nil, fmt.Errorf("Invalid format for AWS volume (%s)", name)
return nil, err
}
disk := &awsDisk{ec2: aws.ec2, name: name, awsID: awsID}
return disk, nil
}
......@@ -1313,7 +1287,7 @@ func (d *awsDisk) describeVolume() (*ec2.Volume, error) {
volumeID := d.awsID
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{&volumeID},
VolumeIds: []*string{volumeID.awsString()},
}
volumes, err := d.ec2.DescribeVolumes(request)
......@@ -1394,7 +1368,7 @@ func (d *awsDisk) waitForAttachmentStatus(status string) (*ec2.VolumeAttachment,
// Deletes the EBS disk
func (d *awsDisk) deleteVolume() (bool, error) {
request := &ec2.DeleteVolumeInput{VolumeId: aws.String(d.awsID)}
request := &ec2.DeleteVolumeInput{VolumeId: d.awsID.awsString()}
_, err := d.ec2.DeleteVolume(request)
if err != nil {
if awsError, ok := err.(awserr.Error); ok {
......@@ -1451,7 +1425,7 @@ func (c *Cloud) getAwsInstance(nodeName string) (*awsInstance, error) {
}
// AttachDisk implements Volumes.AttachDisk
func (c *Cloud) AttachDisk(diskName string, instanceName string, readOnly bool) (string, error) {
func (c *Cloud) AttachDisk(diskName KubernetesVolumeID, instanceName string, readOnly bool) (string, error) {
disk, err := newAWSDisk(c, diskName)
if err != nil {
return "", err
......@@ -1499,7 +1473,7 @@ func (c *Cloud) AttachDisk(diskName string, instanceName string, readOnly bool)
request := &ec2.AttachVolumeInput{
Device: aws.String(ec2Device),
InstanceId: aws.String(awsInstance.awsID),
VolumeId: aws.String(disk.awsID),
VolumeId: disk.awsID.awsString(),
}
attachResponse, err := c.ec2.AttachVolume(request)
......@@ -1538,7 +1512,7 @@ func (c *Cloud) AttachDisk(diskName string, instanceName string, readOnly bool)
}
// DetachDisk implements Volumes.DetachDisk
func (c *Cloud) DetachDisk(diskName string, instanceName string) (string, error) {
func (c *Cloud) DetachDisk(diskName KubernetesVolumeID, instanceName string) (string, error) {
disk, err := newAWSDisk(c, diskName)
if err != nil {
return "", err
......@@ -1570,7 +1544,7 @@ func (c *Cloud) DetachDisk(diskName string, instanceName string) (string, error)
request := ec2.DetachVolumeInput{
InstanceId: &awsInstance.awsID,
VolumeId: &disk.awsID,
VolumeId: disk.awsID.awsString(),
}
response, err := c.ec2.DetachVolume(&request)
......@@ -1601,7 +1575,7 @@ func (c *Cloud) DetachDisk(diskName string, instanceName string) (string, error)
}
// CreateDisk implements Volumes.CreateDisk
func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (string, error) {
func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (KubernetesVolumeID, error) {
allZones, err := c.getAllZones()
if err != nil {
return "", fmt.Errorf("error querying for all zones: %v", err)
......@@ -1659,10 +1633,11 @@ func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (string, error) {
return "", err
}
az := orEmpty(response.AvailabilityZone)
awsID := orEmpty(response.VolumeId)
volumeName := "aws://" + az + "/" + awsID
awsID := awsVolumeID(aws.StringValue(response.VolumeId))
if awsID == "" {
return "", fmt.Errorf("VolumeID was not returned by CreateVolume")
}
volumeName := KubernetesVolumeID("aws://" + aws.StringValue(response.AvailabilityZone) + "/" + string(awsID))
// apply tags
tags := make(map[string]string)
......@@ -1675,7 +1650,7 @@ func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (string, error) {
}
if len(tags) != 0 {
if err := c.createTags(awsID, tags); err != nil {
if err := c.createTags(string(awsID), tags); err != nil {
// delete the volume and hope it succeeds
_, delerr := c.DeleteDisk(volumeName)
if delerr != nil {
......@@ -1689,7 +1664,7 @@ func (c *Cloud) CreateDisk(volumeOptions *VolumeOptions) (string, error) {
}
// DeleteDisk implements Volumes.DeleteDisk
func (c *Cloud) DeleteDisk(volumeName string) (bool, error) {
func (c *Cloud) DeleteDisk(volumeName KubernetesVolumeID) (bool, error) {
awsDisk, err := newAWSDisk(c, volumeName)
if err != nil {
return false, err
......@@ -1698,7 +1673,7 @@ func (c *Cloud) DeleteDisk(volumeName string) (bool, error) {
}
// GetVolumeLabels implements Volumes.GetVolumeLabels
func (c *Cloud) GetVolumeLabels(volumeName string) (map[string]string, error) {
func (c *Cloud) GetVolumeLabels(volumeName KubernetesVolumeID) (map[string]string, error) {
awsDisk, err := newAWSDisk(c, volumeName)
if err != nil {
return nil, err
......@@ -1724,7 +1699,7 @@ func (c *Cloud) GetVolumeLabels(volumeName string) (map[string]string, error) {
}
// GetDiskPath implements Volumes.GetDiskPath
func (c *Cloud) GetDiskPath(volumeName string) (string, error) {
func (c *Cloud) GetDiskPath(volumeName KubernetesVolumeID) (string, error) {
awsDisk, err := newAWSDisk(c, volumeName)
if err != nil {
return "", err
......@@ -1740,14 +1715,14 @@ func (c *Cloud) GetDiskPath(volumeName string) (string, error) {
}
// DiskIsAttached implements Volumes.DiskIsAttached
func (c *Cloud) DiskIsAttached(diskName, instanceID string) (bool, error) {
awsInstance, err := c.getAwsInstance(instanceID)
func (c *Cloud) DiskIsAttached(diskName KubernetesVolumeID, nodeName string) (bool, error) {
awsInstance, err := c.getAwsInstance(nodeName)
if err != nil {
if err == cloudprovider.InstanceNotFound {
// If instance no longer exists, safe to assume volume is not attached.
glog.Warningf(
"Instance %q does not exist. DiskIsAttached will assume disk %q is not attached to it.",
instanceID,
nodeName,
diskName)
return false, nil
}
......@@ -1755,19 +1730,64 @@ func (c *Cloud) DiskIsAttached(diskName, instanceID string) (bool, error) {
return false, err
}
diskID, err := diskName.mapToAWSVolumeID()
if err != nil {
return false, fmt.Errorf("error mapping volume spec %q to aws id: %v", diskName, err)
}
info, err := awsInstance.describeInstance()
if err != nil {
return false, err
}
for _, blockDevice := range info.BlockDeviceMappings {
name := aws.StringValue(blockDevice.Ebs.VolumeId)
if name == diskName {
id := awsVolumeID(aws.StringValue(blockDevice.Ebs.VolumeId))
if id == diskID {
return true, nil
}
}
return false, nil
}
func (c *Cloud) DisksAreAttached(diskNames []KubernetesVolumeID, nodeName string) (map[KubernetesVolumeID]bool, error) {
idToDiskName := make(map[awsVolumeID]KubernetesVolumeID)
attached := make(map[KubernetesVolumeID]bool)
for _, diskName := range diskNames {
volumeID, err := diskName.mapToAWSVolumeID()
if err != nil {
return nil, fmt.Errorf("error mapping volume spec %q to aws id: %v", diskName, err)
}
idToDiskName[volumeID] = diskName
attached[diskName] = false
}
awsInstance, err := c.getAwsInstance(nodeName)
if err != nil {
if err == cloudprovider.InstanceNotFound {
// If instance no longer exists, safe to assume volume is not attached.
glog.Warningf(
"Node %q does not exist. DisksAreAttached will assume disks %v are not attached to it.",
nodeName,
diskNames)
return attached, nil
}
return attached, err
}
info, err := awsInstance.describeInstance()
if err != nil {
return attached, err
}
for _, blockDevice := range info.BlockDeviceMappings {
volumeID := awsVolumeID(aws.StringValue(blockDevice.Ebs.VolumeId))
diskName, found := idToDiskName[volumeID]
if found {
// Disk is still attached to node
attached[diskName] = true
}
}
return attached, nil
}
// Gets the current load balancer state
func (c *Cloud) describeLoadBalancer(name string) (*elb.LoadBalancerDescription, error) {
request := &elb.DescribeLoadBalancersInput{}
......
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aws
import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"net/url"
"strings"
)
// awsVolumeID represents the ID of the volume in the AWS API, e.g. vol-12345678a
// The "traditional" format is "vol-12345678"
// A new longer format is also being introduced: "vol-12345678abcdef01"
// We should not assume anything about the length or format, though it seems
// reasonable to assume that volumes will continue to start with "vol-".
type awsVolumeID string
func (i awsVolumeID) awsString() *string {
return aws.String(string(i))
}
// KubernetesVolumeID represents the id for a volume in the kubernetes API;
// a few forms are recognized:
// * aws://<zone>/<awsVolumeId>
// * aws:///<awsVolumeId>
// * <awsVolumeId>
type KubernetesVolumeID string
// mapToAWSVolumeID extracts the awsVolumeID from the KubernetesVolumeID
func (name KubernetesVolumeID) mapToAWSVolumeID() (awsVolumeID, error) {
// name looks like aws://availability-zone/awsVolumeId
// The original idea of the URL-style name was to put the AZ into the
// host, so we could find the AZ immediately from the name without
// querying the API. But it turns out we don't actually need it for
// multi-AZ clusters, as we put the AZ into the labels on the PV instead.
// However, if in future we want to support multi-AZ cluster
// volume-awareness without using PersistentVolumes, we likely will
// want the AZ in the host.
s := string(name)
if !strings.HasPrefix(s, "aws://") {
// Assume a bare aws volume id (vol-1234...)
// Build a URL with an empty host (AZ)
s = "aws://" + "" + "/" + s
}
url, err := url.Parse(s)
if err != nil {
// TODO: Maybe we should pass a URL into the Volume functions
return "", fmt.Errorf("Invalid disk name (%s): %v", name, err)
}
if url.Scheme != "aws" {
return "", fmt.Errorf("Invalid scheme for AWS volume (%s)", name)
}
awsID := url.Path
awsID = strings.Trim(awsID, "/")
// We sanity check the resulting volume; the two known formats are
// vol-12345678 and vol-12345678abcdef01
// TODO: Regex match?
if strings.Contains(awsID, "/") || !strings.HasPrefix(awsID, "vol-") {
return "", fmt.Errorf("Invalid format for AWS volume (%s)", name)
}
return awsVolumeID(awsID), nil
}
......@@ -73,6 +73,34 @@ func (az *Cloud) AttachDisk(diskName, diskURI, vmName string, lun int32, caching
return err
}
// DisksAreAttached checks if a list of volumes are attached to the node with the specified NodeName
func (az *Cloud) DisksAreAttached(diskNames []string, nodeName string) (map[string]bool, error) {
attached := make(map[string]bool)
for _, diskName := range diskNames {
attached[diskName] = false
}
vm, exists, err := az.getVirtualMachine(nodeName)
if !exists {
// if host doesn't exist, no need to detach
glog.Warningf("Cannot find node %q, DisksAreAttached will assume disks %v are not attached to it.",
nodeName, diskNames)
return attached, nil
} else if err != nil {
return attached, err
}
disks := *vm.Properties.StorageProfile.DataDisks
for _, disk := range disks {
for _, diskName := range diskNames {
if disk.Name != nil && diskName != "" && *disk.Name == diskName {
attached[diskName] = true
}
}
}
return attached, nil
}
// DetachDiskByName detaches a vhd from host
// the vhd can be identified by diskName or diskURI
func (az *Cloud) DetachDiskByName(diskName, diskURI, vmName string) error {
......
......@@ -132,6 +132,10 @@ type Disks interface {
// DiskIsAttached checks if a disk is attached to the given node.
DiskIsAttached(diskName, instanceID string) (bool, error)
// DisksAreAttached is a batch function to check if a list of disks are attached
// to the node with the specified NodeName.
DisksAreAttached(diskNames []string, nodeName string) (map[string]bool, error)
// CreateDisk creates a new PD with given properties. Tags are serialized
// as JSON into Description field.
CreateDisk(name string, diskType string, zone string, sizeGb int64, tags map[string]string) error
......@@ -2578,6 +2582,37 @@ func (gce *GCECloud) DiskIsAttached(diskName, instanceID string) (bool, error) {
return false, nil
}
func (gce *GCECloud) DisksAreAttached(diskNames []string, nodeName string) (map[string]bool, error) {
attached := make(map[string]bool)
for _, diskName := range diskNames {
attached[diskName] = false
}
instance, err := gce.getInstanceByName(nodeName)
if err != nil {
if err == cloudprovider.InstanceNotFound {
// If instance no longer exists, safe to assume volume is not attached.
glog.Warningf(
"Instance %q does not exist. DisksAreAttached will assume PD %v are not attached to it.",
nodeName,
diskNames)
return attached, nil
}
return attached, err
}
for _, instanceDisk := range instance.Disks {
for _, diskName := range diskNames {
if instanceDisk.DeviceName == diskName {
// Disk is still attached to node
attached[diskName] = true
}
}
}
return attached, nil
}
// Returns a gceDisk for the disk, if it is found in the specified zone.
// If not found, returns (nil, nil)
func (gce *GCECloud) findDiskByName(diskName string, zone string) (*gceDisk, error) {
......
......@@ -228,3 +228,33 @@ func (os *OpenStack) DiskIsAttached(diskName, instanceID string) (bool, error) {
}
return false, nil
}
// query if a list of volumes are attached to a compute instance
func (os *OpenStack) DisksAreAttached(diskNames []string, instanceID string) (map[string]bool, error) {
attached := make(map[string]bool)
for _, diskName := range diskNames {
attached[diskName] = false
}
for _, diskName := range diskNames {
disk, err := os.getVolume(diskName)
if err != nil {
continue
}
if len(disk.Attachments) > 0 && disk.Attachments[0]["server_id"] != nil && instanceID == disk.Attachments[0]["server_id"] {
attached[diskName] = true
}
}
return attached, nil
}
// diskIsUsed returns true a disk is attached to any node.
func (os *OpenStack) diskIsUsed(diskName string) (bool, error) {
disk, err := os.getVolume(diskName)
if err != nil {
return false, err
}
if len(disk.Attachments) > 0 {
return true, nil
}
return false, nil
}
......@@ -645,3 +645,24 @@ func (rs *Rackspace) DiskIsAttached(diskName, instanceID string) (bool, error) {
}
return false, nil
}
// query if a list volumes are attached to a compute instance
func (rs *Rackspace) DisksAreAttached(diskNames []string, instanceID string) (map[string]bool, error) {
attached := make(map[string]bool)
for _, diskName := range diskNames {
attached[diskName] = false
}
var returnedErr error
for _, diskName := range diskNames {
result, err := rs.DiskIsAttached(diskName, instanceID)
if err != nil {
returnedErr = fmt.Errorf("Error in checking disk %q attached: %v \n %v", diskName, err, returnedErr)
continue
}
if result {
attached[diskName] = true
}
}
return attached, returnedErr
}
......@@ -125,6 +125,10 @@ type Volumes interface {
// Assumption: If node doesn't exist, disk is not attached to the node.
DiskIsAttached(volPath, nodeName string) (bool, error)
// DisksAreAttached checks if a list disks are attached to the given node.
// Assumption: If node doesn't exist, disks are not attached to the node.
DisksAreAttached(volPath []string, nodeName string) (map[string]bool, error)
// CreateVolume creates a new vmdk with specified parameters.
CreateVolume(name string, size int, tags *map[string]string) (volumePath string, err error)
......@@ -874,6 +878,64 @@ func (vs *VSphere) DiskIsAttached(volPath string, nodeName string) (bool, error)
return attached, err
}
// DisksAreAttached returns if disks are attached to the VM using controllers supported by the plugin.
func (vs *VSphere) DisksAreAttached(volPaths []string, nodeName string) (map[string]bool, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
attached := make(map[string]bool)
for _, volPath := range volPaths {
attached[volPath] = false
}
// Create vSphere client
c, err := vsphereLogin(vs.cfg, ctx)
if err != nil {
glog.Errorf("Failed to create vSphere client. err: %s", err)
return attached, err
}
defer c.Logout(ctx)
// Find VM to detach disk from
var vSphereInstance string
if nodeName == "" {
vSphereInstance = vs.localInstanceID
} else {
vSphereInstance = nodeName
}
nodeExist, err := vs.NodeExists(c, vSphereInstance)
if err != nil {
glog.Errorf("Failed to check whether node exist. err: %s.", err)
return attached, err
}
if !nodeExist {
glog.Warningf(
"Node %q does not exist. DisksAreAttached will assume vmdk %v are not attached to it.",
vSphereInstance,
volPaths)
return attached, nil
}
// Get VM device list
_, vmDevices, _, dc, err := getVirtualMachineDevices(vs.cfg, ctx, c, vSphereInstance)
if err != nil {
glog.Errorf("Failed to get VM devices for VM %#q. err: %s", vSphereInstance, err)
return attached, err
}
for _, volPath := range volPaths {
result, _ := checkDiskAttached(volPath, vmDevices, dc, c)
if result {
attached[volPath] = true
}
}
return attached, err
}
func checkDiskAttached(volPath string, vmdevices object.VirtualDeviceList, dc *object.Datacenter, client *govmomi.Client) (bool, error) {
virtualDiskControllerKey, err := getVirtualDiskControllerKey(volPath, vmdevices, dc, client)
if err != nil {
......@@ -884,7 +946,7 @@ func checkDiskAttached(volPath string, vmdevices object.VirtualDeviceList, dc *o
return false, err
}
for _, controllerType := range supportedSCSIControllerType {
controllerkey, _ := getControllerKey(controllerType, vmdevices, dc, client)
controllerkey, _ := getControllerKey(controllerType, vmdevices)
if controllerkey == virtualDiskControllerKey {
return true, nil
}
......@@ -916,7 +978,7 @@ func getVirtualDiskControllerKey(volPath string, vmDevices object.VirtualDeviceL
// Returns key of the controller.
// Key is unique id that distinguishes one device from other devices in the same virtual machine.
func getControllerKey(scsiType string, vmDevices object.VirtualDeviceList, dc *object.Datacenter, client *govmomi.Client) (int32, error) {
func getControllerKey(scsiType string, vmDevices object.VirtualDeviceList) (int32, error) {
for _, device := range vmDevices {
devType := vmDevices.Type(device)
if devType == scsiType {
......
......@@ -370,7 +370,6 @@ func (asw *actualStateOfWorld) addVolume(
} else {
// If volume object already exists, update the fields such as device path
volumeObj.devicePath = devicePath
volumeObj.spec = volumeSpec
glog.V(2).Infof("Volume %q is already added to attachedVolume list, update device path %q",
volumeName,
devicePath)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册