提交 2b431a56 编写于 作者: C Chris Hajas

Remove pysync

This was only used by gprepairmirrorseg, which was removed in a previous
commit.
Authored-by: NChris Hajas <chajas@pivotal.io>
上级 77a93f43
......@@ -54,23 +54,6 @@ def getPostmasterPID(hostname, datadir):
except:
return -1
#-----------------------------------------------
class PySync(Command):
def __init__(self,name,srcDir,dstHost,dstDir,ctxt=LOCAL,remoteHost=None, options=None):
psync_executable=GPHOME + "/bin/lib/pysync.py"
# MPP-13617
if ':' in dstHost and not ']' in dstHost:
dstHost = '[' + dstHost + ']'
self.cmdStr="%s %s %s %s:%s" % (psync_executable,
options if options else "",
srcDir,
dstHost,
dstDir)
Command.__init__(self,name,self.cmdStr,ctxt,remoteHost)
#-----------------------------------------------
class CmdArgs(list):
......
import os, tempfile, hashlib
from gp_unittest import *
from lib.pysync import LocalPysync
CONTENT = 'To be or not to be.'
class LocalPySyncTestCase(GpTestCase):
def setUp(self):
self.subject = LocalPysync(['pysync', '/tmp', 'localhost:/tmp'])
def tearDown(self):
super(LocalPySyncTestCase, self).tearDown()
def test_doCommand_getDigest(self):
filename = ''
try:
filename, filesize = createTempFileWithContent(CONTENT)
doCommand_digest = self.subject.doCommand(['getDigest', filename, 0, filesize])
m = hashlib.sha256()
m.update(CONTENT)
self.assertTrue(m.digest() == doCommand_digest)
finally:
os.unlink(filename)
def createTempFileWithContent(content):
fd = tempfile.NamedTemporaryFile(delete=False)
fd.write(content)
filename = fd.name
fd.close()
filesize = os.stat(filename).st_size
return filename, filesize
if __name__ == '__main__':
run_tests()
#!/usr/bin/env python
import os, sys
if sys.hexversion < 0x2040400:
sys.stderr.write("pysync.py needs python version at least 2.4.4.\n")
sys.stderr.write("You are using %s\n" % sys.version)
sys.stderr.write("Here is a guess at where the python executable is--\n")
os.system("/bin/sh -c 'type python>&2'");
sys.exit(1)
import cPickle
import inspect
import hashlib
import signal
import socket
import subprocess
import threading
import zlib
import pysync_remote
from pysync_remote import Options
from pysync_remote import ProgressUpdate, ProgressCounters
from pysync_remote import statToTuple
from gppylib.commands.gp import PySync
# MPP-13617
import re
RE1 = re.compile('\\[([^]]+)\\]:(.+)')
bootstrapSource = """
import os,sys
exec(sys.stdin.read(int(sys.stdin.readline())))
"""
class PysyncProxy:
'''
The PysyncProxy class is used to initiate a third-party synchronization operation.
An instance of PysyncProxy is used to start a LocalPysync instance on a remote host
to be used as the source of the synchronization operation. The "remote" LocalPysync
instance then runs RemotePysync on the destination as usual. Progress information
is fed from the destination host, through the remote LocalPysync instance an to this
instance for reporting.
Lines written by LocalPysync to stdout are recorded in the list self.stdout; lines
written by LocalPysync to stderr are recorded in self.stderr. Progress information
is handled only by the functions set for the recordProgressCallback and
recordRawProgressCallback properties.
'''
class _Quit(SystemExit):
def __init__(self, *info):
SystemExit.__init__(self, *info)
def __init__(self, sourceHost, sourceDir, destHost, destDir, syncOptions, verbose=False,
progressBytes=None, progressTime=None,
recordProgressCallback=None, recordRawProgressCallback=None, progressTimestamp=False):
'''
Initialize a new PysyncProxy instance.
sourceHost - the host from which data is to be copied.
sourceDir - the directory on sourceHost from which data is to be copied.
destHost - the host to which data is to be copied.
destDir - the directory on sourceHost to which data is to be copied.
syncOptions - a list of command-line options as described by LocalPysync.usage();
other options may be added based on the following arguments.
verbose - indicates whether or not debugging output is generated.
progressBytes - the number of bytes moved for a volume-based progress message;
maps to the LocalPysync --progress-bytes option.
progressTime - the amount of time for a time-based progress message; maps to
the LocalPysync --progress-time option.
recordProgressCallback - function to call to present a printable progress
message generated by RemotePysync; the function must accept a single
argument of type str. If not set, progress messages are ignored.
recordRawProgressCallback - function to call to handle raw progress information
generated by RemotePysync; the function must accept a single argument
of type pysync_remote.ProgressUpdate. If not set, raw progress
information is ignored.
progressTimestamp - indicates whether or not RemotePysync should include the
observation timestamp on messages it creates.
'''
self.ppid = 0
self.sourceHost = sourceHost
self.sourceDir = sourceDir
self.destHost = destHost
self.destDir = destDir
self.recordProgressCallback = recordProgressCallback
self.recordRawProgressCallback = recordRawProgressCallback
self.syncOptions = syncOptions
if verbose:
self.syncOptions += ["-v"]
if progressBytes:
self.syncOptions += ["--progress-bytes", progressBytes]
if progressTime:
self.syncOptions += ["--progress-time", progressTime]
self.syncOptions += ["--proxy"]
if not progressTimestamp:
self.syncOptions += ["--omit-progress-timestamp"]
self.stderr = []
self.stdout = []
self.cmd = None
self.returncode = None
def run(self):
'''
Initiate and wait for completion of a directory synchronization operation.
Stderr output is appended to the self.stderr list. Stdout output is appended
to the self.stdout list. Progress messages are written to stdout unless a
callback is set.
'''
pysyncCmd = PySync('pysync', self.sourceDir, self.destHost, self.destDir,
options=' '.join(self.syncOptions))
self.cmd = '. %s/greenplum_path.sh && %s' % (os.environ.get('GPHOME'), pysyncCmd.cmdStr)
# save of ppid to allow the process to be stopped.
self.ppid = os.getppid()
pidFilename = '/tmp/pysync.py.%s.%s.ppid' % (self.destHost, self.destDir.replace('/', '_'))
pidFile = open(pidFilename, 'w')
pidFile.write('%d' % (self.ppid))
pidFile.close()
code = 0
self.p = None
stderrThread = None
try:
try:
args = []
args.append("ssh")
args.extend(["-o", "BatchMode=yes"])
args.extend(["-o", "StrictHostKeyChecking=no"])
args.append(self.sourceHost)
args.append(self.cmd)
self.p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stderrThread = ReaderThread("pysync_stderr", self.p.stderr, self.stderr)
stderrThread.start()
code = self._work()
except OSError, e:
self.stderr.append(str(e))
raise
finally:
os.remove(pidFilename)
if self.p:
timer = threading.Timer(2.0, (lambda: os.kill(self.p.pid, signal.SIGHUP)))
timer.start()
self.returncode = self.p.wait()
timer.cancel()
if stderrThread:
stderrThread.join(2.0)
return code
def _work(self):
'''
Wait for and process commands from the LocalPysync instance connected
to the Popened SSH process.
Command processing continues until EOF is reached on Popen.stdout (the
command input stream from LocalPysync) or a "quit" command is proocessed.
Because standard command output may be interleaved with serialized command
objects, command objects are prefixed with "pKl:<length>\n". Non-command
object lines are appended to the self.stdout buffer.
'''
while True:
try:
# check if parent still alive
os.kill(self.ppid, 0)
except:
# parent gone, exit
return 2
# Get the length of the next serialized command
a = self.p.stdout.readline()
if len(a) == 0:
# End the command loop if EOF
self.stderr.append("[FATAL]:-Unexpected EOF on LocalPysync output stream")
return 3
# If not a pickled command object, just record it
if not a.startswith("pKl:"):
self.stdout.append(a.rstrip())
continue
size = int(a[4:])
# Read the serialized command and process it.
data = self.p.stdout.read(size)
assert len(data) == size
try:
self._doCommand(cPickle.loads(data))
except PysyncProxy._Quit, e:
return e.code
def _doCommand(self, what):
'''
Perform the command requested by the remote side and prepare any
result.
'''
if what[0] == 'recordProgress':
if self.recordProgressCallback:
self.recordProgressCallback(what[1].rstrip())
return None
elif what[0] == 'recordRawProgress':
if self.recordRawProgressCallback:
self.recordRawProgressCallback(what[1])
return None
elif what[0] == 'quit':
raise PysyncProxy._Quit(what[1])
else:
assert 0
class ReaderThread(threading.Thread):
'''
Appends all output read from a file handle to the lines buffer.
'''
def __init__(self, name, file, lines):
self.file = file
self.lines = lines
threading.Thread.__init__(self, name=name)
self.setDaemon(True)
def run(self):
for line in self.file:
self.lines.append(line.rstrip())
class LocalPysync:
'''
The LocalPysync class initiates a directory synchronization task by starting
the pysync_remote module on a target system then processes commands from that
system to accomplish directry synchronization. Once the pysync_remote module
is started on the remote system, this LocalPysync instance acts as the remote
system's agent.
When invoked through PysyncProxy, stdout is used to return pickled objects
representing status information from this LocalPysync instance.
'''
NUMBER_SCALES = {'M': 1024 * 1024, 'G': 1024 * 1024 * 1024, 'T': 1024 * 1024 * 1024 * 1024}
class _Quit(SystemExit):
def __init__(self, *info):
SystemExit.__init__(self, *info)
def __init__(self, argv, recordProgressCallback=None, recordRawProgressCallback=None, progressTimestamp=False):
'''
Initialize a new LocalPysync instance.
argv - a command-line style list of arguments as described by self.usage()
recordProgressCallback - function to call to present a printable progress
message generated by RemotePysync; the function must accept a single
argument of type str.
recordRawProgressCallback - function to call to handle raw progress information
generated by RemotePysync; the function must accept a single argument
of type pysync_remote.ProgressUpdate.
progressTimestamp - indicates whether or not RemotePysync should include the
observation timestamp on messages it creates.
'''
self.options = Options()
self.usingProxy = False
self.sshargs = []
self.cache = [None]
self.exclude = set()
self.include = set()
self.recordProgressCallback = recordProgressCallback
if self.recordProgressCallback:
self.options.sendProgress = True
self.recordRawProgressCallback = recordRawProgressCallback
if self.recordRawProgressCallback:
self.options.sendRawProgress = True
self.options.progressTimestamp = progressTimestamp
a = argv[1:]
while a:
if a[0] == '-v':
self.options.verbose = True
elif a[0] == '-?':
self.usage(argv)
elif a[0] == '-compress':
self.options.compress = True
elif a[0] == '-n':
self.options.minusn = True
elif a[0] == '--insecure':
self.options.insecure = True
elif a[0] == '--ssharg':
a.pop(0)
self.sshargs.append(a[0])
elif a[0] == '--delete':
self.options.delete = True
elif a[0] == '-x':
a.pop(0)
name = a[0]
if name[0] == '/':
raise Exception('Please do not use absolute path with -x.')
if name[0:2] != './':
name = os.path.join('.', name)
self.exclude.add(name)
elif a[0] == '-i':
a.pop(0)
name = a[0]
if name[0] == '/':
raise Exception('Please do not use absolute path with -i.')
if name[0:2] != './':
name = os.path.join('.', name)
self.include.add(name)
elif a[0] == '--progress-bytes':
a.pop(0)
try:
scale = a[0][-1]
if scale == '%':
# Ensure number part is convertable; otherwise pass the whole value
factor = float(a[0][:-1])
self.options.progressBytes = a[0]
elif scale.upper() in LocalPysync.NUMBER_SCALES:
# Real numeric value followed by a supported scale identifier
progressBytes = int(float(a[0][:-1]) * LocalPysync.NUMBER_SCALES[scale.upper()])
self.options.progressBytes = progressBytes
else:
# If the value isn't a percent or scaled, it must be an integer number of bytes
progressBytes = int(a[0])
self.options.progressBytes = self.options.progressBytes
except ValueError:
raise ValueError("--progress-bytes value is not supported", a[0])
if type(
self.options.progressBytes) != str and progressBytes < pysync_remote.SyncProgress.MINIMUM_VOLUME_INTERVAL:
raise ValueError(
"--progress-bytes value must be at least %d" % pysync_remote.SyncProgress.MINIMUM_VOLUME_INTERVAL,
a[0])
elif a[0] == '--progress-time':
a.pop(0)
try:
progressSeconds = int(60 * float(a[0]))
self.options.progressTime = progressSeconds
except ValueError:
raise ValueError("--progress-time value is not supported", a[0])
if progressSeconds < pysync_remote.SyncProgress.MINIMUM_TIME_INTERVAL:
raise ValueError("--progress-time value must be at least %f" % (
pysync_remote.SyncProgress.MINIMUM_TIME_INTERVAL / 60))
elif a[0] == '--proxy':
self.usingProxy = True
self.options.sendProgress = True
self.recordProgressCallback = self._recordProgress
self.options.sendRawProgress = True
self.recordRawProgressCallback = self._recordRawProgress
elif a[0] == '--omit-progress-timestamp':
self.options.progressTimestamp = False
else:
break
a.pop(0)
if len(a) != 2:
self.usage(argv)
self.sourceDir = os.path.abspath(a[0])
if not os.path.exists(self.sourceDir):
raise ValueError("Source path \"%s\" not found" % self.sourceDir)
if not os.path.isdir(self.sourceDir):
raise ValueError("Source path \"%s\" is not a directory" % self.sourceDir)
if not os.access(self.sourceDir, os.F_OK | os.R_OK | os.X_OK):
raise ValueError("Source path) \"%s\" is not accessible" % self.sourceDir)
dest = a[1]
# MPP-13617
m = re.match(RE1, dest)
if m:
self.userAndHost, self.destDir = m.groups()
else:
i = dest.find(':')
if i == -1:
self.usage(argv)
self.userAndHost, self.destDir = dest[:i], dest[i + 1:]
self.connectAddress = None
self.sendData = None
hostname = self.userAndHost[self.userAndHost.find('@') + 1:]
try:
addrinfo = socket.getaddrinfo(hostname, None)
except:
print 'dest>>%s<<' % dest, ' hostname>>%s<<' % hostname
raise
if addrinfo:
self.options.addrinfo = addrinfo[0]
else:
raise Exception("Unable to determine address for %s" % self.userAndHost)
def usage(self, argv):
sys.stderr.write("""usage:
python """ + argv[0] + """ [-v] [-?] [-n]
[--ssharg arg] [-x exclude_file] [-i include_file] [--insecure] [--delete]
[--progress-time seconds] [--progress-bytes { n[.n]{% | G | T} }
[--proxy] [--omit-progress-timestamp]
sourcedir [user@]host:destdir
-v: verbose output
-?: Print this message.
--ssharg arg: pass arg to ssh. Use many times to pass many args.
-n: Do not do any work. Just print how many bytes will need to be
transferred over the network per file and a total.
-x name: Do not transfer named file or directory. Don't be too
creative with the name. For example, "directory/./file" will not
work--use "directory/file". Name is relative to sourcedir.
-i name: Only transfer named file or directory. Don't be too
creative with the name. For example, "directory/./file" will not
work--use "directory/file". Name is relative to sourcedir.
--insecure: Do not check SHA256 digest after transfering data.
This makes pysync.py run faster, but a bad guy can forge TCP
packets and put junk of his choice into your files.
--delete: Delete things in dst that do not exist in src.
--progress-time minutes: the number of minutes to elapse before a
time-based progress message is issued. Progress messages may
appear more frequently than specified due to the --progress-bytes
value.
--progress-bytes count: the number of bytes processed before a
volume-based progress message is issued. The count may be a
number followed by 'G' or 'T' or number followed by '%'. If
specified as a percent, the count is calculated as the specified
percent of the total bytes expected to be processed.
--proxy: Internal option indicating a call from PysyncProxy.
--omit-progress-timestamp: Omit the timestamp from progress messages.
""")
sys.exit(1)
def readFile(self, filename, offset, size):
'''
Read a chunk of the specified size at the specified offset from the
file identified. The last chunk read is cached for possible re-reading.
The file is opened only for the duration of the seek and read operations.
'''
key = (filename, offset, size)
if self.cache[0] == key:
return self.cache[1]
absfilename = os.path.join(self.sourceDir, filename)
f = open(absfilename, 'rb')
f.seek(offset)
a = f.read(size)
f.close()
assert len(a) == size
self.cache = (key, a)
return a
def getList(self):
'''
Gets a map of {name:stat} pairs to be processed. The stat value
is generally the tuple returned from pysync_remote.statToTuple.
Hard links (an entry with an inode equal to another in the list)
are represented by a ('L', linked_name) tuple.
'''
list = dict()
inomap = dict()
for root, dirs, files in os.walk(self.sourceDir):
for i in dirs + files:
absname = os.path.join(root, i)
relname = '.' + absname[len(self.sourceDir):]
if relname in self.exclude:
if i in dirs:
dirs.remove(i)
continue
if len(self.include) > 0:
""" Check if the file or dir is in the include list """
if relname in self.include:
pass
else:
""" Make sure we include any files or dirs under a dir in the include list."""
foundPrefix = False
for j in self.include:
if relname.startswith(j + '/') == True:
foundPrefix = True
continue
if foundPrefix == False:
if i in dirs:
dirs.remove(i)
continue
s = os.lstat(absname)
if s.st_ino in inomap:
list[relname] = ('L', inomap[s.st_ino])
continue
inomap[s.st_ino] = relname
list[relname] = statToTuple(s, absname)
return list
def doCommand(self, what):
'''
Perform the command requested by the remote side and prepare any
result.
'''
if what[0] == 'connect':
self.connectAddress = what[1]
elif what[0] == 'getOptions':
return self.options
elif what[0] == 'getDestDir':
return self.destDir
elif what[0] == 'getList':
return self.getList()
elif what[0] == 'getDigest':
m = hashlib.sha256()
m.update(self.readFile(what[1], what[2], what[3]))
return m.digest()
elif what[0] == 'getData':
self.sendData = self.readFile(what[1], what[2], what[3])
if self.options.compress:
self.sendData = zlib.compress(self.sendData, 1)
return len(self.sendData)
elif what[0] == 'recordProgress':
if self.recordProgressCallback:
self.recordProgressCallback(what[1].rstrip())
else:
sys.stdout.write(what[1].rstrip())
sys.stdout.write('\n')
return None
elif what[0] == 'recordRawProgress':
if self.recordRawProgressCallback:
self.recordRawProgressCallback(what[1])
else:
sys.stdout.write("raw: " + str(what[1]))
sys.stdout.write('\n')
return None
elif what[0] == 'quit':
raise LocalPysync._Quit(what[1])
else:
assert 0
def _recordProgress(self, message):
'''
Send progress information to associated PysyncProxy instance.
'''
if message:
self._sendCommand('recordProgress', message)
def _recordRawProgress(self, progressUpdate):
'''
Send raw progress data to associated PysyncProxy instance.
'''
if progressUpdate:
self._sendCommand('recordRawProgress', progressUpdate)
def _sendCommand(self, *args):
'''
Serialize the command & arguments using cPickle and send write to stdout.
This method is used for communication with the initiating PysyncProxy
instance.
'''
a = cPickle.dumps(args)
sys.stdout.write('pKl:%d\n%s' % (len(a), a))
sys.stdout.flush()
def work(self):
'''
Wait for and process commands from the RemotePysync instance connected
to the Popened SSH process.
Command processing continues until EOF is reached on Popen.stdout (the
command input stream from RemotePysync) or a "quit" command is proocessed.
Command response objects are serialized and written to Popen.stdin (the
command output stream to RemotePysync).
'''
while True:
try:
# check if parent still alive
os.kill(os.getppid(), 0)
except:
# parent gone, exit
return 2
# Get the length of the next serialized command
a = self.p.stdout.readline()
if len(a) == 0:
# End the command loop if EOF
print >> sys.stderr, "[FATAL]:-Unexpected EOF on RemotePysync output stream"
return 3
size = int(a)
# Read the serialized command and process it.
data = self.p.stdout.read(size)
assert len(data) == size
try:
answer = cPickle.dumps(self.doCommand(cPickle.loads(data)))
except LocalPysync._Quit, e:
return e.code
# Send the serialized command response
self.p.stdin.write("%d\n%s" % (len(answer), answer))
self.p.stdin.flush()
# If the command was a connect order, open a socket to
# the remote side for data transfer
if self.connectAddress != None:
self.socket = socket.socket(self.options.addrinfo[0])
self.socket.connect(self.connectAddress)
self.connectAddress = None
# If the command was a getData order, send the prepared
# data over the socket.
if self.sendData != None:
self.socket.sendall(self.sendData)
self.sendData = None
def run(self):
'''
Start the pysync_remote module on the remote host and call self.work() to process
commands presented by the remote host.
'''
# save of ppid to allow the process to be stopped.
os.system('echo %d > /tmp/pysync.py.%s.ppid' % (os.getppid(), self.destDir.replace('/', '_')))
PATH = os.environ.get('PATH') or '.'
LIBPATH = os.environ.get('LD_LIBRARY_PATH') or '.'
cmd = ('''. %s/greenplum_path.sh && bash -c "python -u -c '%s'"'''
% (os.environ.get('GPHOME'),
bootstrapSource))
args = []
args.append('ssh')
args.extend(["-o", "BatchMode=yes"])
args.extend(["-o", "StrictHostKeyChecking=no"])
args.extend(self.sshargs)
args.append(self.userAndHost)
args.append(cmd)
code = 0
self.p = None
try:
try:
pysyncSource = inspect.getsource(pysync_remote)
self.p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
self.p.stdin.write("%d\n%s" % (len(pysyncSource), pysyncSource))
code = self.work()
except OSError, e:
sys.stderr.write(str(e))
raise
finally:
os.remove('/tmp/pysync.py.%s.ppid' % (self.destDir.replace('/', '_')))
if self.p:
timer = threading.Timer(2.0, (lambda: os.kill(self.p.pid, signal.SIGHUP)))
timer.start()
rc = self.p.wait()
timer.cancel()
if self.usingProxy:
self._sendCommand('quit', code)
return code
if os.environ.get('GPHOME') is None:
print >> sys.stderr, '[FATAL]:- Please specify environment variable GPHOME'
sys.exit(1)
if __name__ == '__main__':
sys.exit(LocalPysync(sys.argv, progressTimestamp=True).run())
'''
The pysync_remote module is a companion to the pysync module containing the
components required for the remote end of a synchronization pair.
'''
from __future__ import with_statement
import sys, os
if sys.hexversion < 0x2040400:
sys.stderr.write("pysync_remote.py needs python version at least 2.4.4 on the remote computer.\n")
sys.stderr.write("You are using %s\n" % sys.version)
sys.stderr.write("Here is a guess at where the python executable is--\n")
os.system("/bin/sh -c 'type python>&2'");
sys.exit(1)
import copy
import cPickle
from datetime import datetime
import errno
import hashlib
import socket
import stat
import threading
import time
import zlib
# "Simulate" importing this module as 'pysync_remote' to mirror the namespace
# used by the local pysync component so pickled object deserialization works.
if __name__ == '__main__':
sys.modules['pysync_remote'] = sys.modules['__main__']
# The number of file bytes processed at a time
CHUNK_SIZE = 4000000
class Progress:
def __init__(self, byteMax, fileMax):
self.status = '';
self.fname = '';
self.byteMax = byteMax;
self.byteNow = 0;
self.fileMax = fileMax;
self.fileNow = 0;
self.tstamp = 0;
def _printStatus(self):
pct = 0
if self.byteMax > 0: pct = int(self.byteNow * 100 / self.byteMax);
sys.stderr.write('[%s %d%%] %s %s byte:%d/%d file:%d/%d\n' % (
datetime.now().isoformat(' '),
pct, self.status, self.fname, self.byteNow,
self.byteMax, self.fileNow, self.fileMax))
def update(self, status, fname, byteNow, fileNow):
changed = (status != self.status
or fname != self.fname
or fileNow != self.fileNow
or (time.time() - self.tstamp > 5))
self.status = status
self.fname = fname
self.byteNow = byteNow
self.fileNow = fileNow
if changed:
self.tstamp = time.time()
self._printStatus()
class Options:
'''
A container for the options collected by LocalPysync and use by RemotePysync.
'''
def __init__(self):
self.compress = False
self.verbose = False
self.insecure = False
self.delete = False
self.minusn = False
self.progressTime = 0
self.progressBytes = 0
self.progressTimestamp = False
self.sendProgress = True
self.sendRawProgress = False
self.addrinfo = None
def statToTuple(s, name):
'''
Converts the os.stat() structure to a tuple used internally. The tuple
content varies by type but is generally of the form:
(entry_type, protection_bits [, variable_content])
entry_type:
- regular files
l symbolic link
d directory
c character special file
b block special file
p FIFO
s socket link
These entry types are coordinated with code in RemotePysync.createFiles().
'''
if stat.S_ISREG(s.st_mode):
return ['-', s.st_mode, s.st_size, None]
if stat.S_ISLNK(s.st_mode):
return ('l', 0, os.readlink(name))
if stat.S_ISDIR(s.st_mode):
return ('d', s.st_mode)
if stat.S_ISCHR(s.st_mode):
return ('c', s.st_mode, s.st_rdev)
if stat.S_ISBLK(s.st_mode):
return ('b', s.st_mode, s.st_rdev)
if stat.S_ISFIFO(s.st_mode):
return ('p', s.st_mode)
if stat.S_ISSOCK(s.st_mode):
return ('s', s.st_mode)
raise Exception('Pysync can not handle %s.' % name)
def formatSiBinary(number):
'''
Format a number using SI/IEC 60027-2 prefixes as described in
http://physics.nist.gov/cuu/Units/binary.html.
The result is of the form "D.dd P" where D.dd is the input
number in P units with no more than 2 decimal places. D will
be longer than 4 digits only when number exceeds the scale of
the largest SI/IEC binary prefix supported.
'''
number = float(number)
factor = 1024
prefixes = ('', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi', 'Yi')
for prefix in prefixes[:-1]:
if number < factor:
return '%.2f %s' % (number, prefix)
number /= factor
else:
return '%.2f %s' % (number, prefixes[-1])
class ProgressUpdate():
'''
A container used to report raw synchronization progress information collected
by SyncProgress and RemotePysync to LocalPysync.
'''
def __init__(self, isFinal=False, totalExpectedBytes=0, totalExpectedFiles=0, progressCounters=None):
# An indication of whether or not this ProgressUpdate is the final
self.isFinal = isFinal
# The total number of file bytes expected to be processed
self.totalExpectedBytes = totalExpectedBytes
# The total number of files expected to be processed
self.totalExpectedFiles = totalExpectedFiles
# ProgressCounters for this update
self.progressCounters = copy.copy(progressCounters)
def __str__(self):
return ("ProgressUpdate(isFinal=%s, totalExpectedBytes=%d, totalExpectedFiles=%d, progressCounters=%s)"
% (self.isFinal,
self.totalExpectedBytes,
self.totalExpectedFiles,
self.progressCounters))
class ProgressCounters():
'''
A structure used to track the progress reported to SyncProgress.
'''
def __init__(self, updateTime=None, bytesProcessed=0, filesProcessed=0, processingFile=None):
# The time (from time.time()) this instance was last updated
self.updateTime = updateTime
# The cumulative number of bytes processed as of the last update
self.bytesProcessed = bytesProcessed
# The cumulative number of files *completed* as of the last update
self.filesProcessed = filesProcessed
# The file being processed during the last update
self.processingFile = processingFile
def __str__(self):
return ("ProgressCounters(updateTime=%f, bytesProcessed=%d, filesProcessed=%d, processingFile=\"%s\"))"
% (self.updateTime,
self.bytesProcessed,
self.filesProcessed,
self.processingFile))
class SyncProgress(threading.Thread):
'''
The SyncProgress class is used to record synchronization progress by RemotePysync and
issue a periodic progress report. SyncProgress runs as a daemon Thread and must be
started following allocation. The update() method must be called to feed progress
information to this instance; the stop() method should be called when the sync
operation is complete so a final report can be issued.
'''
# The minimum allowed reporting interval based on time (in seconds)
MINIMUM_TIME_INTERVAL = 2 * 60.0
DEFAULT_TIME_INTERVAL = 10 * 60.0
# The minimum allowed reporting interval based on byte volume
MINIMUM_VOLUME_INTERVAL = 512 * 1024 * 1024
DEFAULT_VOLUME_INTERVAL = "5.0%"
def __init__(self, list, reportInterval=0, reportBytes=0,
recordProgressCallback=None, recordRawProgressCallback=None,
progressTimestamp=False):
'''
Initialize a new SyncProgress instance.
list - the dict containing the name:attribute pairings for the
file system objects to synchronize; this list is expected
to be obtained from a LocalPysync "getList" command.
reportInterval - the minimum number of seconds between progress
reports; volume reports are emitted regardless of the
reportInterval value. If zero, the default report interval
is SyncProgress.DEFAULT_TIME_INTERVAL
reportBytes - the number of bytes processed for a volume report
to be issued; if zero, the default volume report interval
is SyncProgress.DEFAULT_VOLUME_INTERVAL. If a string of the
form "n.n%", the reportBytes is calculated to be n.n percent
of the total expected bytes to process.
recordProgressCallback - method to call to emit the progress *message*
instead of writting the message to stderr. This method is
presented a string containing the progress message as the only
argument. If None, the progress message is written to stderr
prepended with the timestamp of the observation.
recordRawProgressCallback - method to call to emit the raw progress data
collected by an update() call. This method is presented a
ProgressUpdate instance as the only argument.
progressTimestamp - indicates whether or not the observation timestamp is
included in progress messages emitted by this SyncProgress instance.
'''
self._final = False
self._totalBytes = 0
self._totalFiles = 0
self.processingChunk = 0
self.progressTimestamp = progressTimestamp
# Calculate the total number of files and bytes that could be
# processed. Only regular files contribute to the total byte
# count.
for desc in list.itervalues():
if desc[0] == '-':
self._totalBytes += desc[2]
self._totalFiles += 1
# Indicates the time, in seconds, of the progress reporting interval.
if reportInterval == 0:
reportInterval = SyncProgress.DEFAULT_TIME_INTERVAL
elif reportInterval < SyncProgress.MINIMUM_TIME_INTERVAL:
raise ValueError("reportInterval must be at least %d" % SyncProgress.MINIMUM_TIME_INTERVAL, reportInterval)
self.progressInterval = reportInterval
# Indicates the amount, in bytes, of the data processed before a progress report is forced.
if reportBytes == 0:
reportBytes = SyncProgress.DEFAULT_VOLUME_INTERVAL
elif type(reportBytes) == str:
pass
elif reportBytes < SyncProgress.MINIMUM_VOLUME_INTERVAL:
raise ValueError("reportBytes must be at least %d" % SyncProgress.MINIMUM_VOLUME_INTERVAL, reportBytes)
if type(reportBytes) == str:
if reportBytes[-1] == '%':
# Caller has requested a percent of the total
reportBytes = int(round(self._totalBytes * float(reportBytes[:-1]) / 100))
else:
raise ValueError("reportBytes must be numeric or a string in the form \"n.n%\"", reportBytes)
self.progressBytes = reportBytes
# Method to call for recording progress details
self.recordProgressCallback = recordProgressCallback
# Method to call for recording raw progress data
self.recordRawProgressCallback = recordRawProgressCallback
# Active ProgressCounters instance
self.progressCounters = ProgressCounters()
# Access synchronization lock for self.progressCounters
self.progressCountersLock = threading.Lock()
# Last time progress was reported
self.lastReportTime = None
# Copy of ProgressCounters at self.lastReportTime
self.lastReportProgressCounters = None
# Access synchronization lock for self.lastReportTime and self.lastReportProgressCounters
self.progressReportLock = threading.Lock()
# threading.Event instance used to run the interval timer;
# setting this Event terminates the reporting loop.
self.timerEvent = None
threading.Thread.__init__(self, name="ProgressTimer")
self.daemon = True
def _emitProgress(self, progressCounters=None, timedReport=True):
'''
Prepare and emit a progress message. If progressCounters
is not provided, a copy of self.progressCounters is made
while holding the self.progressCountersLock; the lock is
not held while actually formatting and emitting the progress
message.
Returns the amount of time, in seconds, for the next timed
report.
'''
if not progressCounters:
with self.progressCountersLock:
progressCounters = copy.copy(self.progressCounters)
currentReportTime = time.time()
with self.progressReportLock:
lastReportTime = self.lastReportTime
if timedReport:
# If the last report was more recent than the desired reporting interval,
# suppress this report and return the amount of time remaining until the
# next reporting time
lastReportInterval = currentReportTime - lastReportTime
if lastReportInterval < self.progressInterval:
return self.progressInterval - lastReportInterval
lastReportProgressCounters = self.lastReportProgressCounters
self.lastReportTime = currentReportTime
self.lastReportProgressCounters = progressCounters
if self._final:
bytesMoved = progressCounters.bytesProcessed
processingInterval = currentReportTime - self.collectionStartTime
byteRate = bytesMoved / processingInterval if processingInterval else 0.0
else:
bytesMoved = progressCounters.bytesProcessed - lastReportProgressCounters.bytesProcessed if lastReportProgressCounters else 0
processingInterval = progressCounters.updateTime - lastReportProgressCounters.updateTime if lastReportProgressCounters else 0.0
byteRate = bytesMoved / processingInterval if processingInterval else 0.0
message = ("%sProcessed %sB of %sB (%d%%); %sB processed at %sBps; %d of %d files processed"
% ('*' if timedReport else ' ',
formatSiBinary(progressCounters.bytesProcessed),
formatSiBinary(self._totalBytes),
100 * progressCounters.bytesProcessed / self._totalBytes if self._totalBytes != 0 else 1,
formatSiBinary(bytesMoved),
formatSiBinary(byteRate),
progressCounters.filesProcessed,
self._totalFiles))
if self.progressTimestamp:
message = "[%s]%s" % (datetime.fromtimestamp(currentReportTime).isoformat(' '), message)
if self.recordProgressCallback:
self.recordProgressCallback(message)
else:
sys.stderr.write(message)
sys.stderr.write("\n")
return self.progressInterval
def update(self, fileBytesProcessed=0, processingFile=None):
'''
Update the progress information and emit a progress message if
deemed necessary.
fileBytesProcessed - the number of bytes processed (moved/examined)
since progress was last reported.
processingFile - the file currently being processed. If different
from the previous value, the file completion counter is
incremented.
The final (closing) call to this method is made when the SyncProgress
thread is terminated by the stop() method call.
'''
progressUpdate = None
progressCounters = None
with self.progressCountersLock:
self.progressCounters.updateTime = time.time()
self.progressCounters.bytesProcessed += fileBytesProcessed
if self.progressCounters.processingFile != processingFile:
if self.progressCounters.processingFile:
self.progressCounters.filesProcessed += 1
self.progressCounters.processingFile = processingFile
processingChunk = self.progressCounters.bytesProcessed // self.progressBytes if self.progressBytes != 0 else 1
if self._final or processingChunk > self.processingChunk:
self.processingChunk = processingChunk
progressCounters = copy.copy(self.progressCounters)
if self.recordRawProgressCallback:
progressUpdate = ProgressUpdate(self._final, self._totalBytes, self._totalFiles, self.progressCounters)
if progressUpdate:
self.recordRawProgressCallback(progressUpdate)
if progressCounters:
self._emitProgress(progressCounters, False)
def run(self):
'''
Emit a progress message periodically based on self.progressInterval.
Exit once self.timerEvent is set by self.stop().
Updates to the progress counters should be made by calling
self.update().
'''
self.collectionStartTime = self.lastReportTime = self.progressCounters.updateTime = time.time()
self.lastReportProgressCounters = copy.copy(self.progressCounters)
self.timerEvent = threading.Event()
waitInterval = self.progressInterval
while not self.timerEvent.isSet():
self.timerEvent.wait(waitInterval)
if self.timerEvent.isSet():
break
waitInterval = self._emitProgress()
# Close out statistics and emit the final report
self._final = True
self.update()
self.timerEvent = None
def stop(self):
'''
Stop emitting progress messages and wait for this thread to terminate.
'''
if self.isAlive():
if self.timerEvent:
self.timerEvent.set()
self.join(0.5)
class RemotePysync:
'''
The RemotePysync class is the receiving end of the directory synchronization
initiaited by LocalPysync. Once started by LocalPysync, RemotePysync manages
copying the content of a source directory.
A RemotePysync instance and a LocalPysync instance communicate with each other
over a pipe; command orders and arguments are transferred to LocalPysync over
stdout; responses are received from LocalPysync via stdin.
This class is *not* thread-safe; a single RemotePysync instance may be used
in a process and must be free to change the current working directory for the
duration of the synchronization operation.
'''
def __init__(self):
# The SyncProgress instance used to track progress of the directory
# synchronization managed by this RemotePysync instance.
self.syncProgress = None
# Synchronization lock to prevent interleaved communications using
# self.sendCommand() and self.getAnswer().
self.commLock = threading.RLock()
def sendCommand(self, *args):
'''
Serialize the command & arguments using cPickle and send write to stdout.
The self.commLock must be held before calling this method. The lock should
not be released until the getAnswer() method is called to obtain the results.
'''
a = cPickle.dumps(args)
sys.stdout.write('%d\n%s' % (len(a), a))
sys.stdout.flush()
def getAnswer(self):
'''
Read a command response from stdin for a command sent using the sendCommand()
method. The response is a data stream serialized using cPickle and preceded by
the integer size of the serialized data. The data stream is deserialized using
cPickle and returned.
The self.commLock must be held before calling this method. The lock should
be obtained before calling the self.sendCommand() method to send the command
for which the call to this method is being made. Once the answer is obtained,
the self.commLock should be released.
'''
size = int(sys.stdin.readline())
data = sys.stdin.read(size)
assert size == len(data)
return cPickle.loads(data)
def doCommand(self, *args):
'''
Sends the command represented by args to the local peer of this
RemotePysync and returns with the answer.
This method calls the sendCommand() method followed immediately
by the getAnswer() method while holding self.commLock.
'''
with self.commLock:
self.sendCommand(*args)
return self.getAnswer()
def _recordProgress(self, message):
'''
Send progress information to associated LocalPysync instance.
'''
if message:
self.doCommand('recordProgress', message)
def _recordRawProgress(self, progressUpdate):
'''
Send raw progress data to associated LocalPysync instance.
'''
if progressUpdate:
self.doCommand('recordRawProgress', progressUpdate)
def removeJunk(self, list):
'''
Trim the current directory of files and directories not in list
(if options.delete is True) and files and directories in list if
the current type is not the expected type.
Existing files of the expected type may be altered or removed in
self.createFiles().
'''
for root, dirs, files in os.walk('.', False):
os.chmod(root, 0700)
for i in dirs + files:
name = os.path.join(root, i)
t = statToTuple(os.lstat(name), name)
if name not in list and self.options.delete or name in list and list[name][0] != t[0]:
if t[0] == 'd':
os.rmdir(name)
else:
os.remove(name)
def createDirs(self, list):
'''
Create all directories identified in list.
'''
for name in sorted(list.keys()):
value = list[name]
if value[0] == 'd':
try:
os.mkdir(name, 0700)
except OSError, err:
if err.errno != errno.EEXIST:
raise
# Ignoring creation error for existing directory
def createFiles(self, list):
'''
Create all files identified in the list.
Directories must be created by calling self.createDirs() before calling
this method. Hard links (entry type = 'L') are not processed by this
method.
'''
for name, value in list.iteritems():
# If a directory or hard link, skip it.
if value[0] == 'd' or value[0] == 'L':
continue
# If the file already exists, process according to its existing
# type:
# 1) If the existing type and the expected/desired type
# are the same:
# a) set the permissions to 0700 for all except symbolic
# links
# b) if a regular file and the existing size is greater
# than the expected size, truncate the file to the
# expected size and skip further processing for this
# file
# c) if the variable content of the internal stat tuple
# for the existing file and the expected file are the
# same, skip further processing for this file.
# 2) If the existing type and the expected type are not the
# same or file processing was not skipped above, remove the
# existing file and continue processing.
if os.path.lexists(name):
t = statToTuple(os.lstat(name), name)
if value[0] == t[0]:
if value[0] != 'l':
os.chmod(name, 0700)
if value[0] == '-':
value[3] = t[2]
if t[2] > value[2]:
f = open(name, 'r+b')
f.truncate(value[2])
f.close()
continue
if value[2:] == t[2:]:
continue
os.remove(name)
# Create the file entry based on the desired type.
# For regular files, an empty (zero-length) file is created.
if value[0] == '-':
open(name, 'w+b').close()
elif value[0] == 'l':
os.symlink(value[2], name)
elif value[0] == 'c':
os.mknod(name, 0700 | stat.S_IFCHR, value[2])
elif value[0] == 'b':
os.mknod(name, 0700 | stat.S_IFBLK, value[2])
elif value[0] == 'p':
os.mkfifo(name, 0700)
elif value[0] == 's':
f = socket.socket(socket.AF_UNIX)
f.bind(name)
f.close()
else:
assert 0
def linkLinks(self, list):
'''
Create hard links identified in the list. An existing file
having the same name as the link is removed before the link
is created.
Directories must be created by calling self.createDirs() and
files must be created by calling self.createFiles() before calling
this method.
'''
for name, value in list.iteritems():
if value[0] == 'L':
if os.path.lexists(name):
os.remove(name)
os.link(value[1], name)
def getData(self, name, offset, size):
if not self.socket:
ss = socket.socket(self.options.addrinfo[0])
ss.bind(self.options.addrinfo[4])
ss.listen(1)
self.doCommand('connect', ss.getsockname())
self.socket = ss.accept()[0]
ss.close()
s = self.doCommand('getData', name, offset, size)
o = 0
sb = []
while o < s:
a = self.socket.recv(min(s - o, 65536))
if len(a) == 0:
raise IOError('EOF')
sb.append(a)
o += len(a)
data = "".join(sb)
if self.options.compress:
data = zlib.decompress(data)
assert len(data) == size
return data
def copyData(self, list):
fileMax = 0
byteMax = 0
for name, value in list.iteritems():
if value[0] == '-':
fileMax += 1
byteMax += value[2]
progress = Progress(byteMax, fileMax)
fileNow = 0
byteNow = 0
bytesTotal = 0
unchanged = 0
for name, value in list.iteritems():
if value[0] == '-':
f = None
if self.options.minusn:
if os.path.lexists(name) and stat.S_ISREG(os.lstat(name).st_mode):
f = open(name, 'rb')
else:
f = open(name, 'r+b')
localSize = 0
if f:
f.seek(0, 2)
localSize = f.tell()
offset = 0
bytesFile = 0
changed = value[2] != value[3]
if not value[2]:
# Zero-length files require little work
if not self.options.minusn:
self.syncProgress.update(processingFile=name)
else:
while offset < value[2]:
if self.options.verbose:
progress.update('', name, byteNow, fileNow)
size = min(value[2] - offset, CHUNK_SIZE)
digest = None
if offset + size <= localSize:
with self.commLock:
self.sendCommand('getDigest', name, offset, size)
f.seek(offset)
b = f.read(size)
assert len(b) == size
m = hashlib.sha256()
m.update(b)
a = m.digest()
digest = self.getAnswer()
if a == digest:
if not self.options.minusn:
self.syncProgress.update(fileBytesProcessed=size, processingFile=name)
offset += size
byteNow += size
continue
changed = True
if not self.options.minusn:
data = self.getData(name, offset, size)
if not self.options.insecure:
if digest == None:
digest = self.doCommand('getDigest', name, offset, size)
m = hashlib.sha256()
m.update(data)
if m.digest() != digest:
raise Exception('Digest did not match.')
f.seek(offset)
f.write(data)
self.syncProgress.update(fileBytesProcessed=size, processingFile=name)
bytesFile += size
bytesTotal += size
offset += size
byteNow += size
if f:
f.close()
if self.options.minusn and self.options.verbose:
sys.stderr.write('%s %d\n' % (name, bytesFile))
if not changed:
if self.options.verbose:
sys.stderr.write('%s is the same\n' % name)
unchanged += 1
fileNow += 1
if self.options.verbose:
progress.update('', '', byteNow, fileNow)
updated = fileMax - unchanged
if self.options.minusn or self.options.verbose:
sys.stderr.write('%d out of %d file(s) updated.\n' % (updated, fileMax))
if self.options.minusn:
sys.stderr.write('Total--%d byte%s\n' % (bytesTotal, bytesTotal != 1 and 's' or ''))
def fixPermissions(self, list):
'''
Set the permission bits of the non-link files and directories to
the expected value.
'''
for name, value in list.iteritems():
if value[0] != 'L' and value[0] != 'l':
os.chmod(name, value[1] & 01777)
def run(self):
self.socket = None
self.options = self.doCommand('getOptions')
# TODO: Add a safety check for destDir
os.chdir(self.doCommand('getDestDir'))
list = self.doCommand('getList')
if self.options.minusn:
self.copyData(list)
else:
self.syncProgress = SyncProgress(list,
reportInterval=self.options.progressTime,
reportBytes=self.options.progressBytes,
recordProgressCallback=(
self._recordProgress if self.options.sendProgress else None),
recordRawProgressCallback=(
self._recordRawProgress if self.options.sendRawProgress else None),
progressTimestamp=self.options.progressTimestamp)
self.syncProgress.start()
self.removeJunk(list)
self.createDirs(list)
self.createFiles(list)
self.linkLinks(list)
self.copyData(list)
self.fixPermissions(list)
self.syncProgress.stop()
with self.commLock:
self.sendCommand('quit', 0)
if __name__ == '__main__':
RemotePysync().run()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册