提交 a1da8e15 编写于 作者: P Philipp Heckel

Remove code duplication (as per code review); it's really awkward now

上级 a8a6c28a
...@@ -604,50 +604,91 @@ func MaybeEnableSemiSyncReplica(replicaInstance *Instance) (*Instance, error) { ...@@ -604,50 +604,91 @@ func MaybeEnableSemiSyncReplica(replicaInstance *Instance) (*Instance, error) {
// Backwards compatible logic: Enable semi-sync if SemiSyncPriority > 0 (formerly SemiSyncEnforced) // Backwards compatible logic: Enable semi-sync if SemiSyncPriority > 0 (formerly SemiSyncEnforced)
// Note that this logic NEVER enables semi-sync if the promotion rule is "must_not". // Note that this logic NEVER enables semi-sync if the promotion rule is "must_not".
if !config.Config.EnforceExactSemiSyncReplicas && !config.Config.RecoverLockedSemiSyncMaster { if !config.Config.EnforceExactSemiSyncReplicas && !config.Config.RecoverLockedSemiSyncMaster {
if replicaInstance.SemiSyncPriority > 0 { return maybeEnableSemiSyncReplicaLegacy(replicaInstance)
enable := replicaInstance.PromotionRule != MustNotPromoteRule // Send ACK only from promotable instances
log.Infof("semi-sync: %+v: setting rpl_semi_sync_slave_enabled = %t (legacy behavior)", &replicaInstance.Key, enable)
return SetSemiSyncReplica(&replicaInstance.Key, enable)
}
return replicaInstance, nil
} }
// New logic: If EnforceExactSemiSyncReplicas or RecoverLockedSemiSyncMaster are set, we enable semi-sync only if the // New logic: If EnforceExactSemiSyncReplicas or RecoverLockedSemiSyncMaster are set, we enable semi-sync only if the
// given replica instance is in the list of replicas to have semi-sync enabled (according to the priority). // given replica instance is in the list of replicas to have semi-sync enabled (according to the priority).
masterInstance, err := ReadTopologyInstance(&replicaInstance.MasterKey) logFn := func(s string, a ...interface{}) { log.Debugf(s, a...) }
if err != nil { if _, err := RecoverSemiSyncReplicas(&replicaInstance.MasterKey, &replicaInstance.Key, config.Config.EnforceExactSemiSyncReplicas, logFn); err != nil {
return replicaInstance, log.Errore(err) return replicaInstance, log.Errore(err)
} }
replicas, err := ReadReplicaInstances(&replicaInstance.MasterKey)
return replicaInstance, nil
}
// maybeEnableSemiSyncReplicaLegacy enable semi-sync if SemiSyncPriority > 0 (formerly SemiSyncEnforced). This is backwards
// compatible logic that NEVER enables semi-sync if the promotion rule is "must_not".
func maybeEnableSemiSyncReplicaLegacy(replicaInstance *Instance) (*Instance, error) {
if replicaInstance.SemiSyncPriority > 0 {
enable := replicaInstance.PromotionRule != MustNotPromoteRule // Send ACK only from promotable instances
log.Infof("semi-sync: %+v: setting rpl_semi_sync_slave_enabled = %t (legacy behavior)", &replicaInstance.Key, enable)
return SetSemiSyncReplica(&replicaInstance.Key, enable)
}
return replicaInstance, nil
}
type logFunc func(s string, a ...interface{})
func RecoverSemiSyncReplicas(masterKey *InstanceKey, recoverOnlyReplicaKey *InstanceKey, exactReplicaTopology bool, logf logFunc) (masterInstance *Instance, err error) {
// Read entire topology of master and its replicas
masterInstance, err = ReadTopologyInstance(masterKey)
if err != nil { if err != nil {
return replicaInstance, log.Errore(err) return nil, err
}
replicas, err := ReadReplicaInstances(masterKey)
if err != nil {
return nil, err
} else if len(replicas) == 0 {
return nil, fmt.Errorf("no replicas found for %+v; cannot recover from incorrect semi-sync topology", masterKey)
} }
possibleSemiSyncReplicas, asyncReplicas, excludedReplicas := ClassifyAndPrioritizeReplicas(replicas, &replicaInstance.Key) // Classify and prioritize replicas & figure out which replicas need to be acted upon
actions := DetermineSemiSyncReplicaActions(possibleSemiSyncReplicas, asyncReplicas, masterInstance.SemiSyncMasterWaitForReplicaCount, masterInstance.SemiSyncMasterClients, config.Config.EnforceExactSemiSyncReplicas) possibleSemiSyncReplicas, asyncReplicas, excludedReplicas := classifyAndPrioritizeReplicas(replicas, recoverOnlyReplicaKey)
actions := determineSemiSyncReplicaActions(possibleSemiSyncReplicas, asyncReplicas, masterInstance.SemiSyncMasterWaitForReplicaCount, masterInstance.SemiSyncMasterClients, exactReplicaTopology)
log.Debugf("semi-sync: %+v: determining whether to enable rpl_semi_sync_slave_enabled", replicaInstance.Key) // Log analysis
log.Debugf("semi-sync: master = %+v, master semi-sync wait count = %d, master semi-sync replica count = %d", replicaInstance.MasterKey, masterInstance.SemiSyncMasterWaitForReplicaCount, masterInstance.SemiSyncMasterClients) logf("semi-sync: analysis results for recovery of cluster %+v:", masterInstance.ClusterName)
LogSemiSyncReplicaAnalysis(possibleSemiSyncReplicas, asyncReplicas, excludedReplicas, actions, func(s string, a ...interface{}) { log.Debugf(s, a...) }) logf("semi-sync: master = %+v, replica repair scope = %+v, master semi-sync wait count = %d, master semi-sync replica count = %d", masterKey, recoverOnlyReplicaKey, masterInstance.SemiSyncMasterWaitForReplicaCount, masterInstance.SemiSyncMasterClients)
LogSemiSyncReplicaAnalysis(possibleSemiSyncReplicas, asyncReplicas, excludedReplicas, actions, logf)
// TODO This is a little odd. We sometimes get actions for replicas that are not us. If we do not take this action // Bail out if we cannot succeed
// TODO then we'll absolutely have a MasterWithTooManySemiSyncReplicas event following this recovery. We could prevent this if uint(len(possibleSemiSyncReplicas)) < masterInstance.SemiSyncMasterWaitForReplicaCount {
// TODO by just executing all actions, but we're in the scope of `replicaInstance` here, so..... idk return nil, fmt.Errorf("not enough valid live replicas found to recover on %+v", masterKey)
} else if len(actions) == 0 {
return nil, fmt.Errorf("cannot determine actions based on possible semi-sync replicas; cannot recover on %+v", masterKey)
}
// Take action: we first enable and then disable (two loops) in order to avoid "locked master" scenarios; If recoverOnlyReplicaKey is set
// only that replica's action is executed (if any). This may lead to a MasterWithTooManySemiSyncReplicas if other actions were suggested.
// While not ideal, that is okay and expected.
logf("semi-sync: taking actions:")
// TODO should we also set master_enabled = false here?
for replica, enable := range actions { for replica, enable := range actions {
if replica.Key.Equals(&replicaInstance.Key) { if enable && (recoverOnlyReplicaKey == nil || replica.Key.Equals(recoverOnlyReplicaKey)) {
log.Infof("semi-sync: %s: setting rpl_semi_sync_slave_enabled: %t", &replicaInstance.Key, enable) logf("semi-sync: - %s: setting rpl_semi_sync_slave_enabled=%t, restarting slave_io thread", replica.Key.String(), enable)
return SetSemiSyncReplica(&replicaInstance.Key, enable) if _, err := SetSemiSyncReplica(&replica.Key, enable); err != nil {
return nil, fmt.Errorf("cannot enable semi sync on replica %+v", replica.Key)
}
}
}
for replica, enable := range actions {
if !enable && (recoverOnlyReplicaKey == nil || replica.Key.Equals(recoverOnlyReplicaKey)) {
logf("semi-sync: - %s: setting rpl_semi_sync_slave_enabled=%t, restarting slave_io thread", replica.Key.String(), enable)
if _, err := SetSemiSyncReplica(&replica.Key, enable); err != nil {
return nil, fmt.Errorf("cannot disable semi sync on replica %+v", replica.Key)
}
} }
} }
log.Infof("semi-sync: %+v: no action taken", &replicaInstance.Key) // log.Infof("semi-sync: %+v: no action taken", &replicaInstance.Key)
return replicaInstance, nil
return masterInstance, nil
} }
// ClassifyAndPrioritizeReplicas takes a list of replica instances and classifies them based on their semi-sync priority, excluding // classifyAndPrioritizeReplicas takes a list of replica instances and classifies them based on their semi-sync priority, excluding replicas
// replicas that are down. It furthermore prioritizes the possible semi-sync replicas based on SemiSyncPriority, PromotionRule // that are down. It furthermore prioritizes the possible semi-sync replicas based on SemiSyncPriority, PromotionRule and hostname (fallback).
// and hostname (fallback). func classifyAndPrioritizeReplicas(replicas []*Instance, includeNonReplicatingInstance *InstanceKey) (possibleSemiSyncReplicas []*Instance, asyncReplicas []*Instance, excludedReplicas []*Instance) {
func ClassifyAndPrioritizeReplicas(replicas []*Instance, includeNonReplicatingInstance *InstanceKey) (possibleSemiSyncReplicas []*Instance, asyncReplicas []*Instance, excludedReplicas []*Instance) {
// Classify based on state and semi-sync priority // Classify based on state and semi-sync priority
possibleSemiSyncReplicas = make([]*Instance, 0) possibleSemiSyncReplicas = make([]*Instance, 0)
asyncReplicas = make([]*Instance, 0) asyncReplicas = make([]*Instance, 0)
...@@ -676,9 +717,9 @@ func ClassifyAndPrioritizeReplicas(replicas []*Instance, includeNonReplicatingIn ...@@ -676,9 +717,9 @@ func ClassifyAndPrioritizeReplicas(replicas []*Instance, includeNonReplicatingIn
return return
} }
// DetermineSemiSyncReplicaActions returns a map of replicas for which to change the semi-sync replica setting. // determineSemiSyncReplicaActions returns a map of replicas for which to change the semi-sync replica setting.
// A value of true indicates semi-sync needs to be enabled, false that it needs to be disabled. // A value of true indicates semi-sync needs to be enabled, false that it needs to be disabled.
func DetermineSemiSyncReplicaActions(possibleSemiSyncReplicas []*Instance, asyncReplicas []*Instance, waitCount uint, currentSemiSyncReplicas uint, exactReplicaTopology bool) map[*Instance]bool { func determineSemiSyncReplicaActions(possibleSemiSyncReplicas []*Instance, asyncReplicas []*Instance, waitCount uint, currentSemiSyncReplicas uint, exactReplicaTopology bool) map[*Instance]bool {
if exactReplicaTopology { if exactReplicaTopology {
return determineSemiSyncReplicaActionsForExactTopology(possibleSemiSyncReplicas, asyncReplicas, waitCount) return determineSemiSyncReplicaActionsForExactTopology(possibleSemiSyncReplicas, asyncReplicas, waitCount)
} }
......
...@@ -1508,69 +1508,15 @@ func checkAndRecoverMasterWithTooManySemiSyncReplicas(analysisEntry inst.Replica ...@@ -1508,69 +1508,15 @@ func checkAndRecoverMasterWithTooManySemiSyncReplicas(analysisEntry inst.Replica
} }
func recoverSemiSyncReplicas(topologyRecovery *TopologyRecovery, analysisEntry inst.ReplicationAnalysis, exactReplicaTopology bool) (recoveryAttempted bool, topologyRecoveryOut *TopologyRecovery, err error) { func recoverSemiSyncReplicas(topologyRecovery *TopologyRecovery, analysisEntry inst.ReplicationAnalysis, exactReplicaTopology bool) (recoveryAttempted bool, topologyRecoveryOut *TopologyRecovery, err error) {
masterInstance, found, err := inst.ReadInstance(&analysisEntry.AnalyzedInstanceKey) logFn := func(s string, a ...interface{}) { AuditTopologyRecovery(topologyRecovery, fmt.Sprintf(s, a...)) }
if !found || err != nil { masterInstance, err := inst.RecoverSemiSyncReplicas(&analysisEntry.AnalyzedInstanceKey, nil, exactReplicaTopology, logFn)
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: instance not found in recovery %+v.", analysisEntry.AnalyzedInstanceKey))
return false, topologyRecovery, err
}
// Read all replicas
replicas, err := inst.ReadReplicaInstances(&analysisEntry.AnalyzedInstanceKey)
if err != nil { if err != nil {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: could not read replica instances for %+v: %s", analysisEntry.AnalyzedInstanceKey, err.Error())) return true, topologyRecovery, err
return false, topologyRecovery, err
}
if len(replicas) == 0 {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: no replicas found for %+v; cannot recover", analysisEntry.AnalyzedInstanceKey))
return false, topologyRecovery, nil
}
// Classify and prioritize replicas & figure out which replicas need to be acted upon
possibleSemiSyncReplicas, asyncReplicas, excludedReplicas := inst.ClassifyAndPrioritizeReplicas(replicas, nil)
actions := inst.DetermineSemiSyncReplicaActions(possibleSemiSyncReplicas, asyncReplicas, analysisEntry.SemiSyncMasterWaitForReplicaCount, analysisEntry.SemiSyncMasterClients, exactReplicaTopology)
// Log analysis
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: analysis results for recovery of %+v:", masterInstance.Key))
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: semi-sync wait count = %d, semi-sync replica count = %d", analysisEntry.SemiSyncMasterWaitForReplicaCount, analysisEntry.SemiSyncMasterClients))
inst.LogSemiSyncReplicaAnalysis(possibleSemiSyncReplicas, asyncReplicas, excludedReplicas, actions, func(s string, a ...interface{}) { AuditTopologyRecovery(topologyRecovery, fmt.Sprintf(s, a...)) })
// Bail out if we cannot succeed
if uint(len(possibleSemiSyncReplicas)) < analysisEntry.SemiSyncMasterWaitForReplicaCount {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: not enough valid live replicas found to recover from %s on %+v.", analysisEntry.AnalysisString(), analysisEntry.AnalyzedInstanceKey))
return true, topologyRecovery, fmt.Errorf("not enough valid live replicas found to recover from %s on %+v", analysisEntry.AnalysisString(), analysisEntry.AnalyzedInstanceKey)
} else if len(actions) == 0 {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: cannot determine actions based on possible semi-sync replicas; cannot recover from %s on %+v.", analysisEntry.AnalysisString(), analysisEntry.AnalyzedInstanceKey))
return true, topologyRecovery, fmt.Errorf("cannot determine actions based on possible semi-sync replicas; cannot recover from %s on %+v", analysisEntry.AnalysisString(), analysisEntry.AnalyzedInstanceKey)
}
// Take action: we first enable and then disable (two loops) in order to avoid "locked master" scenarios
AuditTopologyRecovery(topologyRecovery, "semi-sync: taking actions:")
// TODO should we also set master_enabled = false here?
for replica, enable := range actions {
if enable {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: - %s: setting rpl_semi_sync_slave_enabled=%t, restarting slave_io thread", replica.Key.String(), enable))
if _, err := inst.SetSemiSyncReplica(&replica.Key, enable); err != nil {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: cannot change semi sync on replica %+v.", replica.Key))
return true, topologyRecovery, nil
}
}
}
for replica, enable := range actions {
if !enable {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: - %s: setting rpl_semi_sync_slave_enabled=%t, restarting slave_io thread", replica.Key.String(), enable))
if _, err := inst.SetSemiSyncReplica(&replica.Key, enable); err != nil {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: cannot change semi sync on replica %+v.", replica.Key))
return true, topologyRecovery, nil
}
}
} }
// Re-read source instance to avoid re-triggering the same condition
// TODO even though we resolve correctly here, we are re-triggering the same analysis until the next polling interval. WHY? // TODO even though we resolve correctly here, we are re-triggering the same analysis until the next polling interval. WHY?
resolveRecovery(topologyRecovery, masterInstance) resolveRecovery(topologyRecovery, masterInstance)
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: recovery complete; success = %t", topologyRecovery.IsSuccessful)) AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("semi-sync: recovery complete; success = %t", topologyRecovery.IsSuccessful))
return true, topologyRecovery, nil return true, topologyRecovery, nil
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册