"""Tests for the Manager object for FileCheckers.""" import errno import mock import pytest from flake8 import checker def style_guide_mock(**kwargs): kwargs.setdefault('diff', False) kwargs.setdefault('jobs', '4') style_guide = mock.Mock() style_guide.options = mock.Mock(**kwargs) return style_guide def test_oserrors_cause_serial_fall_back(): """Verify that OSErrors will cause the Manager to fallback to serial.""" err = OSError(errno.ENOSPC, 'Ominous message about spaceeeeee') style_guide = style_guide_mock() with mock.patch('multiprocessing.Queue', side_effect=err): manager = checker.Manager(style_guide, [], []) assert manager.using_multiprocessing is False def test_oserrors_are_reraised(): """Verify that OSErrors will cause the Manager to fallback to serial.""" err = OSError(errno.EAGAIN, 'Ominous message') style_guide = style_guide_mock() with mock.patch('multiprocessing.Queue', side_effect=err): with pytest.raises(OSError): checker.Manager(style_guide, [], []) def test_multiprocessing_is_disabled(): """Verify not being able to import multiprocessing forces jobs to 0.""" style_guide = style_guide_mock() with mock.patch('flake8.checker.multiprocessing', None): manager = checker.Manager(style_guide, [], []) assert manager.jobs == 0