ssh.py 26.0 KB
Newer Older
O
oceanbase-admin 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy.  If not, see <https://www.gnu.org/licenses/>.


from __future__ import absolute_import, division, print_function

F
v1.5.0  
frf12 已提交
23
import enum
O
oceanbase-admin 已提交
24
import getpass
F
v1.5.0  
frf12 已提交
25
import os
F
v1.6.0  
frf12 已提交
26
import tempfile
O
oceanbase-admin 已提交
27
import warnings
F
v1.5.0  
frf12 已提交
28 29
from glob import glob

O
oceanbase-admin 已提交
30
from subprocess32 import Popen, PIPE
F
v1.5.0  
frf12 已提交
31

O
oceanbase-admin 已提交
32 33 34 35 36
# paramiko import cryptography 模块在python2下会报不支持警报
warnings.filterwarnings("ignore")

from paramiko import AuthenticationException, SFTPClient
from paramiko.client import SSHClient, AutoAddPolicy
R
Rongfeng Fu 已提交
37 38
from paramiko.ssh_exception import NoValidConnectionsError, SSHException

F
v1.5.0  
frf12 已提交
39 40 41
from multiprocessing.queues import Empty
from multiprocessing import Queue, Process
from multiprocessing.pool import ThreadPool
O
oceanbase-admin 已提交
42

F
v1.6.0  
frf12 已提交
43
from tool import COMMAND_ENV, DirectoryUtil, FileUtil
F
v1.5.0  
frf12 已提交
44
from _stdio import SafeStdio
F
v1.6.0  
frf12 已提交
45
from _environ import ENV_DISABLE_RSYNC
O
oceanbase-admin 已提交
46

R
Rongfeng Fu 已提交
47

F
v1.5.0  
frf12 已提交
48
__all__ = ("SshClient", "SshConfig", "LocalClient", "ConcurrentExecutor")
R
Rongfeng Fu 已提交
49

O
oceanbase-admin 已提交
50

F
v1.5.0  
frf12 已提交
51
class SshConfig(object):
O
oceanbase-admin 已提交
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78

    def __init__(self, host, username='root', password=None, key_filename=None, port=22, timeout=30):
        self.host = host
        self.username = username
        self.password = password
        self.key_filename = key_filename
        self.port = port
        self.timeout = timeout

    def __str__(self):
        return '%s@%s' % (self.username ,self.host)


class SshReturn(object):

    def __init__(self, code, stdout, stderr):
        self.code = code
        self.stdout = stdout
        self.stderr = stderr

    def __bool__(self):
        return self.code == 0
    
    def __nonzero__(self):
        return self.__bool__()


F
v1.5.0  
frf12 已提交
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
class FutureSshReturn(SshReturn):

    def __init__(self, client, command, timeout=None, stdio=None):
        self.client = client
        self.command = command
        self.timeout = timeout
        self.stdio = stdio if stdio else client.stdio
        if self.stdio:
            self.stdio = self.stdio.sub_io()
        self.finsh = False
        super(FutureSshReturn, self).__init__(127, '', '')

    def set_return(self, ssh_return):
        self.code = ssh_return.code
        self.stdout = ssh_return.stdout
        self.stderr = ssh_return.stderr
        self.finsh = True


class ConcurrentExecutor(object):

    def __init__(self, workers=None):
        self.workers = workers
        self.futures = []

    def add_task(self, client, command, timeout=None, stdio=None):
        ret = FutureSshReturn(client, command, timeout, stdio=stdio)
        self.futures.append(ret)
        return ret

F
v1.6.0  
frf12 已提交
109 110 111
    def size(self):
        return len(self.futures)

F
v1.5.0  
frf12 已提交
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
    @staticmethod
    def execute(future):
        client = SshClient(future.client.config, future.stdio)
        future.set_return(client.execute_command(future.command, timeout=future.timeout))
        return future

    def submit(self):
        rets = []
        pool = ThreadPool(processes=self.workers)
        try:
            results = pool.map(ConcurrentExecutor.execute, tuple(self.futures))
            for r in results:
                rets.append(r)
        finally:
            pool.close()
        self.futures = []
        return rets


class LocalClient(SafeStdio):
O
oceanbase-admin 已提交
132 133 134

    @staticmethod
    def execute_command(command, env=None, timeout=None, stdio=None):
F
v1.5.0  
frf12 已提交
135
        stdio.verbose('local execute: %s ' % command, end='')
O
oceanbase-admin 已提交
136 137 138 139 140 141 142 143 144
        try:
            p = Popen(command, env=env, shell=True, stdout=PIPE, stderr=PIPE)
            output, error = p.communicate(timeout=timeout)
            code = p.returncode
            output = output.decode(errors='replace')
            error = error.decode(errors='replace')
            verbose_msg = 'exited code %s' % code
            if code:
                verbose_msg += ', error output:\n%s' % error
F
v1.5.0  
frf12 已提交
145
            stdio.verbose(verbose_msg)
O
oceanbase-admin 已提交
146 147 148 149 150
        except Exception as e:
            output = ''
            error = str(e)
            code = 255
            verbose_msg = 'exited code 255, error output:\n%s' % error
F
v1.5.0  
frf12 已提交
151 152
            stdio.verbose(verbose_msg)
            stdio.exception('')
O
oceanbase-admin 已提交
153 154 155 156
        return SshReturn(code, output, error)

    @staticmethod
    def put_file(local_path, remote_path, stdio=None):
R
Rongfeng Fu 已提交
157
        if LocalClient.execute_command('mkdir -p %s && cp -f %s %s' % (os.path.dirname(remote_path), local_path, remote_path), stdio=stdio):
O
oceanbase-admin 已提交
158 159 160 161
            return True
        return False

    @staticmethod
R
Rongfeng Fu 已提交
162
    def put_dir(local_dir, remote_dir, stdio=None):
F
v1.5.0  
frf12 已提交
163 164 165 166 167
        if os.path.isdir(local_dir):
            local_dir = os.path.join(local_dir, '*')
        if os.path.exists(os.path.dirname(local_dir)) and not glob(local_dir):
            stdio.verbose("%s is empty" % local_dir)
            return True
F
v1.6.0  
frf12 已提交
168
        if LocalClient.execute_command('mkdir -p %s && cp -frL %s %s' % (remote_dir, local_dir, remote_dir), stdio=stdio):
O
oceanbase-admin 已提交
169 170 171
            return True
        return False

F
v1.6.0  
frf12 已提交
172 173 174 175 176 177 178 179 180 181 182 183 184
    @staticmethod
    def write_file(content, file_path, mode='w', stdio=None):
        stdio.verbose('write {} to {}'.format(content, file_path))
        try:
            with FileUtil.open(file_path, mode, stdio=stdio) as f:
                f.write(content)
                f.flush()
            return True
        except:
            stdio.exception('')
            return False


R
Rongfeng Fu 已提交
185 186 187 188 189 190 191 192
    @staticmethod
    def get_file(local_path, remote_path, stdio=None):
        return LocalClient.put_file(remote_path, local_path, stdio=stdio)

    @staticmethod
    def get_dir(local_path, remote_path, stdio=None):
        return LocalClient.put_dir(remote_path, local_path, stdio=stdio)

O
oceanbase-admin 已提交
193

F
v1.5.0  
frf12 已提交
194 195 196 197 198 199 200 201 202 203 204 205
class RemoteTransporter(enum.Enum):
    CLIENT = 0
    RSYNC = 1

    def __lt__(self, other):
        return self.value < other.value

    def __gt__(self, other):
        return self.value > other.value


class SshClient(SafeStdio):
O
oceanbase-admin 已提交
206 207 208 209 210 211 212 213

    def __init__(self, config, stdio=None):
        self.config = config
        self.stdio = stdio
        self.sftp = None
        self.is_connected = False
        self.ssh_client = SSHClient()
        self.env_str = ''
F
v1.5.0  
frf12 已提交
214 215 216
        self._remote_transporter = None
        self.task_queue = None
        self.result_queue = None
O
oceanbase-admin 已提交
217
        if self._is_local():
F
v1.5.0  
frf12 已提交
218
            self.env = COMMAND_ENV.copy()
O
oceanbase-admin 已提交
219 220 221
        else:
            self.env = {'PATH': '/sbin:/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:'}
            self._update_env()
F
v1.5.0  
frf12 已提交
222 223 224 225 226
        super(SshClient, self).__init__()

    def _init_queue(self):
        self.task_queue = Queue()
        self.result_queue = Queue()
O
oceanbase-admin 已提交
227 228 229 230 231 232 233 234 235 236

    def _update_env(self):
        env = []
        for key in self.env:
            if self.env[key]:
                env.append('export %s=%s$%s;' % (key, self.env[key], key))
        self.env_str = ''.join(env)

    def add_env(self, key, value, rewrite=False, stdio=None):
        if key not in self.env or not self.env[key] or rewrite:
F
v1.5.0  
frf12 已提交
237
            stdio.verbose('%s@%s set env %s to \'%s\'' % (self.config.username, self.config.host, key, value))
O
oceanbase-admin 已提交
238 239
            self.env[key] = value
        else:
F
v1.5.0  
frf12 已提交
240
            stdio.verbose('%s@%s append \'%s\' to %s' % (self.config.username, self.config.host, value, key))
O
oceanbase-admin 已提交
241 242 243
            self.env[key] += value
        self._update_env()

F
v1.5.0  
frf12 已提交
244
    def get_env(self, key, stdio=None):
O
oceanbase-admin 已提交
245 246
        return self.env[key] if key in self.env else None

F
v1.5.0  
frf12 已提交
247 248 249 250 251
    def del_env(self, key,  stdio=None):
        if key in self.env:
            stdio.verbose('%s@%s delete env %s' % (self.config.username, self.config.host, key))
            del self.env[key]
            self._update_env()
F
v1.6.0  
frf12 已提交
252

O
oceanbase-admin 已提交
253 254 255 256
    def __str__(self):
        return '%s@%s:%d' % (self.config.username, self.config.host, self.config.port)

    def _is_local(self):
R
Rongfeng Fu 已提交
257 258 259 260
        return self.is_localhost() and self.config.username ==  getpass.getuser()

    def is_localhost(self, stdio=None):
        return self.config.host in ['127.0.0.1', 'localhost', '127.1', '127.0.1']
O
oceanbase-admin 已提交
261 262 263 264 265 266 267

    def _login(self, stdio=None):
        if self.is_connected:
            return True
        try:
            self.ssh_client.set_missing_host_key_policy(AutoAddPolicy())
            self.ssh_client.connect(
F
v1.5.0  
frf12 已提交
268 269 270 271 272
                self.config.host,
                port=self.config.port,
                username=self.config.username,
                password=self.config.password,
                key_filename=self.config.key_filename,
O
oceanbase-admin 已提交
273 274
                timeout=self.config.timeout
            )
R
Rongfeng Fu 已提交
275
            self.is_connected = True
O
oceanbase-admin 已提交
276
        except AuthenticationException:
F
v1.5.0  
frf12 已提交
277 278
            stdio.exception('')
            stdio.critical('%s@%s username or password error' % (self.config.username, self.config.host))
O
oceanbase-admin 已提交
279
        except NoValidConnectionsError:
F
v1.5.0  
frf12 已提交
280 281
            stdio.exception('')
            stdio.critical('%s@%s connect failed: time out' % (self.config.username, self.config.host))
O
oceanbase-admin 已提交
282
        except Exception as e:
F
v1.5.0  
frf12 已提交
283 284
            stdio.exception('')
            stdio.critical('%s@%s connect failed: %s' % (self.config.username, self.config.host, e))
R
Rongfeng Fu 已提交
285
        return self.is_connected
O
oceanbase-admin 已提交
286 287 288 289

    def _open_sftp(self, stdio=None):
        if self.sftp:
            return True
F
v1.5.0  
frf12 已提交
290
        if self._login(stdio=stdio):
O
oceanbase-admin 已提交
291 292 293 294 295 296 297 298
            SFTPClient.from_transport(self.ssh_client.get_transport())
            self.sftp = self.ssh_client.open_sftp()
            return True
        return False

    def connect(self, stdio=None):
        if self._is_local():
            return True
F
v1.5.0  
frf12 已提交
299
        return self._login(stdio=stdio)
O
oceanbase-admin 已提交
300 301

    def reconnect(self, stdio=None):
F
v1.5.0  
frf12 已提交
302 303
        self.close(stdio=stdio)
        return self.connect(stdio=stdio)
O
oceanbase-admin 已提交
304 305 306 307 308 309 310 311 312 313 314

    def close(self, stdio=None):
        if self._is_local():
            return True
        if self.is_connected:
            self.ssh_client.close()
        if self.sftp:
            self.sftp = None

    def __del__(self):
        self.close()
R
Rongfeng Fu 已提交
315

F
v1.5.0  
frf12 已提交
316
    def _execute_command(self, command, timeout=None, retry=3, stdio=None):
R
Rongfeng Fu 已提交
317 318 319
        if not self._login(stdio):
            return SshReturn(255, '', 'connect failed')
        try:
F
v1.5.0  
frf12 已提交
320
            stdin, stdout, stderr = self.ssh_client.exec_command(command, timeout=timeout)
R
Rongfeng Fu 已提交
321 322 323 324 325 326 327 328 329 330 331
            output = stdout.read().decode(errors='replace')
            error = stderr.read().decode(errors='replace')
            if output:
                idx = output.rindex('\n')
                code = int(output[idx:])
                stdout = output[:idx]
                verbose_msg = 'exited code %s' % code
            else:
                code, stdout = 1, ''
            if code:
                verbose_msg = 'exited code %s, error output:\n%s' % (code, error)
F
v1.5.0  
frf12 已提交
332
            stdio.verbose(verbose_msg)
R
Rongfeng Fu 已提交
333 334 335 336 337 338
            return SshReturn(code, stdout, error)
        except SSHException as e:
            if retry:
                self.close()
                return self._execute_command(command, retry-1, stdio)
            else:
F
v1.5.0  
frf12 已提交
339 340
                stdio.exception('')
                stdio.critical('%s@%s connect failed: %s' % (self.config.username, self.config.host, e))
R
Rongfeng Fu 已提交
341 342
                raise e
        except Exception as e:
F
v1.5.0  
frf12 已提交
343 344
            stdio.exception('')
            stdio.critical('%s@%s connect failed: %s' % (self.config.username, self.config.host, e))
R
Rongfeng Fu 已提交
345
            raise e
F
v1.5.0  
frf12 已提交
346 347 348 349 350 351

    def execute_command(self, command, timeout=None, stdio=None):
        if timeout is None:
            timeout = self.config.timeout
        elif timeout <= 0:
            timeout = None
O
oceanbase-admin 已提交
352
        if self._is_local():
F
v1.5.0  
frf12 已提交
353
            return LocalClient.execute_command(command, self.env, timeout, stdio=stdio)
O
oceanbase-admin 已提交
354 355

        verbose_msg = '%s execute: %s ' % (self.config, command)
F
v1.5.0  
frf12 已提交
356
        stdio.verbose(verbose_msg, end='')
F
v1.6.0  
frf12 已提交
357
        command = '%s %s;echo -e "\n$?\c"' % (self.env_str, command.strip(';').lstrip('\n'))
F
v1.5.0  
frf12 已提交
358 359 360 361
        return self._execute_command(command, retry=3, timeout=timeout, stdio=stdio)

    @property
    def disable_rsync(self):
F
v1.6.0  
frf12 已提交
362
        return COMMAND_ENV.get(ENV_DISABLE_RSYNC) == "1"
F
v1.5.0  
frf12 已提交
363 364 365 366 367 368 369 370 371 372 373 374 375 376

    @property
    def remote_transporter(self):
        if self._remote_transporter is not None:
            return self._remote_transporter
        _transporter = RemoteTransporter.CLIENT
        if not self._is_local() and self._remote_transporter is None:
            if not self.config.password and not self.disable_rsync:
                ret = LocalClient.execute_command('rsync -h', stdio=self.stdio)
                if ret:
                    _transporter = RemoteTransporter.RSYNC
        self._remote_transporter = _transporter
        self.stdio.verbose("current remote_transporter {}".format(self._remote_transporter))
        return self._remote_transporter
R
Rongfeng Fu 已提交
377

O
oceanbase-admin 已提交
378 379
    def put_file(self, local_path, remote_path, stdio=None):
        if not os.path.isfile(local_path):
F
v1.5.0  
frf12 已提交
380
            stdio.error('path: %s is not file' % local_path)
O
oceanbase-admin 已提交
381
            return False
R
Rongfeng Fu 已提交
382 383
        if self._is_local():
            return LocalClient.put_file(local_path, remote_path, stdio=stdio)
F
v1.5.0  
frf12 已提交
384
        if not self._open_sftp(stdio=stdio):
O
oceanbase-admin 已提交
385
            return False
R
Rongfeng Fu 已提交
386
        return self._put_file(local_path, remote_path, stdio=stdio)
R
Rongfeng Fu 已提交
387

F
v1.6.0  
frf12 已提交
388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
    def write_file(self, content, file_path, mode='w', stdio=None):
        if self._is_local():
            return LocalClient.write_file(content, file_path, mode, stdio)
        return self._write_file(content, file_path, mode, stdio)

    def _write_file(self, content, file_path, mode='w', stdio=None):
        stdio.verbose('write {} to {}: {}'.format(content, self, file_path))
        try:
            with tempfile.NamedTemporaryFile(mode=mode) as f:
                f.write(content)
                f.flush()
                return self.put_file(f.name, file_path, stdio=stdio)
        except:
            stdio.exception('')
            return False

F
v1.5.0  
frf12 已提交
404 405 406 407 408 409 410 411
    @property
    def _put_file(self):
        if self.remote_transporter == RemoteTransporter.RSYNC:
            return self._rsync_put_file
        else:
            return self._client_put_file

    def _client_put_file(self, local_path, remote_path, stdio=None):
R
Rongfeng Fu 已提交
412
        if self.execute_command('mkdir -p %s && rm -fr %s' % (os.path.dirname(remote_path), remote_path), stdio=stdio):
F
v1.5.0  
frf12 已提交
413
            stdio.verbose('send %s to %s' % (local_path, remote_path))
R
Rongfeng Fu 已提交
414
            if self.sftp.put(local_path, remote_path):
F
v1.5.0  
frf12 已提交
415
                return self.execute_command('chmod %s %s' % (oct(os.stat(local_path).st_mode)[-3:], remote_path))
O
oceanbase-admin 已提交
416 417
        return False

F
v1.5.0  
frf12 已提交
418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451
    def _rsync(self, source, target, stdio=None):
        identity_option = ""
        if self.config.key_filename:
            identity_option += '-e "ssh -i {key_filename} "'.format(key_filename=self.config.key_filename)
        cmd = 'rsync -a -W {identity_option} {source} {target}'.format(
            identity_option=identity_option,
            source=source,
            target=target
        )
        ret = LocalClient.execute_command(cmd, stdio=stdio)
        return bool(ret)

    def _rsync_put_dir(self, local_path, remote_path, stdio=None):
        stdio.verbose('send %s to %s by rsync' % (local_path, remote_path))
        source = os.path.join(local_path, '*')
        if os.path.exists(os.path.dirname(source)) and not glob(source):
            stdio.verbose("%s is empty" % source)
            return True
        target = "{user}@{host}:{remote_path}".format(user=self.config.username, host=self.config.host, remote_path=remote_path)
        if self._rsync(source, target, stdio=stdio):
            return True
        else:
            return False

    def _rsync_put_file(self, local_path, remote_path, stdio=None):
        if not self.execute_command('mkdir -p %s' % os.path.dirname(remote_path), stdio=stdio):
            return False
        stdio.verbose('send %s to %s by rsync' % (local_path, remote_path))
        target = "{user}@{host}:{remote_path}".format(user=self.config.username, host=self.config.host, remote_path=remote_path)
        if self._rsync(local_path, target, stdio=stdio):
            return True
        else:
            return False

O
oceanbase-admin 已提交
452 453
    def put_dir(self, local_dir, remote_dir, stdio=None):
        if self._is_local():
R
Rongfeng Fu 已提交
454
            return LocalClient.put_dir(local_dir, remote_dir, stdio=stdio)
F
v1.5.0  
frf12 已提交
455
        if not self._open_sftp(stdio=stdio):
O
oceanbase-admin 已提交
456
            return False
R
Rongfeng Fu 已提交
457
        if not self.execute_command('mkdir -p %s' % remote_dir, stdio=stdio):
O
oceanbase-admin 已提交
458
            return False
F
v1.5.0  
frf12 已提交
459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493
        stdio.start_loading('Send %s to %s' % (local_dir, remote_dir))
        ret = self._put_dir(local_dir, remote_dir, stdio=stdio)
        stdio.stop_loading('succeed' if ret else 'fail')
        return ret

    @property
    def _put_dir(self):
        if self.remote_transporter == RemoteTransporter.RSYNC:
            return self._rsync_put_dir
        else:
            return self._client_put_dir

    def _client_put_dir(self, local_dir, remote_dir, stdio=None):
        has_failed = False
        ret = LocalClient.execute_command('find %s -type f' % local_dir)
        if not ret:
            has_failed = True
        all_files = ret.stdout.strip().split('\n') if ret.stdout else []
        ret = LocalClient.execute_command('find %s -type d' % local_dir)
        if not ret:
            has_failed = True
        all_dirs = ret.stdout.strip().split('\n') if ret.stdout else []
        self._filter_dir_in_file_path(all_files, all_dirs)
        for local_path in all_files:
            remote_path = os.path.join(remote_dir, os.path.relpath(local_path, local_dir))
            if not self._client_put_file(local_path, remote_path, stdio=stdio):
                stdio.error('Fail to get %s' % remote_path)
                has_failed = True
        for local_path in all_dirs:
            remote_path = os.path.join(remote_dir, os.path.relpath(local_path, local_dir))
            stat = oct(os.stat(local_path).st_mode)[-3:]
            cmd = '[ -d "{remote_path}" ] || (mkdir -p {remote_path}; chmod {stat} {remote_path})'.format(remote_path=remote_path, stat=stat)
            if not self.execute_command(cmd):
                has_failed = True
        return not has_failed
O
oceanbase-admin 已提交
494

R
Rongfeng Fu 已提交
495 496 497 498 499 500 501
    def get_file(self, local_path, remote_path, stdio=None):
        dirname, _ = os.path.split(local_path)
        if not dirname:
            dirname = os.getcwd()
            local_path = os.path.join(dirname, local_path)
        if os.path.exists(dirname):
            if not os.path.isdir(dirname):
F
v1.5.0  
frf12 已提交
502
                stdio.error('%s is not directory' % dirname)
R
Rongfeng Fu 已提交
503 504 505 506
                return False
        elif not DirectoryUtil.mkdir(dirname, stdio=stdio):
            return False
        if os.path.exists(local_path) and not os.path.isfile(local_path):
F
v1.5.0  
frf12 已提交
507
            stdio.error('path: %s is not file' % local_path)
R
Rongfeng Fu 已提交
508 509 510
            return False
        if self._is_local():
            return LocalClient.get_file(local_path, remote_path, stdio=stdio)
F
v1.5.0  
frf12 已提交
511
        if not self._open_sftp(stdio=stdio):
R
Rongfeng Fu 已提交
512 513 514
            return False
        return self._get_file(local_path, remote_path, stdio=stdio)

F
v1.5.0  
frf12 已提交
515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542
    @property
    def _get_file(self):
        if self.remote_transporter == RemoteTransporter.RSYNC:
            return self._rsync_get_file
        else:
            return self._client_get_file

    def _rsync_get_dir(self, local_path, remote_path, stdio=None):
        source = "{user}@{host}:{remote_path}".format(user=self.config.username, host=self.config.host, remote_path=remote_path)
        if "*" not in remote_path:
            source = os.path.join(source, "*")
        target = local_path
        stdio.verbose('get %s from %s by rsync' % (local_path, remote_path))
        if LocalClient.execute_command('mkdir -p {}'.format(local_path), stdio=stdio) and self._rsync(source, target, stdio=stdio):
            return True
        else:
            return False

    def _rsync_get_file(self, local_path, remote_path, stdio=None):
        source = "{user}@{host}:{remote_path}".format(user=self.config.username, host=self.config.host, remote_path=remote_path)
        target = local_path
        stdio.verbose('get %s from %s by rsync' % (local_path, remote_path))
        if self._rsync(source, target, stdio=stdio):
            return True
        else:
            return False

    def _client_get_file(self, local_path, remote_path, stdio=None):
R
Rongfeng Fu 已提交
543 544 545 546 547 548
        try:
            self.sftp.get(remote_path, local_path)
            stat = self.sftp.stat(remote_path)
            os.chmod(local_path, stat.st_mode)
            return True
        except Exception as e:
F
v1.5.0  
frf12 已提交
549
            stdio.exception('get %s from %s@%s:%s failed: %s' % (local_path, self.config.username, self.config.host, remote_path, e))
R
Rongfeng Fu 已提交
550 551 552 553 554 555 556
        return False

    def get_dir(self, local_dir, remote_dir, stdio=None):
        dirname, _ = os.path.split(local_dir)
        if not dirname:
            dirname = os.getcwd()
            local_dir = os.path.join(dirname, local_dir)
F
v1.5.0  
frf12 已提交
557 558 559
        if "*" in dirname:
            stdio.error('Invalid directory {}'.format(dirname))
            return False
R
Rongfeng Fu 已提交
560 561
        if os.path.exists(dirname):
            if not os.path.isdir(dirname):
F
v1.5.0  
frf12 已提交
562
                stdio.error('%s is not directory' % dirname)
R
Rongfeng Fu 已提交
563 564 565 566
                return False
        elif not DirectoryUtil.mkdir(dirname, stdio=stdio):
            return False
        if os.path.exists(local_dir) and not os.path.isdir(local_dir):
F
v1.5.0  
frf12 已提交
567
            stdio.error('%s is not directory' % local_dir)
R
Rongfeng Fu 已提交
568 569 570
            return False
        if self._is_local():
            return LocalClient.get_dir(local_dir, remote_dir, stdio=stdio)
F
v1.5.0  
frf12 已提交
571
        if not self._open_sftp(stdio=stdio):
R
Rongfeng Fu 已提交
572
            return False
F
v1.5.0  
frf12 已提交
573 574 575 576 577 578 579 580 581 582 583
        stdio.start_loading('Get %s from %s' % (local_dir, remote_dir))
        ret = self._get_dir(local_dir, remote_dir, stdio=stdio)
        stdio.stop_loading('succeed' if ret else 'fail')
        return ret

    @property
    def _get_dir(self):
        if self.remote_transporter == RemoteTransporter.RSYNC:
            return self._rsync_get_dir
        else:
            return self._client_get_dir
R
Rongfeng Fu 已提交
584

F
v1.5.0  
frf12 已提交
585 586 587
    def _client_get_dir(self, local_dir, remote_dir, stdio=None):
        task_queue = []
        has_failed = False
R
Rongfeng Fu 已提交
588 589
        if DirectoryUtil.mkdir(local_dir, stdio=stdio):
            try:
F
v1.5.0  
frf12 已提交
590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620
                ret = self.execute_command('find %s -type f' % remote_dir)
                if not ret:
                    stdio.verbose(ret.stderr)
                    has_failed = True
                all_files = ret.stdout.strip().split('\n') if ret.stdout else []
                ret = self.execute_command('find %s -type d' % remote_dir)
                if not ret:
                    has_failed = True
                all_dirs = ret.stdout.strip().split('\n') if ret.stdout else []
                self._filter_dir_in_file_path(all_files, all_dirs)
                for f in all_files:
                    task_queue.append(f)
                if "*" in remote_dir:
                    remote_base_dir = os.path.dirname(remote_dir)
                else:
                    remote_base_dir = remote_dir
                for remote_path in task_queue:
                    local_path = os.path.join(local_dir, os.path.relpath(remote_path, remote_dir))
                    if not self._client_get_file(local_path, remote_path, stdio=stdio):
                        stdio.error('Fail to get %s' % remote_path)
                        has_failed = True
                for remote_path in all_dirs:
                    try:
                        local_path = os.path.join(local_dir, os.path.relpath(remote_path, remote_base_dir))
                        if not os.path.exists(local_path):
                            stat = self.sftp.stat(remote_path)
                            os.makedirs(local_path, mode=stat.st_mode)
                    except Exception as e:
                        stdio.exception('Fail to make directory %s in local: %s' % (remote_path, e))
                        has_failed = True
                return not has_failed
R
Rongfeng Fu 已提交
621
            except Exception as e:
F
v1.5.0  
frf12 已提交
622 623 624 625 626 627 628 629 630 631 632 633 634 635
                stdio.exception('Fail to get %s: %s' % (remote_dir, e))

    @staticmethod
    def _filter_dir_in_file_path(files, directories):
        skip_directories = []
        for path in files:
            dir_name = os.path.dirname(path)
            while dir_name not in ["/", ".", ""]:
                if dir_name in skip_directories:
                    break
                if dir_name in directories:
                    directories.remove(dir_name)
                    skip_directories.append(dir_name)
                dir_name = os.path.dirname(dir_name)
R
Rongfeng Fu 已提交
636

F
v1.5.0  
frf12 已提交
637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676
    def file_downloader(self, local_dir, remote_dir, stdio=None):
        try:
            client = SshClient(config=self.config, stdio=None)
            client._open_sftp(stdio=stdio)
            client._remote_transporter = self.remote_transporter
            while True:
                remote_path = self.task_queue.get(block=False)
                local_path = os.path.join(local_dir, os.path.relpath(remote_path, remote_dir))
                if client.get_file(local_path, remote_path, stdio=stdio):
                    self.result_queue.put(remote_path)
                else:
                    stdio.error('Fail to get %s' % remote_path)
        except Empty:
            return
        except:
            stdio.exception("")
            stdio.exception('Failed to get %s' % remote_dir)

    def file_uploader(self, local_dir, remote_dir, stdio=None):
        try:
            client = SshClient(config=self.config, stdio=None)
            client._remote_transporter = self.remote_transporter
            while True:
                local_path, is_dir = self.task_queue.get(block=False)
                remote_path = os.path.join(remote_dir, os.path.relpath(local_path, local_dir))
                if is_dir:
                    stat = oct(os.stat(local_path).st_mode)[-3:]
                    cmd = '[ -d "{remote_path}" ] || (mkdir -p {remote_path}; chmod {stat} {remote_path})'.format(remote_path=remote_path, stat=stat)
                    if client.execute_command(cmd):
                        self.result_queue.put(remote_path)
                else:
                    if client.put_file(local_path, remote_path, stdio=stdio):
                        self.result_queue.put(remote_path)
                    else:
                        stdio.error('Fail to get %s' % remote_path)
        except Empty:
            return
        except:
            stdio.exception("")
            stdio.verbose('Failed to get %s' % remote_dir)