ssh.py 26.3 KB
Newer Older
O
oceanbase-admin 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy.  If not, see <https://www.gnu.org/licenses/>.


from __future__ import absolute_import, division, print_function

F
v1.5.0  
frf12 已提交
23
import enum
O
oceanbase-admin 已提交
24
import getpass
F
v1.5.0  
frf12 已提交
25
import os
F
v1.6.0  
frf12 已提交
26
import tempfile
O
oceanbase-admin 已提交
27
import warnings
F
v1.5.0  
frf12 已提交
28 29
from glob import glob

O
oceanbase-admin 已提交
30
from subprocess32 import Popen, PIPE
F
v1.5.0  
frf12 已提交
31

O
oceanbase-admin 已提交
32 33 34 35 36
# paramiko import cryptography 模块在python2下会报不支持警报
warnings.filterwarnings("ignore")

from paramiko import AuthenticationException, SFTPClient
from paramiko.client import SSHClient, AutoAddPolicy
R
Rongfeng Fu 已提交
37 38
from paramiko.ssh_exception import NoValidConnectionsError, SSHException

F
v1.5.0  
frf12 已提交
39 40 41
from multiprocessing.queues import Empty
from multiprocessing import Queue, Process
from multiprocessing.pool import ThreadPool
O
oceanbase-admin 已提交
42

F
v1.6.0  
frf12 已提交
43
from tool import COMMAND_ENV, DirectoryUtil, FileUtil
F
v1.5.0  
frf12 已提交
44
from _stdio import SafeStdio
R
Rongfeng Fu 已提交
45
from _errno import EC_SSH_CONNECT
F
v1.6.0  
frf12 已提交
46
from _environ import ENV_DISABLE_RSYNC
O
oceanbase-admin 已提交
47

R
Rongfeng Fu 已提交
48

F
v1.5.0  
frf12 已提交
49
__all__ = ("SshClient", "SshConfig", "LocalClient", "ConcurrentExecutor")
R
Rongfeng Fu 已提交
50

O
oceanbase-admin 已提交
51

F
v1.5.0  
frf12 已提交
52
class SshConfig(object):
O
oceanbase-admin 已提交
53 54 55 56

    def __init__(self, host, username='root', password=None, key_filename=None, port=22, timeout=30):
        self.host = host
        self.username = username
R
Rongfeng Fu 已提交
57
        self.password = password if password is None else str(password)
O
oceanbase-admin 已提交
58
        self.key_filename = key_filename
R
Rongfeng Fu 已提交
59 60
        self.port = int(port)
        self.timeout = int(timeout)
O
oceanbase-admin 已提交
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79

    def __str__(self):
        return '%s@%s' % (self.username ,self.host)


class SshReturn(object):

    def __init__(self, code, stdout, stderr):
        self.code = code
        self.stdout = stdout
        self.stderr = stderr

    def __bool__(self):
        return self.code == 0
    
    def __nonzero__(self):
        return self.__bool__()


F
v1.5.0  
frf12 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
class FutureSshReturn(SshReturn):

    def __init__(self, client, command, timeout=None, stdio=None):
        self.client = client
        self.command = command
        self.timeout = timeout
        self.stdio = stdio if stdio else client.stdio
        if self.stdio:
            self.stdio = self.stdio.sub_io()
        self.finsh = False
        super(FutureSshReturn, self).__init__(127, '', '')

    def set_return(self, ssh_return):
        self.code = ssh_return.code
        self.stdout = ssh_return.stdout
        self.stderr = ssh_return.stderr
        self.finsh = True


class ConcurrentExecutor(object):

    def __init__(self, workers=None):
        self.workers = workers
        self.futures = []

    def add_task(self, client, command, timeout=None, stdio=None):
        ret = FutureSshReturn(client, command, timeout, stdio=stdio)
        self.futures.append(ret)
        return ret

F
v1.6.0  
frf12 已提交
110 111 112
    def size(self):
        return len(self.futures)

F
v1.5.0  
frf12 已提交
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
    @staticmethod
    def execute(future):
        client = SshClient(future.client.config, future.stdio)
        future.set_return(client.execute_command(future.command, timeout=future.timeout))
        return future

    def submit(self):
        rets = []
        pool = ThreadPool(processes=self.workers)
        try:
            results = pool.map(ConcurrentExecutor.execute, tuple(self.futures))
            for r in results:
                rets.append(r)
        finally:
            pool.close()
        self.futures = []
        return rets


class LocalClient(SafeStdio):
O
oceanbase-admin 已提交
133 134 135

    @staticmethod
    def execute_command(command, env=None, timeout=None, stdio=None):
F
v1.5.0  
frf12 已提交
136
        stdio.verbose('local execute: %s ' % command, end='')
O
oceanbase-admin 已提交
137 138 139 140 141 142 143 144 145
        try:
            p = Popen(command, env=env, shell=True, stdout=PIPE, stderr=PIPE)
            output, error = p.communicate(timeout=timeout)
            code = p.returncode
            output = output.decode(errors='replace')
            error = error.decode(errors='replace')
            verbose_msg = 'exited code %s' % code
            if code:
                verbose_msg += ', error output:\n%s' % error
F
v1.5.0  
frf12 已提交
146
            stdio.verbose(verbose_msg)
O
oceanbase-admin 已提交
147 148 149 150 151
        except Exception as e:
            output = ''
            error = str(e)
            code = 255
            verbose_msg = 'exited code 255, error output:\n%s' % error
F
v1.5.0  
frf12 已提交
152 153
            stdio.verbose(verbose_msg)
            stdio.exception('')
O
oceanbase-admin 已提交
154 155 156 157
        return SshReturn(code, output, error)

    @staticmethod
    def put_file(local_path, remote_path, stdio=None):
R
Rongfeng Fu 已提交
158
        if LocalClient.execute_command('mkdir -p %s && cp -f %s %s' % (os.path.dirname(remote_path), local_path, remote_path), stdio=stdio):
O
oceanbase-admin 已提交
159 160 161 162
            return True
        return False

    @staticmethod
R
Rongfeng Fu 已提交
163
    def put_dir(local_dir, remote_dir, stdio=None):
F
v1.5.0  
frf12 已提交
164 165 166 167 168
        if os.path.isdir(local_dir):
            local_dir = os.path.join(local_dir, '*')
        if os.path.exists(os.path.dirname(local_dir)) and not glob(local_dir):
            stdio.verbose("%s is empty" % local_dir)
            return True
F
v1.6.0  
frf12 已提交
169
        if LocalClient.execute_command('mkdir -p %s && cp -frL %s %s' % (remote_dir, local_dir, remote_dir), stdio=stdio):
O
oceanbase-admin 已提交
170 171 172
            return True
        return False

F
v1.6.0  
frf12 已提交
173 174 175 176 177 178 179 180 181 182 183 184 185
    @staticmethod
    def write_file(content, file_path, mode='w', stdio=None):
        stdio.verbose('write {} to {}'.format(content, file_path))
        try:
            with FileUtil.open(file_path, mode, stdio=stdio) as f:
                f.write(content)
                f.flush()
            return True
        except:
            stdio.exception('')
            return False


R
Rongfeng Fu 已提交
186 187 188 189 190 191 192 193
    @staticmethod
    def get_file(local_path, remote_path, stdio=None):
        return LocalClient.put_file(remote_path, local_path, stdio=stdio)

    @staticmethod
    def get_dir(local_path, remote_path, stdio=None):
        return LocalClient.put_dir(remote_path, local_path, stdio=stdio)

O
oceanbase-admin 已提交
194

F
v1.5.0  
frf12 已提交
195 196 197 198 199 200 201 202 203 204 205 206
class RemoteTransporter(enum.Enum):
    CLIENT = 0
    RSYNC = 1

    def __lt__(self, other):
        return self.value < other.value

    def __gt__(self, other):
        return self.value > other.value


class SshClient(SafeStdio):
O
oceanbase-admin 已提交
207 208 209 210 211 212 213 214

    def __init__(self, config, stdio=None):
        self.config = config
        self.stdio = stdio
        self.sftp = None
        self.is_connected = False
        self.ssh_client = SSHClient()
        self.env_str = ''
F
v1.5.0  
frf12 已提交
215 216 217
        self._remote_transporter = None
        self.task_queue = None
        self.result_queue = None
O
oceanbase-admin 已提交
218
        if self._is_local():
F
v1.5.0  
frf12 已提交
219
            self.env = COMMAND_ENV.copy()
O
oceanbase-admin 已提交
220 221 222
        else:
            self.env = {'PATH': '/sbin:/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:'}
            self._update_env()
F
v1.5.0  
frf12 已提交
223 224 225 226 227
        super(SshClient, self).__init__()

    def _init_queue(self):
        self.task_queue = Queue()
        self.result_queue = Queue()
O
oceanbase-admin 已提交
228 229 230 231 232 233 234 235 236 237

    def _update_env(self):
        env = []
        for key in self.env:
            if self.env[key]:
                env.append('export %s=%s$%s;' % (key, self.env[key], key))
        self.env_str = ''.join(env)

    def add_env(self, key, value, rewrite=False, stdio=None):
        if key not in self.env or not self.env[key] or rewrite:
F
v1.5.0  
frf12 已提交
238
            stdio.verbose('%s@%s set env %s to \'%s\'' % (self.config.username, self.config.host, key, value))
O
oceanbase-admin 已提交
239 240
            self.env[key] = value
        else:
F
v1.5.0  
frf12 已提交
241
            stdio.verbose('%s@%s append \'%s\' to %s' % (self.config.username, self.config.host, value, key))
O
oceanbase-admin 已提交
242 243 244
            self.env[key] += value
        self._update_env()

F
v1.5.0  
frf12 已提交
245
    def get_env(self, key, stdio=None):
O
oceanbase-admin 已提交
246 247
        return self.env[key] if key in self.env else None

F
v1.5.0  
frf12 已提交
248 249 250 251 252
    def del_env(self, key,  stdio=None):
        if key in self.env:
            stdio.verbose('%s@%s delete env %s' % (self.config.username, self.config.host, key))
            del self.env[key]
            self._update_env()
F
v1.6.0  
frf12 已提交
253

O
oceanbase-admin 已提交
254 255 256 257
    def __str__(self):
        return '%s@%s:%d' % (self.config.username, self.config.host, self.config.port)

    def _is_local(self):
R
Rongfeng Fu 已提交
258 259 260 261
        return self.is_localhost() and self.config.username ==  getpass.getuser()

    def is_localhost(self, stdio=None):
        return self.config.host in ['127.0.0.1', 'localhost', '127.1', '127.0.1']
O
oceanbase-admin 已提交
262 263 264 265

    def _login(self, stdio=None):
        if self.is_connected:
            return True
R
Rongfeng Fu 已提交
266
        err = None
O
oceanbase-admin 已提交
267 268 269
        try:
            self.ssh_client.set_missing_host_key_policy(AutoAddPolicy())
            self.ssh_client.connect(
F
v1.5.0  
frf12 已提交
270 271 272 273 274
                self.config.host,
                port=self.config.port,
                username=self.config.username,
                password=self.config.password,
                key_filename=self.config.key_filename,
O
oceanbase-admin 已提交
275 276
                timeout=self.config.timeout
            )
R
Rongfeng Fu 已提交
277
            self.is_connected = True
O
oceanbase-admin 已提交
278
        except AuthenticationException:
F
v1.5.0  
frf12 已提交
279
            stdio.exception('')
R
Rongfeng Fu 已提交
280
            err = EC_SSH_CONNECT.format(user=self.config.username, ip=self.config.host, message='username or password error')
O
oceanbase-admin 已提交
281
        except NoValidConnectionsError:
F
v1.5.0  
frf12 已提交
282
            stdio.exception('')
R
Rongfeng Fu 已提交
283 284
            err = EC_SSH_CONNECT.format(user=self.config.username, ip=self.config.host, message='time out')
        except BaseException as e:
F
v1.5.0  
frf12 已提交
285
            stdio.exception('')
R
Rongfeng Fu 已提交
286 287 288 289
            err = EC_SSH_CONNECT.format(user=self.config.username, ip=self.config.host, message=e)
        if err:
            stdio.critical(err)
            return err
R
Rongfeng Fu 已提交
290
        return self.is_connected
O
oceanbase-admin 已提交
291 292 293 294

    def _open_sftp(self, stdio=None):
        if self.sftp:
            return True
F
v1.5.0  
frf12 已提交
295
        if self._login(stdio=stdio):
O
oceanbase-admin 已提交
296 297 298 299 300 301 302 303
            SFTPClient.from_transport(self.ssh_client.get_transport())
            self.sftp = self.ssh_client.open_sftp()
            return True
        return False

    def connect(self, stdio=None):
        if self._is_local():
            return True
F
v1.5.0  
frf12 已提交
304
        return self._login(stdio=stdio)
O
oceanbase-admin 已提交
305 306

    def reconnect(self, stdio=None):
F
v1.5.0  
frf12 已提交
307 308
        self.close(stdio=stdio)
        return self.connect(stdio=stdio)
O
oceanbase-admin 已提交
309 310 311 312 313 314 315 316 317 318 319

    def close(self, stdio=None):
        if self._is_local():
            return True
        if self.is_connected:
            self.ssh_client.close()
        if self.sftp:
            self.sftp = None

    def __del__(self):
        self.close()
R
Rongfeng Fu 已提交
320

F
v1.5.0  
frf12 已提交
321
    def _execute_command(self, command, timeout=None, retry=3, stdio=None):
R
Rongfeng Fu 已提交
322 323 324
        if not self._login(stdio):
            return SshReturn(255, '', 'connect failed')
        try:
F
v1.5.0  
frf12 已提交
325
            stdin, stdout, stderr = self.ssh_client.exec_command(command, timeout=timeout)
R
Rongfeng Fu 已提交
326 327 328 329 330 331 332 333 334 335 336
            output = stdout.read().decode(errors='replace')
            error = stderr.read().decode(errors='replace')
            if output:
                idx = output.rindex('\n')
                code = int(output[idx:])
                stdout = output[:idx]
                verbose_msg = 'exited code %s' % code
            else:
                code, stdout = 1, ''
            if code:
                verbose_msg = 'exited code %s, error output:\n%s' % (code, error)
F
v1.5.0  
frf12 已提交
337
            stdio.verbose(verbose_msg)
R
Rongfeng Fu 已提交
338 339 340 341 342
        except SSHException as e:
            if retry:
                self.close()
                return self._execute_command(command, retry-1, stdio)
            else:
F
v1.5.0  
frf12 已提交
343 344
                stdio.exception('')
                stdio.critical('%s@%s connect failed: %s' % (self.config.username, self.config.host, e))
R
Rongfeng Fu 已提交
345 346
                raise e
        except Exception as e:
F
v1.5.0  
frf12 已提交
347
            stdio.exception('')
R
Rongfeng Fu 已提交
348 349 350 351
            code = 255
            stdout = ''
            error = str(e)
        return SshReturn(code, stdout, error)
F
v1.5.0  
frf12 已提交
352 353 354 355 356 357

    def execute_command(self, command, timeout=None, stdio=None):
        if timeout is None:
            timeout = self.config.timeout
        elif timeout <= 0:
            timeout = None
O
oceanbase-admin 已提交
358
        if self._is_local():
F
v1.5.0  
frf12 已提交
359
            return LocalClient.execute_command(command, self.env, timeout, stdio=stdio)
O
oceanbase-admin 已提交
360 361

        verbose_msg = '%s execute: %s ' % (self.config, command)
F
v1.5.0  
frf12 已提交
362
        stdio.verbose(verbose_msg, end='')
F
v1.6.0  
frf12 已提交
363
        command = '%s %s;echo -e "\n$?\c"' % (self.env_str, command.strip(';').lstrip('\n'))
F
v1.5.0  
frf12 已提交
364 365 366 367
        return self._execute_command(command, retry=3, timeout=timeout, stdio=stdio)

    @property
    def disable_rsync(self):
F
v1.6.0  
frf12 已提交
368
        return COMMAND_ENV.get(ENV_DISABLE_RSYNC) == "1"
F
v1.5.0  
frf12 已提交
369 370 371 372 373 374 375 376

    @property
    def remote_transporter(self):
        if self._remote_transporter is not None:
            return self._remote_transporter
        _transporter = RemoteTransporter.CLIENT
        if not self._is_local() and self._remote_transporter is None:
            if not self.config.password and not self.disable_rsync:
R
Rongfeng Fu 已提交
377
                ret = LocalClient.execute_command('rsync -h', stdio=self.stdio) and self.execute_command('rsync -h', stdio=self.stdio)
F
v1.5.0  
frf12 已提交
378 379 380 381 382
                if ret:
                    _transporter = RemoteTransporter.RSYNC
        self._remote_transporter = _transporter
        self.stdio.verbose("current remote_transporter {}".format(self._remote_transporter))
        return self._remote_transporter
R
Rongfeng Fu 已提交
383

O
oceanbase-admin 已提交
384 385
    def put_file(self, local_path, remote_path, stdio=None):
        if not os.path.isfile(local_path):
F
v1.5.0  
frf12 已提交
386
            stdio.error('path: %s is not file' % local_path)
O
oceanbase-admin 已提交
387
            return False
R
Rongfeng Fu 已提交
388 389
        if self._is_local():
            return LocalClient.put_file(local_path, remote_path, stdio=stdio)
F
v1.5.0  
frf12 已提交
390
        if not self._open_sftp(stdio=stdio):
O
oceanbase-admin 已提交
391
            return False
R
Rongfeng Fu 已提交
392
        return self._put_file(local_path, remote_path, stdio=stdio)
R
Rongfeng Fu 已提交
393

F
v1.6.0  
frf12 已提交
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
    def write_file(self, content, file_path, mode='w', stdio=None):
        if self._is_local():
            return LocalClient.write_file(content, file_path, mode, stdio)
        return self._write_file(content, file_path, mode, stdio)

    def _write_file(self, content, file_path, mode='w', stdio=None):
        stdio.verbose('write {} to {}: {}'.format(content, self, file_path))
        try:
            with tempfile.NamedTemporaryFile(mode=mode) as f:
                f.write(content)
                f.flush()
                return self.put_file(f.name, file_path, stdio=stdio)
        except:
            stdio.exception('')
            return False

F
v1.5.0  
frf12 已提交
410 411 412 413 414 415 416 417
    @property
    def _put_file(self):
        if self.remote_transporter == RemoteTransporter.RSYNC:
            return self._rsync_put_file
        else:
            return self._client_put_file

    def _client_put_file(self, local_path, remote_path, stdio=None):
R
Rongfeng Fu 已提交
418
        if self.execute_command('mkdir -p %s && rm -fr %s' % (os.path.dirname(remote_path), remote_path), stdio=stdio):
F
v1.5.0  
frf12 已提交
419
            stdio.verbose('send %s to %s' % (local_path, remote_path))
R
Rongfeng Fu 已提交
420
            if self.sftp.put(local_path, remote_path):
F
v1.5.0  
frf12 已提交
421
                return self.execute_command('chmod %s %s' % (oct(os.stat(local_path).st_mode)[-3:], remote_path))
O
oceanbase-admin 已提交
422 423
        return False

F
v1.5.0  
frf12 已提交
424 425 426
    def _rsync(self, source, target, stdio=None):
        identity_option = ""
        if self.config.key_filename:
R
Rongfeng Fu 已提交
427 428 429 430
            identity_option += '-i {key_filename} '.format(key_filename=self.config.key_filename)
        if self.config.port:
            identity_option += '-p {}'.format(self.config.port)
        cmd = 'rsync -a -W -e "ssh {identity_option}" {source} {target}'.format(
F
v1.5.0  
frf12 已提交
431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459
            identity_option=identity_option,
            source=source,
            target=target
        )
        ret = LocalClient.execute_command(cmd, stdio=stdio)
        return bool(ret)

    def _rsync_put_dir(self, local_path, remote_path, stdio=None):
        stdio.verbose('send %s to %s by rsync' % (local_path, remote_path))
        source = os.path.join(local_path, '*')
        if os.path.exists(os.path.dirname(source)) and not glob(source):
            stdio.verbose("%s is empty" % source)
            return True
        target = "{user}@{host}:{remote_path}".format(user=self.config.username, host=self.config.host, remote_path=remote_path)
        if self._rsync(source, target, stdio=stdio):
            return True
        else:
            return False

    def _rsync_put_file(self, local_path, remote_path, stdio=None):
        if not self.execute_command('mkdir -p %s' % os.path.dirname(remote_path), stdio=stdio):
            return False
        stdio.verbose('send %s to %s by rsync' % (local_path, remote_path))
        target = "{user}@{host}:{remote_path}".format(user=self.config.username, host=self.config.host, remote_path=remote_path)
        if self._rsync(local_path, target, stdio=stdio):
            return True
        else:
            return False

O
oceanbase-admin 已提交
460 461
    def put_dir(self, local_dir, remote_dir, stdio=None):
        if self._is_local():
R
Rongfeng Fu 已提交
462
            return LocalClient.put_dir(local_dir, remote_dir, stdio=stdio)
F
v1.5.0  
frf12 已提交
463
        if not self._open_sftp(stdio=stdio):
O
oceanbase-admin 已提交
464
            return False
R
Rongfeng Fu 已提交
465
        if not self.execute_command('mkdir -p %s' % remote_dir, stdio=stdio):
O
oceanbase-admin 已提交
466
            return False
F
v1.5.0  
frf12 已提交
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501
        stdio.start_loading('Send %s to %s' % (local_dir, remote_dir))
        ret = self._put_dir(local_dir, remote_dir, stdio=stdio)
        stdio.stop_loading('succeed' if ret else 'fail')
        return ret

    @property
    def _put_dir(self):
        if self.remote_transporter == RemoteTransporter.RSYNC:
            return self._rsync_put_dir
        else:
            return self._client_put_dir

    def _client_put_dir(self, local_dir, remote_dir, stdio=None):
        has_failed = False
        ret = LocalClient.execute_command('find %s -type f' % local_dir)
        if not ret:
            has_failed = True
        all_files = ret.stdout.strip().split('\n') if ret.stdout else []
        ret = LocalClient.execute_command('find %s -type d' % local_dir)
        if not ret:
            has_failed = True
        all_dirs = ret.stdout.strip().split('\n') if ret.stdout else []
        self._filter_dir_in_file_path(all_files, all_dirs)
        for local_path in all_files:
            remote_path = os.path.join(remote_dir, os.path.relpath(local_path, local_dir))
            if not self._client_put_file(local_path, remote_path, stdio=stdio):
                stdio.error('Fail to get %s' % remote_path)
                has_failed = True
        for local_path in all_dirs:
            remote_path = os.path.join(remote_dir, os.path.relpath(local_path, local_dir))
            stat = oct(os.stat(local_path).st_mode)[-3:]
            cmd = '[ -d "{remote_path}" ] || (mkdir -p {remote_path}; chmod {stat} {remote_path})'.format(remote_path=remote_path, stat=stat)
            if not self.execute_command(cmd):
                has_failed = True
        return not has_failed
O
oceanbase-admin 已提交
502

R
Rongfeng Fu 已提交
503 504 505 506 507 508 509
    def get_file(self, local_path, remote_path, stdio=None):
        dirname, _ = os.path.split(local_path)
        if not dirname:
            dirname = os.getcwd()
            local_path = os.path.join(dirname, local_path)
        if os.path.exists(dirname):
            if not os.path.isdir(dirname):
F
v1.5.0  
frf12 已提交
510
                stdio.error('%s is not directory' % dirname)
R
Rongfeng Fu 已提交
511 512 513 514
                return False
        elif not DirectoryUtil.mkdir(dirname, stdio=stdio):
            return False
        if os.path.exists(local_path) and not os.path.isfile(local_path):
F
v1.5.0  
frf12 已提交
515
            stdio.error('path: %s is not file' % local_path)
R
Rongfeng Fu 已提交
516 517 518
            return False
        if self._is_local():
            return LocalClient.get_file(local_path, remote_path, stdio=stdio)
F
v1.5.0  
frf12 已提交
519
        if not self._open_sftp(stdio=stdio):
R
Rongfeng Fu 已提交
520 521 522
            return False
        return self._get_file(local_path, remote_path, stdio=stdio)

F
v1.5.0  
frf12 已提交
523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550
    @property
    def _get_file(self):
        if self.remote_transporter == RemoteTransporter.RSYNC:
            return self._rsync_get_file
        else:
            return self._client_get_file

    def _rsync_get_dir(self, local_path, remote_path, stdio=None):
        source = "{user}@{host}:{remote_path}".format(user=self.config.username, host=self.config.host, remote_path=remote_path)
        if "*" not in remote_path:
            source = os.path.join(source, "*")
        target = local_path
        stdio.verbose('get %s from %s by rsync' % (local_path, remote_path))
        if LocalClient.execute_command('mkdir -p {}'.format(local_path), stdio=stdio) and self._rsync(source, target, stdio=stdio):
            return True
        else:
            return False

    def _rsync_get_file(self, local_path, remote_path, stdio=None):
        source = "{user}@{host}:{remote_path}".format(user=self.config.username, host=self.config.host, remote_path=remote_path)
        target = local_path
        stdio.verbose('get %s from %s by rsync' % (local_path, remote_path))
        if self._rsync(source, target, stdio=stdio):
            return True
        else:
            return False

    def _client_get_file(self, local_path, remote_path, stdio=None):
R
Rongfeng Fu 已提交
551 552 553 554 555 556
        try:
            self.sftp.get(remote_path, local_path)
            stat = self.sftp.stat(remote_path)
            os.chmod(local_path, stat.st_mode)
            return True
        except Exception as e:
F
v1.5.0  
frf12 已提交
557
            stdio.exception('get %s from %s@%s:%s failed: %s' % (local_path, self.config.username, self.config.host, remote_path, e))
R
Rongfeng Fu 已提交
558 559 560 561 562 563 564
        return False

    def get_dir(self, local_dir, remote_dir, stdio=None):
        dirname, _ = os.path.split(local_dir)
        if not dirname:
            dirname = os.getcwd()
            local_dir = os.path.join(dirname, local_dir)
F
v1.5.0  
frf12 已提交
565 566 567
        if "*" in dirname:
            stdio.error('Invalid directory {}'.format(dirname))
            return False
R
Rongfeng Fu 已提交
568 569
        if os.path.exists(dirname):
            if not os.path.isdir(dirname):
F
v1.5.0  
frf12 已提交
570
                stdio.error('%s is not directory' % dirname)
R
Rongfeng Fu 已提交
571 572 573 574
                return False
        elif not DirectoryUtil.mkdir(dirname, stdio=stdio):
            return False
        if os.path.exists(local_dir) and not os.path.isdir(local_dir):
F
v1.5.0  
frf12 已提交
575
            stdio.error('%s is not directory' % local_dir)
R
Rongfeng Fu 已提交
576 577 578
            return False
        if self._is_local():
            return LocalClient.get_dir(local_dir, remote_dir, stdio=stdio)
F
v1.5.0  
frf12 已提交
579
        if not self._open_sftp(stdio=stdio):
R
Rongfeng Fu 已提交
580
            return False
F
v1.5.0  
frf12 已提交
581 582 583 584 585 586 587 588 589 590 591
        stdio.start_loading('Get %s from %s' % (local_dir, remote_dir))
        ret = self._get_dir(local_dir, remote_dir, stdio=stdio)
        stdio.stop_loading('succeed' if ret else 'fail')
        return ret

    @property
    def _get_dir(self):
        if self.remote_transporter == RemoteTransporter.RSYNC:
            return self._rsync_get_dir
        else:
            return self._client_get_dir
R
Rongfeng Fu 已提交
592

F
v1.5.0  
frf12 已提交
593 594 595
    def _client_get_dir(self, local_dir, remote_dir, stdio=None):
        task_queue = []
        has_failed = False
R
Rongfeng Fu 已提交
596 597
        if DirectoryUtil.mkdir(local_dir, stdio=stdio):
            try:
F
v1.5.0  
frf12 已提交
598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628
                ret = self.execute_command('find %s -type f' % remote_dir)
                if not ret:
                    stdio.verbose(ret.stderr)
                    has_failed = True
                all_files = ret.stdout.strip().split('\n') if ret.stdout else []
                ret = self.execute_command('find %s -type d' % remote_dir)
                if not ret:
                    has_failed = True
                all_dirs = ret.stdout.strip().split('\n') if ret.stdout else []
                self._filter_dir_in_file_path(all_files, all_dirs)
                for f in all_files:
                    task_queue.append(f)
                if "*" in remote_dir:
                    remote_base_dir = os.path.dirname(remote_dir)
                else:
                    remote_base_dir = remote_dir
                for remote_path in task_queue:
                    local_path = os.path.join(local_dir, os.path.relpath(remote_path, remote_dir))
                    if not self._client_get_file(local_path, remote_path, stdio=stdio):
                        stdio.error('Fail to get %s' % remote_path)
                        has_failed = True
                for remote_path in all_dirs:
                    try:
                        local_path = os.path.join(local_dir, os.path.relpath(remote_path, remote_base_dir))
                        if not os.path.exists(local_path):
                            stat = self.sftp.stat(remote_path)
                            os.makedirs(local_path, mode=stat.st_mode)
                    except Exception as e:
                        stdio.exception('Fail to make directory %s in local: %s' % (remote_path, e))
                        has_failed = True
                return not has_failed
R
Rongfeng Fu 已提交
629
            except Exception as e:
F
v1.5.0  
frf12 已提交
630 631 632 633 634 635 636 637 638 639 640 641 642 643
                stdio.exception('Fail to get %s: %s' % (remote_dir, e))

    @staticmethod
    def _filter_dir_in_file_path(files, directories):
        skip_directories = []
        for path in files:
            dir_name = os.path.dirname(path)
            while dir_name not in ["/", ".", ""]:
                if dir_name in skip_directories:
                    break
                if dir_name in directories:
                    directories.remove(dir_name)
                    skip_directories.append(dir_name)
                dir_name = os.path.dirname(dir_name)
R
Rongfeng Fu 已提交
644

F
v1.5.0  
frf12 已提交
645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684
    def file_downloader(self, local_dir, remote_dir, stdio=None):
        try:
            client = SshClient(config=self.config, stdio=None)
            client._open_sftp(stdio=stdio)
            client._remote_transporter = self.remote_transporter
            while True:
                remote_path = self.task_queue.get(block=False)
                local_path = os.path.join(local_dir, os.path.relpath(remote_path, remote_dir))
                if client.get_file(local_path, remote_path, stdio=stdio):
                    self.result_queue.put(remote_path)
                else:
                    stdio.error('Fail to get %s' % remote_path)
        except Empty:
            return
        except:
            stdio.exception("")
            stdio.exception('Failed to get %s' % remote_dir)

    def file_uploader(self, local_dir, remote_dir, stdio=None):
        try:
            client = SshClient(config=self.config, stdio=None)
            client._remote_transporter = self.remote_transporter
            while True:
                local_path, is_dir = self.task_queue.get(block=False)
                remote_path = os.path.join(remote_dir, os.path.relpath(local_path, local_dir))
                if is_dir:
                    stat = oct(os.stat(local_path).st_mode)[-3:]
                    cmd = '[ -d "{remote_path}" ] || (mkdir -p {remote_path}; chmod {stat} {remote_path})'.format(remote_path=remote_path, stat=stat)
                    if client.execute_command(cmd):
                        self.result_queue.put(remote_path)
                else:
                    if client.put_file(local_path, remote_path, stdio=stdio):
                        self.result_queue.put(remote_path)
                    else:
                        stdio.error('Fail to get %s' % remote_path)
        except Empty:
            return
        except:
            stdio.exception("")
            stdio.verbose('Failed to get %s' % remote_dir)