# coding: utf-8 # OceanBase Deploy. # Copyright (C) 2021 OceanBase # # This file is part of OceanBase Deploy. # # OceanBase Deploy is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # OceanBase Deploy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>. from __future__ import absolute_import, division, print_function import enum import getpass import os import tempfile import warnings from glob import glob from subprocess32 import Popen, PIPE # paramiko import cryptography 模块在python2下会报不支持警报 warnings.filterwarnings("ignore") from paramiko import AuthenticationException, SFTPClient from paramiko.client import SSHClient, AutoAddPolicy from paramiko.ssh_exception import NoValidConnectionsError, SSHException from multiprocessing.queues import Empty from multiprocessing import Queue, Process from multiprocessing.pool import ThreadPool from tool import COMMAND_ENV, DirectoryUtil, FileUtil from _stdio import SafeStdio from _errno import EC_SSH_CONNECT from _environ import ENV_DISABLE_RSYNC __all__ = ("SshClient", "SshConfig", "LocalClient", "ConcurrentExecutor") class SshConfig(object): def __init__(self, host, username='root', password=None, key_filename=None, port=22, timeout=30): self.host = host self.username = username self.password = password if password is None else str(password) self.key_filename = key_filename self.port = int(port) self.timeout = int(timeout) def __str__(self): return '%s@%s' % (self.username ,self.host) class SshReturn(object): def __init__(self, code, stdout, stderr): self.code = code self.stdout = stdout self.stderr = stderr def __bool__(self): return self.code == 0 def __nonzero__(self): return self.__bool__() class FeatureSshReturn(SshReturn, SafeStdio): def __init__(self, popen, timeout, stdio): self.popen = popen self.timeout = timeout self.stdio = stdio self._code = None self._stdout = None self._stderr = None def _get_return(self): if self._code is None: try: p = self.popen output, error = p.communicate(timeout=self.timeout) self._stdout = output.decode(errors='replace') self._stderr = error.decode(errors='replace') self._code = p.returncode verbose_msg = 'exited code %s' % self._code if self._code: verbose_msg += ', error output:\n%s' % self._stderr self.stdio.verbose(verbose_msg) except Exception as e: self._stdout = '' self._stderr = str(e) self._code = 255 verbose_msg = 'exited code 255, error output:\n%s' % self._stderr self.stdio.verbose(verbose_msg) self.stdio.exception('') @property def code(self): self._get_return() return self._code @property def stdout(self): self._get_return() return self._stdout @property def stderr(self): self._get_return() return self._stderr class FutureSshReturn(SshReturn): def __init__(self, client, command, timeout=None, stdio=None): self.client = client self.command = command self.timeout = timeout self.stdio = stdio if stdio else client.stdio if self.stdio: self.stdio = self.stdio.sub_io() self.finsh = False super(FutureSshReturn, self).__init__(127, '', '') def set_return(self, ssh_return): self.code = ssh_return.code self.stdout = ssh_return.stdout self.stderr = ssh_return.stderr self.finsh = True class ConcurrentExecutor(object): def __init__(self, workers=None): self.workers = workers self.futures = [] def add_task(self, client, command, timeout=None, stdio=None): ret = FutureSshReturn(client, command, timeout, stdio=stdio) self.futures.append(ret) return ret def size(self): return len(self.futures) @staticmethod def execute(future): client = SshClient(future.client.config, future.stdio) future.set_return(client.execute_command(future.command, timeout=future.timeout)) return future def submit(self): rets = [] pool = ThreadPool(processes=self.workers) try: results = pool.map(ConcurrentExecutor.execute, tuple(self.futures)) for r in results: rets.append(r) finally: pool.close() self.futures = [] return rets class LocalClient(SafeStdio): @staticmethod def init_env(env=None): if env is None: return None env_t = COMMAND_ENV.copy() env_t.update(env) return env_t @staticmethod def execute_command_background(command, env=None, timeout=None, stdio=None): stdio.verbose('local background execute: %s ' % command, end='') try: p = Popen(command, env=LocalClient.init_env(env), shell=True, stdout=PIPE, stderr=PIPE) return FeatureSshReturn(p, timeout, stdio) except Exception as e: output = '' error = str(e) code = 255 verbose_msg = 'exited code 255, error output:\n%s' % error stdio.verbose(verbose_msg) stdio.exception('') return SshReturn(code, output, error) @staticmethod def execute_command(command, env=None, timeout=None, stdio=None): stdio.verbose('local execute: %s ' % command, end='') try: p = Popen(command, env=LocalClient.init_env(env), shell=True, stdout=PIPE, stderr=PIPE) output, error = p.communicate(timeout=timeout) code = p.returncode output = output.decode(errors='replace') error = error.decode(errors='replace') verbose_msg = 'exited code %s' % code if code: verbose_msg += ', error output:\n%s' % error stdio.verbose(verbose_msg) except Exception as e: output = '' error = str(e) code = 255 verbose_msg = 'exited code 255, error output:\n%s' % error stdio.verbose(verbose_msg) stdio.exception('') return SshReturn(code, output, error) @staticmethod def put_file(local_path, remote_path, stdio=None): if LocalClient.execute_command('mkdir -p %s && cp -f %s %s' % (os.path.dirname(remote_path), local_path, remote_path), stdio=stdio): return True return False @staticmethod def put_dir(local_dir, remote_dir, stdio=None): if os.path.isdir(local_dir): local_dir = os.path.join(local_dir, '*') if os.path.exists(os.path.dirname(local_dir)) and not glob(local_dir): stdio.verbose("%s is empty" % local_dir) return True if LocalClient.execute_command('mkdir -p %s && cp -frL %s %s' % (remote_dir, local_dir, remote_dir), stdio=stdio): return True return False @staticmethod def write_file(content, file_path, mode='w', stdio=None): stdio.verbose('write {} to {}'.format(content, file_path)) try: with FileUtil.open(file_path, mode, stdio=stdio) as f: f.write(content) f.flush() return True except: stdio.exception('') return False @staticmethod def get_file(local_path, remote_path, stdio=None): return LocalClient.put_file(remote_path, local_path, stdio=stdio) @staticmethod def get_dir(local_path, remote_path, stdio=None): return LocalClient.put_dir(remote_path, local_path, stdio=stdio) class RemoteTransporter(enum.Enum): CLIENT = 0 RSYNC = 1 def __lt__(self, other): return self.value < other.value def __gt__(self, other): return self.value > other.value class SshClient(SafeStdio): DEFAULT_PATH = '/sbin:/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:' LOCAL_HOST = ['127.0.0.1', 'localhost', '127.1', '127.0.1'] def __init__(self, config, stdio=None): self.config = config self.stdio = stdio self.sftp = None self.is_connected = False self.ssh_client = SSHClient() self.env_str = '' self._remote_transporter = None self.task_queue = None self.result_queue = None self._is_local = self.is_localhost() and self.config.username == getpass.getuser() if self._is_local: self.env = {} else: self.env = {'PATH': self.DEFAULT_PATH} self._update_env() super(SshClient, self).__init__() def _init_queue(self): self.task_queue = Queue() self.result_queue = Queue() def _update_env(self): env = [] for key in self.env: if self.env[key]: env.append('export %s=%s$%s;' % (key, self.env[key], key)) self.env_str = ''.join(env) def add_env(self, key, value, rewrite=False, stdio=None): if key not in self.env or not self.env[key] or rewrite: stdio.verbose('%s@%s set env %s to \'%s\'' % (self.config.username, self.config.host, key, value)) if self._is_local: self._add_env_for_local(key, value, rewrite) else: self.env[key] = value else: stdio.verbose('%s@%s append \'%s\' to %s' % (self.config.username, self.config.host, value, key)) if self._is_local: self._add_env_for_local(key, value, rewrite) else: self.env[key] += value self._update_env() def _add_env_for_local(self, key, value, rewrite=False): if rewrite: self.env[key] = value else: if key not in self.env: self.env[key] = COMMAND_ENV.get(key, '') self.env[key] += value def get_env(self, key, stdio=None): return self.env[key] if key in self.env else None def del_env(self, key, stdio=None): if key in self.env: stdio.verbose('%s@%s delete env %s' % (self.config.username, self.config.host, key)) del self.env[key] self._update_env() def __str__(self): return '%s@%s:%d' % (self.config.username, self.config.host, self.config.port) def is_localhost(self, stdio=None): return self.config.host in self.LOCAL_HOST def _login(self, stdio=None): if self.is_connected: return True err = None try: self.ssh_client.set_missing_host_key_policy(AutoAddPolicy()) self.ssh_client.connect( self.config.host, port=self.config.port, username=self.config.username, password=self.config.password, key_filename=self.config.key_filename, timeout=self.config.timeout ) self.is_connected = True except AuthenticationException: stdio.exception('') err = EC_SSH_CONNECT.format(user=self.config.username, ip=self.config.host, message='username or password error') except NoValidConnectionsError: stdio.exception('') err = EC_SSH_CONNECT.format(user=self.config.username, ip=self.config.host, message='time out') except BaseException as e: stdio.exception('') err = EC_SSH_CONNECT.format(user=self.config.username, ip=self.config.host, message=e) if err: stdio.critical(err) return err return self.is_connected def _open_sftp(self, stdio=None): if self.sftp: return True if self._login(stdio=stdio): SFTPClient.from_transport(self.ssh_client.get_transport()) self.sftp = self.ssh_client.open_sftp() return True return False def connect(self, stdio=None): if self._is_local: return True return self._login(stdio=stdio) def reconnect(self, stdio=None): self.close(stdio=stdio) return self.connect(stdio=stdio) def close(self, stdio=None): if self._is_local: return True if self.is_connected: self.ssh_client.close() if self.sftp: self.sftp = None def __del__(self): self.close() def _execute_command(self, command, timeout=None, retry=3, stdio=None): if not self._login(stdio): return SshReturn(255, '', 'connect failed') try: stdin, stdout, stderr = self.ssh_client.exec_command(command, timeout=timeout) output = stdout.read().decode(errors='replace') error = stderr.read().decode(errors='replace') if output: idx = output.rindex('\n') code = int(output[idx:]) stdout = output[:idx] verbose_msg = 'exited code %s' % code else: code, stdout = 1, '' if code: verbose_msg = 'exited code %s, error output:\n%s' % (code, error) stdio.verbose(verbose_msg) except SSHException as e: if retry: self.close() return self._execute_command(command, retry-1, stdio) else: stdio.exception('') stdio.critical('%s@%s connect failed: %s' % (self.config.username, self.config.host, e)) raise e except Exception as e: stdio.exception('') code = 255 stdout = '' error = str(e) return SshReturn(code, stdout, error) def execute_command(self, command, timeout=None, stdio=None): if timeout is None: timeout = self.config.timeout elif timeout <= 0: timeout = None if self._is_local: return LocalClient.execute_command(command, self.env if self.env else None, timeout, stdio=stdio) verbose_msg = '%s execute: %s ' % (self.config, command) stdio.verbose(verbose_msg, end='') command = '%s %s;echo -e "\n$?\c"' % (self.env_str, command.strip(';').lstrip('\n')) return self._execute_command(command, retry=3, timeout=timeout, stdio=stdio) @property def disable_rsync(self): return COMMAND_ENV.get(ENV_DISABLE_RSYNC) == "1" @property def remote_transporter(self): if self._remote_transporter is not None: return self._remote_transporter _transporter = RemoteTransporter.CLIENT if not self._is_local and self._remote_transporter is None: if not self.config.password and not self.disable_rsync: ret = LocalClient.execute_command('rsync -h', stdio=self.stdio) and self.execute_command('rsync -h', stdio=self.stdio) if ret: _transporter = RemoteTransporter.RSYNC self._remote_transporter = _transporter self.stdio.verbose("current remote_transporter {}".format(self._remote_transporter)) return self._remote_transporter def put_file(self, local_path, remote_path, stdio=None): if not os.path.isfile(local_path): stdio.error('path: %s is not file' % local_path) return False if self._is_local: return LocalClient.put_file(local_path, remote_path, stdio=stdio) if not self._open_sftp(stdio=stdio): return False return self._put_file(local_path, remote_path, stdio=stdio) def write_file(self, content, file_path, mode='w', stdio=None): if self._is_local: return LocalClient.write_file(content, file_path, mode, stdio) return self._write_file(content, file_path, mode, stdio) def _write_file(self, content, file_path, mode='w', stdio=None): stdio.verbose('write {} to {}: {}'.format(content, self, file_path)) try: with tempfile.NamedTemporaryFile(mode=mode) as f: f.write(content) f.flush() return self.put_file(f.name, file_path, stdio=stdio) except: stdio.exception('') return False @property def _put_file(self): if self.remote_transporter == RemoteTransporter.RSYNC: return self._rsync_put_file else: return self._client_put_file def _client_put_file(self, local_path, remote_path, stdio=None): if self.execute_command('mkdir -p %s && rm -fr %s' % (os.path.dirname(remote_path), remote_path), stdio=stdio): stdio.verbose('send %s to %s' % (local_path, remote_path)) if self.sftp.put(local_path, remote_path): return self.execute_command('chmod %s %s' % (oct(os.stat(local_path).st_mode)[-3:], remote_path)) return False def _rsync(self, source, target, stdio=None): identity_option = "" if self.config.key_filename: identity_option += '-i {key_filename} '.format(key_filename=self.config.key_filename) if self.config.port: identity_option += '-p {}'.format(self.config.port) cmd = 'rsync -a -W -e "ssh {identity_option}" {source} {target}'.format( identity_option=identity_option, source=source, target=target ) ret = LocalClient.execute_command(cmd, stdio=stdio) return bool(ret) def _rsync_put_dir(self, local_path, remote_path, stdio=None): stdio.verbose('send %s to %s by rsync' % (local_path, remote_path)) source = os.path.join(local_path, '*') if os.path.exists(os.path.dirname(source)) and not glob(source): stdio.verbose("%s is empty" % source) return True target = "{user}@{host}:{remote_path}".format(user=self.config.username, host=self.config.host, remote_path=remote_path) if self._rsync(source, target, stdio=stdio): return True else: return False def _rsync_put_file(self, local_path, remote_path, stdio=None): if not self.execute_command('mkdir -p %s' % os.path.dirname(remote_path), stdio=stdio): return False stdio.verbose('send %s to %s by rsync' % (local_path, remote_path)) target = "{user}@{host}:{remote_path}".format(user=self.config.username, host=self.config.host, remote_path=remote_path) if self._rsync(local_path, target, stdio=stdio): return True else: return False def put_dir(self, local_dir, remote_dir, stdio=None): if self._is_local: return LocalClient.put_dir(local_dir, remote_dir, stdio=stdio) if not self._open_sftp(stdio=stdio): return False if not self.execute_command('mkdir -p %s' % remote_dir, stdio=stdio): return False stdio.start_loading('Send %s to %s' % (local_dir, remote_dir)) ret = self._put_dir(local_dir, remote_dir, stdio=stdio) stdio.stop_loading('succeed' if ret else 'fail') return ret @property def _put_dir(self): if self.remote_transporter == RemoteTransporter.RSYNC: return self._rsync_put_dir else: return self._client_put_dir def _client_put_dir(self, local_dir, remote_dir, stdio=None): has_failed = False ret = LocalClient.execute_command('find %s -type f' % local_dir) if not ret: has_failed = True all_files = ret.stdout.strip().split('\n') if ret.stdout else [] ret = LocalClient.execute_command('find %s -type d' % local_dir) if not ret: has_failed = True all_dirs = ret.stdout.strip().split('\n') if ret.stdout else [] self._filter_dir_in_file_path(all_files, all_dirs) for local_path in all_files: remote_path = os.path.join(remote_dir, os.path.relpath(local_path, local_dir)) if not self._client_put_file(local_path, remote_path, stdio=stdio): stdio.error('Fail to get %s' % remote_path) has_failed = True for local_path in all_dirs: remote_path = os.path.join(remote_dir, os.path.relpath(local_path, local_dir)) stat = oct(os.stat(local_path).st_mode)[-3:] cmd = '[ -d "{remote_path}" ] || (mkdir -p {remote_path}; chmod {stat} {remote_path})'.format(remote_path=remote_path, stat=stat) if not self.execute_command(cmd): has_failed = True return not has_failed def get_file(self, local_path, remote_path, stdio=None): dirname, _ = os.path.split(local_path) if not dirname: dirname = os.getcwd() local_path = os.path.join(dirname, local_path) if os.path.exists(dirname): if not os.path.isdir(dirname): stdio.error('%s is not directory' % dirname) return False elif not DirectoryUtil.mkdir(dirname, stdio=stdio): return False if os.path.exists(local_path) and not os.path.isfile(local_path): stdio.error('path: %s is not file' % local_path) return False if self._is_local: return LocalClient.get_file(local_path, remote_path, stdio=stdio) if not self._open_sftp(stdio=stdio): return False return self._get_file(local_path, remote_path, stdio=stdio) @property def _get_file(self): if self.remote_transporter == RemoteTransporter.RSYNC: return self._rsync_get_file else: return self._client_get_file def _rsync_get_dir(self, local_path, remote_path, stdio=None): source = "{user}@{host}:{remote_path}".format(user=self.config.username, host=self.config.host, remote_path=remote_path) if "*" not in remote_path: source = os.path.join(source, "*") target = local_path stdio.verbose('get %s from %s by rsync' % (local_path, remote_path)) if LocalClient.execute_command('mkdir -p {}'.format(local_path), stdio=stdio) and self._rsync(source, target, stdio=stdio): return True else: return False def _rsync_get_file(self, local_path, remote_path, stdio=None): source = "{user}@{host}:{remote_path}".format(user=self.config.username, host=self.config.host, remote_path=remote_path) target = local_path stdio.verbose('get %s from %s by rsync' % (local_path, remote_path)) if self._rsync(source, target, stdio=stdio): return True else: return False def _client_get_file(self, local_path, remote_path, stdio=None): try: self.sftp.get(remote_path, local_path) stat = self.sftp.stat(remote_path) os.chmod(local_path, stat.st_mode) return True except Exception as e: stdio.exception('get %s from %s@%s:%s failed: %s' % (local_path, self.config.username, self.config.host, remote_path, e)) return False def get_dir(self, local_dir, remote_dir, stdio=None): dirname, _ = os.path.split(local_dir) if not dirname: dirname = os.getcwd() local_dir = os.path.join(dirname, local_dir) if "*" in dirname: stdio.error('Invalid directory {}'.format(dirname)) return False if os.path.exists(dirname): if not os.path.isdir(dirname): stdio.error('%s is not directory' % dirname) return False elif not DirectoryUtil.mkdir(dirname, stdio=stdio): return False if os.path.exists(local_dir) and not os.path.isdir(local_dir): stdio.error('%s is not directory' % local_dir) return False if self._is_local: return LocalClient.get_dir(local_dir, remote_dir, stdio=stdio) if not self._open_sftp(stdio=stdio): return False stdio.start_loading('Get %s from %s' % (local_dir, remote_dir)) ret = self._get_dir(local_dir, remote_dir, stdio=stdio) stdio.stop_loading('succeed' if ret else 'fail') return ret @property def _get_dir(self): if self.remote_transporter == RemoteTransporter.RSYNC: return self._rsync_get_dir else: return self._client_get_dir def _client_get_dir(self, local_dir, remote_dir, stdio=None): task_queue = [] has_failed = False if DirectoryUtil.mkdir(local_dir, stdio=stdio): try: ret = self.execute_command('find %s -type f' % remote_dir) if not ret: stdio.verbose(ret.stderr) has_failed = True all_files = ret.stdout.strip().split('\n') if ret.stdout else [] ret = self.execute_command('find %s -type d' % remote_dir) if not ret: has_failed = True all_dirs = ret.stdout.strip().split('\n') if ret.stdout else [] self._filter_dir_in_file_path(all_files, all_dirs) for f in all_files: task_queue.append(f) if "*" in remote_dir: remote_base_dir = os.path.dirname(remote_dir) else: remote_base_dir = remote_dir for remote_path in task_queue: local_path = os.path.join(local_dir, os.path.relpath(remote_path, remote_dir)) if not self._client_get_file(local_path, remote_path, stdio=stdio): stdio.error('Fail to get %s' % remote_path) has_failed = True for remote_path in all_dirs: try: local_path = os.path.join(local_dir, os.path.relpath(remote_path, remote_base_dir)) if not os.path.exists(local_path): stat = self.sftp.stat(remote_path) os.makedirs(local_path, mode=stat.st_mode) except Exception as e: stdio.exception('Fail to make directory %s in local: %s' % (remote_path, e)) has_failed = True return not has_failed except Exception as e: stdio.exception('Fail to get %s: %s' % (remote_dir, e)) @staticmethod def _filter_dir_in_file_path(files, directories): skip_directories = [] for path in files: dir_name = os.path.dirname(path) while dir_name not in ["/", ".", ""]: if dir_name in skip_directories: break if dir_name in directories: directories.remove(dir_name) skip_directories.append(dir_name) dir_name = os.path.dirname(dir_name) def file_downloader(self, local_dir, remote_dir, stdio=None): try: client = SshClient(config=self.config, stdio=None) client._open_sftp(stdio=stdio) client._remote_transporter = self.remote_transporter while True: remote_path = self.task_queue.get(block=False) local_path = os.path.join(local_dir, os.path.relpath(remote_path, remote_dir)) if client.get_file(local_path, remote_path, stdio=stdio): self.result_queue.put(remote_path) else: stdio.error('Fail to get %s' % remote_path) except Empty: return except: stdio.exception("") stdio.exception('Failed to get %s' % remote_dir) def file_uploader(self, local_dir, remote_dir, stdio=None): try: client = SshClient(config=self.config, stdio=None) client._remote_transporter = self.remote_transporter while True: local_path, is_dir = self.task_queue.get(block=False) remote_path = os.path.join(remote_dir, os.path.relpath(local_path, local_dir)) if is_dir: stat = oct(os.stat(local_path).st_mode)[-3:] cmd = '[ -d "{remote_path}" ] || (mkdir -p {remote_path}; chmod {stat} {remote_path})'.format(remote_path=remote_path, stat=stat) if client.execute_command(cmd): self.result_queue.put(remote_path) else: if client.put_file(local_path, remote_path, stdio=stdio): self.result_queue.put(remote_path) else: stdio.error('Fail to get %s' % remote_path) except Empty: return except: stdio.exception("") stdio.verbose('Failed to get %s' % remote_dir)