contents.py 17.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
#!/usr/bin/env python
"""
Helper code for contents generation.

@contact: Debian FTPMaster <ftpmaster@debian.org>
@copyright: 2011 Torsten Werner <twerner@debian.org>
@license: GNU General Public License version 2 or later
"""

################################################################################

# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

################################################################################

B
Bastian Blank 已提交
28
from __future__ import absolute_import, print_function
B
Bastian Blank 已提交
29

30 31
from daklib.dbconn import *
from daklib.config import Config
32
from daklib.filewriter import BinaryContentsFileWriter, SourceContentsFileWriter
33

B
Bastian Blank 已提交
34
from .dakmultiprocessing import DakProcessPool
35 36
from shutil import rmtree
from tempfile import mkdtemp
37

38
import daklib.daksubprocess
T
Torsten Werner 已提交
39
import os.path
40
import sqlalchemy.sql as sql
T
Torsten Werner 已提交
41

42

43
class BinaryContentsWriter(object):
44
    '''
45
    BinaryContentsWriter writes the Contents-$arch.gz files.
46
    '''
47

48
    def __init__(self, suite, architecture, overridetype, component):
49 50 51 52 53
        self.suite = suite
        self.architecture = architecture
        self.overridetype = overridetype
        self.component = component
        self.session = suite.session()
54 55 56 57 58

    def query(self):
        '''
        Returns a query object that is doing most of the work.
        '''
59 60
        overridesuite = self.suite
        if self.suite.overridesuite is not None:
T
Torsten Werner 已提交
61
            overridesuite = get_suite(self.suite.overridesuite, self.session)
62
        params = {
63
            'suite':         self.suite.suite_id,
T
Torsten Werner 已提交
64
            'overridesuite': overridesuite.suite_id,
65
            'component':     self.component.component_id,
66 67 68
            'arch':          self.architecture.arch_id,
            'type_id':       self.overridetype.overridetype_id,
            'type':          self.overridetype.overridetype,
69 70
        }

N
Niels Thykier 已提交
71 72 73 74 75 76
        if self.suite.separate_contents_architecture_all:
            sql_arch_part = 'architecture = :arch'
        else:
            sql_arch_part = '(architecture = :arch_all or architecture = :arch)'
            params['arch_all'] = get_architecture('all', self.session).arch_id

A
Ansgar Burchardt 已提交
77 78 79 80 81 82
        sql_create_temp = '''
create temp table newest_binaries (
    id integer primary key,
    package text);

create index newest_binaries_by_package on newest_binaries (package);
83

A
Ansgar Burchardt 已提交
84 85
insert into newest_binaries (id, package)
    select distinct on (package) id, package from binaries
86
        where type = :type and
N
Niels Thykier 已提交
87
            %s and
88
            id in (select bin from bin_associations where suite = :suite)
N
Niels Thykier 已提交
89
        order by package, version desc;''' % sql_arch_part
A
Ansgar Burchardt 已提交
90 91
        self.session.execute(sql_create_temp, params=params)

92
        query = sql.text('''
A
Ansgar Burchardt 已提交
93
with
94 95 96 97

unique_override as
    (select o.package, s.section
        from override o, section s
98
        where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
99 100
        o.component = :component)

101
select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
102 103
    from newest_binaries b, bin_contents bc, unique_override o
    where b.id = bc.binary_id and o.package = b.package
104
    group by bc.file''')
105

106
        return self.session.query("file", "pkglist").from_statement(query). \
107 108 109 110 111 112
            params(params)

    def formatline(self, filename, package_list):
        '''
        Returns a formatted string for the filename argument.
        '''
T
Torsten Werner 已提交
113
        return "%-55s %s\n" % (filename, package_list)
114 115 116 117 118

    def fetch(self):
        '''
        Yields a new line of the Contents-$arch.gz file in filename order.
        '''
119 120
        for filename, package_list in self.query().yield_per(100):
            yield self.formatline(filename, package_list)
121 122 123 124 125 126 127 128 129
        # end transaction to return connection to pool
        self.session.rollback()

    def get_list(self):
        '''
        Returns a list of lines for the Contents-$arch.gz file.
        '''
        return [item for item in self.fetch()]

130
    def writer(self):
131
        '''
132
        Returns a writer object.
133 134
        '''
        values = {
135
            'archive':      self.suite.archive.path,
136
            'suite':        self.suite.suite_name,
T
bugfix  
Torsten Werner 已提交
137
            'component':    self.component.component_name,
138
            'debtype':      self.overridetype.overridetype,
139
            'architecture': self.architecture.arch_string,
140
        }
T
Torsten Werner 已提交
141
        return BinaryContentsFileWriter(**values)
142

143
    def write_file(self):
144
        '''
145
        Write the output file.
146
        '''
147 148
        writer = self.writer()
        file = writer.open()
149
        for item in self.fetch():
150 151
            file.write(item)
        writer.close()
T
Torsten Werner 已提交
152

153

T
Torsten Werner 已提交
154 155 156 157
class SourceContentsWriter(object):
    '''
    SourceContentsWriter writes the Contents-source.gz files.
    '''
158

T
Torsten Werner 已提交
159 160 161 162 163 164 165 166 167 168 169 170 171 172
    def __init__(self, suite, component):
        self.suite = suite
        self.component = component
        self.session = suite.session()

    def query(self):
        '''
        Returns a query object that is doing most of the work.
        '''
        params = {
            'suite_id':     self.suite.suite_id,
            'component_id': self.component.component_id,
        }

A
Ansgar Burchardt 已提交
173 174 175 176 177 178 179 180 181
        sql_create_temp = '''
create temp table newest_sources (
    id integer primary key,
    source text);

create index sources_binaries_by_source on newest_sources (source);

insert into newest_sources (id, source)
    select distinct on (source) s.id, s.source from source s
182
        join files_archive_map af on s.file = af.file_id
T
Torsten Werner 已提交
183
        where s.id in (select source from src_associations where suite = :suite_id)
184
            and af.component_id = :component_id
A
Ansgar Burchardt 已提交
185 186
        order by source, version desc;'''
        self.session.execute(sql_create_temp, params=params)
T
Torsten Werner 已提交
187

188
        query = sql.text('''
T
Torsten Werner 已提交
189 190
select sc.file, string_agg(s.source, ',' order by s.source) as pkglist
    from newest_sources s, src_contents sc
191
    where s.id = sc.source_id group by sc.file''')
T
Torsten Werner 已提交
192

193
        return self.session.query("file", "pkglist").from_statement(query). \
T
Torsten Werner 已提交
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
            params(params)

    def formatline(self, filename, package_list):
        '''
        Returns a formatted string for the filename argument.
        '''
        return "%s\t%s\n" % (filename, package_list)

    def fetch(self):
        '''
        Yields a new line of the Contents-source.gz file in filename order.
        '''
        for filename, package_list in self.query().yield_per(100):
            yield self.formatline(filename, package_list)
        # end transaction to return connection to pool
        self.session.rollback()

    def get_list(self):
        '''
        Returns a list of lines for the Contents-source.gz file.
        '''
        return [item for item in self.fetch()]

217
    def writer(self):
T
Torsten Werner 已提交
218
        '''
219
        Returns a writer object.
T
Torsten Werner 已提交
220 221
        '''
        values = {
222
            'archive':   self.suite.archive.path,
T
Torsten Werner 已提交
223 224 225
            'suite':     self.suite.suite_name,
            'component': self.component.component_name
        }
226
        return SourceContentsFileWriter(**values)
T
Torsten Werner 已提交
227 228 229 230 231

    def write_file(self):
        '''
        Write the output file.
        '''
232 233
        writer = self.writer()
        file = writer.open()
T
Torsten Werner 已提交
234
        for item in self.fetch():
235 236
            file.write(item)
        writer.close()
T
Torsten Werner 已提交
237 238


239
def binary_helper(suite_id, arch_id, overridetype_id, component_id):
240
    '''
T
Torsten Werner 已提交
241 242
    This function is called in a new subprocess and multiprocessing wants a top
    level function.
243
    '''
244
    session = DBConn().session(work_mem=1000)
245 246 247
    suite = Suite.get(suite_id, session)
    architecture = Architecture.get(arch_id, session)
    overridetype = OverrideType.get(overridetype_id, session)
248
    component = Component.get(component_id, session)
249
    log_message = [suite.suite_name, architecture.arch_string,
250
        overridetype.overridetype, component.component_name]
251 252
    contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component)
    contents_writer.write_file()
253
    session.close()
254 255
    return log_message

256

T
Torsten Werner 已提交
257 258 259 260 261
def source_helper(suite_id, component_id):
    '''
    This function is called in a new subprocess and multiprocessing wants a top
    level function.
    '''
262
    session = DBConn().session(work_mem=1000)
T
Torsten Werner 已提交
263 264
    suite = Suite.get(suite_id, session)
    component = Component.get(component_id, session)
265
    log_message = [suite.suite_name, 'source', component.component_name]
T
Torsten Werner 已提交
266 267
    contents_writer = SourceContentsWriter(suite, component)
    contents_writer.write_file()
268
    session.close()
T
Torsten Werner 已提交
269 270
    return log_message

271

272 273 274 275 276
class ContentsWriter(object):
    '''
    Loop over all suites, architectures, overridetypes, and components to write
    all contents files.
    '''
277
    @classmethod
278 279 280 281
    def log_result(class_, result):
        '''
        Writes a result message to the logfile.
        '''
282
        class_.logger.log(list(result))
283 284

    @classmethod
285
    def write_all(class_, logger, archive_names=[], suite_names=[], component_names=[], force=False):
286 287 288 289 290
        '''
        Writes all Contents files for suites in list suite_names which defaults
        to all 'touchable' suites if not specified explicitely. Untouchable
        suites will be included if the force argument is set to True.
        '''
291
        pool = DakProcessPool()
292
        class_.logger = logger
293
        session = DBConn().session()
T
Torsten Werner 已提交
294
        suite_query = session.query(Suite)
295 296
        if len(archive_names) > 0:
            suite_query = suite_query.join(Suite.archive).filter(Archive.archive_name.in_(archive_names))
297
        if len(suite_names) > 0:
T
Torsten Werner 已提交
298
            suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
299 300 301
        component_query = session.query(Component)
        if len(component_names) > 0:
            component_query = component_query.filter(Component.component_name.in_(component_names))
302
        if not force:
303
            suite_query = suite_query.filter(Suite.untouchable == False)  # noqa:E712
304 305
        deb_id = get_override_type('deb', session).overridetype_id
        udeb_id = get_override_type('udeb', session).overridetype_id
306 307 308 309 310

        # Lock tables so that nobody can change things underneath us
        session.execute("LOCK TABLE bin_contents IN SHARE MODE")
        session.execute("LOCK TABLE src_contents IN SHARE MODE")

311
        for suite in suite_query:
312
            suite_id = suite.suite_id
N
Niels Thykier 已提交
313 314 315 316 317

            skip_arch_all = True
            if suite.separate_contents_architecture_all:
                skip_arch_all = False

318 319 320 321
            for component in component_query:
                component_id = component.component_id
                # handle source packages
                pool.apply_async(source_helper, (suite_id, component_id),
322
                    callback=class_.log_result)
N
Niels Thykier 已提交
323
                for architecture in suite.get_architectures(skipsrc=True, skipall=skip_arch_all):
324 325
                    arch_id = architecture.arch_id
                    # handle 'deb' packages
326
                    pool.apply_async(binary_helper, (suite_id, arch_id, deb_id, component_id),
327
                        callback=class_.log_result)
328
                    # handle 'udeb' packages
329
                    pool.apply_async(binary_helper, (suite_id, arch_id, udeb_id, component_id),
330
                        callback=class_.log_result)
331 332
        pool.close()
        pool.join()
333 334
        session.close()

T
Torsten Werner 已提交
335

336
class BinaryContentsScanner(object):
T
Torsten Werner 已提交
337
    '''
338 339
    BinaryContentsScanner provides a threadsafe method scan() to scan the
    contents of a DBBinary object.
T
Torsten Werner 已提交
340
    '''
341

342
    def __init__(self, binary_id):
T
Torsten Werner 已提交
343
        '''
344 345
        The argument binary_id is the id of the DBBinary object that
        should be scanned.
T
Torsten Werner 已提交
346
        '''
347
        self.binary_id = binary_id
T
Torsten Werner 已提交
348

349
    def scan(self, dummy_arg=None):
T
Torsten Werner 已提交
350 351 352 353 354 355 356
        '''
        This method does the actual scan and fills in the associated BinContents
        property. It commits any changes to the database. The argument dummy_arg
        is ignored but needed by our threadpool implementation.
        '''
        session = DBConn().session()
        binary = session.query(DBBinary).get(self.binary_id)
357 358 359 360
        fileset = set(binary.scan_contents())
        if len(fileset) == 0:
            fileset.add('EMPTY_PACKAGE')
        for filename in fileset:
361
            binary.contents.append(BinContents(file=filename))
362
        session.commit()
T
Torsten Werner 已提交
363 364 365
        session.close()

    @classmethod
366
    def scan_all(class_, limit=None):
T
Torsten Werner 已提交
367 368 369
        '''
        The class method scan_all() scans all binaries using multiple threads.
        The number of binaries to be scanned can be limited with the limit
370 371
        argument. Returns the number of processed and remaining packages as a
        dict.
T
Torsten Werner 已提交
372
        '''
373
        pool = DakProcessPool()
T
Torsten Werner 已提交
374
        session = DBConn().session()
375
        query = session.query(DBBinary).filter(DBBinary.contents == None) # noqa:E711
376
        remaining = query.count
T
Torsten Werner 已提交
377 378
        if limit is not None:
            query = query.limit(limit)
379
        processed = query.count()
T
Torsten Werner 已提交
380
        for binary in query.yield_per(100):
381
            pool.apply_async(binary_scan_helper, (binary.binary_id, ))
382 383
        pool.close()
        pool.join()
384
        remaining = remaining()
T
Torsten Werner 已提交
385
        session.close()
B
Bastian Blank 已提交
386
        return {'processed': processed, 'remaining': remaining}
387

388

389
def binary_scan_helper(binary_id):
390 391 392
    '''
    This function runs in a subprocess.
    '''
393 394 395 396 397
    try:
        scanner = BinaryContentsScanner(binary_id)
        scanner.scan()
    except Exception as e:
        print("binary_scan_helper raised an exception: %s" % (e))
398

399

400 401 402 403 404
class UnpackedSource(object):
    '''
    UnpackedSource extracts a source package into a temporary location and
    gives you some convinient function for accessing it.
    '''
405

406
    def __init__(self, dscfilename, tmpbasedir=None):
407 408 409
        '''
        The dscfilename is a name of a DSC file that will be extracted.
        '''
410
        basedir = tmpbasedir if tmpbasedir else Config()['Dir::TempPath']
411
        temp_directory = mkdtemp(dir=basedir)
412
        self.root_directory = os.path.join(temp_directory, 'root')
413 414
        command = ('dpkg-source', '--no-copy', '--no-check', '-q', '-x',
            dscfilename, self.root_directory)
415
        daklib.daksubprocess.check_call(command)
416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448

    def get_root_directory(self):
        '''
        Returns the name of the package's root directory which is the directory
        where the debian subdirectory is located.
        '''
        return self.root_directory

    def get_all_filenames(self):
        '''
        Returns an iterator over all filenames. The filenames will be relative
        to the root directory.
        '''
        skip = len(self.root_directory) + 1
        for root, _, files in os.walk(self.root_directory):
            for name in files:
                yield os.path.join(root[skip:], name)

    def cleanup(self):
        '''
        Removes all temporary files.
        '''
        if self.root_directory is None:
            return
        parent_directory = os.path.dirname(self.root_directory)
        rmtree(parent_directory)
        self.root_directory = None

    def __del__(self):
        '''
        Enforce cleanup.
        '''
        self.cleanup()
449 450 451 452 453 454 455


class SourceContentsScanner(object):
    '''
    SourceContentsScanner provides a method scan() to scan the contents of a
    DBSource object.
    '''
456

457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472
    def __init__(self, source_id):
        '''
        The argument source_id is the id of the DBSource object that
        should be scanned.
        '''
        self.source_id = source_id

    def scan(self):
        '''
        This method does the actual scan and fills in the associated SrcContents
        property. It commits any changes to the database.
        '''
        session = DBConn().session()
        source = session.query(DBSource).get(self.source_id)
        fileset = set(source.scan_contents())
        for filename in fileset:
473
            source.contents.append(SrcContents(file=filename))
474 475 476 477
        session.commit()
        session.close()

    @classmethod
478
    def scan_all(class_, limit=None):
479 480 481 482 483 484
        '''
        The class method scan_all() scans all source using multiple processes.
        The number of sources to be scanned can be limited with the limit
        argument. Returns the number of processed and remaining packages as a
        dict.
        '''
485
        pool = DakProcessPool()
486
        session = DBConn().session()
487
        query = session.query(DBSource).filter(DBSource.contents == None) # noqa:E711
488 489 490 491 492 493 494 495 496 497
        remaining = query.count
        if limit is not None:
            query = query.limit(limit)
        processed = query.count()
        for source in query.yield_per(100):
            pool.apply_async(source_scan_helper, (source.source_id, ))
        pool.close()
        pool.join()
        remaining = remaining()
        session.close()
B
Bastian Blank 已提交
498
        return {'processed': processed, 'remaining': remaining}
499

500

T
Torsten Werner 已提交
501
def source_scan_helper(source_id):
502 503 504
    '''
    This function runs in a subprocess.
    '''
505 506 507
    try:
        scanner = SourceContentsScanner(source_id)
        scanner.scan()
508
    except Exception as e:
509
        print("source_scan_helper raised an exception: %s" % (e))