1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
import logging
from unittest.mock import MagicMock, patch
import pytest
from bot import api
from tests.helpers import async_test
def test_loop_is_not_running_by_default():
assert not api.loop_is_running()
@async_test
async def test_loop_is_running_in_async_test():
assert api.loop_is_running()
@pytest.fixture()
def error_api_response():
response = MagicMock()
response.status = 999
return response
@pytest.fixture()
def api_log_handler():
return api.APILoggingHandler(None)
@pytest.fixture()
def debug_log_record():
return logging.LogRecord(
name='my.logger', level=logging.DEBUG,
pathname='my/logger.py', lineno=666,
msg="Lemon wins", args=(),
exc_info=None
)
def test_response_code_error_default_initialization(error_api_response):
error = api.ResponseCodeError(response=error_api_response)
assert error.status is error_api_response.status
assert not error.response_json
assert not error.response_text
assert error.response is error_api_response
def test_response_code_error_default_representation(error_api_response):
error = api.ResponseCodeError(response=error_api_response)
assert str(error) == f"Status: {error_api_response.status} Response: "
def test_response_code_error_representation_with_nonempty_response_json(error_api_response):
error = api.ResponseCodeError(
response=error_api_response,
response_json={'hello': 'world'}
)
assert str(error) == f"Status: {error_api_response.status} Response: {{'hello': 'world'}}"
def test_response_code_error_representation_with_nonempty_response_text(error_api_response):
error = api.ResponseCodeError(
response=error_api_response,
response_text='Lemon will eat your soul'
)
assert str(error) == f"Status: {error_api_response.status} Response: Lemon will eat your soul"
@patch('bot.api.APILoggingHandler.ship_off')
def test_emit_appends_to_queue_with_stopped_event_loop(
ship_off_patch, api_log_handler, debug_log_record
):
# This is a coroutine so returns something we should await,
# but asyncio complains about that. To ease testing, we patch
# `ship_off` to just return a regular value instead.
ship_off_patch.return_value = 42
api_log_handler.emit(debug_log_record)
assert api_log_handler.queue == [42]
def test_emit_ignores_less_than_debug(debug_log_record, api_log_handler):
debug_log_record.levelno = logging.DEBUG - 5
api_log_handler.emit(debug_log_record)
assert not api_log_handler.queue
def test_schedule_queued_tasks_for_empty_queue(api_log_handler, caplog):
api_log_handler.schedule_queued_tasks()
# Logs when tasks are scheduled
assert not caplog.records
@patch('asyncio.create_task')
def test_schedule_queued_tasks_for_nonempty_queue(create_task_patch, api_log_handler, caplog):
api_log_handler.queue = [555]
api_log_handler.schedule_queued_tasks()
assert not api_log_handler.queue
create_task_patch.assert_called_once_with(555)
[record] = caplog.records
assert record.message == "Scheduled 1 pending logging tasks."
assert record.levelno == logging.DEBUG
assert record.name == 'bot.api'
assert record.__dict__['via_handler']
|