From 764ecb0e48e01d7a0a76c226f4f5d010f46c6b72 Mon Sep 17 00:00:00 2001 From: Ilja Orlovs Date: Wed, 13 Oct 2021 10:15:44 +0100 Subject: [PATCH] Fixing integration with pytest-flake8 --- src/flake8/formatting/base.py | 14 +++++++++++++- tests/unit/test_base_formatter.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/src/flake8/formatting/base.py b/src/flake8/formatting/base.py index 7919f92..a13746d 100644 --- a/src/flake8/formatting/base.py +++ b/src/flake8/formatting/base.py @@ -179,12 +179,24 @@ class BaseFormatter: # one return f"{error.physical_line}{indent}^" + def _stdoutWriteHook(self, value: str) -> None: + """An alternative to print() function that handles output terminal encoding.""" + try: + sys.stdout.write(value) + except UnicodeEncodeError: + byteValue = value.encode(sys.stdout.encoding, 'backslashreplace') + if hasattr(sys.stdout, 'buffer'): + sys.stdout.buffer.write(byteValue) + else: + sys.stdout.write(byteValue.decode(sys.stdout.encoding, 'strict')) + def _write(self, output: str) -> None: """Handle logic of whether to use an output file or print().""" if self.output_fd is not None: self.output_fd.write(output + self.newline) if self.output_fd is None or self.options.tee: - sys.stdout.buffer.write(output.encode() + self.newline.encode()) + self._stdoutWriteHook(output) + self._stdoutWriteHook(self.newline) def write(self, line: Optional[str], source: Optional[str]) -> None: """Write the line either to the output file or stdout. diff --git a/tests/unit/test_base_formatter.py b/tests/unit/test_base_formatter.py index 8958903..fbe84d8 100644 --- a/tests/unit/test_base_formatter.py +++ b/tests/unit/test_base_formatter.py @@ -136,6 +136,35 @@ def test_write_produces_stdout(capsys): assert capsys.readouterr().out == f"{line}\n{source}\n" +@pytest.mark.parametrize('buffered_stdout', [True, False]) +def test_write_hook_fallbacks(buffered_stdout): + mock_line = mock.Mock(name='Mock Line') + + stdout_spec = ['write', 'encoding'] + if buffered_stdout: + stdout_spec.append('buffer') + + with mock.patch('sys.stdout', spec=stdout_spec) as mock_stdout: + def _stdoutWriteEffect(value): + if value is mock_line: + raise UnicodeEncodeError('unittest-codec', u'', 42, 43, 'NOPE') + return None + mock_stdout.write.side_effect = _stdoutWriteEffect + + mock_stdout.encoding = 'ascii' + + formatter = base.BaseFormatter(options()) + formatter.write(mock_line, None) + + mock_line.encode.assert_called_once_with('ascii', 'backslashreplace') + byte_mock_line = mock_line.encode.return_value + if buffered_stdout: + mock_stdout.buffer.write.assert_any_call(byte_mock_line) + else: + byte_mock_line.decode.assert_called_once_with('ascii', 'strict') + mock_stdout.write.assert_any_call(byte_mock_line.decode.return_value) + + class AfterInitFormatter(base.BaseFormatter): """Subclass for testing after_init."""