contents.py 17.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
#!/usr/bin/env python
"""
Helper code for contents generation.

@contact: Debian FTPMaster <ftpmaster@debian.org>
@copyright: 2011 Torsten Werner <twerner@debian.org>
@license: GNU General Public License version 2 or later
"""

################################################################################

# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

################################################################################

B
Bastian Blank 已提交
28
from __future__ import absolute_import, print_function
B
Bastian Blank 已提交
29

30 31
from daklib.dbconn import *
from daklib.config import Config
32
from daklib.filewriter import BinaryContentsFileWriter, SourceContentsFileWriter
33

B
Bastian Blank 已提交
34
from .dakmultiprocessing import DakProcessPool
35 36
from shutil import rmtree
from tempfile import mkdtemp
37

38
import daklib.daksubprocess
T
Torsten Werner 已提交
39
import os.path
40
import sqlalchemy.sql as sql
T
Torsten Werner 已提交
41

42

43
class BinaryContentsWriter(object):
44
    '''
45
    BinaryContentsWriter writes the Contents-$arch.gz files.
46
    '''
47

48
    def __init__(self, suite, architecture, overridetype, component):
49 50 51 52 53
        self.suite = suite
        self.architecture = architecture
        self.overridetype = overridetype
        self.component = component
        self.session = suite.session()
54 55 56 57 58

    def query(self):
        '''
        Returns a query object that is doing most of the work.
        '''
59 60
        overridesuite = self.suite
        if self.suite.overridesuite is not None:
T
Torsten Werner 已提交
61
            overridesuite = get_suite(self.suite.overridesuite, self.session)
62
        params = {
63
            'suite':         self.suite.suite_id,
T
Torsten Werner 已提交
64
            'overridesuite': overridesuite.suite_id,
65
            'component':     self.component.component_id,
66 67 68 69
            'arch_all':      get_architecture('all', self.session).arch_id,
            'arch':          self.architecture.arch_id,
            'type_id':       self.overridetype.overridetype_id,
            'type':          self.overridetype.overridetype,
70 71
        }

A
Ansgar Burchardt 已提交
72 73 74 75 76 77
        sql_create_temp = '''
create temp table newest_binaries (
    id integer primary key,
    package text);

create index newest_binaries_by_package on newest_binaries (package);
78

A
Ansgar Burchardt 已提交
79 80
insert into newest_binaries (id, package)
    select distinct on (package) id, package from binaries
81 82 83
        where type = :type and
            (architecture = :arch_all or architecture = :arch) and
            id in (select bin from bin_associations where suite = :suite)
A
Ansgar Burchardt 已提交
84 85 86
        order by package, version desc;'''
        self.session.execute(sql_create_temp, params=params)

87
        query = sql.text('''
A
Ansgar Burchardt 已提交
88
with
89 90 91 92

unique_override as
    (select o.package, s.section
        from override o, section s
93
        where o.suite = :overridesuite and o.type = :type_id and o.section = s.id and
94 95
        o.component = :component)

96
select bc.file, string_agg(o.section || '/' || b.package, ',' order by b.package) as pkglist
97 98
    from newest_binaries b, bin_contents bc, unique_override o
    where b.id = bc.binary_id and o.package = b.package
99
    group by bc.file''')
100

101
        return self.session.query("file", "pkglist").from_statement(query). \
102 103 104 105 106 107
            params(params)

    def formatline(self, filename, package_list):
        '''
        Returns a formatted string for the filename argument.
        '''
T
Torsten Werner 已提交
108
        return "%-55s %s\n" % (filename, package_list)
109 110 111 112 113

    def fetch(self):
        '''
        Yields a new line of the Contents-$arch.gz file in filename order.
        '''
114 115
        for filename, package_list in self.query().yield_per(100):
            yield self.formatline(filename, package_list)
116 117 118 119 120 121 122 123 124
        # end transaction to return connection to pool
        self.session.rollback()

    def get_list(self):
        '''
        Returns a list of lines for the Contents-$arch.gz file.
        '''
        return [item for item in self.fetch()]

125
    def writer(self):
126
        '''
127
        Returns a writer object.
128 129
        '''
        values = {
130
            'archive':      self.suite.archive.path,
131
            'suite':        self.suite.suite_name,
T
bugfix  
Torsten Werner 已提交
132
            'component':    self.component.component_name,
133
            'debtype':      self.overridetype.overridetype,
134
            'architecture': self.architecture.arch_string,
135
        }
T
Torsten Werner 已提交
136
        return BinaryContentsFileWriter(**values)
137

138
    def write_file(self):
139
        '''
140
        Write the output file.
141
        '''
142 143
        writer = self.writer()
        file = writer.open()
144
        for item in self.fetch():
145 146
            file.write(item)
        writer.close()
T
Torsten Werner 已提交
147

148

T
Torsten Werner 已提交
149 150 151 152
class SourceContentsWriter(object):
    '''
    SourceContentsWriter writes the Contents-source.gz files.
    '''
153

T
Torsten Werner 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166 167
    def __init__(self, suite, component):
        self.suite = suite
        self.component = component
        self.session = suite.session()

    def query(self):
        '''
        Returns a query object that is doing most of the work.
        '''
        params = {
            'suite_id':     self.suite.suite_id,
            'component_id': self.component.component_id,
        }

A
Ansgar Burchardt 已提交
168 169 170 171 172 173 174 175 176
        sql_create_temp = '''
create temp table newest_sources (
    id integer primary key,
    source text);

create index sources_binaries_by_source on newest_sources (source);

insert into newest_sources (id, source)
    select distinct on (source) s.id, s.source from source s
177
        join files_archive_map af on s.file = af.file_id
T
Torsten Werner 已提交
178
        where s.id in (select source from src_associations where suite = :suite_id)
179
            and af.component_id = :component_id
A
Ansgar Burchardt 已提交
180 181
        order by source, version desc;'''
        self.session.execute(sql_create_temp, params=params)
T
Torsten Werner 已提交
182

183
        query = sql.text('''
T
Torsten Werner 已提交
184 185
select sc.file, string_agg(s.source, ',' order by s.source) as pkglist
    from newest_sources s, src_contents sc
186
    where s.id = sc.source_id group by sc.file''')
T
Torsten Werner 已提交
187

188
        return self.session.query("file", "pkglist").from_statement(query). \
T
Torsten Werner 已提交
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
            params(params)

    def formatline(self, filename, package_list):
        '''
        Returns a formatted string for the filename argument.
        '''
        return "%s\t%s\n" % (filename, package_list)

    def fetch(self):
        '''
        Yields a new line of the Contents-source.gz file in filename order.
        '''
        for filename, package_list in self.query().yield_per(100):
            yield self.formatline(filename, package_list)
        # end transaction to return connection to pool
        self.session.rollback()

    def get_list(self):
        '''
        Returns a list of lines for the Contents-source.gz file.
        '''
        return [item for item in self.fetch()]

212
    def writer(self):
T
Torsten Werner 已提交
213
        '''
214
        Returns a writer object.
T
Torsten Werner 已提交
215 216
        '''
        values = {
217
            'archive':   self.suite.archive.path,
T
Torsten Werner 已提交
218 219 220
            'suite':     self.suite.suite_name,
            'component': self.component.component_name
        }
221
        return SourceContentsFileWriter(**values)
T
Torsten Werner 已提交
222 223 224 225 226

    def write_file(self):
        '''
        Write the output file.
        '''
227 228
        writer = self.writer()
        file = writer.open()
T
Torsten Werner 已提交
229
        for item in self.fetch():
230 231
            file.write(item)
        writer.close()
T
Torsten Werner 已提交
232 233


234
def binary_helper(suite_id, arch_id, overridetype_id, component_id):
235
    '''
T
Torsten Werner 已提交
236 237
    This function is called in a new subprocess and multiprocessing wants a top
    level function.
238
    '''
239
    session = DBConn().session(work_mem=1000)
240 241 242
    suite = Suite.get(suite_id, session)
    architecture = Architecture.get(arch_id, session)
    overridetype = OverrideType.get(overridetype_id, session)
243
    component = Component.get(component_id, session)
244
    log_message = [suite.suite_name, architecture.arch_string,
245
        overridetype.overridetype, component.component_name]
246 247
    contents_writer = BinaryContentsWriter(suite, architecture, overridetype, component)
    contents_writer.write_file()
248
    session.close()
249 250
    return log_message

251

T
Torsten Werner 已提交
252 253 254 255 256
def source_helper(suite_id, component_id):
    '''
    This function is called in a new subprocess and multiprocessing wants a top
    level function.
    '''
257
    session = DBConn().session(work_mem=1000)
T
Torsten Werner 已提交
258 259
    suite = Suite.get(suite_id, session)
    component = Component.get(component_id, session)
260
    log_message = [suite.suite_name, 'source', component.component_name]
T
Torsten Werner 已提交
261 262
    contents_writer = SourceContentsWriter(suite, component)
    contents_writer.write_file()
263
    session.close()
T
Torsten Werner 已提交
264 265
    return log_message

266

267 268 269 270 271
class ContentsWriter(object):
    '''
    Loop over all suites, architectures, overridetypes, and components to write
    all contents files.
    '''
272
    @classmethod
273 274 275 276
    def log_result(class_, result):
        '''
        Writes a result message to the logfile.
        '''
277
        class_.logger.log(list(result))
278 279

    @classmethod
280
    def write_all(class_, logger, archive_names=[], suite_names=[], component_names=[], force=False):
281 282 283 284 285
        '''
        Writes all Contents files for suites in list suite_names which defaults
        to all 'touchable' suites if not specified explicitely. Untouchable
        suites will be included if the force argument is set to True.
        '''
286
        pool = DakProcessPool()
287
        class_.logger = logger
288
        session = DBConn().session()
T
Torsten Werner 已提交
289
        suite_query = session.query(Suite)
290 291
        if len(archive_names) > 0:
            suite_query = suite_query.join(Suite.archive).filter(Archive.archive_name.in_(archive_names))
292
        if len(suite_names) > 0:
T
Torsten Werner 已提交
293
            suite_query = suite_query.filter(Suite.suite_name.in_(suite_names))
294 295 296
        component_query = session.query(Component)
        if len(component_names) > 0:
            component_query = component_query.filter(Component.component_name.in_(component_names))
297
        if not force:
298
            suite_query = suite_query.filter(Suite.untouchable == False)  # noqa:E712
299 300
        deb_id = get_override_type('deb', session).overridetype_id
        udeb_id = get_override_type('udeb', session).overridetype_id
301 302 303 304 305

        # Lock tables so that nobody can change things underneath us
        session.execute("LOCK TABLE bin_contents IN SHARE MODE")
        session.execute("LOCK TABLE src_contents IN SHARE MODE")

306
        for suite in suite_query:
307
            suite_id = suite.suite_id
308 309 310 311
            for component in component_query:
                component_id = component.component_id
                # handle source packages
                pool.apply_async(source_helper, (suite_id, component_id),
312 313
                    callback=class_.log_result)
                for architecture in suite.get_architectures(skipsrc=True, skipall=True):
314 315
                    arch_id = architecture.arch_id
                    # handle 'deb' packages
316
                    pool.apply_async(binary_helper, (suite_id, arch_id, deb_id, component_id),
317
                        callback=class_.log_result)
318
                    # handle 'udeb' packages
319
                    pool.apply_async(binary_helper, (suite_id, arch_id, udeb_id, component_id),
320
                        callback=class_.log_result)
321 322
        pool.close()
        pool.join()
323 324
        session.close()

T
Torsten Werner 已提交
325

326
class BinaryContentsScanner(object):
T
Torsten Werner 已提交
327
    '''
328 329
    BinaryContentsScanner provides a threadsafe method scan() to scan the
    contents of a DBBinary object.
T
Torsten Werner 已提交
330
    '''
331

332
    def __init__(self, binary_id):
T
Torsten Werner 已提交
333
        '''
334 335
        The argument binary_id is the id of the DBBinary object that
        should be scanned.
T
Torsten Werner 已提交
336
        '''
337
        self.binary_id = binary_id
T
Torsten Werner 已提交
338

339
    def scan(self, dummy_arg=None):
T
Torsten Werner 已提交
340 341 342 343 344 345 346
        '''
        This method does the actual scan and fills in the associated BinContents
        property. It commits any changes to the database. The argument dummy_arg
        is ignored but needed by our threadpool implementation.
        '''
        session = DBConn().session()
        binary = session.query(DBBinary).get(self.binary_id)
347 348 349 350
        fileset = set(binary.scan_contents())
        if len(fileset) == 0:
            fileset.add('EMPTY_PACKAGE')
        for filename in fileset:
351
            binary.contents.append(BinContents(file=filename))
352
        session.commit()
T
Torsten Werner 已提交
353 354 355
        session.close()

    @classmethod
356
    def scan_all(class_, limit=None):
T
Torsten Werner 已提交
357 358 359
        '''
        The class method scan_all() scans all binaries using multiple threads.
        The number of binaries to be scanned can be limited with the limit
360 361
        argument. Returns the number of processed and remaining packages as a
        dict.
T
Torsten Werner 已提交
362
        '''
363
        pool = DakProcessPool()
T
Torsten Werner 已提交
364
        session = DBConn().session()
365
        query = session.query(DBBinary).filter(DBBinary.contents == None) # noqa:E711
366
        remaining = query.count
T
Torsten Werner 已提交
367 368
        if limit is not None:
            query = query.limit(limit)
369
        processed = query.count()
T
Torsten Werner 已提交
370
        for binary in query.yield_per(100):
371
            pool.apply_async(binary_scan_helper, (binary.binary_id, ))
372 373
        pool.close()
        pool.join()
374
        remaining = remaining()
T
Torsten Werner 已提交
375
        session.close()
B
Bastian Blank 已提交
376
        return {'processed': processed, 'remaining': remaining}
377

378

379
def binary_scan_helper(binary_id):
380 381 382
    '''
    This function runs in a subprocess.
    '''
383 384 385 386 387
    try:
        scanner = BinaryContentsScanner(binary_id)
        scanner.scan()
    except Exception as e:
        print("binary_scan_helper raised an exception: %s" % (e))
388

389

390 391 392 393 394
class UnpackedSource(object):
    '''
    UnpackedSource extracts a source package into a temporary location and
    gives you some convinient function for accessing it.
    '''
395

396
    def __init__(self, dscfilename, tmpbasedir=None):
397 398 399
        '''
        The dscfilename is a name of a DSC file that will be extracted.
        '''
400
        basedir = tmpbasedir if tmpbasedir else Config()['Dir::TempPath']
401
        temp_directory = mkdtemp(dir=basedir)
402
        self.root_directory = os.path.join(temp_directory, 'root')
403 404
        command = ('dpkg-source', '--no-copy', '--no-check', '-q', '-x',
            dscfilename, self.root_directory)
405
        daklib.daksubprocess.check_call(command)
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448

    def get_root_directory(self):
        '''
        Returns the name of the package's root directory which is the directory
        where the debian subdirectory is located.
        '''
        return self.root_directory

    def get_changelog_file(self):
        '''
        Returns a file object for debian/changelog or None if no such file exists.
        '''
        changelog_name = os.path.join(self.root_directory, 'debian', 'changelog')
        try:
            return open(changelog_name)
        except IOError:
            return None

    def get_all_filenames(self):
        '''
        Returns an iterator over all filenames. The filenames will be relative
        to the root directory.
        '''
        skip = len(self.root_directory) + 1
        for root, _, files in os.walk(self.root_directory):
            for name in files:
                yield os.path.join(root[skip:], name)

    def cleanup(self):
        '''
        Removes all temporary files.
        '''
        if self.root_directory is None:
            return
        parent_directory = os.path.dirname(self.root_directory)
        rmtree(parent_directory)
        self.root_directory = None

    def __del__(self):
        '''
        Enforce cleanup.
        '''
        self.cleanup()
449 450 451 452 453 454 455


class SourceContentsScanner(object):
    '''
    SourceContentsScanner provides a method scan() to scan the contents of a
    DBSource object.
    '''
456

457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472
    def __init__(self, source_id):
        '''
        The argument source_id is the id of the DBSource object that
        should be scanned.
        '''
        self.source_id = source_id

    def scan(self):
        '''
        This method does the actual scan and fills in the associated SrcContents
        property. It commits any changes to the database.
        '''
        session = DBConn().session()
        source = session.query(DBSource).get(self.source_id)
        fileset = set(source.scan_contents())
        for filename in fileset:
473
            source.contents.append(SrcContents(file=filename))
474 475 476 477
        session.commit()
        session.close()

    @classmethod
478
    def scan_all(class_, limit=None):
479 480 481 482 483 484
        '''
        The class method scan_all() scans all source using multiple processes.
        The number of sources to be scanned can be limited with the limit
        argument. Returns the number of processed and remaining packages as a
        dict.
        '''
485
        pool = DakProcessPool()
486
        session = DBConn().session()
487
        query = session.query(DBSource).filter(DBSource.contents == None) # noqa:E711
488 489 490 491 492 493 494 495 496 497
        remaining = query.count
        if limit is not None:
            query = query.limit(limit)
        processed = query.count()
        for source in query.yield_per(100):
            pool.apply_async(source_scan_helper, (source.source_id, ))
        pool.close()
        pool.join()
        remaining = remaining()
        session.close()
B
Bastian Blank 已提交
498
        return {'processed': processed, 'remaining': remaining}
499

500

T
Torsten Werner 已提交
501
def source_scan_helper(source_id):
502 503 504
    '''
    This function runs in a subprocess.
    '''
505 506 507
    try:
        scanner = SourceContentsScanner(source_id)
        scanner.scan()
508
    except Exception as e:
509
        print("source_scan_helper raised an exception: %s" % (e))