From 94b59d327ed0c29d09a7f3c7cf2652999f8644d0 Mon Sep 17 00:00:00 2001 From: Florent Xicluna Date: Sun, 27 Apr 2014 00:22:50 +0200 Subject: [PATCH] New option --jobs to spawn multiple processes in parallel; closes #146 --- CHANGES.rst | 1 + flake8/engine.py | 14 +++++++++ flake8/reporter.py | 60 +++++++++++++++++++++++++++++++++++++ flake8/tests/test_engine.py | 1 + 4 files changed, 76 insertions(+) create mode 100644 flake8/reporter.py diff --git a/CHANGES.rst b/CHANGES.rst index b057c60..a509b87 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -5,6 +5,7 @@ CHANGES ------------------ - New option ``doctests`` to run Pyflakes checks on doctests too +- New option ``jobs`` to launch multiple jobs in parallel - Fix Git and Mercurial hooks, issues #88 and #133 - Fix crashes with Python 3.4 by upgrading dependencies - Fix traceback when running tests with Python 2.6 diff --git a/flake8/engine.py b/flake8/engine.py index 3bc6810..13bfa12 100644 --- a/flake8/engine.py +++ b/flake8/engine.py @@ -5,6 +5,7 @@ import platform import pep8 from flake8 import __version__ +from flake8.reporter import multiprocessing, BaseQReport, QueueReport from flake8.util import OrderedSet _flake8_noqa = re.compile(r'flake8[:=]\s*noqa', re.I).search @@ -47,6 +48,12 @@ def get_parser(): parser.remove_option(opt) except ValueError: pass + + if multiprocessing: + parser.config_options.append('jobs') + parser.add_option('-j', '--jobs', type='int', default=1, + help="number of jobs to run simultaneously") + parser.add_option('--exit-zero', action='store_true', help="exit with code 0 even if there are errors") for parser_hook in parser_hooks: @@ -79,6 +86,13 @@ def get_style_guide(**kwargs): options = styleguide.options for options_hook in options_hooks: options_hook(options) + + if multiprocessing and options.jobs > 1: + reporter = BaseQReport if options.quiet else QueueReport + report = styleguide.init_report(reporter) + report.input_file = styleguide.input_file + styleguide.runner = report.task_queue.put + return styleguide diff --git a/flake8/reporter.py b/flake8/reporter.py new file mode 100644 index 0000000..ef94395 --- /dev/null +++ b/flake8/reporter.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- +# Adapted from a contribution of Johan Dahlin + +import collections +try: + import multiprocessing +except ImportError: # Python 2.5 + multiprocessing = None + +import pep8 + +__all__ = ['multiprocessing', 'BaseQReport', 'QueueReport'] + + +class BaseQReport(pep8.BaseReport): + """Base Queue Report.""" + + def __init__(self, options): + assert options.jobs > 0 + super(BaseQReport, self).__init__(options) + self.counters = collections.defaultdict(int) + self.n_jobs = options.jobs + + # init queues + self.task_queue = multiprocessing.Queue() + self.result_queue = multiprocessing.Queue() + + def start(self): + super(BaseQReport, self).start() + # spawn processes + for i in range(self.n_jobs): + p = multiprocessing.Process(target=self.process_main) + p.start() + + def stop(self): + # collect queues + for i in range(self.n_jobs): + self.task_queue.put('DONE') + self.update_state(self.result_queue.get()) + super(BaseQReport, self).stop() + + def process_main(self): + for filename in iter(self.task_queue.get, 'DONE'): + self.input_file(filename) + self.result_queue.put(self.get_state()) + + def get_state(self): + return {'total_errors': self.total_errors, + 'counters': self.counters, + 'messages': self.messages} + + def update_state(self, state): + self.total_errors += state['total_errors'] + for key, value in state['counters'].items(): + self.counters[key] += value + self.messages.update(state['messages']) + + +class QueueReport(pep8.StandardReport, BaseQReport): + """Standard Queue Report.""" diff --git a/flake8/tests/test_engine.py b/flake8/tests/test_engine.py index 67cba5a..1032bd1 100644 --- a/flake8/tests/test_engine.py +++ b/flake8/tests/test_engine.py @@ -34,6 +34,7 @@ class TestEngine(unittest.TestCase): m = mock.Mock() with mock.patch('flake8.engine.StyleGuide') as StyleGuide: with mock.patch('flake8.engine.get_parser') as get_parser: + StyleGuide.return_value.options.jobs = 42 get_parser.return_value = (m, []) engine.get_style_guide(foo='bar') get_parser.assert_called_once_with()