aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar Joseph Banks <[email protected]>2019-10-01 17:31:59 +0100
committerGravatar Joseph Banks <[email protected]>2019-10-01 17:31:59 +0100
commite5b724ec617f3ca82f6491329fca2cd17e103bdb (patch)
treee24d61c0b7b5ccb4ff08e596ca67fb61acb4367f
parentMake tests work with Union converter (diff)
parentMerge branch 'master' into add-role-info-command (diff)
Merge branch 'add-role-info-command' of github.com:python-discord/bot into add-role-info-command
-rw-r--r--.gitignore1
-rw-r--r--CONTRIBUTING.md6
-rw-r--r--Dockerfile31
-rw-r--r--Pipfile41
-rw-r--r--Pipfile.lock120
-rw-r--r--README.md12
-rw-r--r--azure-pipelines.yml122
-rw-r--r--bot/__init__.py15
-rw-r--r--bot/api.py61
-rw-r--r--bot/cogs/alias.py145
-rw-r--r--bot/cogs/antispam.py10
-rw-r--r--bot/cogs/bot.py71
-rw-r--r--bot/cogs/clean.py97
-rw-r--r--bot/cogs/cogs.py50
-rw-r--r--bot/cogs/defcon.py187
-rw-r--r--bot/cogs/doc.py133
-rw-r--r--bot/cogs/error_handler.py31
-rw-r--r--bot/cogs/eval.py24
-rw-r--r--bot/cogs/filtering.py111
-rw-r--r--bot/cogs/free.py18
-rw-r--r--bot/cogs/help.py405
-rw-r--r--bot/cogs/information.py34
-rw-r--r--bot/cogs/jams.py15
-rw-r--r--bot/cogs/logging.py10
-rw-r--r--bot/cogs/moderation.py603
-rw-r--r--bot/cogs/modlog.py69
-rw-r--r--bot/cogs/off_topic_names.py53
-rw-r--r--bot/cogs/reddit.py78
-rw-r--r--bot/cogs/reminders.py109
-rw-r--r--bot/cogs/security.py13
-rw-r--r--bot/cogs/site.py28
-rw-r--r--bot/cogs/snekbox.py21
-rw-r--r--bot/cogs/superstarify/__init__.py71
-rw-r--r--bot/cogs/superstarify/stars.py3
-rw-r--r--bot/cogs/sync/__init__.py5
-rw-r--r--bot/cogs/sync/cog.py2
-rw-r--r--bot/cogs/sync/syncers.py7
-rw-r--r--bot/cogs/tags.py55
-rw-r--r--bot/cogs/token_remover.py21
-rw-r--r--bot/cogs/utils.py37
-rw-r--r--bot/cogs/verification.py55
-rw-r--r--bot/cogs/watchchannels/__init__.py5
-rw-r--r--bot/cogs/watchchannels/bigbrother.py4
-rw-r--r--bot/cogs/watchchannels/talentpool.py11
-rw-r--r--bot/cogs/watchchannels/watchchannel.py33
-rw-r--r--bot/cogs/wolfram.py70
-rw-r--r--bot/constants.py6
-rw-r--r--bot/converters.py121
-rw-r--r--bot/decorators.py57
-rw-r--r--bot/interpreter.py14
-rw-r--r--bot/pagination.py169
-rw-r--r--bot/patches/__init__.py2
-rw-r--r--bot/patches/message_edited_at.py6
-rw-r--r--bot/rules/attachments.py8
-rw-r--r--bot/rules/burst.py8
-rw-r--r--bot/rules/burst_shared.py8
-rw-r--r--bot/rules/chars.py8
-rw-r--r--bot/rules/discord_emojis.py8
-rw-r--r--bot/rules/duplicates.py8
-rw-r--r--bot/rules/links.py8
-rw-r--r--bot/rules/mentions.py8
-rw-r--r--bot/rules/newlines.py8
-rw-r--r--bot/rules/role_mentions.py8
-rw-r--r--bot/utils/__init__.py37
-rw-r--r--bot/utils/checks.py18
-rw-r--r--bot/utils/messages.py63
-rw-r--r--bot/utils/scheduling.py43
-rw-r--r--bot/utils/time.py51
-rw-r--r--config-default.yml5
-rw-r--r--docker-compose.yml44
-rw-r--r--docker/ci.Dockerfile20
-rw-r--r--scripts/deploy-azure.sh12
-rw-r--r--tests/test_converters.py123
-rw-r--r--tox.ini19
74 files changed, 1646 insertions, 2347 deletions
diff --git a/.gitignore b/.gitignore
index cda3aeb9f..261fa179f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -20,7 +20,6 @@ lib64/
parts/
sdist/
var/
-wheels/
*.egg-info/
.installed.cfg
*.egg
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index a0a1200ec..39f76c7b4 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -36,9 +36,9 @@ All projects evolve over time, and this contribution guide is no different. This
## Supplemental Information
### Developer Environment
-A working environment for the [PyDis site](https://github.com/python-discord/site) is required to develop the bot. Instructions for setting up environments for both the site and the bot can be found on the PyDis Wiki:
- * [Site](https://wiki.pythondiscord.com/wiki/contributing/project/site)
- * [Bot](https://wiki.pythondiscord.com/wiki/contributing/project/bot)
+Instructions for setting the bot developer environment can be found on the [PyDis wiki](https://pythondiscord.com/pages/contributing/bot/)
+
+To provide a standalone development environment for this project, docker compose is utilized to pull the current version of the [site backend](https://github.com/python-discord/site). While appropriate for bot-only contributions, any contributions that necessitate backend changes will require the site repository to be appropriately configured as well. Instructions for setting up the site environment can be found on the [PyDis site](https://pythondiscord.com/pages/contributing/site/).
When pulling down changes from GitHub, remember to sync your environment using `pipenv sync --dev` to ensure you're using the most up-to-date versions the project's dependencies.
diff --git a/Dockerfile b/Dockerfile
index aa6333380..271c25050 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,27 +1,20 @@
-FROM python:3.7-alpine3.7
+FROM python:3.7-slim
-RUN apk add --no-cache \
- build-base \
- freetype-dev \
- git \
- jpeg-dev \
- libffi-dev \
- libxml2 \
- libxml2-dev \
- libxslt-dev \
- tini \
- zlib \
- zlib-dev
-
-ENV \
- LIBRARY_PATH=/lib:/usr/lib
+# Set pip to have cleaner logs and no saved cache
+ENV PIP_NO_CACHE_DIR=false \
+ PIPENV_HIDE_EMOJIS=1 \
+ PIPENV_IGNORE_VIRTUALENVS=1 \
+ PIPENV_NOSPIN=1
+# Install pipenv
RUN pip install -U pipenv
+# Copy project files into working directory
WORKDIR /bot
COPY . .
-RUN pipenv install --deploy --system
+# Install project dependencies
+RUN pipenv install --system --deploy
-ENTRYPOINT ["/sbin/tini", "--"]
-CMD ["python3", "-m", "bot"]
+ENTRYPOINT ["python3"]
+CMD ["-m", "bot"]
diff --git a/Pipfile b/Pipfile
index 273db04d2..82847b23f 100644
--- a/Pipfile
+++ b/Pipfile
@@ -5,34 +5,35 @@ name = "pypi"
[packages]
discord-py = "~=1.2"
-aiodns = "*"
-logmatic-python = "*"
-aiohttp = "*"
-sphinx = "*"
-markdownify = "*"
-lxml = "*"
-pyyaml = "*"
-fuzzywuzzy = "*"
-aio-pika = "*"
-python-dateutil = "*"
-deepdiff = "*"
-requests = "*"
-dateparser = "*"
+aiodns = "~=2.0"
+logmatic-python = "~=0.1"
+aiohttp = "~=3.5"
+sphinx = "~=2.2"
+markdownify = "~=0.4"
+lxml = "~=4.4"
+pyyaml = "~=5.1"
+fuzzywuzzy = "~=0.17"
+aio-pika = "~=6.1"
+python-dateutil = "~=2.8"
+deepdiff = "~=4.0"
+requests = "~=2.22"
more_itertools = "~=7.2"
urllib3 = ">=1.24.2,<1.25"
[dev-packages]
flake8 = "~=3.7"
+flake8-annotations = "~=1.1"
flake8-bugbear = "~=19.8"
+flake8-docstrings = "~=1.4"
flake8-import-order = "~=0.18"
flake8-string-format = "~=0.2"
flake8-tidy-imports = "~=2.0"
flake8-todo = "~=0.7"
pre-commit = "~=1.18"
-safety = "*"
-dodgy = "*"
-pytest = "*"
-pytest-cov = "*"
+safety = "~=1.8"
+dodgy = "~=0.1"
+pytest = "~=5.1"
+pytest-cov = "~=2.7"
[requires]
python_version = "3.7"
@@ -41,9 +42,5 @@ python_version = "3.7"
start = "python -m bot"
lint = "python -m flake8"
precommit = "pre-commit install"
-build = "docker build -t pythondiscord/bot:latest -f docker/bot.Dockerfile ."
+build = "docker build -t pythondiscord/bot:latest -f Dockerfile ."
push = "docker push pythondiscord/bot:latest"
-buildbase = "docker build -t pythondiscord/bot-base:latest -f docker/base.Dockerfile ."
-pushbase = "docker push pythondiscord/bot-base:latest"
-buildci = "docker build -t pythondiscord/bot-ci:latest -f docker/ci.Dockerfile ."
-pushci = "docker push pythondiscord/bot-ci:latest"
diff --git a/Pipfile.lock b/Pipfile.lock
index e5978198f..4e6b4eaf8 100644
--- a/Pipfile.lock
+++ b/Pipfile.lock
@@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
- "sha256": "f21c27a5c4493b65a36a78721c2cb597c3eed7fcbd28f3bf731453f2c3cccb56"
+ "sha256": "c2537cc3c5b0886d0b38f9b48f4f4b93e1e74d925454aa71a2189bddedadde42"
},
"pipfile-spec": 6,
"requires": {
@@ -62,10 +62,10 @@
},
"aiormq": {
"hashes": [
- "sha256:0b755b748d87d5ec63b4b7f162102333bf0901caf1f8a2bf29467bbdd54e637d",
- "sha256:f8eef1f98bc331a266404d925745fac589dab30412688564d740754d6d643863"
+ "sha256:c3e4dd01a2948a75f739fb637334dbb8c6f1a4cecf74d5ed662dc3bab7f39973",
+ "sha256:e220d3f9477bb2959b729b79bec815148ddb8a7686fc6c3d05d41c88ebd7c59e"
],
- "version": "==2.7.5"
+ "version": "==2.8.0"
},
"alabaster": {
"hashes": [
@@ -150,13 +150,12 @@
],
"version": "==3.0.4"
},
- "dateparser": {
+ "colorama": {
"hashes": [
- "sha256:983d84b5e3861cb0aa240cad07f12899bb10b62328aae188b9007e04ce37d665",
- "sha256:e1eac8ef28de69a554d5fcdb60b172d526d61924b1a40afbbb08df459a36006b"
+ "sha256:05eed71e2e327246ad6b38c540c4a3117230b19679b875190486ddd2d721422d",
+ "sha256:f8ac84de7840f5b9c4e3347b3c1eaa50f7e49c2b07596221daec5edaabbd7c48"
],
- "index": "pypi",
- "version": "==0.7.2"
+ "version": "==0.4.1"
},
"deepdiff": {
"hashes": [
@@ -342,10 +341,10 @@
},
"packaging": {
"hashes": [
- "sha256:a7ac867b97fdc07ee80a8058fe4435ccd274ecc3b0ed61d852d7d53055528cf9",
- "sha256:c491ca87294da7cc01902edbe30a5bc6c4c28172b5138ab4e4aa1b9d7bfaeafe"
+ "sha256:28b924174df7a2fa32c1953825ff29c61e2f5e082343165438812f00d3a7fc47",
+ "sha256:d9551545c6d761f3def1677baf08ab2a3ca17c56879e70fecba2fc4dde4ed108"
],
- "version": "==19.1"
+ "version": "==19.2"
},
"pamqp": {
"hashes": [
@@ -432,22 +431,6 @@
"index": "pypi",
"version": "==5.1.2"
},
- "regex": {
- "hashes": [
- "sha256:1e9f9bc44ca195baf0040b1938e6801d2f3409661c15fe57f8164c678cfc663f",
- "sha256:587b62d48ca359d2d4f02d486f1f0aa9a20fbaf23a9d4198c4bed72ab2f6c849",
- "sha256:835ccdcdc612821edf132c20aef3eaaecfb884c9454fdc480d5887562594ac61",
- "sha256:93f6c9da57e704e128d90736430c5c59dd733327882b371b0cae8833106c2a21",
- "sha256:a46f27d267665016acb3ec8c6046ec5eae8cf80befe85ba47f43c6f5ec636dcd",
- "sha256:c5c8999b3a341b21ac2c6ec704cfcccbc50f1fedd61b6a8ee915ca7fd4b0a557",
- "sha256:d4d1829cf97632673aa49f378b0a2c3925acd795148c5ace8ef854217abbee89",
- "sha256:d96479257e8e4d1d7800adb26bf9c5ca5bab1648a1eddcac84d107b73dc68327",
- "sha256:f20f4912daf443220436759858f96fefbfc6c6ba9e67835fd6e4e9b73582791a",
- "sha256:f2b37b5b2c2a9d56d9e88efef200ec09c36c7f323f9d58d0b985a90923df386d",
- "sha256:fe765b809a1f7ce642c2edeee351e7ebd84391640031ba4b60af8d91a9045890"
- ],
- "version": "==2019.8.19"
- },
"requests": {
"hashes": [
"sha256:11e007a8a2aa0323f5a921e9e6a2d7e4e67d9877e85773fba9ba6419025cbeb4",
@@ -471,10 +454,10 @@
},
"soupsieve": {
"hashes": [
- "sha256:8662843366b8d8779dec4e2f921bebec9afd856a5ff2e82cd419acc5054a1a92",
- "sha256:a5a6166b4767725fd52ae55fee8c8b6137d9a51e9f1edea461a062a759160118"
+ "sha256:605f89ad5fdbfefe30cdc293303665eff2d188865d4dbe4eb510bba1edfbfce3",
+ "sha256:b91d676b330a0ebd5b21719cb6e9b57c57d433671f65b9c28dd3461d9a1ed0b6"
],
- "version": "==1.9.3"
+ "version": "==1.9.4"
},
"sphinx": {
"hashes": [
@@ -526,13 +509,6 @@
],
"version": "==1.1.3"
},
- "tzlocal": {
- "hashes": [
- "sha256:11c9f16e0a633b4b60e1eede97d8a46340d042e67b670b290ca526576e039048",
- "sha256:949b9dd5ba4be17190a80c0268167d7e6c92c62b30026cf9764caf3e308e5590"
- ],
- "version": "==2.0.0"
- },
"urllib3": {
"hashes": [
"sha256:2393a695cd12afedd0dcb26fe5d50d0cf248e5a66f75dbd89a3d4eb333a61af4",
@@ -634,6 +610,13 @@
],
"version": "==7.0"
},
+ "colorama": {
+ "hashes": [
+ "sha256:05eed71e2e327246ad6b38c540c4a3117230b19679b875190486ddd2d721422d",
+ "sha256:f8ac84de7840f5b9c4e3347b3c1eaa50f7e49c2b07596221daec5edaabbd7c48"
+ ],
+ "version": "==0.4.1"
+ },
"coverage": {
"hashes": [
"sha256:08907593569fe59baca0bf152c43f3863201efb6113ecb38ce7e97ce339805a6",
@@ -700,6 +683,14 @@
"index": "pypi",
"version": "==3.7.8"
},
+ "flake8-annotations": {
+ "hashes": [
+ "sha256:6ac7ca1e706307686b60af8043ff1db31dc2cfc1233c8210d67a3d9b8f364736",
+ "sha256:b51131007000d67217608fa028a35ff80aa400b474e5972f1f99c2cf9d26bd2e"
+ ],
+ "index": "pypi",
+ "version": "==1.1.0"
+ },
"flake8-bugbear": {
"hashes": [
"sha256:d8c466ea79d5020cb20bf9f11cf349026e09517a42264f313d3f6fddb83e0571",
@@ -708,6 +699,14 @@
"index": "pypi",
"version": "==19.8.0"
},
+ "flake8-docstrings": {
+ "hashes": [
+ "sha256:1666dd069c9c457ee57e80af3c1a6b37b00cc1801c6fde88e455131bb2e186cd",
+ "sha256:9c0db5a79a1affd70fdf53b8765c8a26bf968e59e0252d7f2fc546b41c0cda06"
+ ],
+ "index": "pypi",
+ "version": "==1.4.0"
+ },
"flake8-import-order": {
"hashes": [
"sha256:90a80e46886259b9c396b578d75c749801a41ee969a235e163cfe1be7afd2543",
@@ -784,10 +783,10 @@
},
"packaging": {
"hashes": [
- "sha256:a7ac867b97fdc07ee80a8058fe4435ccd274ecc3b0ed61d852d7d53055528cf9",
- "sha256:c491ca87294da7cc01902edbe30a5bc6c4c28172b5138ab4e4aa1b9d7bfaeafe"
+ "sha256:28b924174df7a2fa32c1953825ff29c61e2f5e082343165438812f00d3a7fc47",
+ "sha256:d9551545c6d761f3def1677baf08ab2a3ca17c56879e70fecba2fc4dde4ed108"
],
- "version": "==19.1"
+ "version": "==19.2"
},
"pluggy": {
"hashes": [
@@ -818,6 +817,13 @@
],
"version": "==2.5.0"
},
+ "pydocstyle": {
+ "hashes": [
+ "sha256:04c84e034ebb56eb6396c820442b8c4499ac5eb94a3bda88951ac3dc519b6058",
+ "sha256:66aff87ffe34b1e49bff2dd03a88ce6843be2f3346b0c9814410d34987fbab59"
+ ],
+ "version": "==4.0.1"
+ },
"pyflakes": {
"hashes": [
"sha256:17dbeb2e3f4d772725c777fabc446d5634d1038f234e77343108ce445ea69ce0",
@@ -834,11 +840,11 @@
},
"pytest": {
"hashes": [
- "sha256:95d13143cc14174ca1a01ec68e84d76ba5d9d493ac02716fd9706c949a505210",
- "sha256:b78fe2881323bd44fd9bd76e5317173d4316577e7b1cddebae9136a4495ec865"
+ "sha256:813b99704b22c7d377bbd756ebe56c35252bb710937b46f207100e843440b3c2",
+ "sha256:cc6620b96bc667a0c8d4fa592a8c9c94178a1bd6cc799dbb057dfd9286d31a31"
],
"index": "pypi",
- "version": "==5.1.2"
+ "version": "==5.1.3"
},
"pytest-cov": {
"hashes": [
@@ -890,6 +896,12 @@
],
"version": "==1.12.0"
},
+ "snowballstemmer": {
+ "hashes": [
+ "sha256:713e53b79cbcf97bc5245a06080a33d54a77e7cce2f789c835a143bcdb5c033e"
+ ],
+ "version": "==1.9.1"
+ },
"toml": {
"hashes": [
"sha256:229f81c57791a41d65e399fc06bf0848bab550a9dfd5ed66df18ce5f05e73d5c",
@@ -897,6 +909,26 @@
],
"version": "==0.10.0"
},
+ "typed-ast": {
+ "hashes": [
+ "sha256:18511a0b3e7922276346bcb47e2ef9f38fb90fd31cb9223eed42c85d1312344e",
+ "sha256:262c247a82d005e43b5b7f69aff746370538e176131c32dda9cb0f324d27141e",
+ "sha256:2b907eb046d049bcd9892e3076c7a6456c93a25bebfe554e931620c90e6a25b0",
+ "sha256:354c16e5babd09f5cb0ee000d54cfa38401d8b8891eefa878ac772f827181a3c",
+ "sha256:4e0b70c6fc4d010f8107726af5fd37921b666f5b31d9331f0bd24ad9a088e631",
+ "sha256:630968c5cdee51a11c05a30453f8cd65e0cc1d2ad0d9192819df9978984529f4",
+ "sha256:66480f95b8167c9c5c5c87f32cf437d585937970f3fc24386f313a4c97b44e34",
+ "sha256:71211d26ffd12d63a83e079ff258ac9d56a1376a25bc80b1cdcdf601b855b90b",
+ "sha256:95bd11af7eafc16e829af2d3df510cecfd4387f6453355188342c3e79a2ec87a",
+ "sha256:bc6c7d3fa1325a0c6613512a093bc2a2a15aeec350451cbdf9e1d4bffe3e3233",
+ "sha256:cc34a6f5b426748a507dd5d1de4c1978f2eb5626d51326e43280941206c209e1",
+ "sha256:d755f03c1e4a51e9b24d899561fec4ccaf51f210d52abdf8c07ee2849b212a36",
+ "sha256:d7c45933b1bdfaf9f36c579671fec15d25b06c8398f113dab64c18ed1adda01d",
+ "sha256:d896919306dd0aa22d0132f62a1b78d11aaf4c9fc5b3410d3c666b818191630a",
+ "sha256:ffde2fbfad571af120fcbfbbc61c72469e72f550d676c3342492a9dfdefb8f12"
+ ],
+ "version": "==1.4.0"
+ },
"urllib3": {
"hashes": [
"sha256:2393a695cd12afedd0dcb26fe5d50d0cf248e5a66f75dbd89a3d4eb333a61af4",
diff --git a/README.md b/README.md
index 1c9e52b71..7a7f1b992 100644
--- a/README.md
+++ b/README.md
@@ -1,7 +1,13 @@
-# Python Utility Bot
+# Python Utility Bot
-[![Build Status](https://dev.azure.com/python-discord/Python%20Discord/_apis/build/status/Bot%20(Mainline))](https://dev.azure.com/python-discord/Python%20Discord/_build/latest?definitionId=1)
-[![Discord](https://discordapp.com/api/guilds/267624335836053506/embed.png)](https://discord.gg/2B963hn)
+[![Discord](https://img.shields.io/discord/267624335836053506?color=%237289DA&label=Python%20Discord&logo=discord&logoColor=white)](https://discord.gg/2B963hn)
+[![Build Status](https://dev.azure.com/python-discord/Python%20Discord/_apis/build/status/Bot?branchName=master)](https://dev.azure.com/python-discord/Python%20Discord/_build/latest?definitionId=1&branchName=master)
+[![Tests](https://img.shields.io/azure-devops/tests/python-discord/Python%20Discord/1?compact_message)](https://dev.azure.com/python-discord/Python%20Discord/_apis/build/status/Bot?branchName=master)
+[![Coverage](https://img.shields.io/azure-devops/coverage/python-discord/Python%20Discord/1/master)](https://dev.azure.com/python-discord/Python%20Discord/_apis/build/status/Bot?branchName=master)
+[![License](https://img.shields.io/github/license/python-discord/bot)](LICENSE)
+[![Website](https://img.shields.io/badge/website-visit-brightgreen)](https://pythondiscord.com)
This project is a Discord bot specifically for use with the Python Discord server. It provides numerous utilities
and other tools to help keep the server running like a well-oiled machine.
+
+Read the [Contributing Guide](https://pythondiscord.com/pages/contributing/bot/) on our website if you're interested in helping out.
diff --git a/azure-pipelines.yml b/azure-pipelines.yml
index 4dcad685c..c22bac089 100644
--- a/azure-pipelines.yml
+++ b/azure-pipelines.yml
@@ -6,69 +6,59 @@ variables:
PIPENV_NOSPIN: 1
jobs:
-- job: test
- displayName: 'Lint & Test'
-
- pool:
- vmImage: ubuntu-16.04
-
- variables:
- PIPENV_CACHE_DIR: ".cache/pipenv"
- PIP_CACHE_DIR: ".cache/pip"
- PIP_SRC: ".cache/src"
-
- steps:
- - script: |
- sudo apt-get update
- sudo apt-get install build-essential curl docker libffi-dev libfreetype6-dev libxml2 libxml2-dev libxslt1-dev zlib1g zlib1g-dev
- displayName: 'Install base dependencies'
-
- - task: UsePythonVersion@0
- displayName: 'Set Python version'
- inputs:
- versionSpec: '3.7.x'
- addToPath: true
-
- - script: sudo pip install pipenv
- displayName: 'Install pipenv'
-
- - script: pipenv install --dev --deploy --system
- displayName: 'Install project using pipenv'
-
- - script: python -m flake8
- displayName: 'Run linter'
-
- - script: BOT_API_KEY=foo BOT_TOKEN=bar WOLFRAM_API_KEY=baz python -m pytest --junitxml=junit.xml --cov=bot --cov-branch --cov-report=term --cov-report=xml tests
- displayName: Run tests
-
- - task: PublishCodeCoverageResults@1
- displayName: 'Publish Coverage Results'
- condition: succeededOrFailed()
- inputs:
- codeCoverageTool: Cobertura
- summaryFileLocation: coverage.xml
-
- - task: PublishTestResults@2
- displayName: 'Publish Test Results'
- condition: succeededOrFailed()
- inputs:
- testResultsFiles: junit.xml
- testRunTitle: 'Bot Test results'
-
-- job: build
- displayName: 'Build Containers'
- dependsOn: 'test'
-
- steps:
- - task: Docker@1
- displayName: 'Login: Docker Hub'
-
- inputs:
- containerregistrytype: 'Container Registry'
- dockerRegistryEndpoint: 'DockerHub'
- command: 'login'
-
- - task: ShellScript@2
- displayName: 'Build and deploy containers'
- inputs:
- scriptPath: scripts/deploy-azure.sh
+ - job: test
+ displayName: 'Lint & Test'
+ pool:
+ vmImage: ubuntu-16.04
+
+ variables:
+ PIP_CACHE_DIR: ".cache/pip"
+
+ steps:
+ - task: UsePythonVersion@0
+ displayName: 'Set Python version'
+ inputs:
+ versionSpec: '3.7.x'
+ addToPath: true
+
+ - script: pip install pipenv
+ displayName: 'Install pipenv'
+
+ - script: pipenv install --dev --deploy --system
+ displayName: 'Install project using pipenv'
+
+ - script: python -m flake8
+ displayName: 'Run linter'
+
+ - script: BOT_API_KEY=foo BOT_TOKEN=bar WOLFRAM_API_KEY=baz python -m pytest --junitxml=junit.xml --cov=bot --cov-branch --cov-report=term --cov-report=xml tests
+ displayName: Run tests
+
+ - task: PublishCodeCoverageResults@1
+ displayName: 'Publish Coverage Results'
+ condition: succeededOrFailed()
+ inputs:
+ codeCoverageTool: Cobertura
+ summaryFileLocation: coverage.xml
+
+ - task: PublishTestResults@2
+ displayName: 'Publish Test Results'
+ condition: succeededOrFailed()
+ inputs:
+ testResultsFiles: junit.xml
+ testRunTitle: 'Bot Test results'
+
+ - job: build
+ displayName: 'Build & Push Container'
+ dependsOn: 'test'
+ condition: and(succeeded(), ne(variables['Build.Reason'], 'PullRequest'), eq(variables['Build.SourceBranch'], 'refs/heads/master'))
+
+ steps:
+ - task: Docker@2
+ displayName: 'Build & Push Container'
+ inputs:
+ containerRegistry: 'DockerHub'
+ repository: 'pythondiscord/bot'
+ command: 'buildAndPush'
+ Dockerfile: 'Dockerfile'
+ buildContext: '.'
+ tags: 'latest'
diff --git a/bot/__init__.py b/bot/__init__.py
index 8efa5e53c..4a2df730d 100644
--- a/bot/__init__.py
+++ b/bot/__init__.py
@@ -2,6 +2,7 @@ import logging
import os
import sys
from logging import Logger, StreamHandler, handlers
+from pathlib import Path
from logmatic import JsonFormatter
@@ -9,7 +10,7 @@ logging.TRACE = 5
logging.addLevelName(logging.TRACE, "TRACE")
-def monkeypatch_trace(self, msg, *args, **kwargs):
+def monkeypatch_trace(self: logging.Logger, msg: str, *args, **kwargs) -> None:
"""
Log 'msg % args' with severity 'TRACE'.
@@ -30,22 +31,20 @@ logging_handlers = []
# We can't import this yet, so we have to define it ourselves
DEBUG_MODE = True if 'local' in os.environ.get("SITE_URL", "local") else False
+LOG_DIR = Path("logs")
+LOG_DIR.mkdir(exist_ok=True)
if DEBUG_MODE:
logging_handlers.append(StreamHandler(stream=sys.stdout))
- json_handler = logging.FileHandler(filename="log.json", mode="w")
+ json_handler = logging.FileHandler(filename=Path(LOG_DIR, "log.json"), mode="w")
json_handler.formatter = JsonFormatter()
logging_handlers.append(json_handler)
else:
- logdir = "log"
- logfile = logdir+os.sep+"bot.log"
+ logfile = Path(LOG_DIR, "bot.log")
megabyte = 1048576
- if not os.path.exists(logdir):
- os.makedirs(logdir)
-
filehandler = handlers.RotatingFileHandler(logfile, maxBytes=(megabyte*5), backupCount=7)
logging_handlers.append(filehandler)
@@ -55,7 +54,7 @@ else:
logging.basicConfig(
- format="%(asctime)s pd.beardfist.com Bot: | %(name)33s | %(levelname)8s | %(message)s",
+ format="%(asctime)s Bot: | %(name)33s | %(levelname)8s | %(message)s",
datefmt="%b %d %H:%M:%S",
level=logging.TRACE if DEBUG_MODE else logging.INFO,
handlers=logging_handlers
diff --git a/bot/api.py b/bot/api.py
index 3acde242e..7f26e5305 100644
--- a/bot/api.py
+++ b/bot/api.py
@@ -11,6 +11,8 @@ log = logging.getLogger(__name__)
class ResponseCodeError(ValueError):
+ """Raised when a non-OK HTTP response is received."""
+
def __init__(
self,
response: aiohttp.ClientResponse,
@@ -28,6 +30,8 @@ class ResponseCodeError(ValueError):
class APIClient:
+ """Django Site API wrapper."""
+
def __init__(self, **kwargs):
auth_headers = {
'Authorization': f"Token {Keys.site_api}"
@@ -41,10 +45,11 @@ class APIClient:
self.session = aiohttp.ClientSession(**kwargs)
@staticmethod
- def _url_for(endpoint: str):
+ def _url_for(endpoint: str) -> str:
return f"{URLs.site_schema}{URLs.site_api}/{quote_url(endpoint)}"
- async def maybe_raise_for_status(self, response: aiohttp.ClientResponse, should_raise: bool):
+ async def maybe_raise_for_status(self, response: aiohttp.ClientResponse, should_raise: bool) -> None:
+ """Raise ResponseCodeError for non-OK response if an exception should be raised."""
if should_raise and response.status >= 400:
try:
response_json = await response.json()
@@ -53,27 +58,32 @@ class APIClient:
response_text = await response.text()
raise ResponseCodeError(response=response, response_text=response_text)
- async def get(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs):
+ async def get(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs) -> dict:
+ """Site API GET."""
async with self.session.get(self._url_for(endpoint), *args, **kwargs) as resp:
await self.maybe_raise_for_status(resp, raise_for_status)
return await resp.json()
- async def patch(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs):
+ async def patch(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs) -> dict:
+ """Site API PATCH."""
async with self.session.patch(self._url_for(endpoint), *args, **kwargs) as resp:
await self.maybe_raise_for_status(resp, raise_for_status)
return await resp.json()
- async def post(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs):
+ async def post(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs) -> dict:
+ """Site API POST."""
async with self.session.post(self._url_for(endpoint), *args, **kwargs) as resp:
await self.maybe_raise_for_status(resp, raise_for_status)
return await resp.json()
- async def put(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs):
+ async def put(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs) -> dict:
+ """Site API PUT."""
async with self.session.put(self._url_for(endpoint), *args, **kwargs) as resp:
await self.maybe_raise_for_status(resp, raise_for_status)
return await resp.json()
- async def delete(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs):
+ async def delete(self, endpoint: str, *args, raise_for_status: bool = True, **kwargs) -> Optional[dict]:
+ """Site API DELETE."""
async with self.session.delete(self._url_for(endpoint), *args, **kwargs) as resp:
if resp.status == 204:
return None
@@ -83,9 +93,12 @@ class APIClient:
def loop_is_running() -> bool:
- # asyncio does not have a way to say "call this when the event
- # loop is running", see e.g. `callWhenRunning` from twisted.
+ """
+ Determine if there is a running asyncio event loop.
+ This helps enable "call this when event loop is running" logic (see: Twisted's `callWhenRunning`),
+ which is currently not provided by asyncio.
+ """
try:
asyncio.get_running_loop()
except RuntimeError:
@@ -94,6 +107,8 @@ def loop_is_running() -> bool:
class APILoggingHandler(logging.StreamHandler):
+ """Site API logging handler."""
+
def __init__(self, client: APIClient):
logging.StreamHandler.__init__(self)
self.client = client
@@ -102,7 +117,8 @@ class APILoggingHandler(logging.StreamHandler):
# on the event loop yet - scheduled when the event loop is ready.
self.queue = []
- async def ship_off(self, payload: dict):
+ async def ship_off(self, payload: dict) -> None:
+ """Ship log payload to the logging API."""
try:
await self.client.post('logs', json=payload)
except ResponseCodeError as err:
@@ -118,19 +134,19 @@ class APILoggingHandler(logging.StreamHandler):
extra={'via_handler': True}
)
- def emit(self, record: logging.LogRecord):
- # Two checks are performed here:
+ def emit(self, record: logging.LogRecord) -> None:
+ """
+ Determine if a log record should be shipped to the logging API.
+
+ If the asyncio event loop is not yet running, log records will instead be put in a queue
+ which will be consumed once the event loop is running.
+
+ The following two conditions are set:
+ 1. Do not log anything below DEBUG (only applies to the monkeypatched `TRACE` level)
+ 2. Ignore log records originating from this logging handler itself to prevent infinite recursion
+ """
if (
- # 1. Do not log anything below `DEBUG`. This is only applicable
- # for the monkeypatched `TRACE` logging level, which has a
- # lower numeric value than `DEBUG`.
record.levelno >= logging.DEBUG
- # 2. Ignore logging messages which are sent by this logging
- # handler itself. This is required because if we were to
- # not ignore messages emitted by this handler, we would
- # infinitely recurse back down into this logging handler,
- # making the reactor run like crazy, and eventually OOM
- # something. Let's not do that...
and not record.__dict__.get('via_handler')
):
payload = {
@@ -149,7 +165,8 @@ class APILoggingHandler(logging.StreamHandler):
asyncio.create_task(task)
self.schedule_queued_tasks()
- def schedule_queued_tasks(self):
+ def schedule_queued_tasks(self) -> None:
+ """Consume the queue and schedule the logging of each queued record."""
for task in self.queue:
asyncio.create_task(task)
diff --git a/bot/cogs/alias.py b/bot/cogs/alias.py
index 3d0c9d826..80ff37983 100644
--- a/bot/cogs/alias.py
+++ b/bot/cogs/alias.py
@@ -3,9 +3,7 @@ import logging
from typing import Union
from discord import Colour, Embed, Member, User
-from discord.ext.commands import (
- Cog, Command, Context, clean_content, command, group
-)
+from discord.ext.commands import Bot, Cog, Command, Context, clean_content, command, group
from bot.cogs.watchchannels.watchchannel import proxy_user
from bot.converters import TagNameConverter
@@ -14,26 +12,14 @@ from bot.pagination import LinePaginator
log = logging.getLogger(__name__)
-class Alias(Cog):
- """
- Aliases for more used commands
- """
+class Alias (Cog):
+ """Aliases for commonly used commands."""
- def __init__(self, bot):
+ def __init__(self, bot: Bot):
self.bot = bot
- async def invoke(self, ctx, cmd_name, *args, **kwargs):
- """
- Invokes a command with args and kwargs.
- Fail early through `command.can_run`, and logs warnings.
-
- :param ctx: Context instance for command call
- :param cmd_name: Name of command/subcommand to be invoked
- :param args: args to be passed to the command
- :param kwargs: kwargs to be passed to the command
- :return: None
- """
-
+ async def invoke(self, ctx: Context, cmd_name: str, *args, **kwargs) -> None:
+ """Invokes a command with args and kwargs."""
log.debug(f"{cmd_name} was invoked through an alias")
cmd = self.bot.get_command(cmd_name)
if not cmd:
@@ -46,9 +32,8 @@ class Alias(Cog):
await ctx.invoke(cmd, *args, **kwargs)
@command(name='aliases')
- async def aliases_command(self, ctx):
+ async def aliases_command(self, ctx: Context) -> None:
"""Show configured aliases on the bot."""
-
embed = Embed(
title='Configured aliases',
colour=Colour.blue()
@@ -64,148 +49,98 @@ class Alias(Cog):
)
@command(name="resources", aliases=("resource",), hidden=True)
- async def site_resources_alias(self, ctx):
- """
- Alias for invoking <prefix>site resources.
- """
-
+ async def site_resources_alias(self, ctx: Context) -> None:
+ """Alias for invoking <prefix>site resources."""
await self.invoke(ctx, "site resources")
@command(name="watch", hidden=True)
- async def bigbrother_watch_alias(self, ctx, user: Union[Member, User, proxy_user], *, reason: str):
- """
- Alias for invoking <prefix>bigbrother watch [user] [reason].
- """
-
+ async def bigbrother_watch_alias(self, ctx: Context, user: Union[Member, User, proxy_user], *, reason: str) -> None:
+ """Alias for invoking <prefix>bigbrother watch [user] [reason]."""
await self.invoke(ctx, "bigbrother watch", user, reason=reason)
@command(name="unwatch", hidden=True)
- async def bigbrother_unwatch_alias(self, ctx, user: Union[User, proxy_user], *, reason: str):
- """
- Alias for invoking <prefix>bigbrother unwatch [user] [reason].
- """
-
+ async def bigbrother_unwatch_alias(self, ctx: Context, user: Union[User, proxy_user], *, reason: str) -> None:
+ """Alias for invoking <prefix>bigbrother unwatch [user] [reason]."""
await self.invoke(ctx, "bigbrother unwatch", user, reason=reason)
@command(name="home", hidden=True)
- async def site_home_alias(self, ctx):
- """
- Alias for invoking <prefix>site home.
- """
-
+ async def site_home_alias(self, ctx: Context) -> None:
+ """Alias for invoking <prefix>site home."""
await self.invoke(ctx, "site home")
@command(name="faq", hidden=True)
- async def site_faq_alias(self, ctx):
- """
- Alias for invoking <prefix>site faq.
- """
-
+ async def site_faq_alias(self, ctx: Context) -> None:
+ """Alias for invoking <prefix>site faq."""
await self.invoke(ctx, "site faq")
@command(name="rules", hidden=True)
- async def site_rules_alias(self, ctx):
- """
- Alias for invoking <prefix>site rules.
- """
-
+ async def site_rules_alias(self, ctx: Context) -> None:
+ """Alias for invoking <prefix>site rules."""
await self.invoke(ctx, "site rules")
@command(name="reload", hidden=True)
- async def cogs_reload_alias(self, ctx, *, cog_name: str):
- """
- Alias for invoking <prefix>cogs reload [cog_name].
-
- cog_name: str - name of the cog to be reloaded.
- """
-
+ async def cogs_reload_alias(self, ctx: Context, *, cog_name: str) -> None:
+ """Alias for invoking <prefix>cogs reload [cog_name]."""
await self.invoke(ctx, "cogs reload", cog_name)
@command(name="defon", hidden=True)
- async def defcon_enable_alias(self, ctx):
- """
- Alias for invoking <prefix>defcon enable.
- """
-
+ async def defcon_enable_alias(self, ctx: Context) -> None:
+ """Alias for invoking <prefix>defcon enable."""
await self.invoke(ctx, "defcon enable")
@command(name="defoff", hidden=True)
- async def defcon_disable_alias(self, ctx):
- """
- Alias for invoking <prefix>defcon disable.
- """
-
+ async def defcon_disable_alias(self, ctx: Context) -> None:
+ """Alias for invoking <prefix>defcon disable."""
await self.invoke(ctx, "defcon disable")
@command(name="exception", hidden=True)
- async def tags_get_traceback_alias(self, ctx):
- """
- Alias for invoking <prefix>tags get traceback.
- """
-
+ async def tags_get_traceback_alias(self, ctx: Context) -> None:
+ """Alias for invoking <prefix>tags get traceback."""
await self.invoke(ctx, "tags get", tag_name="traceback")
@group(name="get",
aliases=("show", "g"),
hidden=True,
invoke_without_command=True)
- async def get_group_alias(self, ctx):
- """
- Group for reverse aliases for commands like `tags get`,
- allowing for `get tags` or `get docs`.
- """
-
+ async def get_group_alias(self, ctx: Context) -> None:
+ """Group for reverse aliases for commands like `tags get`, allowing for `get tags` or `get docs`."""
pass
@get_group_alias.command(name="tags", aliases=("tag", "t"), hidden=True)
async def tags_get_alias(
self, ctx: Context, *, tag_name: TagNameConverter = None
- ):
+ ) -> None:
"""
Alias for invoking <prefix>tags get [tag_name].
tag_name: str - tag to be viewed.
"""
-
await self.invoke(ctx, "tags get", tag_name=tag_name)
@get_group_alias.command(name="docs", aliases=("doc", "d"), hidden=True)
async def docs_get_alias(
self, ctx: Context, symbol: clean_content = None
- ):
- """
- Alias for invoking <prefix>docs get [symbol].
-
- symbol: str - name of doc to be viewed.
- """
-
+ ) -> None:
+ """Alias for invoking <prefix>docs get [symbol]."""
await self.invoke(ctx, "docs get", symbol)
@command(name="nominate", hidden=True)
- async def nomination_add_alias(self, ctx, user: Union[Member, User, proxy_user], *, reason: str):
- """
- Alias for invoking <prefix>talentpool add [user] [reason].
- """
-
+ async def nomination_add_alias(self, ctx: Context, user: Union[Member, User, proxy_user], *, reason: str) -> None:
+ """Alias for invoking <prefix>talentpool add [user] [reason]."""
await self.invoke(ctx, "talentpool add", user, reason=reason)
@command(name="unnominate", hidden=True)
- async def nomination_end_alias(self, ctx, user: Union[User, proxy_user], *, reason: str):
- """
- Alias for invoking <prefix>nomination end [user] [reason].
- """
-
+ async def nomination_end_alias(self, ctx: Context, user: Union[User, proxy_user], *, reason: str) -> None:
+ """Alias for invoking <prefix>nomination end [user] [reason]."""
await self.invoke(ctx, "nomination end", user, reason=reason)
@command(name="nominees", hidden=True)
- async def nominees_alias(self, ctx):
- """
- Alias for invoking <prefix>tp watched.
- """
-
+ async def nominees_alias(self, ctx: Context) -> None:
+ """Alias for invoking <prefix>tp watched."""
await self.invoke(ctx, "talentpool watched")
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Alias cog load."""
bot.add_cog(Alias(bot))
log.info("Cog loaded: Alias")
diff --git a/bot/cogs/antispam.py b/bot/cogs/antispam.py
index 7b97881fd..8dfa0ad05 100644
--- a/bot/cogs/antispam.py
+++ b/bot/cogs/antispam.py
@@ -17,7 +17,7 @@ from bot.constants import (
Guild as GuildConfig, Icons,
STAFF_ROLES,
)
-from bot.converters import ExpirationDate
+from bot.converters import Duration
log = logging.getLogger(__name__)
@@ -102,7 +102,7 @@ class AntiSpam(Cog):
self.validation_errors = validation_errors
role_id = AntiSpamConfig.punishment['role_id']
self.muted_role = Object(role_id)
- self.expiration_date_converter = ExpirationDate()
+ self.expiration_date_converter = Duration()
self.message_deletion_queue = dict()
self.queue_consumption_tasks = dict()
@@ -113,7 +113,7 @@ class AntiSpam(Cog):
return self.bot.get_cog("ModLog")
@Cog.listener()
- async def on_ready(self):
+ async def on_ready(self) -> None:
"""Unloads the cog and alerts admins if configuration validation failed."""
if self.validation_errors:
body = "**The following errors were encountered:**\n"
@@ -221,9 +221,7 @@ class AntiSpam(Cog):
async def maybe_delete_messages(self, channel: TextChannel, messages: List[Message]) -> None:
"""Cleans the messages if cleaning is configured."""
-
if AntiSpamConfig.clean_offending:
-
# If we have more than one message, we can use bulk delete.
if len(messages) > 1:
message_ids = [message.id for message in messages]
@@ -274,7 +272,7 @@ def validate_config(rules: Mapping = AntiSpamConfig.rules) -> Dict[str, str]:
def setup(bot: Bot) -> None:
- """Setup for the cog."""
+ """Antispam cog load."""
validation_errors = validate_config()
bot.add_cog(AntiSpam(bot, validation_errors))
log.info("Cog loaded: AntiSpam")
diff --git a/bot/cogs/bot.py b/bot/cogs/bot.py
index 577865a65..7583b2f2d 100644
--- a/bot/cogs/bot.py
+++ b/bot/cogs/bot.py
@@ -2,6 +2,7 @@ import ast
import logging
import re
import time
+from typing import Optional, Tuple
from discord import Embed, Message, RawMessageUpdateEvent
from discord.ext.commands import Bot, Cog, Context, command, group
@@ -16,9 +17,7 @@ RE_MARKDOWN = re.compile(r'([*_~`|>])')
class Bot(Cog):
- """
- Bot information commands
- """
+ """Bot information commands."""
def __init__(self, bot: Bot):
self.bot = bot
@@ -47,20 +46,14 @@ class Bot(Cog):
@group(invoke_without_command=True, name="bot", hidden=True)
@with_role(Roles.verified)
- async def botinfo_group(self, ctx: Context):
- """
- Bot informational commands
- """
-
+ async def botinfo_group(self, ctx: Context) -> None:
+ """Bot informational commands."""
await ctx.invoke(self.bot.get_command("help"), "bot")
@botinfo_group.command(name='about', aliases=('info',), hidden=True)
@with_role(Roles.verified)
- async def about_command(self, ctx: Context):
- """
- Get information about the bot
- """
-
+ async def about_command(self, ctx: Context) -> None:
+ """Get information about the bot."""
embed = Embed(
description="A utility bot designed just for the Python server! Try `!help` for more info.",
url="https://github.com/python-discord/bot"
@@ -78,24 +71,18 @@ class Bot(Cog):
@command(name='echo', aliases=('print',))
@with_role(*MODERATION_ROLES)
- async def echo_command(self, ctx: Context, *, text: str):
- """
- Send the input verbatim to the current channel
- """
-
+ async def echo_command(self, ctx: Context, *, text: str) -> None:
+ """Send the input verbatim to the current channel."""
await ctx.send(text)
@command(name='embed')
@with_role(*MODERATION_ROLES)
- async def embed_command(self, ctx: Context, *, text: str):
- """
- Send the input within an embed to the current channel
- """
-
+ async def embed_command(self, ctx: Context, *, text: str) -> None:
+ """Send the input within an embed to the current channel."""
embed = Embed(description=text)
await ctx.send(embed=embed)
- def codeblock_stripping(self, msg: str, bad_ticks: bool):
+ def codeblock_stripping(self, msg: str, bad_ticks: bool) -> Optional[Tuple[Tuple[str, ...], str]]:
"""
Strip msg in order to find Python code.
@@ -164,15 +151,10 @@ class Bot(Cog):
log.trace(f"Returning message.\n\n{content}\n\n")
return (content,), repl_code
- def fix_indentation(self, msg: str):
- """
- Attempts to fix badly indented code.
- """
-
- def unindent(code, skip_spaces=0):
- """
- Unindents all code down to the number of spaces given ins skip_spaces
- """
+ def fix_indentation(self, msg: str) -> str:
+ """Attempts to fix badly indented code."""
+ def unindent(code: str, skip_spaces: int = 0) -> str:
+ """Unindents all code down to the number of spaces given in skip_spaces."""
final = ""
current = code[0]
leading_spaces = 0
@@ -208,11 +190,13 @@ class Bot(Cog):
msg = f"{first_line}\n{unindent(code, 4)}"
return msg
- def repl_stripping(self, msg: str):
+ def repl_stripping(self, msg: str) -> Tuple[str, bool]:
"""
Strip msg in order to extract Python code out of REPL output.
Tries to strip out REPL Python code out of msg and returns the stripped msg.
+
+ Returns True for the boolean if REPL code was found in the input msg.
"""
final = ""
for line in msg.splitlines(keepends=True):
@@ -226,7 +210,8 @@ class Bot(Cog):
log.trace(f"Found REPL code in \n\n{msg}\n\n")
return final.rstrip(), True
- def has_bad_ticks(self, msg: Message):
+ def has_bad_ticks(self, msg: Message) -> bool:
+ """Check to see if msg contains ticks that aren't '`'."""
not_backticks = [
"'''", '"""', "\u00b4\u00b4\u00b4", "\u2018\u2018\u2018", "\u2019\u2019\u2019",
"\u2032\u2032\u2032", "\u201c\u201c\u201c", "\u201d\u201d\u201d", "\u2033\u2033\u2033",
@@ -236,13 +221,13 @@ class Bot(Cog):
return msg.content[:3] in not_backticks
@Cog.listener()
- async def on_message(self, msg: Message):
- """
- Detect poorly formatted Python code and send the user
- a helpful message explaining how to do properly
- formatted Python syntax highlighting codeblocks.
+ async def on_message(self, msg: Message) -> None:
"""
+ Detect poorly formatted Python code in new messages.
+ If poorly formatted code is detected, send the user a helpful message explaining how to do
+ properly formatted Python syntax highlighting codeblocks.
+ """
parse_codeblock = (
(
msg.channel.id in self.channel_cooldowns
@@ -361,7 +346,8 @@ class Bot(Cog):
)
@Cog.listener()
- async def on_raw_message_edit(self, payload: RawMessageUpdateEvent):
+ async def on_raw_message_edit(self, payload: RawMessageUpdateEvent) -> None:
+ """Check to see if an edited message (previously called out) still contains poorly formatted code."""
if (
# Checks to see if the message was called out by the bot
payload.message_id not in self.codeblock_message_ids
@@ -387,6 +373,7 @@ class Bot(Cog):
log.trace("User's incorrect code block has been fixed. Removing bot formatting message.")
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Bot cog load."""
bot.add_cog(Bot(bot))
log.info("Cog loaded: Bot")
diff --git a/bot/cogs/clean.py b/bot/cogs/clean.py
index 20c24dafc..1c0c9a7a8 100644
--- a/bot/cogs/clean.py
+++ b/bot/cogs/clean.py
@@ -18,17 +18,13 @@ log = logging.getLogger(__name__)
class Clean(Cog):
"""
- A cog that allows messages to be deleted in
- bulk, while applying various filters.
+ A cog that allows messages to be deleted in bulk, while applying various filters.
- You can delete messages sent by a specific user,
- messages sent by bots, all messages, or messages
- that match a specific regular expression.
+ You can delete messages sent by a specific user, messages sent by bots, all messages, or messages that match a
+ specific regular expression.
- The deleted messages are saved and uploaded
- to the database via an API endpoint, and a URL is
- returned which can be used to view the messages
- in the Discord dark theme style.
+ The deleted messages are saved and uploaded to the database via an API endpoint, and a URL is returned which can be
+ used to view the messages in the Discord dark theme style.
"""
def __init__(self, bot: Bot):
@@ -37,44 +33,25 @@ class Clean(Cog):
@property
def mod_log(self) -> ModLog:
+ """Get currently loaded ModLog cog instance."""
return self.bot.get_cog("ModLog")
async def _clean_messages(
self, amount: int, ctx: Context,
bots_only: bool = False, user: User = None,
regex: Optional[str] = None
- ):
- """
- A helper function that does the actual message cleaning.
-
- :param bots_only: Set this to True if you only want to delete bot messages.
- :param user: Specify a user and it will only delete messages by this user.
- :param regular_expression: Specify a regular expression and it will only
- delete messages that match this.
- """
-
+ ) -> None:
+ """A helper function that does the actual message cleaning."""
def predicate_bots_only(message: Message) -> bool:
- """
- Returns true if the message was sent by a bot
- """
-
+ """Return True if the message was sent by a bot."""
return message.author.bot
def predicate_specific_user(message: Message) -> bool:
- """
- Return True if the message was sent by the
- user provided in the _clean_messages call.
- """
-
+ """Return True if the message was sent by the user provided in the _clean_messages call."""
return message.author == user
- def predicate_regex(message: Message):
- """
- Returns True if the regex provided in the
- _clean_messages matches the message content
- or any embed attributes the message may have.
- """
-
+ def predicate_regex(message: Message) -> bool:
+ """Check if the regex provided in _clean_messages matches the message content or any embed attributes."""
content = [message.content]
# Add the content for all embed attributes
@@ -192,61 +169,38 @@ class Clean(Cog):
@group(invoke_without_command=True, name="clean", hidden=True)
@with_role(*MODERATION_ROLES)
- async def clean_group(self, ctx: Context):
- """
- Commands for cleaning messages in channels
- """
-
+ async def clean_group(self, ctx: Context) -> None:
+ """Commands for cleaning messages in channels."""
await ctx.invoke(self.bot.get_command("help"), "clean")
@clean_group.command(name="user", aliases=["users"])
@with_role(*MODERATION_ROLES)
- async def clean_user(self, ctx: Context, user: User, amount: int = 10):
- """
- Delete messages posted by the provided user,
- and stop cleaning after traversing `amount` messages.
- """
-
+ async def clean_user(self, ctx: Context, user: User, amount: int = 10) -> None:
+ """Delete messages posted by the provided user, stop cleaning after traversing `amount` messages."""
await self._clean_messages(amount, ctx, user=user)
@clean_group.command(name="all", aliases=["everything"])
@with_role(*MODERATION_ROLES)
- async def clean_all(self, ctx: Context, amount: int = 10):
- """
- Delete all messages, regardless of poster,
- and stop cleaning after traversing `amount` messages.
- """
-
+ async def clean_all(self, ctx: Context, amount: int = 10) -> None:
+ """Delete all messages, regardless of poster, stop cleaning after traversing `amount` messages."""
await self._clean_messages(amount, ctx)
@clean_group.command(name="bots", aliases=["bot"])
@with_role(*MODERATION_ROLES)
- async def clean_bots(self, ctx: Context, amount: int = 10):
- """
- Delete all messages posted by a bot,
- and stop cleaning after traversing `amount` messages.
- """
-
+ async def clean_bots(self, ctx: Context, amount: int = 10) -> None:
+ """Delete all messages posted by a bot, stop cleaning after traversing `amount` messages."""
await self._clean_messages(amount, ctx, bots_only=True)
@clean_group.command(name="regex", aliases=["word", "expression"])
@with_role(*MODERATION_ROLES)
- async def clean_regex(self, ctx: Context, regex, amount: int = 10):
- """
- Delete all messages that match a certain regex,
- and stop cleaning after traversing `amount` messages.
- """
-
+ async def clean_regex(self, ctx: Context, regex: str, amount: int = 10) -> None:
+ """Delete all messages that match a certain regex, stop cleaning after traversing `amount` messages."""
await self._clean_messages(amount, ctx, regex=regex)
@clean_group.command(name="stop", aliases=["cancel", "abort"])
@with_role(*MODERATION_ROLES)
- async def clean_cancel(self, ctx: Context):
- """
- If there is an ongoing cleaning process,
- attempt to immediately cancel it.
- """
-
+ async def clean_cancel(self, ctx: Context) -> None:
+ """If there is an ongoing cleaning process, attempt to immediately cancel it."""
self.cleaning = False
embed = Embed(
@@ -256,6 +210,7 @@ class Clean(Cog):
await ctx.send(embed=embed, delete_after=10)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Clean cog load."""
bot.add_cog(Clean(bot))
log.info("Cog loaded: Clean")
diff --git a/bot/cogs/cogs.py b/bot/cogs/cogs.py
index ec497b966..1f6ccd09c 100644
--- a/bot/cogs/cogs.py
+++ b/bot/cogs/cogs.py
@@ -16,9 +16,7 @@ KEEP_LOADED = ["bot.cogs.cogs", "bot.cogs.modlog"]
class Cogs(Cog):
- """
- Cog management commands
- """
+ """Cog management commands."""
def __init__(self, bot: Bot):
self.bot = bot
@@ -38,21 +36,19 @@ class Cogs(Cog):
@group(name='cogs', aliases=('c',), invoke_without_command=True)
@with_role(*MODERATION_ROLES, Roles.core_developer)
- async def cogs_group(self, ctx: Context):
+ async def cogs_group(self, ctx: Context) -> None:
"""Load, unload, reload, and list active cogs."""
-
await ctx.invoke(self.bot.get_command("help"), "cogs")
@cogs_group.command(name='load', aliases=('l',))
@with_role(*MODERATION_ROLES, Roles.core_developer)
- async def load_command(self, ctx: Context, cog: str):
+ async def load_command(self, ctx: Context, cog: str) -> None:
"""
- Load up an unloaded cog, given the module containing it
+ Load up an unloaded cog, given the module containing it.
You can specify the cog name for any cogs that are placed directly within `!cogs`, or specify the
entire module directly.
"""
-
cog = cog.lower()
embed = Embed()
@@ -78,13 +74,12 @@ class Cogs(Cog):
try:
self.bot.load_extension(full_cog)
except ImportError:
- log.error(f"{ctx.author} requested we load the '{cog}' cog, "
- f"but the cog module {full_cog} could not be found!")
+ log.exception(f"{ctx.author} requested we load the '{cog}' cog, "
+ f"but the cog module {full_cog} could not be found!")
embed.description = f"Invalid cog: {cog}\n\nCould not find cog module {full_cog}"
except Exception as e:
- log.error(f"{ctx.author} requested we load the '{cog}' cog, "
- "but the loading failed with the following error: \n"
- f"**{e.__class__.__name__}: {e}**")
+ log.exception(f"{ctx.author} requested we load the '{cog}' cog, "
+ "but the loading failed")
embed.description = f"Failed to load cog: {cog}\n\n{e.__class__.__name__}: {e}"
else:
log.debug(f"{ctx.author} requested we load the '{cog}' cog. Cog loaded!")
@@ -98,14 +93,13 @@ class Cogs(Cog):
@cogs_group.command(name='unload', aliases=('ul',))
@with_role(*MODERATION_ROLES, Roles.core_developer)
- async def unload_command(self, ctx: Context, cog: str):
+ async def unload_command(self, ctx: Context, cog: str) -> None:
"""
- Unload an already-loaded cog, given the module containing it
+ Unload an already-loaded cog, given the module containing it.
You can specify the cog name for any cogs that are placed directly within `!cogs`, or specify the
entire module directly.
"""
-
cog = cog.lower()
embed = Embed()
@@ -134,9 +128,8 @@ class Cogs(Cog):
try:
self.bot.unload_extension(full_cog)
except Exception as e:
- log.error(f"{ctx.author} requested we unload the '{cog}' cog, "
- "but the unloading failed with the following error: \n"
- f"{e}")
+ log.exception(f"{ctx.author} requested we unload the '{cog}' cog, "
+ "but the unloading failed")
embed.description = f"Failed to unload cog: {cog}\n\n```{e}```"
else:
log.debug(f"{ctx.author} requested we unload the '{cog}' cog. Cog unloaded!")
@@ -150,9 +143,9 @@ class Cogs(Cog):
@cogs_group.command(name='reload', aliases=('r',))
@with_role(*MODERATION_ROLES, Roles.core_developer)
- async def reload_command(self, ctx: Context, cog: str):
+ async def reload_command(self, ctx: Context, cog: str) -> None:
"""
- Reload an unloaded cog, given the module containing it
+ Reload an unloaded cog, given the module containing it.
You can specify the cog name for any cogs that are placed directly within `!cogs`, or specify the
entire module directly.
@@ -160,7 +153,6 @@ class Cogs(Cog):
If you specify "*" as the cog, every cog currently loaded will be unloaded, and then every cog present in the
bot/cogs directory will be loaded.
"""
-
cog = cog.lower()
embed = Embed()
@@ -232,16 +224,16 @@ class Cogs(Cog):
log.debug(f"{ctx.author} requested we reload all cogs. Here are the results: \n"
f"{lines}")
- return await LinePaginator.paginate(lines, ctx, embed, empty=False)
+ await LinePaginator.paginate(lines, ctx, embed, empty=False)
+ return
elif full_cog in self.bot.extensions:
try:
self.bot.unload_extension(full_cog)
self.bot.load_extension(full_cog)
except Exception as e:
- log.error(f"{ctx.author} requested we reload the '{cog}' cog, "
- "but the unloading failed with the following error: \n"
- f"{e}")
+ log.exception(f"{ctx.author} requested we reload the '{cog}' cog, "
+ "but the unloading failed")
embed.description = f"Failed to reload cog: {cog}\n\n```{e}```"
else:
log.debug(f"{ctx.author} requested we reload the '{cog}' cog. Cog reloaded!")
@@ -255,13 +247,12 @@ class Cogs(Cog):
@cogs_group.command(name='list', aliases=('all',))
@with_role(*MODERATION_ROLES, Roles.core_developer)
- async def list_command(self, ctx: Context):
+ async def list_command(self, ctx: Context) -> None:
"""
Get a list of all cogs, including their loaded status.
Gray indicates that the cog is unloaded. Green indicates that the cog is currently loaded.
"""
-
embed = Embed()
lines = []
cogs = {}
@@ -301,6 +292,7 @@ class Cogs(Cog):
await LinePaginator.paginate(lines, ctx, embed, max_size=300, empty=False)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Cogs cog load."""
bot.add_cog(Cogs(bot))
log.info("Cog loaded: Cogs")
diff --git a/bot/cogs/defcon.py b/bot/cogs/defcon.py
index 8fab00712..048d8a683 100644
--- a/bot/cogs/defcon.py
+++ b/bot/cogs/defcon.py
@@ -25,7 +25,8 @@ BASE_CHANNEL_TOPIC = "Python Discord Defense Mechanism"
class Defcon(Cog):
- """Time-sensitive server defense mechanisms"""
+ """Time-sensitive server defense mechanisms."""
+
days = None # type: timedelta
enabled = False # type: bool
@@ -36,10 +37,12 @@ class Defcon(Cog):
@property
def mod_log(self) -> ModLog:
+ """Get currently loaded ModLog cog instance."""
return self.bot.get_cog("ModLog")
@Cog.listener()
- async def on_ready(self):
+ async def on_ready(self) -> None:
+ """On cog load, try to synchronize DEFCON settings to the API."""
self.channel = await self.bot.fetch_channel(Channels.defcon)
try:
response = await self.bot.api_client.get('bot/bot-settings/defcon')
@@ -65,7 +68,8 @@ class Defcon(Cog):
await self.update_channel_topic()
@Cog.listener()
- async def on_member_join(self, member: Member):
+ async def on_member_join(self, member: Member) -> None:
+ """If DEFCON is enabled, check newly joining users to see if they meet the account age threshold."""
if self.enabled and self.days.days > 0:
now = datetime.utcnow()
@@ -98,21 +102,19 @@ class Defcon(Cog):
@group(name='defcon', aliases=('dc',), invoke_without_command=True)
@with_role(Roles.admin, Roles.owner)
- async def defcon_group(self, ctx: Context):
+ async def defcon_group(self, ctx: Context) -> None:
"""Check the DEFCON status or run a subcommand."""
-
await ctx.invoke(self.bot.get_command("help"), "defcon")
@defcon_group.command(name='enable', aliases=('on', 'e'))
@with_role(Roles.admin, Roles.owner)
- async def enable_command(self, ctx: Context):
+ async def enable_command(self, ctx: Context) -> None:
"""
Enable DEFCON mode. Useful in a pinch, but be sure you know what you're doing!
- Currently, this just adds an account age requirement. Use !defcon days <int> to set how old an account must
- be, in days.
+ Currently, this just adds an account age requirement. Use !defcon days <int> to set how old an account must be,
+ in days.
"""
-
self.enabled = True
try:
@@ -130,40 +132,19 @@ class Defcon(Cog):
except Exception as e:
log.exception("Unable to update DEFCON settings.")
- await ctx.send(
- f"{Emojis.defcon_enabled} DEFCON enabled.\n\n"
- "**There was a problem updating the site** - This setting may be reverted when the bot is "
- "restarted.\n\n"
- f"```py\n{e}\n```"
- )
-
- await self.mod_log.send_log_message(
- Icons.defcon_enabled, Colours.soft_green, "DEFCON enabled",
- f"**Staffer:** {ctx.author.name}#{ctx.author.discriminator} (`{ctx.author.id}`)\n"
- f"**Days:** {self.days.days}\n\n"
- "**There was a problem updating the site** - This setting may be reverted when the bot is "
- "restarted.\n\n"
- f"```py\n{e}\n```"
- )
+ await ctx.send(self.build_defcon_msg("enabled", e))
+ await self.send_defcon_log("enabled", ctx.author, e)
else:
- await ctx.send(f"{Emojis.defcon_enabled} DEFCON enabled.")
-
- await self.mod_log.send_log_message(
- Icons.defcon_enabled, Colours.soft_green, "DEFCON enabled",
- f"**Staffer:** {ctx.author.name}#{ctx.author.discriminator} (`{ctx.author.id}`)\n"
- f"**Days:** {self.days.days}\n\n"
- )
+ await ctx.send(self.build_defcon_msg("enabled"))
+ await self.send_defcon_log("enabled", ctx.author)
await self.update_channel_topic()
@defcon_group.command(name='disable', aliases=('off', 'd'))
@with_role(Roles.admin, Roles.owner)
- async def disable_command(self, ctx: Context):
- """
- Disable DEFCON mode. Useful in a pinch, but be sure you know what you're doing!
- """
-
+ async def disable_command(self, ctx: Context) -> None:
+ """Disable DEFCON mode. Useful in a pinch, but be sure you know what you're doing!"""
self.enabled = False
try:
@@ -179,37 +160,18 @@ class Defcon(Cog):
)
except Exception as e:
log.exception("Unable to update DEFCON settings.")
- await ctx.send(
- f"{Emojis.defcon_disabled} DEFCON disabled.\n\n"
- "**There was a problem updating the site** - This setting may be reverted when the bot is "
- "restarted.\n\n"
- f"```py\n{e}\n```"
- )
-
- await self.mod_log.send_log_message(
- Icons.defcon_disabled, Colours.soft_red, "DEFCON disabled",
- f"**Staffer:** {ctx.author.name}#{ctx.author.discriminator} (`{ctx.author.id}`)\n"
- "**There was a problem updating the site** - This setting may be reverted when the bot is "
- "restarted.\n\n"
- f"```py\n{e}\n```"
- )
+ await ctx.send(self.build_defcon_msg("disabled", e))
+ await self.send_defcon_log("disabled", ctx.author, e)
else:
- await ctx.send(f"{Emojis.defcon_disabled} DEFCON disabled.")
-
- await self.mod_log.send_log_message(
- Icons.defcon_disabled, Colours.soft_red, "DEFCON disabled",
- f"**Staffer:** {ctx.author.name}#{ctx.author.discriminator} (`{ctx.author.id}`)"
- )
+ await ctx.send(self.build_defcon_msg("disabled"))
+ await self.send_defcon_log("disabled", ctx.author)
await self.update_channel_topic()
@defcon_group.command(name='status', aliases=('s',))
@with_role(Roles.admin, Roles.owner)
- async def status_command(self, ctx: Context):
- """
- Check the current status of DEFCON mode.
- """
-
+ async def status_command(self, ctx: Context) -> None:
+ """Check the current status of DEFCON mode."""
embed = Embed(
colour=Colour.blurple(), title="DEFCON Status",
description=f"**Enabled:** {self.enabled}\n"
@@ -220,11 +182,8 @@ class Defcon(Cog):
@defcon_group.command(name='days')
@with_role(Roles.admin, Roles.owner)
- async def days_command(self, ctx: Context, days: int):
- """
- Set how old an account must be to join the server, in days, with DEFCON mode enabled.
- """
-
+ async def days_command(self, ctx: Context, days: int) -> None:
+ """Set how old an account must be to join the server, in days, with DEFCON mode enabled."""
self.days = timedelta(days=days)
try:
@@ -240,40 +199,20 @@ class Defcon(Cog):
)
except Exception as e:
log.exception("Unable to update DEFCON settings.")
- await ctx.send(
- f"{Emojis.defcon_updated} DEFCON days updated; accounts must be {days} "
- f"days old to join to the server.\n\n"
- "**There was a problem updating the site** - This setting may be reverted when the bot is "
- "restarted.\n\n"
- f"```py\n{e}\n```"
- )
-
- await self.mod_log.send_log_message(
- Icons.defcon_updated, Colour.blurple(), "DEFCON updated",
- f"**Staffer:** {ctx.author.name}#{ctx.author.discriminator} (`{ctx.author.id}`)\n"
- f"**Days:** {self.days.days}\n\n"
- "**There was a problem updating the site** - This setting may be reverted when the bot is "
- "restarted.\n\n"
- f"```py\n{e}\n```"
- )
+ await ctx.send(self.build_defcon_msg("updated", e))
+ await self.send_defcon_log("updated", ctx.author, e)
else:
- await ctx.send(
- f"{Emojis.defcon_updated} DEFCON days updated; accounts must be {days} days old to join to the server"
- )
+ await ctx.send(self.build_defcon_msg("updated"))
+ await self.send_defcon_log("updated", ctx.author)
- await self.mod_log.send_log_message(
- Icons.defcon_updated, Colour.blurple(), "DEFCON updated",
- f"**Staffer:** {ctx.author.name}#{ctx.author.discriminator} (`{ctx.author.id}`)\n"
- f"**Days:** {self.days.days}"
- )
+ # Enable DEFCON if it's not already
+ if not self.enabled:
+ self.enabled = True
await self.update_channel_topic()
- async def update_channel_topic(self):
- """
- Update the #defcon channel topic with the current DEFCON status
- """
-
+ async def update_channel_topic(self) -> None:
+ """Update the #defcon channel topic with the current DEFCON status."""
if self.enabled:
day_str = "days" if self.days.days > 1 else "day"
new_topic = f"{BASE_CHANNEL_TOPIC}\n(Status: Enabled, Threshold: {self.days.days} {day_str})"
@@ -283,7 +222,63 @@ class Defcon(Cog):
self.mod_log.ignore(Event.guild_channel_update, Channels.defcon)
await self.channel.edit(topic=new_topic)
+ def build_defcon_msg(self, change: str, e: Exception = None) -> str:
+ """
+ Build in-channel response string for DEFCON action.
+
+ `change` string may be one of the following: ('enabled', 'disabled', 'updated')
+ """
+ if change.lower() == "enabled":
+ msg = f"{Emojis.defcon_enabled} DEFCON enabled.\n\n"
+ elif change.lower() == "disabled":
+ msg = f"{Emojis.defcon_disabled} DEFCON disabled.\n\n"
+ elif change.lower() == "updated":
+ msg = (
+ f"{Emojis.defcon_updated} DEFCON days updated; accounts must be {self.days} "
+ "days old to join the server.\n\n"
+ )
+
+ if e:
+ msg += (
+ "**There was a problem updating the site** - This setting may be reverted when the bot restarts.\n\n"
+ f"```py\n{e}\n```"
+ )
+
+ return msg
+
+ async def send_defcon_log(self, change: str, actor: Member, e: Exception = None) -> None:
+ """
+ Send log message for DEFCON action.
+
+ `change` string may be one of the following: ('enabled', 'disabled', 'updated')
+ """
+ log_msg = f"**Staffer:** {actor.name}#{actor.discriminator} (`{actor.id}`)\n"
+
+ if change.lower() == "enabled":
+ icon = Icons.defcon_enabled
+ color = Colours.soft_green
+ status_msg = "DEFCON enabled"
+ log_msg += f"**Days:** {self.days.days}\n\n"
+ elif change.lower() == "disabled":
+ icon = Icons.defcon_disabled
+ color = Colours.soft_red
+ status_msg = "DEFCON enabled"
+ elif change.lower() == "updated":
+ icon = Icons.defcon_updated
+ color = Colour.blurple()
+ status_msg = "DEFCON updated"
+ log_msg += f"**Days:** {self.days.days}\n\n"
+
+ if e:
+ log_msg += (
+ "**There was a problem updating the site** - This setting may be reverted when the bot restarts.\n\n"
+ f"```py\n{e}\n```"
+ )
+
+ await self.mod_log.send_log_message(icon, color, status_msg, log_msg)
+
-def setup(bot: Bot):
+def setup(bot: Bot) -> None:
+ """DEFCON cog load."""
bot.add_cog(Defcon(bot))
log.info("Cog loaded: Defcon")
diff --git a/bot/cogs/doc.py b/bot/cogs/doc.py
index ebf2c1d65..e5c51748f 100644
--- a/bot/cogs/doc.py
+++ b/bot/cogs/doc.py
@@ -4,10 +4,11 @@ import logging
import re
import textwrap
from collections import OrderedDict
-from typing import Optional, Tuple
+from typing import Any, Callable, Optional, Tuple
import discord
from bs4 import BeautifulSoup
+from bs4.element import PageElement
from discord.ext import commands
from markdownify import MarkdownConverter
from requests import ConnectionError
@@ -27,24 +28,22 @@ UNWANTED_SIGNATURE_SYMBOLS = ('[source]', '¶')
WHITESPACE_AFTER_NEWLINES_RE = re.compile(r"(?<=\n\n)(\s+)")
-def async_cache(max_size=128, arg_offset=0):
+def async_cache(max_size: int = 128, arg_offset: int = 0) -> Callable:
"""
LRU cache implementation for coroutines.
- :param max_size:
- Specifies the maximum size the cache should have.
- Once it exceeds the maximum size, keys are deleted in FIFO order.
- :param arg_offset:
- The offset that should be applied to the coroutine's arguments
- when creating the cache key. Defaults to `0`.
- """
+ Once the cache exceeds the maximum size, keys are deleted in FIFO order.
+ An offset may be optionally provided to be applied to the coroutine's arguments when creating the cache key.
+ """
# Assign the cache to the function itself so we can clear it from outside.
async_cache.cache = OrderedDict()
- def decorator(function):
+ def decorator(function: Callable) -> Callable:
+ """Define the async_cache decorator."""
@functools.wraps(function)
- async def wrapper(*args):
+ async def wrapper(*args) -> Any:
+ """Decorator wrapper for the caching logic."""
key = ':'.join(args[arg_offset:])
value = async_cache.cache.get(key)
@@ -59,27 +58,25 @@ def async_cache(max_size=128, arg_offset=0):
class DocMarkdownConverter(MarkdownConverter):
- def convert_code(self, el, text):
- """Undo `markdownify`s underscore escaping."""
+ """Subclass markdownify's MarkdownCoverter to provide custom conversion methods."""
+ def convert_code(self, el: PageElement, text: str) -> str:
+ """Undo `markdownify`s underscore escaping."""
return f"`{text}`".replace('\\', '')
- def convert_pre(self, el, text):
+ def convert_pre(self, el: PageElement, text: str) -> str:
"""Wrap any codeblocks in `py` for syntax highlighting."""
-
code = ''.join(el.strings)
return f"```py\n{code}```"
-def markdownify(html):
+def markdownify(html: str) -> DocMarkdownConverter:
+ """Create a DocMarkdownConverter object from the input html."""
return DocMarkdownConverter(bullets='•').convert(html)
class DummyObject(object):
- """
- A dummy object which supports assigning anything,
- which the builtin `object()` does not support normally.
- """
+ """A dummy object which supports assigning anything, which the builtin `object()` does not support normally."""
class SphinxConfiguration:
@@ -94,14 +91,15 @@ class InventoryURL(commands.Converter):
"""
Represents an Intersphinx inventory URL.
- This converter checks whether intersphinx
- accepts the given inventory URL, and raises
+ This converter checks whether intersphinx accepts the given inventory URL, and raises
`BadArgument` if that is not the case.
+
Otherwise, it simply passes through the given URL.
"""
@staticmethod
- async def convert(ctx, url: str):
+ async def convert(ctx: commands.Context, url: str) -> str:
+ """Convert url to Intersphinx inventory URL."""
try:
intersphinx.fetch_inventory(SphinxConfiguration(), '', url)
except AttributeError:
@@ -121,31 +119,33 @@ class InventoryURL(commands.Converter):
class Doc(commands.Cog):
- def __init__(self, bot):
+ """A set of commands for querying & displaying documentation."""
+
+ def __init__(self, bot: commands.Bot):
self.base_urls = {}
self.bot = bot
self.inventories = {}
@commands.Cog.listener()
- async def on_ready(self):
+ async def on_ready(self) -> None:
+ """Refresh documentation inventory."""
await self.refresh_inventory()
async def update_single(
self, package_name: str, base_url: str, inventory_url: str, config: SphinxConfiguration
- ):
+ ) -> None:
"""
Rebuild the inventory for a single package.
- :param package_name: The package name to use, appears in the log.
- :param base_url: The root documentation URL for the specified package.
- Used to build absolute paths that link to specific symbols.
- :param inventory_url: The absolute URL to the intersphinx inventory.
- Fetched by running `intersphinx.fetch_inventory` in an
- executor on the bot's event loop.
- :param config: A `SphinxConfiguration` instance to mock the regular sphinx
- project layout. Required for use with intersphinx.
+ Where:
+ * `package_name` is the package name to use, appears in the log
+ * `base_url` is the root documentation URL for the specified package, used to build
+ absolute paths that link to specific symbols
+ * `inventory_url` is the absolute URL to the intersphinx inventory, fetched by running
+ `intersphinx.fetch_inventory` in an executor on the bot's event loop
+ * `config` is a `SphinxConfiguration` instance to mock the regular sphinx
+ project layout, required for use with intersphinx
"""
-
self.base_urls[package_name] = base_url
fetch_func = functools.partial(intersphinx.fetch_inventory, config, '', inventory_url)
@@ -159,7 +159,8 @@ class Doc(commands.Cog):
log.trace(f"Fetched inventory for {package_name}.")
- async def refresh_inventory(self):
+ async def refresh_inventory(self) -> None:
+ """Refresh internal documentation inventory."""
log.debug("Refreshing documentation inventory...")
# Clear the old base URLS and inventories to ensure
@@ -186,16 +187,13 @@ class Doc(commands.Cog):
"""
Given a Python symbol, return its signature and description.
- :param symbol: The symbol for which HTML data should be returned.
- :return:
- A tuple in the form (str, str), or `None`.
- The first tuple element is the signature of the given
- symbol as a markup-free string, and the second tuple
- element is the description of the given symbol with HTML
- markup included. If the given symbol could not be found,
- returns `None`.
- """
+ Returns a tuple in the form (str, str), or `None`.
+ The first tuple element is the signature of the given symbol as a markup-free string, and
+ the second tuple element is the description of the given symbol with HTML markup included.
+
+ If the given symbol could not be found, returns `None`.
+ """
url = self.inventories.get(symbol)
if url is None:
return None
@@ -223,16 +221,10 @@ class Doc(commands.Cog):
@async_cache(arg_offset=1)
async def get_symbol_embed(self, symbol: str) -> Optional[discord.Embed]:
"""
- Using `get_symbol_html`, attempt to scrape and
- fetch the data for the given `symbol`, and build
- a formatted embed out of its contents.
-
- :param symbol: The symbol for which the embed should be returned
- :return:
- If the symbol is known, an Embed with documentation about it.
- Otherwise, `None`.
- """
+ Attempt to scrape and fetch the data for the given `symbol`, and build an embed from its contents.
+ If the symbol is known, an Embed with documentation about it is returned.
+ """
scraped_html = await self.get_symbol_html(symbol)
if scraped_html is None:
return None
@@ -267,20 +259,16 @@ class Doc(commands.Cog):
)
@commands.group(name='docs', aliases=('doc', 'd'), invoke_without_command=True)
- async def docs_group(self, ctx, symbol: commands.clean_content = None):
+ async def docs_group(self, ctx: commands.Context, symbol: commands.clean_content = None) -> None:
"""Lookup documentation for Python symbols."""
-
await ctx.invoke(self.get_command)
@docs_group.command(name='get', aliases=('g',))
- async def get_command(self, ctx, symbol: commands.clean_content = None):
+ async def get_command(self, ctx: commands.Context, symbol: commands.clean_content = None) -> None:
"""
Return a documentation embed for a given symbol.
- If no symbol is given, return a list of all available inventories.
- :param ctx: Discord message context
- :param symbol: The symbol for which documentation should be returned,
- or nothing to get a list of all inventories
+ If no symbol is given, return a list of all available inventories.
Examples:
!docs
@@ -288,7 +276,6 @@ class Doc(commands.Cog):
!docs aiohttp.ClientSession
!docs get aiohttp.ClientSession
"""
-
if symbol is None:
inventory_embed = discord.Embed(
title=f"All inventories (`{len(self.base_urls)}` total)",
@@ -322,18 +309,13 @@ class Doc(commands.Cog):
@docs_group.command(name='set', aliases=('s',))
@with_role(*MODERATION_ROLES)
async def set_command(
- self, ctx, package_name: ValidPythonIdentifier,
+ self, ctx: commands.Context, package_name: ValidPythonIdentifier,
base_url: ValidURL, inventory_url: InventoryURL
- ):
+ ) -> None:
"""
Adds a new documentation metadata object to the site's database.
- The database will update the object, should an existing item
- with the specified `package_name` already exist.
- :param ctx: Discord message context
- :param package_name: The package name, for example `aiohttp`.
- :param base_url: The package documentation's root URL, used to build absolute links.
- :param inventory_url: The intersphinx inventory URL.
+ The database will update the object, should an existing item with the specified `package_name` already exist.
Example:
!docs set \
@@ -341,7 +323,6 @@ class Doc(commands.Cog):
https://discordpy.readthedocs.io/en/rewrite/ \
https://discordpy.readthedocs.io/en/rewrite/objects.inv
"""
-
body = {
'package': package_name,
'base_url': base_url,
@@ -365,17 +346,13 @@ class Doc(commands.Cog):
@docs_group.command(name='delete', aliases=('remove', 'rm', 'd'))
@with_role(*MODERATION_ROLES)
- async def delete_command(self, ctx, package_name: ValidPythonIdentifier):
+ async def delete_command(self, ctx: commands.Context, package_name: ValidPythonIdentifier) -> None:
"""
Removes the specified package from the database.
- :param ctx: Discord message context
- :param package_name: The package name, for example `aiohttp`.
-
Examples:
!docs delete aiohttp
"""
-
await self.bot.api_client.delete(f'bot/documentation-links/{package_name}')
async with ctx.typing():
@@ -385,5 +362,7 @@ class Doc(commands.Cog):
await ctx.send(f"Successfully deleted `{package_name}` and refreshed inventory.")
-def setup(bot):
+def setup(bot: commands.Bot) -> None:
+ """Doc cog load."""
bot.add_cog(Doc(bot))
+ log.info("Cog loaded: Doc")
diff --git a/bot/cogs/error_handler.py b/bot/cogs/error_handler.py
index e2d8c3a8f..49411814c 100644
--- a/bot/cogs/error_handler.py
+++ b/bot/cogs/error_handler.py
@@ -30,7 +30,27 @@ class ErrorHandler(Cog):
self.bot = bot
@Cog.listener()
- async def on_command_error(self, ctx: Context, e: CommandError):
+ async def on_command_error(self, ctx: Context, e: CommandError) -> None:
+ """
+ Provide generic command error handling.
+
+ Error handling is deferred to any local error handler, if present.
+
+ Error handling emits a single error response, prioritized as follows:
+ 1. If the name fails to match a command but matches a tag, the tag is invoked
+ 2. Send a BadArgument error message to the invoking context & invoke the command's help
+ 3. Send a UserInputError error message to the invoking context & invoke the command's help
+ 4. Send a NoPrivateMessage error message to the invoking context
+ 5. Send a BotMissingPermissions error message to the invoking context
+ 6. Log a MissingPermissions error, no message is sent
+ 7. Send a InChannelCheckFailure error message to the invoking context
+ 8. Log CheckFailure, CommandOnCooldown, and DisabledCommand errors, no message is sent
+ 9. For CommandInvokeErrors, response is based on the type of error:
+ * 404: Error message is sent to the invoking context
+ * 400: Log the resopnse JSON, no message is sent
+ * 500 <= status <= 600: Error message is sent to the invoking context
+ 10. Otherwise, handling is deferred to `handle_unexpected_error`
+ """
command = ctx.command
parent = None
@@ -57,7 +77,8 @@ class ErrorHandler(Cog):
# Return to not raise the exception
with contextlib.suppress(ResponseCodeError):
- return await ctx.invoke(tags_get_command, tag_name=ctx.invoked_with)
+ await ctx.invoke(tags_get_command, tag_name=ctx.invoked_with)
+ return
elif isinstance(e, BadArgument):
await ctx.send(f"Bad argument: {e}\n")
await ctx.invoke(*help_command)
@@ -109,7 +130,8 @@ class ErrorHandler(Cog):
await self.handle_unexpected_error(ctx, e)
@staticmethod
- async def handle_unexpected_error(ctx: Context, e: CommandError):
+ async def handle_unexpected_error(ctx: Context, e: CommandError) -> None:
+ """Generic handler for errors without an explicit handler."""
await ctx.send(
f"Sorry, an unexpected error occurred. Please let us know!\n\n"
f"```{e.__class__.__name__}: {e}```"
@@ -120,6 +142,7 @@ class ErrorHandler(Cog):
raise e
-def setup(bot: Bot):
+def setup(bot: Bot) -> None:
+ """Error handler cog load."""
bot.add_cog(ErrorHandler(bot))
log.info("Cog loaded: Events")
diff --git a/bot/cogs/eval.py b/bot/cogs/eval.py
index c52c04df1..9ce854f2c 100644
--- a/bot/cogs/eval.py
+++ b/bot/cogs/eval.py
@@ -6,9 +6,10 @@ import re
import textwrap
import traceback
from io import StringIO
+from typing import Any, Optional, Tuple
import discord
-from discord.ext.commands import Bot, Cog, group
+from discord.ext.commands import Bot, Cog, Context, group
from bot.constants import Roles
from bot.decorators import with_role
@@ -18,10 +19,7 @@ log = logging.getLogger(__name__)
class CodeEval(Cog):
- """
- Owner and admin feature that evaluates code
- and returns the result to the channel.
- """
+ """Owner and admin feature that evaluates code and returns the result to the channel."""
def __init__(self, bot: Bot):
self.bot = bot
@@ -31,7 +29,8 @@ class CodeEval(Cog):
self.interpreter = Interpreter(bot)
- def _format(self, inp, out): # (str, Any) -> (str, discord.Embed)
+ def _format(self, inp: str, out: Any) -> Tuple[str, Optional[discord.Embed]]:
+ """Format the eval output into a string & attempt to format it into an Embed."""
self._ = out
res = ""
@@ -124,7 +123,8 @@ class CodeEval(Cog):
return res # Return (text, embed)
- async def _eval(self, ctx, code): # (discord.Context, str) -> None
+ async def _eval(self, ctx: Context, code: str) -> Optional[discord.Message]:
+ """Eval the input code string & send an embed to the invoking context."""
self.ln += 1
if code.startswith("exit"):
@@ -174,16 +174,15 @@ async def func(): # (None,) -> Any
@group(name='internal', aliases=('int',))
@with_role(Roles.owner, Roles.admin)
- async def internal_group(self, ctx):
+ async def internal_group(self, ctx: Context) -> None:
"""Internal commands. Top secret!"""
-
if not ctx.invoked_subcommand:
await ctx.invoke(self.bot.get_command("help"), "internal")
@internal_group.command(name='eval', aliases=('e',))
@with_role(Roles.admin, Roles.owner)
- async def eval(self, ctx, *, code: str):
- """ Run eval in a REPL-like format. """
+ async def eval(self, ctx: Context, *, code: str) -> None:
+ """Run eval in a REPL-like format."""
code = code.strip("`")
if re.match('py(thon)?\n', code):
code = "\n".join(code.split("\n")[1:])
@@ -197,6 +196,7 @@ async def func(): # (None,) -> Any
await self._eval(ctx, code)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Code eval cog load."""
bot.add_cog(CodeEval(bot))
log.info("Cog loaded: Eval")
diff --git a/bot/cogs/filtering.py b/bot/cogs/filtering.py
index dc4de7ff1..bd8c6ed67 100644
--- a/bot/cogs/filtering.py
+++ b/bot/cogs/filtering.py
@@ -15,25 +15,30 @@ from bot.constants import (
log = logging.getLogger(__name__)
-INVITE_RE = (
+INVITE_RE = re.compile(
r"(?:discord(?:[\.,]|dot)gg|" # Could be discord.gg/
r"discord(?:[\.,]|dot)com(?:\/|slash)invite|" # or discord.com/invite/
r"discordapp(?:[\.,]|dot)com(?:\/|slash)invite|" # or discordapp.com/invite/
r"discord(?:[\.,]|dot)me|" # or discord.me
r"discord(?:[\.,]|dot)io" # or discord.io.
r")(?:[\/]|slash)" # / or 'slash'
- r"([a-zA-Z0-9]+)" # the invite code itself
+ r"([a-zA-Z0-9]+)", # the invite code itself
+ flags=re.IGNORECASE
)
-URL_RE = r"(https?://[^\s]+)"
-ZALGO_RE = r"[\u0300-\u036F\u0489]"
+URL_RE = re.compile(r"(https?://[^\s]+)", flags=re.IGNORECASE)
+ZALGO_RE = re.compile(r"[\u0300-\u036F\u0489]")
+
+WORD_WATCHLIST_PATTERNS = [
+ re.compile(fr'\b{expression}\b', flags=re.IGNORECASE) for expression in Filter.word_watchlist
+]
+TOKEN_WATCHLIST_PATTERNS = [
+ re.compile(fr'{expression}', flags=re.IGNORECASE) for expression in Filter.token_watchlist
+]
class Filtering(Cog):
- """
- Filtering out invites, blacklisting domains,
- and warning us of certain regular expressions
- """
+ """Filtering out invites, blacklisting domains, and warning us of certain regular expressions."""
def __init__(self, bot: Bot):
self.bot = bot
@@ -94,28 +99,29 @@ class Filtering(Cog):
@property
def mod_log(self) -> ModLog:
+ """Get currently loaded ModLog cog instance."""
return self.bot.get_cog("ModLog")
@Cog.listener()
- async def on_message(self, msg: Message):
+ async def on_message(self, msg: Message) -> None:
+ """Invoke message filter for new messages."""
await self._filter_message(msg)
@Cog.listener()
- async def on_message_edit(self, before: Message, after: Message):
+ async def on_message_edit(self, before: Message, after: Message) -> None:
+ """
+ Invoke message filter for message edits.
+
+ If there have been multiple edits, calculate the time delta from the previous edit.
+ """
if not before.edited_at:
delta = relativedelta(after.edited_at, before.created_at).microseconds
else:
delta = relativedelta(after.edited_at, before.edited_at).microseconds
await self._filter_message(after, delta)
- async def _filter_message(self, msg: Message, delta: Optional[int] = None):
- """
- Whenever a message is sent or edited,
- run it through our filters to see if it
- violates any of our rules, and then respond
- accordingly.
- """
-
+ async def _filter_message(self, msg: Message, delta: Optional[int] = None) -> None:
+ """Filter the input message to see if it violates any of our rules, and then respond accordingly."""
# Should we filter this message?
role_whitelisted = False
@@ -226,16 +232,12 @@ class Filtering(Cog):
@staticmethod
async def _has_watchlist_words(text: str) -> bool:
"""
- Returns True if the text contains
- one of the regular expressions from the
- word_watchlist in our filter config.
+ Returns True if the text contains one of the regular expressions from the word_watchlist in our filter config.
- Only matches words with boundaries before
- and after the expression.
+ Only matches words with boundaries before and after the expression.
"""
-
- for expression in Filter.word_watchlist:
- if re.search(fr"\b{expression}\b", text, re.IGNORECASE):
+ for regex_pattern in WORD_WATCHLIST_PATTERNS:
+ if regex_pattern.search(text):
return True
return False
@@ -243,31 +245,23 @@ class Filtering(Cog):
@staticmethod
async def _has_watchlist_tokens(text: str) -> bool:
"""
- Returns True if the text contains
- one of the regular expressions from the
- token_watchlist in our filter config.
+ Returns True if the text contains one of the regular expressions from the token_watchlist in our filter config.
- This will match the expression even if it
- does not have boundaries before and after
+ This will match the expression even if it does not have boundaries before and after.
"""
-
- for expression in Filter.token_watchlist:
- if re.search(fr"{expression}", text, re.IGNORECASE):
+ for regex_pattern in TOKEN_WATCHLIST_PATTERNS:
+ if regex_pattern.search(text):
# Make sure it's not a URL
- if not re.search(URL_RE, text, re.IGNORECASE):
+ if not URL_RE.search(text):
return True
return False
@staticmethod
async def _has_urls(text: str) -> bool:
- """
- Returns True if the text contains one of
- the blacklisted URLs from the config file.
- """
-
- if not re.search(URL_RE, text, re.IGNORECASE):
+ """Returns True if the text contains one of the blacklisted URLs from the config file."""
+ if not URL_RE.search(text):
return False
text = text.lower()
@@ -285,8 +279,7 @@ class Filtering(Cog):
Zalgo range is \u0300 – \u036F and \u0489.
"""
-
- return bool(re.search(ZALGO_RE, text))
+ return bool(ZALGO_RE.search(text))
async def _has_invites(self, text: str) -> Union[dict, bool]:
"""
@@ -297,12 +290,11 @@ class Filtering(Cog):
Attempts to catch some of common ways to try to cheat the system.
"""
-
# Remove backslashes to prevent escape character aroundfuckery like
# discord\.gg/gdudes-pony-farm
text = text.replace("\\", "")
- invites = re.findall(INVITE_RE, text, re.IGNORECASE)
+ invites = INVITE_RE.findall(text)
invite_data = dict()
for invite in invites:
if invite in invite_data:
@@ -338,30 +330,37 @@ class Filtering(Cog):
return invite_data if invite_data else False
@staticmethod
- async def _has_rich_embed(msg: Message):
- """
- Returns True if any of the embeds in the message are of type 'rich', but are not twitter
- embeds. Returns False otherwise.
- """
+ async def _has_rich_embed(msg: Message) -> bool:
+ """Determines if `msg` contains any rich embeds not auto-generated from a URL."""
if msg.embeds:
for embed in msg.embeds:
- if embed.type == "rich" and (not embed.url or "twitter.com" not in embed.url):
- return True
+ if embed.type == "rich":
+ urls = URL_RE.findall(msg.content)
+ if not embed.url or embed.url not in urls:
+ # If `embed.url` does not exist or if `embed.url` is not part of the content
+ # of the message, it's unlikely to be an auto-generated embed by Discord.
+ return True
+ else:
+ log.trace(
+ "Found a rich embed sent by a regular user account, "
+ "but it was likely just an automatic URL embed."
+ )
+ return False
return False
- async def notify_member(self, filtered_member: Member, reason: str, channel: TextChannel):
+ async def notify_member(self, filtered_member: Member, reason: str, channel: TextChannel) -> None:
"""
- Notify filtered_member about a moderation action with the reason str
+ Notify filtered_member about a moderation action with the reason str.
First attempts to DM the user, fall back to in-channel notification if user has DMs disabled
"""
-
try:
await filtered_member.send(reason)
except discord.errors.Forbidden:
await channel.send(f"{filtered_member.mention} {reason}")
-def setup(bot: Bot):
+def setup(bot: Bot) -> None:
+ """Filtering cog load."""
bot.add_cog(Filtering(bot))
log.info("Cog loaded: Filtering")
diff --git a/bot/cogs/free.py b/bot/cogs/free.py
index 92a9ca041..269c5c1b9 100644
--- a/bot/cogs/free.py
+++ b/bot/cogs/free.py
@@ -1,13 +1,13 @@
import logging
from datetime import datetime
+from operator import itemgetter
from discord import Colour, Embed, Member, utils
-from discord.ext.commands import Cog, Context, command
+from discord.ext.commands import Bot, Cog, Context, command
from bot.constants import Categories, Channels, Free, STAFF_ROLES
from bot.decorators import redirect_output
-
log = logging.getLogger(__name__)
TIMEOUT = Free.activity_timeout
@@ -22,11 +22,9 @@ class Free(Cog):
@command(name="free", aliases=('f',))
@redirect_output(destination_channel=Channels.bot, bypass_roles=STAFF_ROLES)
- async def free(self, ctx: Context, user: Member = None, seek: int = 2):
+ async def free(self, ctx: Context, user: Member = None, seek: int = 2) -> None:
"""
Lists free help channels by likeliness of availability.
- :param user: accepts user mention, ID, etc.
- :param seek: How far back to check the last active message.
seek is used only when this command is invoked in a help channel.
You cannot override seek without mentioning a user first.
@@ -53,10 +51,10 @@ class Free(Cog):
# the command was invoked in
if channel.id == ctx.channel.id:
messages = await channel.history(limit=seek).flatten()
- msg = messages[seek-1]
+ msg = messages[seek - 1]
# Otherwise get last message
else:
- msg = await channel.history(limit=1).next() # noqa (False positive)
+ msg = await channel.history(limit=1).next() # noqa (False positive)
inactive = (datetime.utcnow() - msg.created_at).seconds
if inactive > TIMEOUT:
@@ -82,7 +80,8 @@ class Free(Cog):
# Sort channels in descending order by seconds
# Get position in list, inactivity, and channel object
# For each channel, add to embed.description
- for i, (inactive, channel) in enumerate(sorted(free_channels, reverse=True), 1):
+ sorted_channels = sorted(free_channels, key=itemgetter(0), reverse=True)
+ for i, (inactive, channel) in enumerate(sorted_channels, 1):
minutes, seconds = divmod(inactive, 60)
if minutes > 59:
hours, minutes = divmod(minutes, 60)
@@ -101,6 +100,7 @@ class Free(Cog):
await ctx.send(embed=embed)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Free cog load."""
bot.add_cog(Free())
log.info("Cog loaded: Free")
diff --git a/bot/cogs/help.py b/bot/cogs/help.py
index 31e729003..37d12b2d5 100644
--- a/bot/cogs/help.py
+++ b/bot/cogs/help.py
@@ -3,10 +3,11 @@ import inspect
import itertools
from collections import namedtuple
from contextlib import suppress
+from typing import Union
-from discord import Colour, Embed, HTTPException
+from discord import Colour, Embed, HTTPException, Message, Reaction, User
from discord.ext import commands
-from discord.ext.commands import CheckFailure, Cog as DiscordCog
+from discord.ext.commands import Bot, CheckFailure, Cog as DiscordCog, Command, Context
from fuzzywuzzy import fuzz, process
from bot import constants
@@ -35,15 +36,11 @@ class HelpQueryNotFound(ValueError):
Contains the custom attribute of ``possible_matches``.
- Attributes
- ----------
- possible_matches: dict
- Any commands that were close to matching the Query.
- The possible matched command names are the keys.
- The likeness match scores are the values.
+ Instances of this object contain a dictionary of any command(s) that were close to matching the
+ query, where keys are the possible matched command names and values are the likeness match scores.
"""
- def __init__(self, arg, possible_matches=None):
+ def __init__(self, arg: str, possible_matches: dict = None):
super().__init__(arg)
self.possible_matches = possible_matches
@@ -52,48 +49,30 @@ class HelpSession:
"""
An interactive session for bot and command help output.
- Attributes
- ----------
- title: str
- The title of the help message.
- query: Union[:class:`discord.ext.commands.Bot`,
- :class:`discord.ext.commands.Command]
- description: str
- The description of the query.
- pages: list[str]
- A list of the help content split into manageable pages.
- message: :class:`discord.Message`
- The message object that's showing the help contents.
- destination: :class:`discord.abc.Messageable`
- Where the help message is to be sent to.
+ Expected attributes include:
+ * title: str
+ The title of the help message.
+ * query: Union[discord.ext.commands.Bot, discord.ext.commands.Command]
+ * description: str
+ The description of the query.
+ * pages: list[str]
+ A list of the help content split into manageable pages.
+ * message: `discord.Message`
+ The message object that's showing the help contents.
+ * destination: `discord.abc.Messageable`
+ Where the help message is to be sent to.
"""
- def __init__(self, ctx, *command, cleanup=False, only_can_run=True, show_hidden=False, max_lines=15):
- """
- Creates an instance of the HelpSession class.
-
- Parameters
- ----------
- ctx: :class:`discord.Context`
- The context of the invoked help command.
- *command: str
- A variable argument of the command being queried.
- cleanup: Optional[bool]
- Set to ``True`` to have the message deleted on timeout.
- If ``False``, it will clear all reactions on timeout.
- Defaults to ``False``.
- only_can_run: Optional[bool]
- Set to ``True`` to hide commands the user can't run.
- Defaults to ``False``.
- show_hidden: Optional[bool]
- Set to ``True`` to include hidden commands.
- Defaults to ``False``.
- max_lines: Optional[int]
- Sets the max number of lines the paginator will add to a
- single page.
- Defaults to 20.
- """
-
+ def __init__(
+ self,
+ ctx: Context,
+ *command,
+ cleanup: bool = False,
+ only_can_run: bool = True,
+ show_hidden: bool = False,
+ max_lines: int = 15
+ ):
+ """Creates an instance of the HelpSession class."""
self._ctx = ctx
self._bot = ctx.bot
self.title = "Command Help"
@@ -122,20 +101,8 @@ class HelpSession:
self._timeout_task = None
self.reset_timeout()
- def _get_query(self, query):
- """
- Attempts to match the provided query with a valid command or cog.
-
- Parameters
- ----------
- query: str
- The joined string representing the session query.
-
- Returns
- -------
- Union[:class:`discord.ext.commands.Command`, :class:`Cog`]
- """
-
+ def _get_query(self, query: str) -> Union[Command, Cog]:
+ """Attempts to match the provided query with a valid command or cog."""
command = self._bot.get_command(query)
if command:
return command
@@ -143,55 +110,33 @@ class HelpSession:
cog = self._bot.cogs.get(query)
if cog:
return Cog(
- name=cog.__class__.__name__,
+ name=cog.qualified_name,
description=inspect.getdoc(cog),
- commands=[c for c in self._bot.commands if c.instance is cog]
+ commands=[c for c in self._bot.commands if c.cog is cog]
)
self._handle_not_found(query)
- def _handle_not_found(self, query):
+ def _handle_not_found(self, query: str) -> None:
"""
Handles when a query does not match a valid command or cog.
- Will pass on possible close matches along with the
- ``HelpQueryNotFound`` exception.
-
- Parameters
- ----------
- query: str
- The full query that was requested.
-
- Raises
- ------
- HelpQueryNotFound
+ Will pass on possible close matches along with the `HelpQueryNotFound` exception.
"""
-
- # combine command and cog names
+ # Combine command and cog names
choices = list(self._bot.all_commands) + list(self._bot.cogs)
result = process.extractBests(query, choices, scorer=fuzz.ratio, score_cutoff=90)
raise HelpQueryNotFound(f'Query "{query}" not found.', dict(result))
- async def timeout(self, seconds=30):
- """
- Waits for a set number of seconds, then stops the help session.
-
- Parameters
- ----------
- seconds: int
- Number of seconds to wait.
- """
-
+ async def timeout(self, seconds: int = 30) -> None:
+ """Waits for a set number of seconds, then stops the help session."""
await asyncio.sleep(seconds)
await self.stop()
- def reset_timeout(self):
- """
- Cancels the original timeout task and sets it again from the start.
- """
-
+ def reset_timeout(self) -> None:
+ """Cancels the original timeout task and sets it again from the start."""
# cancel original if it exists
if self._timeout_task:
if not self._timeout_task.cancelled():
@@ -200,18 +145,8 @@ class HelpSession:
# recreate the timeout task
self._timeout_task = self._bot.loop.create_task(self.timeout())
- async def on_reaction_add(self, reaction, user):
- """
- Event handler for when reactions are added on the help message.
-
- Parameters
- ----------
- reaction: :class:`discord.Reaction`
- The reaction that was added.
- user: :class:`discord.User`
- The user who added the reaction.
- """
-
+ async def on_reaction_add(self, reaction: Reaction, user: User) -> None:
+ """Event handler for when reactions are added on the help message."""
# ensure it was the relevant session message
if reaction.message.id != self.message.id:
return
@@ -237,24 +172,13 @@ class HelpSession:
with suppress(HTTPException):
await self.message.remove_reaction(reaction, user)
- async def on_message_delete(self, message):
- """
- Closes the help session when the help message is deleted.
-
- Parameters
- ----------
- message: :class:`discord.Message`
- The message that was deleted.
- """
-
+ async def on_message_delete(self, message: Message) -> None:
+ """Closes the help session when the help message is deleted."""
if message.id == self.message.id:
await self.stop()
- async def prepare(self):
- """
- Sets up the help session pages, events, message and reactions.
- """
-
+ async def prepare(self) -> None:
+ """Sets up the help session pages, events, message and reactions."""
# create paginated content
await self.build_pages()
@@ -266,12 +190,8 @@ class HelpSession:
await self.update_page()
self.add_reactions()
- def add_reactions(self):
- """
- Adds the relevant reactions to the help message based on if
- pagination is required.
- """
-
+ def add_reactions(self) -> None:
+ """Adds the relevant reactions to the help message based on if pagination is required."""
# if paginating
if len(self._pages) > 1:
for reaction in REACTIONS:
@@ -281,44 +201,22 @@ class HelpSession:
else:
self._bot.loop.create_task(self.message.add_reaction(DELETE_EMOJI))
- def _category_key(self, cmd):
+ def _category_key(self, cmd: Command) -> str:
"""
- Returns a cog name of a given command. Used as a key for
- ``sorted`` and ``groupby``.
-
- A zero width space is used as a prefix for results with no cogs
- to force them last in ordering.
+ Returns a cog name of a given command for use as a key for `sorted` and `groupby`.
- Parameters
- ----------
- cmd: :class:`discord.ext.commands.Command`
- The command object being checked.
-
- Returns
- -------
- str
+ A zero width space is used as a prefix for results with no cogs to force them last in ordering.
"""
-
cog = cmd.cog_name
return f'**{cog}**' if cog else f'**\u200bNo Category:**'
- def _get_command_params(self, cmd):
+ def _get_command_params(self, cmd: Command) -> str:
"""
Returns the command usage signature.
- This is a custom implementation of ``command.signature`` in
- order to format the command signature without aliases.
-
- Parameters
- ----------
- cmd: :class:`discord.ext.commands.Command`
- The command object to get the parameters of.
-
- Returns
- -------
- str
+ This is a custom implementation of `command.signature` in order to format the command
+ signature without aliases.
"""
-
results = []
for name, param in cmd.clean_params.items():
@@ -346,16 +244,8 @@ class HelpSession:
return f"{cmd.name} {' '.join(results)}"
- async def build_pages(self):
- """
- Builds the list of content pages to be paginated through in the
- help message.
-
- Returns
- -------
- list[str]
- """
-
+ async def build_pages(self) -> None:
+ """Builds the list of content pages to be paginated through in the help message, as a list of str."""
# Use LinePaginator to restrict embed line height
paginator = LinePaginator(prefix='', suffix='', max_lines=self._max_lines)
@@ -482,20 +372,8 @@ class HelpSession:
# save organised pages to session
self._pages = paginator.pages
- def embed_page(self, page_number=0):
- """
- Returns an Embed with the requested page formatted within.
-
- Parameters
- ----------
- page_number: int
- The page to be retrieved. Zero indexed.
-
- Returns
- -------
- :class:`discord.Embed`
- """
-
+ def embed_page(self, page_number: int = 0) -> Embed:
+ """Returns an Embed with the requested page formatted within."""
embed = Embed()
# if command or cog, add query to title for pages other than first
@@ -514,17 +392,8 @@ class HelpSession:
return embed
- async def update_page(self, page_number=0):
- """
- Sends the intial message, or changes the existing one to the
- given page number.
-
- Parameters
- ----------
- page_number: int
- The page number to show in the help message.
- """
-
+ async def update_page(self, page_number: int = 0) -> None:
+ """Sends the intial message, or changes the existing one to the given page number."""
self._current_page = page_number
embed_page = self.embed_page(page_number)
@@ -534,47 +403,27 @@ class HelpSession:
await self.message.edit(embed=embed_page)
@classmethod
- async def start(cls, ctx, *command, **options):
- """
- Create and begin a help session based on the given command
- context.
-
- Parameters
- ----------
- ctx: :class:`discord.ext.commands.Context`
- The context of the invoked help command.
- *command: str
- A variable argument of the command being queried.
- cleanup: Optional[bool]
- Set to ``True`` to have the message deleted on session end.
- Defaults to ``False``.
- only_can_run: Optional[bool]
- Set to ``True`` to hide commands the user can't run.
- Defaults to ``False``.
- show_hidden: Optional[bool]
- Set to ``True`` to include hidden commands.
- Defaults to ``False``.
- max_lines: Optional[int]
- Sets the max number of lines the paginator will add to a
- single page.
- Defaults to 20.
-
- Returns
- -------
- :class:`HelpSession`
+ async def start(cls, ctx: Context, *command, **options) -> "HelpSession":
"""
+ Create and begin a help session based on the given command context.
+ Available options kwargs:
+ * cleanup: Optional[bool]
+ Set to `True` to have the message deleted on session end. Defaults to `False`.
+ * only_can_run: Optional[bool]
+ Set to `True` to hide commands the user can't run. Defaults to `False`.
+ * show_hidden: Optional[bool]
+ Set to `True` to include hidden commands. Defaults to `False`.
+ * max_lines: Optional[int]
+ Sets the max number of lines the paginator will add to a single page. Defaults to 20.
+ """
session = cls(ctx, *command, **options)
await session.prepare()
return session
- async def stop(self):
- """
- Stops the help session, removes event listeners and attempts to
- delete the help message.
- """
-
+ async def stop(self) -> None:
+ """Stops the help session, removes event listeners and attempts to delete the help message."""
self._bot.remove_listener(self.on_reaction_add)
self._bot.remove_listener(self.on_message_delete)
@@ -586,80 +435,47 @@ class HelpSession:
await self.message.clear_reactions()
@property
- def is_first_page(self):
- """
- A bool reflecting if session is currently showing the first page.
-
- Returns
- -------
- bool
- """
-
+ def is_first_page(self) -> bool:
+ """Check if session is currently showing the first page."""
return self._current_page == 0
@property
- def is_last_page(self):
- """
- A bool reflecting if the session is currently showing the last page.
-
- Returns
- -------
- bool
- """
-
+ def is_last_page(self) -> bool:
+ """Check if the session is currently showing the last page."""
return self._current_page == (len(self._pages)-1)
- async def do_first(self):
- """
- Event that is called when the user requests the first page.
- """
-
+ async def do_first(self) -> None:
+ """Event that is called when the user requests the first page."""
if not self.is_first_page:
await self.update_page(0)
- async def do_back(self):
- """
- Event that is called when the user requests the previous page.
- """
-
+ async def do_back(self) -> None:
+ """Event that is called when the user requests the previous page."""
if not self.is_first_page:
await self.update_page(self._current_page-1)
- async def do_next(self):
- """
- Event that is called when the user requests the next page.
- """
-
+ async def do_next(self) -> None:
+ """Event that is called when the user requests the next page."""
if not self.is_last_page:
await self.update_page(self._current_page+1)
- async def do_end(self):
- """
- Event that is called when the user requests the last page.
- """
-
+ async def do_end(self) -> None:
+ """Event that is called when the user requests the last page."""
if not self.is_last_page:
await self.update_page(len(self._pages)-1)
- async def do_stop(self):
- """
- Event that is called when the user requests to stop the help session.
- """
-
+ async def do_stop(self) -> None:
+ """Event that is called when the user requests to stop the help session."""
await self.message.delete()
class Help(DiscordCog):
- """
- Custom Embed Pagination Help feature
- """
+ """Custom Embed Pagination Help feature."""
+
@commands.command('help')
@redirect_output(destination_channel=Channels.bot, bypass_roles=STAFF_ROLES)
- async def new_help(self, ctx, *commands):
- """
- Shows Command Help.
- """
-
+ async def new_help(self, ctx: Context, *commands) -> None:
+ """Shows Command Help."""
try:
await HelpSession.start(ctx, *commands)
except HelpQueryNotFound as error:
@@ -674,42 +490,29 @@ class Help(DiscordCog):
await ctx.send(embed=embed)
-def unload(bot):
+def unload(bot: Bot) -> None:
"""
Reinstates the original help command.
- This is run if the cog raises an exception on load, or if the
- extension is unloaded.
-
- Parameters
- ----------
- bot: :class:`discord.ext.commands.Bot`
- The discord bot client.
+ This is run if the cog raises an exception on load, or if the extension is unloaded.
"""
-
bot.remove_command('help')
bot.add_command(bot._old_help)
-def setup(bot):
+def setup(bot: Bot) -> None:
"""
The setup for the help extension.
This is called automatically on `bot.load_extension` being run.
- Stores the original help command instance on the ``bot._old_help``
- attribute for later reinstatement, before removing it from the
- command registry so the new help command can be loaded successfully.
-
- If an exception is raised during the loading of the cog, ``unload``
- will be called in order to reinstate the original help command.
+ Stores the original help command instance on the `bot._old_help` attribute for later
+ reinstatement, before removing it from the command registry so the new help command can be
+ loaded successfully.
- Parameters
- ----------
- bot: `discord.ext.commands.Bot`
- The discord bot client.
+ If an exception is raised during the loading of the cog, `unload` will be called in order to
+ reinstate the original help command.
"""
-
bot._old_help = bot.get_command('help')
bot.remove_command('help')
@@ -720,18 +523,12 @@ def setup(bot):
raise
-def teardown(bot):
+def teardown(bot: Bot) -> None:
"""
The teardown for the help extension.
This is called automatically on `bot.unload_extension` being run.
- Calls ``unload`` in order to reinstate the original help command.
-
- Parameters
- ----------
- bot: `discord.ext.commands.Bot`
- The discord bot client.
+ Calls `unload` in order to reinstate the original help command.
"""
-
unload(bot)
diff --git a/bot/cogs/information.py b/bot/cogs/information.py
index c789dbcc0..1afb37103 100644
--- a/bot/cogs/information.py
+++ b/bot/cogs/information.py
@@ -15,23 +15,15 @@ log = logging.getLogger(__name__)
class Information(Cog):
- """
- A cog with commands for generating embeds with
- server information, such as server statistics
- and user information.
- """
+ """A cog with commands for generating embeds with server info, such as server stats and user info."""
def __init__(self, bot: Bot):
self.bot = bot
@with_role(*MODERATION_ROLES)
@command(name="roles")
- async def roles_info(self, ctx: Context):
- """
- Returns a list of all roles and their
- corresponding IDs.
- """
-
+ async def roles_info(self, ctx: Context) -> None:
+ """Returns a list of all roles and their corresponding IDs."""
# Sort the roles alphabetically and remove the @everyone role
roles = sorted(ctx.guild.roles, key=lambda role: role.name)
roles = [role for role in roles if role.name != "@everyone"]
@@ -54,7 +46,7 @@ class Information(Cog):
@with_role(*MODERATION_ROLES)
@command(name="role")
- async def role_info(self, ctx: Context, *roles: typing.Union[Role, str]):
+ async def role_info(self, ctx: Context, *roles: typing.Union[Role, str]) -> None:
"""
Return information on a role or list of roles.
@@ -99,12 +91,8 @@ class Information(Cog):
await ctx.send(embed=embed)
@command(name="server", aliases=["server_info", "guild", "guild_info"])
- async def server_info(self, ctx: Context):
- """
- Returns an embed full of
- server information.
- """
-
+ async def server_info(self, ctx: Context) -> None:
+ """Returns an embed full of server information."""
created = time_since(ctx.guild.created_at, precision="days")
features = ", ".join(ctx.guild.features)
region = ctx.guild.region
@@ -168,11 +156,8 @@ class Information(Cog):
await ctx.send(embed=embed)
@command(name="user", aliases=["user_info", "member", "member_info"])
- async def user_info(self, ctx: Context, user: Member = None, hidden: bool = False):
- """
- Returns info about a user.
- """
-
+ async def user_info(self, ctx: Context, user: Member = None, hidden: bool = False) -> None:
+ """Returns info about a user."""
if user is None:
user = ctx.author
@@ -245,6 +230,7 @@ class Information(Cog):
await ctx.send(embed=embed)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Information cog load."""
bot.add_cog(Information(bot))
log.info("Cog loaded: Information")
diff --git a/bot/cogs/jams.py b/bot/cogs/jams.py
index dd14111ce..be9d33e3e 100644
--- a/bot/cogs/jams.py
+++ b/bot/cogs/jams.py
@@ -11,22 +11,16 @@ log = logging.getLogger(__name__)
class CodeJams(commands.Cog):
- """
- Manages the code-jam related parts of our server
- """
+ """Manages the code-jam related parts of our server."""
def __init__(self, bot: commands.Bot):
self.bot = bot
@commands.command()
@with_role(Roles.admin)
- async def createteam(
- self, ctx: commands.Context,
- team_name: str, members: commands.Greedy[Member]
- ):
+ async def createteam(self, ctx: commands.Context, team_name: str, members: commands.Greedy[Member]) -> None:
"""
- Create a team channel (both voice and text) in the Code Jams category, assign roles
- and then add overwrites for the team.
+ Create team channels (voice and text) in the Code Jams category, assign roles, and add overwrites for the team.
The first user passed will always be the team leader.
"""
@@ -114,6 +108,7 @@ class CodeJams(commands.Cog):
)
-def setup(bot):
+def setup(bot: commands.Bot) -> None:
+ """Code Jams cog load."""
bot.add_cog(CodeJams(bot))
log.info("Cog loaded: CodeJams")
diff --git a/bot/cogs/logging.py b/bot/cogs/logging.py
index 64bbed46e..8e47bcc36 100644
--- a/bot/cogs/logging.py
+++ b/bot/cogs/logging.py
@@ -10,15 +10,14 @@ log = logging.getLogger(__name__)
class Logging(Cog):
- """
- Debug logging module
- """
+ """Debug logging module."""
def __init__(self, bot: Bot):
self.bot = bot
@Cog.listener()
- async def on_ready(self):
+ async def on_ready(self) -> None:
+ """Announce our presence to the configured devlog channel."""
log.info("Bot connected!")
embed = Embed(description="Connected!")
@@ -35,6 +34,7 @@ class Logging(Cog):
await self.bot.get_channel(Channels.devlog).send(embed=embed)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Logging cog load."""
bot.add_cog(Logging(bot))
log.info("Cog loaded: Logging")
diff --git a/bot/cogs/moderation.py b/bot/cogs/moderation.py
index fea86c33e..b596f36e6 100644
--- a/bot/cogs/moderation.py
+++ b/bot/cogs/moderation.py
@@ -14,12 +14,12 @@ from discord.ext.commands import (
from bot import constants
from bot.cogs.modlog import ModLog
from bot.constants import Colours, Event, Icons, MODERATION_ROLES
-from bot.converters import ExpirationDate, InfractionSearchQuery
+from bot.converters import Duration, InfractionSearchQuery
from bot.decorators import with_role
from bot.pagination import LinePaginator
from bot.utils.moderation import already_has_active_infraction, post_infraction
from bot.utils.scheduling import Scheduler, create_task
-from bot.utils.time import wait_until
+from bot.utils.time import INFRACTION_FORMAT, format_infraction, wait_until
log = logging.getLogger(__name__)
@@ -33,6 +33,7 @@ APPEALABLE_INFRACTIONS = ("Ban", "Mute")
def proxy_user(user_id: str) -> Object:
+ """Create a proxy user for the provided user_id for situations where a Member or User object cannot be resolved."""
try:
user_id = int(user_id)
except ValueError:
@@ -43,13 +44,20 @@ def proxy_user(user_id: str) -> Object:
return user
+def permanent_duration(expires_at: str) -> str:
+ """Only allow an expiration to be 'permanent' if it is a string."""
+ expires_at = expires_at.lower()
+ if expires_at != "permanent":
+ raise BadArgument
+ else:
+ return expires_at
+
+
UserTypes = Union[Member, User, proxy_user]
class Moderation(Scheduler, Cog):
- """
- Server moderation tools.
- """
+ """Server moderation tools."""
def __init__(self, bot: Bot):
self.bot = bot
@@ -58,10 +66,12 @@ class Moderation(Scheduler, Cog):
@property
def mod_log(self) -> ModLog:
+ """Get currently loaded ModLog cog instance."""
return self.bot.get_cog("ModLog")
@Cog.listener()
- async def on_ready(self):
+ async def on_ready(self) -> None:
+ """Schedule expiration for previous infractions."""
# Schedule expiration for previous infractions
infractions = await self.bot.api_client.get(
'bot/infractions', params={'active': 'true'}
@@ -74,14 +84,8 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@command()
- async def warn(self, ctx: Context, user: UserTypes, *, reason: str = None):
- """
- Create a warning infraction in the database for a user.
-
- **`user`:** Accepts user mention, ID, etc.
- **`reason`:** The reason for the warning.
- """
-
+ async def warn(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None:
+ """Create a warning infraction in the database for a user."""
infraction = await post_infraction(ctx, user, type="warning", reason=reason)
if infraction is None:
return
@@ -90,11 +94,7 @@ class Moderation(Scheduler, Cog):
dm_result = ":incoming_envelope: " if notified else ""
action = f"{dm_result}:ok_hand: warned {user.mention}"
-
- if reason is None:
- await ctx.send(f"{action}.")
- else:
- await ctx.send(f"{action} ({reason}).")
+ await ctx.send(f"{action}.")
if notified:
dm_status = "Sent"
@@ -120,14 +120,8 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@command()
- async def kick(self, ctx: Context, user: Member, *, reason: str = None):
- """
- Kicks a user.
-
- **`user`:** Accepts user mention, ID, etc.
- **`reason`:** The reason for the kick.
- """
-
+ async def kick(self, ctx: Context, user: Member, *, reason: str = None) -> None:
+ """Kicks a user with the provided reason."""
if not await self.respect_role_hierarchy(ctx, user, 'kick'):
# Ensure ctx author has a higher top role than the target user
# Warning is sent to ctx by the helper method
@@ -149,11 +143,7 @@ class Moderation(Scheduler, Cog):
dm_result = ":incoming_envelope: " if notified else ""
action = f"{dm_result}:ok_hand: kicked {user.mention}"
-
- if reason is None:
- await ctx.send(f"{action}.")
- else:
- await ctx.send(f"{action} ({reason}).")
+ await ctx.send(f"{action}.")
dm_status = "Sent" if notified else "**Failed**"
title = "Member kicked" if action_result else "Member kicked (Failed)"
@@ -176,14 +166,8 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@command()
- async def ban(self, ctx: Context, user: UserTypes, *, reason: str = None):
- """
- Create a permanent ban infraction in the database for a user.
-
- **`user`:** Accepts user mention, ID, etc.
- **`reason`:** The reason for the ban.
- """
-
+ async def ban(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None:
+ """Create a permanent ban infraction for a user with the provided reason."""
if not await self.respect_role_hierarchy(ctx, user, 'ban'):
# Ensure ctx author has a higher top role than the target user
# Warning is sent to ctx by the helper method
@@ -213,11 +197,7 @@ class Moderation(Scheduler, Cog):
dm_result = ":incoming_envelope: " if notified else ""
action = f"{dm_result}:ok_hand: permanently banned {user.mention}"
-
- if reason is None:
- await ctx.send(f"{action}.")
- else:
- await ctx.send(f"{action} ({reason}).")
+ await ctx.send(f"{action}.")
dm_status = "Sent" if notified else "**Failed**"
log_content = None if all((notified, action_result)) else ctx.author.mention
@@ -240,75 +220,16 @@ class Moderation(Scheduler, Cog):
footer=f"ID {infraction['id']}"
)
- @with_role(*MODERATION_ROLES)
- @command()
- async def mute(self, ctx: Context, user: Member, *, reason: str = None):
- """
- Create a permanent mute infraction in the database for a user.
-
- **`user`:** Accepts user mention, ID, etc.
- **`reason`:** The reason for the mute.
- """
-
- if await already_has_active_infraction(ctx=ctx, user=user, type="mute"):
- return
-
- infraction = await post_infraction(ctx, user, type="mute", reason=reason)
- if infraction is None:
- return
-
- self.mod_log.ignore(Event.member_update, user.id)
- await user.add_roles(self._muted_role, reason=reason)
-
- notified = await self.notify_infraction(
- user=user,
- infr_type="Mute",
- expires_at="Permanent",
- reason=reason
- )
-
- dm_result = ":incoming_envelope: " if notified else ""
- action = f"{dm_result}:ok_hand: permanently muted {user.mention}"
-
- if reason is None:
- await ctx.send(f"{action}.")
- else:
- await ctx.send(f"{action} ({reason}).")
-
- if notified:
- dm_status = "Sent"
- log_content = None
- else:
- dm_status = "**Failed**"
- log_content = ctx.author.mention
-
- await self.mod_log.send_log_message(
- icon_url=Icons.user_mute,
- colour=Colour(Colours.soft_red),
- title="Member permanently muted",
- thumbnail=user.avatar_url_as(static_format="png"),
- text=textwrap.dedent(f"""
- Member: {user.mention} (`{user.id}`)
- Actor: {ctx.message.author}
- DM: {dm_status}
- Reason: {reason}
- """),
- content=log_content,
- footer=f"ID {infraction['id']}"
- )
-
# endregion
# region: Temporary infractions
@with_role(*MODERATION_ROLES)
- @command()
- async def tempmute(self, ctx: Context, user: Member, duration: ExpirationDate, *, reason: str = None) -> None:
+ @command(aliases=('mute',))
+ async def tempmute(self, ctx: Context, user: Member, duration: Duration, *, reason: str = None) -> None:
"""
- Create a temporary mute infraction in the database for a user.
+ Create a temporary mute infraction for a user with the provided expiration and reason.
- **`user`:** Accepts user mention, ID, etc.
- **`duration`:** The duration for the temporary mute infraction
- **`reason`:** The reason for the temporary mute.
+ Duration strings are parsed per: http://strftime.org/
"""
expiration = duration
@@ -329,21 +250,13 @@ class Moderation(Scheduler, Cog):
reason=reason
)
- infraction_expiration = (
- datetime
- .fromisoformat(infraction["expires_at"][:-1])
- .strftime('%c')
- )
+ infraction_expiration = format_infraction(infraction["expires_at"])
self.schedule_task(ctx.bot.loop, infraction["id"], infraction)
dm_result = ":incoming_envelope: " if notified else ""
action = f"{dm_result}:ok_hand: muted {user.mention} until {infraction_expiration}"
-
- if reason is None:
- await ctx.send(f"{action}.")
- else:
- await ctx.send(f"{action} ({reason}).")
+ await ctx.send(f"{action}.")
if notified:
dm_status = "Sent"
@@ -370,13 +283,11 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@command()
- async def tempban(self, ctx: Context, user: UserTypes, duration: ExpirationDate, *, reason: str = None) -> None:
+ async def tempban(self, ctx: Context, user: UserTypes, duration: Duration, *, reason: str = None) -> None:
"""
- Create a temporary ban infraction in the database for a user.
+ Create a temporary ban infraction for a user with the provided expiration and reason.
- **`user`:** Accepts user mention, ID, etc.
- **`duration`:** The duration for the temporary ban infraction
- **`reason`:** The reason for the temporary ban.
+ Duration strings are parsed per: http://strftime.org/
"""
expiration = duration
@@ -408,21 +319,13 @@ class Moderation(Scheduler, Cog):
except Forbidden:
action_result = False
- infraction_expiration = (
- datetime
- .fromisoformat(infraction["expires_at"][:-1])
- .strftime('%c')
- )
+ infraction_expiration = format_infraction(infraction["expires_at"])
self.schedule_task(ctx.bot.loop, infraction["id"], infraction)
dm_result = ":incoming_envelope: " if notified else ""
action = f"{dm_result}:ok_hand: banned {user.mention} until {infraction_expiration}"
-
- if reason is None:
- await ctx.send(f"{action}.")
- else:
- await ctx.send(f"{action} ({reason}).")
+ await ctx.send(f"{action}.")
dm_status = "Sent" if notified else "**Failed**"
log_content = None if all((notified, action_result)) else ctx.author.mention
@@ -450,23 +353,18 @@ class Moderation(Scheduler, Cog):
# region: Permanent shadow infractions
@with_role(*MODERATION_ROLES)
- @command(hidden=True, aliases=['shadowwarn', 'swarn', 'shadow_warn'])
+ @command(hidden=True)
async def note(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None:
"""
- Create a private infraction note in the database for a user.
+ Create a private infraction note in the database for a user with the provided reason.
- **`user`:** accepts user mention, ID, etc.
- **`reason`:** The reason for the warning.
+ This does not send the user a notification
"""
-
- infraction = await post_infraction(ctx, user, type="warning", reason=reason, hidden=True)
+ infraction = await post_infraction(ctx, user, type="note", reason=reason, hidden=True)
if infraction is None:
return
- if reason is None:
- await ctx.send(f":ok_hand: note added for {user.mention}.")
- else:
- await ctx.send(f":ok_hand: note added for {user.mention} ({reason}).")
+ await ctx.send(f":ok_hand: note added for {user.mention}.")
await self.mod_log.send_log_message(
icon_url=Icons.user_warn,
@@ -485,12 +383,10 @@ class Moderation(Scheduler, Cog):
@command(hidden=True, aliases=['shadowkick', 'skick'])
async def shadow_kick(self, ctx: Context, user: Member, *, reason: str = None) -> None:
"""
- Kicks a user.
+ Kick a user for the provided reason.
- **`user`:** accepts user mention, ID, etc.
- **`reason`:** The reason for the kick.
+ This does not send the user a notification.
"""
-
if not await self.respect_role_hierarchy(ctx, user, 'shadowkick'):
# Ensure ctx author has a higher top role than the target user
# Warning is sent to ctx by the helper method
@@ -508,10 +404,7 @@ class Moderation(Scheduler, Cog):
except Forbidden:
action_result = False
- if reason is None:
- await ctx.send(f":ok_hand: kicked {user.mention}.")
- else:
- await ctx.send(f":ok_hand: kicked {user.mention} ({reason}).")
+ await ctx.send(f":ok_hand: kicked {user.mention}.")
title = "Member shadow kicked"
if action_result:
@@ -538,12 +431,10 @@ class Moderation(Scheduler, Cog):
@command(hidden=True, aliases=['shadowban', 'sban'])
async def shadow_ban(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None:
"""
- Create a permanent ban infraction in the database for a user.
+ Create a permanent ban infraction for a user with the provided reason.
- **`user`:** Accepts user mention, ID, etc.
- **`reason`:** The reason for the ban.
+ This does not send the user a notification.
"""
-
if not await self.respect_role_hierarchy(ctx, user, 'shadowban'):
# Ensure ctx author has a higher top role than the target user
# Warning is sent to ctx by the helper method
@@ -565,10 +456,7 @@ class Moderation(Scheduler, Cog):
except Forbidden:
action_result = False
- if reason is None:
- await ctx.send(f":ok_hand: permanently banned {user.mention}.")
- else:
- await ctx.send(f":ok_hand: permanently banned {user.mention} ({reason}).")
+ await ctx.send(f":ok_hand: permanently banned {user.mention}.")
title = "Member permanently banned"
if action_result:
@@ -591,63 +479,20 @@ class Moderation(Scheduler, Cog):
footer=f"ID {infraction['id']}"
)
- @with_role(*MODERATION_ROLES)
- @command(hidden=True, aliases=['shadowmute', 'smute'])
- async def shadow_mute(self, ctx: Context, user: Member, *, reason: str = None) -> None:
- """
- Create a permanent mute infraction in the database for a user.
-
- **`user`:** Accepts user mention, ID, etc.
- **`reason`:** The reason for the mute.
- """
-
- if await already_has_active_infraction(ctx=ctx, user=user, type="mute"):
- return
-
- infraction = await post_infraction(ctx, user, type="mute", reason=reason, hidden=True)
- if infraction is None:
- return
-
- self.mod_log.ignore(Event.member_update, user.id)
- await user.add_roles(self._muted_role, reason=reason)
-
- if reason is None:
- await ctx.send(f":ok_hand: permanently muted {user.mention}.")
- else:
- await ctx.send(f":ok_hand: permanently muted {user.mention} ({reason}).")
-
- await self.mod_log.send_log_message(
- icon_url=Icons.user_mute,
- colour=Colour(Colours.soft_red),
- title="Member permanently muted",
- thumbnail=user.avatar_url_as(static_format="png"),
- text=textwrap.dedent(f"""
- Member: {user.mention} (`{user.id}`)
- Actor: {ctx.message.author}
- Reason: {reason}
- """),
- footer=f"ID {infraction['id']}"
- )
-
# endregion
# region: Temporary shadow infractions
@with_role(*MODERATION_ROLES)
- @command(hidden=True, aliases=["shadowtempmute, stempmute"])
+ @command(hidden=True, aliases=["shadowtempmute, stempmute", "shadowmute", "smute"])
async def shadow_tempmute(
- self,
- ctx: Context,
- user: Member,
- duration: ExpirationDate,
- *,
- reason: str = None
+ self, ctx: Context, user: Member, duration: Duration, *, reason: str = None
) -> None:
"""
- Create a temporary mute infraction in the database for a user.
+ Create a temporary mute infraction for a user with the provided reason.
+
+ Duration strings are parsed per: http://strftime.org/
- **`user`:** Accepts user mention, ID, etc.
- **`duration`:** The duration for the temporary mute infraction
- **`reason`:** The reason for the temporary mute.
+ This does not send the user a notification.
"""
expiration = duration
@@ -661,20 +506,9 @@ class Moderation(Scheduler, Cog):
self.mod_log.ignore(Event.member_update, user.id)
await user.add_roles(self._muted_role, reason=reason)
- infraction_expiration = (
- datetime
- .fromisoformat(infraction["expires_at"][:-1])
- .strftime('%c')
- )
-
+ infraction_expiration = format_infraction(infraction["expires_at"])
self.schedule_task(ctx.bot.loop, infraction["id"], infraction)
-
- if reason is None:
- await ctx.send(f":ok_hand: muted {user.mention} until {infraction_expiration}.")
- else:
- await ctx.send(
- f":ok_hand: muted {user.mention} until {infraction_expiration} ({reason})."
- )
+ await ctx.send(f":ok_hand: muted {user.mention} until {infraction_expiration}.")
await self.mod_log.send_log_message(
icon_url=Icons.user_mute,
@@ -693,19 +527,14 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@command(hidden=True, aliases=["shadowtempban, stempban"])
async def shadow_tempban(
- self,
- ctx: Context,
- user: UserTypes,
- duration: ExpirationDate,
- *,
- reason: str = None
+ self, ctx: Context, user: UserTypes, duration: Duration, *, reason: str = None
) -> None:
"""
- Create a temporary ban infraction in the database for a user.
+ Create a temporary ban infraction for a user with the provided reason.
+
+ Duration strings are parsed per: http://strftime.org/
- **`user`:** Accepts user mention, ID, etc.
- **`duration`:** The duration for the temporary ban infraction
- **`reason`:** The reason for the temporary ban.
+ This does not send the user a notification.
"""
expiration = duration
@@ -730,20 +559,9 @@ class Moderation(Scheduler, Cog):
except Forbidden:
action_result = False
- infraction_expiration = (
- datetime
- .fromisoformat(infraction["expires_at"][:-1])
- .strftime('%c')
- )
-
+ infraction_expiration = format_infraction(infraction["expires_at"])
self.schedule_task(ctx.bot.loop, infraction["id"], infraction)
-
- if reason is None:
- await ctx.send(f":ok_hand: banned {user.mention} until {infraction_expiration}.")
- else:
- await ctx.send(
- f":ok_hand: banned {user.mention} until {infraction_expiration} ({reason})."
- )
+ await ctx.send(f":ok_hand: banned {user.mention} until {infraction_expiration}.")
title = "Member temporarily banned"
if action_result:
@@ -774,12 +592,7 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@command()
async def unmute(self, ctx: Context, user: UserTypes) -> None:
- """
- Deactivates the active mute infraction for a user.
-
- **`user`:** Accepts user mention, ID, etc.
- """
-
+ """Deactivates the active mute infraction for a user."""
try:
# check the current active infraction
response = await self.bot.api_client.get(
@@ -857,12 +670,7 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@command()
async def unban(self, ctx: Context, user: UserTypes) -> None:
- """
- Deactivates the active ban infraction for a user.
-
- **`user`:** Accepts user mention, ID, etc.
- """
-
+ """Deactivates the active ban infraction for a user."""
try:
# check the current active infraction
response = await self.bot.api_client.get(
@@ -925,136 +733,77 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@group(name='infraction', aliases=('infr', 'infractions', 'inf'), invoke_without_command=True)
- async def infraction_group(self, ctx: Context):
+ async def infraction_group(self, ctx: Context) -> None:
"""Infraction manipulation commands."""
-
await ctx.invoke(self.bot.get_command("help"), "infraction")
@with_role(*MODERATION_ROLES)
- @infraction_group.group(name='edit', invoke_without_command=True)
- async def infraction_edit_group(self, ctx: Context):
- """Infraction editing commands."""
-
- await ctx.invoke(self.bot.get_command("help"), "infraction", "edit")
-
- @with_role(*MODERATION_ROLES)
- @infraction_edit_group.command(name="duration")
- async def edit_duration(
- self, ctx: Context,
- infraction_id: int, expires_at: Union[ExpirationDate, str]
- ):
+ @infraction_group.command(name='edit')
+ async def infraction_edit(
+ self,
+ ctx: Context,
+ infraction_id: int,
+ expires_at: Union[Duration, permanent_duration, None],
+ *,
+ reason: str = None
+ ) -> None:
"""
- Sets the duration of the given infraction, relative to the time of updating.
+ Edit the duration and/or the reason of an infraction.
- **`infraction_id`:** the id of the infraction
- **`expires_at`:** the new expiration date of the infraction.
+ Durations are relative to the time of updating.
Use "permanent" to mark the infraction as permanent.
"""
-
- if isinstance(expires_at, str) and expires_at != 'permanent':
- raise BadArgument(
- "If `expires_at` is given as a non-datetime, "
- "it must be `permanent`."
- )
- if expires_at == 'permanent':
- expires_at = None
-
- try:
- previous_infraction = await self.bot.api_client.get(
- 'bot/infractions/' + str(infraction_id)
- )
-
- # check the current active infraction
- infraction = await self.bot.api_client.patch(
- 'bot/infractions/' + str(infraction_id),
- json={
- 'expires_at': (
- expires_at.isoformat()
- if expires_at is not None
- else None
- )
- }
- )
-
- # Re-schedule
- self.cancel_task(infraction['id'])
- loop = asyncio.get_event_loop()
- self.schedule_task(loop, infraction['id'], infraction)
-
- if expires_at is None:
- await ctx.send(f":ok_hand: Updated infraction: marked as permanent.")
- else:
- human_expiry = (
- datetime
- .fromisoformat(infraction['expires_at'][:-1])
- .strftime('%c')
- )
- await ctx.send(
- ":ok_hand: Updated infraction: set to expire on "
- f"{human_expiry}."
- )
-
- except Exception:
- log.exception("There was an error updating an infraction.")
- await ctx.send(":x: There was an error updating the infraction.")
- return
-
- # Get information about the infraction's user
- user_id = infraction["user"]
- user = ctx.guild.get_member(user_id)
-
- if user:
- member_text = f"{user.mention} (`{user.id}`)"
- thumbnail = user.avatar_url_as(static_format="png")
+ if expires_at is None and reason is None:
+ # Unlike UserInputError, the error handler will show a specified message for BadArgument
+ raise BadArgument("Neither a new expiry nor a new reason was specified.")
+
+ # Retrieve the previous infraction for its information.
+ old_infraction = await self.bot.api_client.get(f'bot/infractions/{infraction_id}')
+
+ request_data = {}
+ confirm_messages = []
+ log_text = ""
+
+ if expires_at == "permanent":
+ request_data['expires_at'] = None
+ confirm_messages.append("marked as permanent")
+ elif expires_at is not None:
+ request_data['expires_at'] = expires_at.isoformat()
+ confirm_messages.append(f"set to expire on {expires_at.strftime(INFRACTION_FORMAT)}")
else:
- member_text = f"`{user_id}`"
- thumbnail = None
+ confirm_messages.append("expiry unchanged")
- # The infraction's actor
- actor_id = infraction["actor"]
- actor = ctx.guild.get_member(actor_id) or f"`{actor_id}`"
+ if reason:
+ request_data['reason'] = reason
+ confirm_messages.append("set a new reason")
+ log_text += f"""
+ Previous reason: {old_infraction['reason']}
+ New reason: {reason}
+ """.rstrip()
+ else:
+ confirm_messages.append("reason unchanged")
- await self.mod_log.send_log_message(
- icon_url=Icons.pencil,
- colour=Colour.blurple(),
- title="Infraction edited",
- thumbnail=thumbnail,
- text=textwrap.dedent(f"""
- Member: {member_text}
- Actor: {actor}
- Edited by: {ctx.message.author}
- Previous expiry: {previous_infraction['expires_at']}
- New expiry: {infraction['expires_at']}
- """)
+ # Update the infraction
+ new_infraction = await self.bot.api_client.patch(
+ f'bot/infractions/{infraction_id}',
+ json=request_data,
)
- @with_role(*MODERATION_ROLES)
- @infraction_edit_group.command(name="reason")
- async def edit_reason(self, ctx: Context, infraction_id: int, *, reason: str) -> None:
- """
- Sets the reason of the given infraction.
- **`infraction_id`:** the id of the infraction
- **`reason`:** The new reason of the infraction
- """
-
- try:
- old_infraction = await self.bot.api_client.get(
- 'bot/infractions/' + str(infraction_id)
- )
+ # Re-schedule infraction if the expiration has been updated
+ if 'expires_at' in request_data:
+ self.cancel_task(new_infraction['id'])
+ loop = asyncio.get_event_loop()
+ self.schedule_task(loop, new_infraction['id'], new_infraction)
- updated_infraction = await self.bot.api_client.patch(
- 'bot/infractions/' + str(infraction_id),
- json={'reason': reason}
- )
- await ctx.send(f":ok_hand: Updated infraction: set reason to \"{reason}\".")
+ log_text += f"""
+ Previous expiry: {old_infraction['expires_at'] or "Permanent"}
+ New expiry: {new_infraction['expires_at'] or "Permanent"}
+ """.rstrip()
- except Exception:
- log.exception("There was an error updating an infraction.")
- await ctx.send(":x: There was an error updating the infraction.")
- return
+ await ctx.send(f":ok_hand: Updated infraction: {' & '.join(confirm_messages)}")
# Get information about the infraction's user
- user_id = updated_infraction['user']
+ user_id = new_infraction['user']
user = ctx.guild.get_member(user_id)
if user:
@@ -1065,7 +814,7 @@ class Moderation(Scheduler, Cog):
thumbnail = None
# The infraction's actor
- actor_id = updated_infraction['actor']
+ actor_id = new_infraction['actor']
actor = ctx.guild.get_member(actor_id) or f"`{actor_id}`"
await self.mod_log.send_log_message(
@@ -1076,9 +825,7 @@ class Moderation(Scheduler, Cog):
text=textwrap.dedent(f"""
Member: {user_text}
Actor: {actor}
- Edited by: {ctx.message.author}
- Previous reason: {old_infraction['reason']}
- New reason: {updated_infraction['reason']}
+ Edited by: {ctx.message.author}{log_text}
""")
)
@@ -1087,11 +834,8 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@infraction_group.group(name="search", invoke_without_command=True)
- async def infraction_search_group(self, ctx: Context, query: InfractionSearchQuery):
- """
- Searches for infractions in the database.
- """
-
+ async def infraction_search_group(self, ctx: Context, query: InfractionSearchQuery) -> None:
+ """Searches for infractions in the database."""
if isinstance(query, User):
await ctx.invoke(self.search_user, query)
@@ -1100,11 +844,8 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@infraction_search_group.command(name="user", aliases=("member", "id"))
- async def search_user(self, ctx: Context, user: Union[User, proxy_user]):
- """
- Search for infractions by member.
- """
-
+ async def search_user(self, ctx: Context, user: Union[User, proxy_user]) -> None:
+ """Search for infractions by member."""
infraction_list = await self.bot.api_client.get(
'bot/infractions',
params={'user__id': str(user.id)}
@@ -1117,11 +858,8 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@infraction_search_group.command(name="reason", aliases=("match", "regex", "re"))
- async def search_reason(self, ctx: Context, reason: str):
- """
- Search for infractions by their reason. Use Re2 for matching.
- """
-
+ async def search_reason(self, ctx: Context, reason: str) -> None:
+ """Search for infractions by their reason. Use Re2 for matching."""
infraction_list = await self.bot.api_client.get(
'bot/infractions', params={'search': reason}
)
@@ -1134,8 +872,8 @@ class Moderation(Scheduler, Cog):
# endregion
# region: Utility functions
- async def send_infraction_list(self, ctx: Context, embed: Embed, infractions: list):
-
+ async def send_infraction_list(self, ctx: Context, embed: Embed, infractions: list) -> None:
+ """Send a paginated embed of infractions for the specified user."""
if not infractions:
await ctx.send(f":warning: No infractions could be found for that query.")
return
@@ -1158,17 +896,9 @@ class Moderation(Scheduler, Cog):
# region: Utility functions
def schedule_expiration(
- self,
- loop: asyncio.AbstractEventLoop,
- infraction_object: Dict[str, Union[str, int, bool]]
+ self, loop: asyncio.AbstractEventLoop, infraction_object: Dict[str, Union[str, int, bool]]
) -> None:
- """
- Schedules a task to expire a temporary infraction.
-
- :param loop: the asyncio event loop
- :param infraction_object: the infraction object to expire at the end of the task
- """
-
+ """Schedules a task to expire a temporary infraction."""
infraction_id = infraction_object["id"]
if infraction_id in self.scheduled_tasks:
return
@@ -1177,12 +907,8 @@ class Moderation(Scheduler, Cog):
self.scheduled_tasks[infraction_id] = task
- def cancel_expiration(self, infraction_id: str):
- """
- Un-schedules a task set to expire a temporary infraction.
- :param infraction_id: the ID of the infraction in question
- """
-
+ def cancel_expiration(self, infraction_id: str) -> None:
+ """Un-schedules a task set to expire a temporary infraction."""
task = self.scheduled_tasks.get(infraction_id)
if task is None:
log.warning(f"Failed to unschedule {infraction_id}: no task found.")
@@ -1193,13 +919,11 @@ class Moderation(Scheduler, Cog):
async def _scheduled_task(self, infraction_object: Dict[str, Union[str, int, bool]]) -> None:
"""
- A co-routine which marks an infraction as expired after the delay from the time of
- scheduling to the time of expiration. At the time of expiration, the infraction is
- marked as inactive on the website, and the expiration task is cancelled.
+ Marks an infraction expired after the delay from time of scheduling to time of expiration.
- :param infraction_object: the infraction in question
+ At the time of expiration, the infraction is marked as inactive on the website, and the
+ expiration task is cancelled. The user is then notified via DM.
"""
-
infraction_id = infraction_object["id"]
# transform expiration to delay in seconds
@@ -1224,11 +948,9 @@ class Moderation(Scheduler, Cog):
async def _deactivate_infraction(self, infraction_object: Dict[str, Union[str, int, bool]]) -> None:
"""
A co-routine which marks an infraction as inactive on the website.
- This co-routine does not cancel or un-schedule an expiration task.
- :param infraction_object: the infraction in question
+ This co-routine does not cancel or un-schedule an expiration task.
"""
-
guild: Guild = self.bot.get_guild(constants.Guild.id)
user_id = infraction_object["user"]
infraction_type = infraction_object["type"]
@@ -1254,17 +976,18 @@ class Moderation(Scheduler, Cog):
)
def _infraction_to_string(self, infraction_object: Dict[str, Union[str, int, bool]]) -> str:
+ """Convert the infraction object to a string representation."""
actor_id = infraction_object["actor"]
guild: Guild = self.bot.get_guild(constants.Guild.id)
actor = guild.get_member(actor_id)
active = infraction_object["active"]
user_id = infraction_object["user"]
hidden = infraction_object["hidden"]
- created = datetime.fromisoformat(infraction_object["inserted_at"][:-1]).strftime("%Y-%m-%d %H:%M")
+ created = format_infraction(infraction_object["inserted_at"])
if infraction_object["expires_at"] is None:
expires = "*Permanent*"
else:
- expires = datetime.fromisoformat(infraction_object["expires_at"][:-1]).strftime("%Y-%m-%d %H:%M")
+ expires = format_infraction(infraction_object["expires_at"])
lines = textwrap.dedent(f"""
{"**===============**" if active else "==============="}
@@ -1283,23 +1006,19 @@ class Moderation(Scheduler, Cog):
return lines.strip()
async def notify_infraction(
- self,
- user: Union[User, Member],
- infr_type: str,
- expires_at: Union[datetime, str] = 'N/A',
- reason: str = "No reason provided."
+ self,
+ user: Union[User, Member],
+ infr_type: str,
+ expires_at: Union[datetime, str] = 'N/A',
+ reason: str = "No reason provided."
) -> bool:
"""
- Notify a user of their fresh infraction :)
+ Attempt to notify a user, via DM, of their fresh infraction.
- :param user: The user to send the message to.
- :param infr_type: The type of infraction, as a string.
- :param duration: The duration of the infraction.
- :param reason: The reason for the infraction.
+ Returns a boolean indicator of whether the DM was successful.
"""
-
if isinstance(expires_at, datetime):
- expires_at = expires_at.strftime('%c')
+ expires_at = expires_at.strftime(INFRACTION_FORMAT)
embed = Embed(
description=textwrap.dedent(f"""
@@ -1328,14 +1047,10 @@ class Moderation(Scheduler, Cog):
icon_url: str = Icons.user_verified
) -> bool:
"""
- Notify a user that an infraction has been lifted.
+ Attempt to notify a user, via DM, of their expired infraction.
- :param user: The user to send the message to.
- :param title: The title of the embed.
- :param content: The content of the embed.
- :param icon_url: URL for the title icon.
+ Optionally returns a boolean indicator of whether the DM was successful.
"""
-
embed = Embed(
description=content,
colour=Colour(Colours.soft_green)
@@ -1349,10 +1064,8 @@ class Moderation(Scheduler, Cog):
"""
A helper method for sending an embed to a user's DMs.
- :param user: The user to send the embed to.
- :param embed: The embed to send.
+ Returns a boolean indicator of DM success.
"""
-
# sometimes `user` is a `discord.Object`, so let's make it a proper user.
user = await self.bot.fetch_user(user.id)
@@ -1366,7 +1079,8 @@ class Moderation(Scheduler, Cog):
)
return False
- async def log_notify_failure(self, target: str, actor: Member, infraction_type: str):
+ async def log_notify_failure(self, target: str, actor: Member, infraction_type: str) -> None:
+ """Send a mod log entry if an attempt to DM the target user has failed."""
await self.mod_log.send_log_message(
icon_url=Icons.token_removed,
content=actor.mention,
@@ -1380,8 +1094,9 @@ class Moderation(Scheduler, Cog):
# endregion
- @staticmethod
- async def cog_command_error(ctx: Context, error) -> None:
+ # This cannot be static (must have a __func__ attribute).
+ async def cog_command_error(self, ctx: Context, error: Exception) -> None:
+ """Send a notification to the invoking context on a Union failure."""
if isinstance(error, BadUnionArgument):
if User in error.converters:
await ctx.send(str(error.errors[0]))
@@ -1391,15 +1106,11 @@ class Moderation(Scheduler, Cog):
async def respect_role_hierarchy(ctx: Context, target: UserTypes, infr_type: str) -> bool:
"""
Check if the highest role of the invoking member is greater than that of the target member.
+
If this check fails, a warning is sent to the invoking ctx.
Returns True always if target is not a discord.Member instance.
-
- :param ctx: The command context when invoked.
- :param target: The target of the infraction.
- :param infr_type: The type of infraction.
"""
-
if not isinstance(target, Member):
return True
@@ -1419,6 +1130,6 @@ class Moderation(Scheduler, Cog):
def setup(bot: Bot) -> None:
- """Sets up the Moderation cog."""
+ """Moderation cog load."""
bot.add_cog(Moderation(bot))
log.info("Cog loaded: Moderation")
diff --git a/bot/cogs/modlog.py b/bot/cogs/modlog.py
index 978646f46..68424d268 100644
--- a/bot/cogs/modlog.py
+++ b/bot/cogs/modlog.py
@@ -11,7 +11,7 @@ from discord import (
RawMessageUpdateEvent, Role, TextChannel, User, VoiceChannel
)
from discord.abc import GuildChannel
-from discord.ext.commands import Bot, Cog
+from discord.ext.commands import Bot, Cog, Context
from bot.constants import (
Channels, Colours, Emojis, Event, Guild as GuildConstant, Icons, URLs
@@ -29,9 +29,7 @@ ROLE_CHANGES_UNSUPPORTED = ("colour", "permissions")
class ModLog(Cog, name="ModLog"):
- """
- Logging for server events and staff actions
- """
+ """Logging for server events and staff actions."""
def __init__(self, bot: Bot):
self.bot = bot
@@ -40,16 +38,14 @@ class ModLog(Cog, name="ModLog"):
self._cached_deletes = []
self._cached_edits = []
- async def upload_log(self, messages: List[Message], actor_id: int) -> Optional[str]:
+ async def upload_log(self, messages: List[Message], actor_id: int) -> str:
"""
- Uploads the log data to the database via
- an API endpoint for uploading logs.
+ Uploads the log data to the database via an API endpoint for uploading logs.
Used in several mod log embeds.
Returns a URL that can be used to view the log.
"""
-
response = await self.bot.api_client.post(
'bot/deleted-messages',
json={
@@ -70,7 +66,8 @@ class ModLog(Cog, name="ModLog"):
return f"{URLs.site_logs_view}/{response['id']}"
- def ignore(self, event: Event, *items: int):
+ def ignore(self, event: Event, *items: int) -> None:
+ """Add event to ignored events to suppress log emission."""
for item in items:
if item not in self._ignored[event]:
self._ignored[event].append(item)
@@ -90,7 +87,8 @@ class ModLog(Cog, name="ModLog"):
additional_embeds_msg: Optional[str] = None,
timestamp_override: Optional[datetime] = None,
footer: Optional[str] = None,
- ):
+ ) -> Context:
+ """Generate log embed and send to logging channel."""
embed = Embed(description=text)
if title and icon_url:
@@ -123,7 +121,8 @@ class ModLog(Cog, name="ModLog"):
return await self.bot.get_context(log_message) # Optionally return for use with antispam
@Cog.listener()
- async def on_guild_channel_create(self, channel: GUILD_CHANNEL):
+ async def on_guild_channel_create(self, channel: GUILD_CHANNEL) -> None:
+ """Log channel create event to mod log."""
if channel.guild.id != GuildConstant.id:
return
@@ -148,7 +147,8 @@ class ModLog(Cog, name="ModLog"):
await self.send_log_message(Icons.hash_green, Colour(Colours.soft_green), title, message)
@Cog.listener()
- async def on_guild_channel_delete(self, channel: GUILD_CHANNEL):
+ async def on_guild_channel_delete(self, channel: GUILD_CHANNEL) -> None:
+ """Log channel delete event to mod log."""
if channel.guild.id != GuildConstant.id:
return
@@ -170,7 +170,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_guild_channel_update(self, before: GUILD_CHANNEL, after: GuildChannel):
+ async def on_guild_channel_update(self, before: GUILD_CHANNEL, after: GuildChannel) -> None:
+ """Log channel update event to mod log."""
if before.guild.id != GuildConstant.id:
return
@@ -229,7 +230,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_guild_role_create(self, role: Role):
+ async def on_guild_role_create(self, role: Role) -> None:
+ """Log role create event to mod log."""
if role.guild.id != GuildConstant.id:
return
@@ -239,7 +241,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_guild_role_delete(self, role: Role):
+ async def on_guild_role_delete(self, role: Role) -> None:
+ """Log role delete event to mod log."""
if role.guild.id != GuildConstant.id:
return
@@ -249,7 +252,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_guild_role_update(self, before: Role, after: Role):
+ async def on_guild_role_update(self, before: Role, after: Role) -> None:
+ """Log role update event to mod log."""
if before.guild.id != GuildConstant.id:
return
@@ -301,7 +305,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_guild_update(self, before: Guild, after: Guild):
+ async def on_guild_update(self, before: Guild, after: Guild) -> None:
+ """Log guild update event to mod log."""
if before.id != GuildConstant.id:
return
@@ -351,7 +356,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_member_ban(self, guild: Guild, member: Union[Member, User]):
+ async def on_member_ban(self, guild: Guild, member: Union[Member, User]) -> None:
+ """Log ban event to mod log."""
if guild.id != GuildConstant.id:
return
@@ -367,7 +373,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_member_join(self, member: Member):
+ async def on_member_join(self, member: Member) -> None:
+ """Log member join event to user log."""
if member.guild.id != GuildConstant.id:
return
@@ -388,7 +395,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_member_remove(self, member: Member):
+ async def on_member_remove(self, member: Member) -> None:
+ """Log member leave event to user log."""
if member.guild.id != GuildConstant.id:
return
@@ -404,7 +412,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_member_unban(self, guild: Guild, member: User):
+ async def on_member_unban(self, guild: Guild, member: User) -> None:
+ """Log member unban event to mod log."""
if guild.id != GuildConstant.id:
return
@@ -420,7 +429,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_member_update(self, before: Member, after: Member):
+ async def on_member_update(self, before: Member, after: Member) -> None:
+ """Log member update event to user log."""
if before.guild.id != GuildConstant.id:
return
@@ -510,7 +520,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_message_delete(self, message: Message):
+ async def on_message_delete(self, message: Message) -> None:
+ """Log message delete event to message change log."""
channel = message.channel
author = message.author
@@ -565,7 +576,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_raw_message_delete(self, event: RawMessageDeleteEvent):
+ async def on_raw_message_delete(self, event: RawMessageDeleteEvent) -> None:
+ """Log raw message delete event to message change log."""
if event.guild_id != GuildConstant.id or event.channel_id in GuildConstant.ignored:
return
@@ -605,7 +617,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_message_edit(self, before: Message, after: Message):
+ async def on_message_edit(self, before: Message, after: Message) -> None:
+ """Log message edit event to message change log."""
if (
not before.guild
or before.guild.id != GuildConstant.id
@@ -679,7 +692,8 @@ class ModLog(Cog, name="ModLog"):
)
@Cog.listener()
- async def on_raw_message_edit(self, event: RawMessageUpdateEvent):
+ async def on_raw_message_edit(self, event: RawMessageUpdateEvent) -> None:
+ """Log raw message edit event to message change log."""
try:
channel = self.bot.get_channel(int(event.data["channel_id"]))
message = await channel.fetch_message(event.message_id)
@@ -748,6 +762,7 @@ class ModLog(Cog, name="ModLog"):
)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Mod log cog load."""
bot.add_cog(ModLog(bot))
log.info("Cog loaded: ModLog")
diff --git a/bot/cogs/off_topic_names.py b/bot/cogs/off_topic_names.py
index 1f6ed80b5..16717d523 100644
--- a/bot/cogs/off_topic_names.py
+++ b/bot/cogs/off_topic_names.py
@@ -6,6 +6,7 @@ from datetime import datetime, timedelta
from discord import Colour, Embed
from discord.ext.commands import BadArgument, Bot, Cog, Context, Converter, group
+from bot.api import ResponseCodeError
from bot.constants import Channels, MODERATION_ROLES
from bot.decorators import with_role
from bot.pagination import LinePaginator
@@ -19,7 +20,8 @@ class OffTopicName(Converter):
"""A converter that ensures an added off-topic name is valid."""
@staticmethod
- async def convert(ctx: Context, argument: str):
+ async def convert(ctx: Context, argument: str) -> str:
+ """Attempt to replace any invalid characters with their approximate Unicode equivalent."""
allowed_characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ!?'`-"
if not (2 <= len(argument) <= 96):
@@ -38,16 +40,8 @@ class OffTopicName(Converter):
return argument.translate(table)
-async def update_names(bot: Bot):
- """
- The background updater task that performs a channel name update daily.
-
- Args:
- bot (Bot):
- The running bot instance, used for fetching data from the
- website via the bot's `api_client`.
- """
-
+async def update_names(bot: Bot) -> None:
+ """Background updater task that performs the daily channel name update."""
while True:
# Since we truncate the compute timedelta to seconds, we add one second to ensure
# we go past midnight in the `seconds_to_sleep` set below.
@@ -56,9 +50,13 @@ async def update_names(bot: Bot):
seconds_to_sleep = (next_midnight - datetime.utcnow()).seconds + 1
await asyncio.sleep(seconds_to_sleep)
- channel_0_name, channel_1_name, channel_2_name = await bot.api_client.get(
- 'bot/off-topic-channel-names', params={'random_items': 3}
- )
+ try:
+ channel_0_name, channel_1_name, channel_2_name = await bot.api_client.get(
+ 'bot/off-topic-channel-names', params={'random_items': 3}
+ )
+ except ResponseCodeError as e:
+ log.error(f"Failed to get new off topic channel names: code {e.response.status}")
+ continue
channel_0, channel_1, channel_2 = (bot.get_channel(channel_id) for channel_id in CHANNELS)
await channel_0.edit(name=f'ot0-{channel_0_name}')
@@ -77,26 +75,27 @@ class OffTopicNames(Cog):
self.bot = bot
self.updater_task = None
- def cog_unload(self):
+ def cog_unload(self) -> None:
+ """Cancel any running updater tasks on cog unload."""
if self.updater_task is not None:
self.updater_task.cancel()
@Cog.listener()
- async def on_ready(self):
+ async def on_ready(self) -> None:
+ """Start off-topic channel updating event loop if it hasn't already started."""
if self.updater_task is None:
coro = update_names(self.bot)
self.updater_task = self.bot.loop.create_task(coro)
@group(name='otname', aliases=('otnames', 'otn'), invoke_without_command=True)
@with_role(*MODERATION_ROLES)
- async def otname_group(self, ctx):
+ async def otname_group(self, ctx: Context) -> None:
"""Add or list items from the off-topic channel name rotation."""
-
await ctx.invoke(self.bot.get_command("help"), "otname")
@otname_group.command(name='add', aliases=('a',))
@with_role(*MODERATION_ROLES)
- async def add_command(self, ctx, *names: OffTopicName):
+ async def add_command(self, ctx: Context, *names: OffTopicName) -> None:
"""Adds a new off-topic name to the rotation."""
# Chain multiple words to a single one
name = "-".join(names)
@@ -110,7 +109,7 @@ class OffTopicNames(Cog):
@otname_group.command(name='delete', aliases=('remove', 'rm', 'del', 'd'))
@with_role(*MODERATION_ROLES)
- async def delete_command(self, ctx, *names: OffTopicName):
+ async def delete_command(self, ctx: Context, *names: OffTopicName) -> None:
"""Removes a off-topic name from the rotation."""
# Chain multiple words to a single one
name = "-".join(names)
@@ -124,12 +123,12 @@ class OffTopicNames(Cog):
@otname_group.command(name='list', aliases=('l',))
@with_role(*MODERATION_ROLES)
- async def list_command(self, ctx):
+ async def list_command(self, ctx: Context) -> None:
"""
Lists all currently known off-topic channel names in a paginator.
+
Restricted to Moderator and above to not spoil the surprise.
"""
-
result = await self.bot.api_client.get('bot/off-topic-channel-names')
lines = sorted(f"• {name}" for name in result)
embed = Embed(
@@ -144,11 +143,8 @@ class OffTopicNames(Cog):
@otname_group.command(name='search', aliases=('s',))
@with_role(*MODERATION_ROLES)
- async def search_command(self, ctx, *, query: OffTopicName):
- """
- Search for an off-topic name.
- """
-
+ async def search_command(self, ctx: Context, *, query: OffTopicName) -> None:
+ """Search for an off-topic name."""
result = await self.bot.api_client.get('bot/off-topic-channel-names')
in_matches = {name for name in result if query in name}
close_matches = difflib.get_close_matches(query, result, n=10, cutoff=0.70)
@@ -165,6 +161,7 @@ class OffTopicNames(Cog):
await ctx.send(embed=embed)
-def setup(bot: Bot):
+def setup(bot: Bot) -> None:
+ """Off topic names cog load."""
bot.add_cog(OffTopicNames(bot))
log.info("Cog loaded: OffTopicNames")
diff --git a/bot/cogs/reddit.py b/bot/cogs/reddit.py
index 4c561b7e8..63a57c5c6 100644
--- a/bot/cogs/reddit.py
+++ b/bot/cogs/reddit.py
@@ -3,8 +3,9 @@ import logging
import random
import textwrap
from datetime import datetime, timedelta
+from typing import List
-from discord import Colour, Embed, TextChannel
+from discord import Colour, Embed, Message, TextChannel
from discord.ext.commands import Bot, Cog, Context, group
from bot.constants import Channels, ERROR_REPLIES, Reddit as RedditConfig, STAFF_ROLES
@@ -16,9 +17,7 @@ log = logging.getLogger(__name__)
class Reddit(Cog):
- """
- Track subreddit posts and show detailed statistics about them.
- """
+ """Track subreddit posts and show detailed statistics about them."""
HEADERS = {"User-Agent": "Discord Bot: PythonDiscord (https://pythondiscord.com/)"}
URL = "https://www.reddit.com"
@@ -34,11 +33,8 @@ class Reddit(Cog):
self.new_posts_task = None
self.top_weekly_posts_task = None
- async def fetch_posts(self, route: str, *, amount: int = 25, params=None):
- """
- A helper method to fetch a certain amount of Reddit posts at a given route.
- """
-
+ async def fetch_posts(self, route: str, *, amount: int = 25, params: dict = None) -> List[dict]:
+ """A helper method to fetch a certain amount of Reddit posts at a given route."""
# Reddit's JSON responses only provide 25 posts at most.
if not 25 >= amount > 0:
raise ValueError("Invalid amount of subreddit posts requested.")
@@ -57,11 +53,10 @@ class Reddit(Cog):
return posts[:amount]
- async def send_top_posts(self, channel: TextChannel, subreddit: Subreddit, content=None, time="all"):
- """
- Create an embed for the top posts, then send it in a given TextChannel.
- """
-
+ async def send_top_posts(
+ self, channel: TextChannel, subreddit: Subreddit, content: str = None, time: str = "all"
+ ) -> Message:
+ """Create an embed for the top posts, then send it in a given TextChannel."""
# Create the new spicy embed.
embed = Embed()
embed.description = ""
@@ -115,11 +110,8 @@ class Reddit(Cog):
embed=embed
)
- async def poll_new_posts(self):
- """
- Periodically search for new subreddit posts.
- """
-
+ async def poll_new_posts(self) -> None:
+ """Periodically search for new subreddit posts."""
while True:
await asyncio.sleep(RedditConfig.request_delay)
@@ -179,11 +171,8 @@ class Reddit(Cog):
log.trace(f"Sent {len(new_posts)} new {subreddit} posts to channel {self.reddit_channel.id}.")
- async def poll_top_weekly_posts(self):
- """
- Post a summary of the top posts every week.
- """
-
+ async def poll_top_weekly_posts(self) -> None:
+ """Post a summary of the top posts every week."""
while True:
now = datetime.utcnow()
@@ -214,19 +203,13 @@ class Reddit(Cog):
await message.pin()
@group(name="reddit", invoke_without_command=True)
- async def reddit_group(self, ctx: Context):
- """
- View the top posts from various subreddits.
- """
-
+ async def reddit_group(self, ctx: Context) -> None:
+ """View the top posts from various subreddits."""
await ctx.invoke(self.bot.get_command("help"), "reddit")
@reddit_group.command(name="top")
- async def top_command(self, ctx: Context, subreddit: Subreddit = "r/Python"):
- """
- Send the top posts of all time from a given subreddit.
- """
-
+ async def top_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None:
+ """Send the top posts of all time from a given subreddit."""
await self.send_top_posts(
channel=ctx.channel,
subreddit=subreddit,
@@ -235,11 +218,8 @@ class Reddit(Cog):
)
@reddit_group.command(name="daily")
- async def daily_command(self, ctx: Context, subreddit: Subreddit = "r/Python"):
- """
- Send the top posts of today from a given subreddit.
- """
-
+ async def daily_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None:
+ """Send the top posts of today from a given subreddit."""
await self.send_top_posts(
channel=ctx.channel,
subreddit=subreddit,
@@ -248,11 +228,8 @@ class Reddit(Cog):
)
@reddit_group.command(name="weekly")
- async def weekly_command(self, ctx: Context, subreddit: Subreddit = "r/Python"):
- """
- Send the top posts of this week from a given subreddit.
- """
-
+ async def weekly_command(self, ctx: Context, subreddit: Subreddit = "r/Python") -> None:
+ """Send the top posts of this week from a given subreddit."""
await self.send_top_posts(
channel=ctx.channel,
subreddit=subreddit,
@@ -262,11 +239,8 @@ class Reddit(Cog):
@with_role(*STAFF_ROLES)
@reddit_group.command(name="subreddits", aliases=("subs",))
- async def subreddits_command(self, ctx: Context):
- """
- Send a paginated embed of all the subreddits we're relaying.
- """
-
+ async def subreddits_command(self, ctx: Context) -> None:
+ """Send a paginated embed of all the subreddits we're relaying."""
embed = Embed()
embed.title = "Relayed subreddits."
embed.colour = Colour.blurple()
@@ -280,7 +254,8 @@ class Reddit(Cog):
)
@Cog.listener()
- async def on_ready(self):
+ async def on_ready(self) -> None:
+ """Initiate reddit post event loop."""
self.reddit_channel = await self.bot.fetch_channel(Channels.reddit)
if self.reddit_channel is not None:
@@ -292,6 +267,7 @@ class Reddit(Cog):
log.warning("Couldn't locate a channel for subreddit relaying.")
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Reddit cog load."""
bot.add_cog(Reddit(bot))
log.info("Cog loaded: Reddit")
diff --git a/bot/cogs/reminders.py b/bot/cogs/reminders.py
index c6ae984ea..6e91d2c06 100644
--- a/bot/cogs/reminders.py
+++ b/bot/cogs/reminders.py
@@ -4,13 +4,14 @@ import random
import textwrap
from datetime import datetime
from operator import itemgetter
+from typing import Optional
from dateutil.relativedelta import relativedelta
-from discord import Colour, Embed
+from discord import Colour, Embed, Message
from discord.ext.commands import Bot, Cog, Context, group
from bot.constants import Channels, Icons, NEGATIVE_REPLIES, POSITIVE_REPLIES, STAFF_ROLES
-from bot.converters import ExpirationDate
+from bot.converters import Duration
from bot.pagination import LinePaginator
from bot.utils.checks import without_role_check
from bot.utils.scheduling import Scheduler
@@ -23,14 +24,15 @@ MAXIMUM_REMINDERS = 5
class Reminders(Scheduler, Cog):
+ """Provide in-channel reminder functionality."""
def __init__(self, bot: Bot):
self.bot = bot
super().__init__()
@Cog.listener()
- async def on_ready(self):
- # Get all the current reminders for re-scheduling
+ async def on_ready(self) -> None:
+ """Get all current reminders from the API and reschedule them."""
response = await self.bot.api_client.get(
'bot/reminders',
params={'active': 'true'}
@@ -51,25 +53,16 @@ class Reminders(Scheduler, Cog):
self.schedule_task(loop, reminder["id"], reminder)
@staticmethod
- async def _send_confirmation(ctx: Context, on_success: str):
- """
- Send an embed confirming the change was made successfully.
- """
-
+ async def _send_confirmation(ctx: Context, on_success: str) -> None:
+ """Send an embed confirming the reminder change was made successfully."""
embed = Embed()
embed.colour = Colour.green()
embed.title = random.choice(POSITIVE_REPLIES)
embed.description = on_success
await ctx.send(embed=embed)
- async def _scheduled_task(self, reminder: dict):
- """
- A coroutine which sends the reminder once the time is reached.
-
- :param reminder: the data of the reminder.
- :return:
- """
-
+ async def _scheduled_task(self, reminder: dict) -> None:
+ """A coroutine which sends the reminder once the time is reached, and cancels the running task."""
reminder_id = reminder["id"]
reminder_datetime = datetime.fromisoformat(reminder['expiration'][:-1])
@@ -83,38 +76,22 @@ class Reminders(Scheduler, Cog):
# Now we can begone with it from our schedule list.
self.cancel_task(reminder_id)
- async def _delete_reminder(self, reminder_id: str):
- """
- Delete a reminder from the database, given its ID.
-
- :param reminder_id: The ID of the reminder.
- """
-
+ async def _delete_reminder(self, reminder_id: str) -> None:
+ """Delete a reminder from the database, given its ID, and cancel the running task."""
await self.bot.api_client.delete('bot/reminders/' + str(reminder_id))
# Now we can remove it from the schedule list
self.cancel_task(reminder_id)
- async def _reschedule_reminder(self, reminder):
- """
- Reschedule a reminder object.
-
- :param reminder: The reminder to be rescheduled.
- """
-
+ async def _reschedule_reminder(self, reminder: dict) -> None:
+ """Reschedule a reminder object."""
loop = asyncio.get_event_loop()
self.cancel_task(reminder["id"])
self.schedule_task(loop, reminder["id"], reminder)
- async def send_reminder(self, reminder, late: relativedelta = None):
- """
- Send the reminder.
-
- :param reminder: The data about the reminder.
- :param late: How late the reminder is (if at all)
- """
-
+ async def send_reminder(self, reminder: dict, late: relativedelta = None) -> None:
+ """Send the reminder."""
channel = self.bot.get_channel(reminder["channel_id"])
user = self.bot.get_user(reminder["author"])
@@ -141,19 +118,17 @@ class Reminders(Scheduler, Cog):
await self._delete_reminder(reminder["id"])
@group(name="remind", aliases=("reminder", "reminders"), invoke_without_command=True)
- async def remind_group(self, ctx: Context, expiration: ExpirationDate, *, content: str):
- """
- Commands for managing your reminders.
- """
-
+ async def remind_group(self, ctx: Context, expiration: Duration, *, content: str) -> None:
+ """Commands for managing your reminders."""
await ctx.invoke(self.new_reminder, expiration=expiration, content=content)
@remind_group.command(name="new", aliases=("add", "create"))
- async def new_reminder(self, ctx: Context, expiration: ExpirationDate, *, content: str):
+ async def new_reminder(self, ctx: Context, expiration: Duration, *, content: str) -> Optional[Message]:
"""
Set yourself a simple reminder.
- """
+ Expiration is parsed per: http://strftime.org/
+ """
embed = Embed()
# If the user is not staff, we need to verify whether or not to make a reminder at all.
@@ -171,7 +146,7 @@ class Reminders(Scheduler, Cog):
active_reminders = await self.bot.api_client.get(
'bot/reminders',
params={
- 'user__id': str(ctx.author.id)
+ 'author__id': str(ctx.author.id)
}
)
@@ -204,15 +179,12 @@ class Reminders(Scheduler, Cog):
self.schedule_task(loop, reminder["id"], reminder)
@remind_group.command(name="list")
- async def list_reminders(self, ctx: Context):
- """
- View a paginated embed of all reminders for your user.
- """
-
+ async def list_reminders(self, ctx: Context) -> Optional[Message]:
+ """View a paginated embed of all reminders for your user."""
# Get all the user's reminders from the database.
data = await self.bot.api_client.get(
'bot/reminders',
- params={'user__id': str(ctx.author.id)}
+ params={'author__id': str(ctx.author.id)}
)
now = datetime.utcnow()
@@ -260,19 +232,17 @@ class Reminders(Scheduler, Cog):
)
@remind_group.group(name="edit", aliases=("change", "modify"), invoke_without_command=True)
- async def edit_reminder_group(self, ctx: Context):
- """
- Commands for modifying your current reminders.
- """
-
+ async def edit_reminder_group(self, ctx: Context) -> None:
+ """Commands for modifying your current reminders."""
await ctx.invoke(self.bot.get_command("help"), "reminders", "edit")
@edit_reminder_group.command(name="duration", aliases=("time",))
- async def edit_reminder_duration(self, ctx: Context, id_: int, expiration: ExpirationDate):
- """
- Edit one of your reminders' expiration.
+ async def edit_reminder_duration(self, ctx: Context, id_: int, expiration: Duration) -> None:
"""
+ Edit one of your reminder's expiration.
+ Expiration is parsed per: http://strftime.org/
+ """
# Send the request to update the reminder in the database
reminder = await self.bot.api_client.patch(
'bot/reminders/' + str(id_),
@@ -287,11 +257,8 @@ class Reminders(Scheduler, Cog):
await self._reschedule_reminder(reminder)
@edit_reminder_group.command(name="content", aliases=("reason",))
- async def edit_reminder_content(self, ctx: Context, id_: int, *, content: str):
- """
- Edit one of your reminders' content.
- """
-
+ async def edit_reminder_content(self, ctx: Context, id_: int, *, content: str) -> None:
+ """Edit one of your reminder's content."""
# Send the request to update the reminder in the database
reminder = await self.bot.api_client.patch(
'bot/reminders/' + str(id_),
@@ -305,17 +272,15 @@ class Reminders(Scheduler, Cog):
await self._reschedule_reminder(reminder)
@remind_group.command("delete", aliases=("remove",))
- async def delete_reminder(self, ctx: Context, id_: int):
- """
- Delete one of your active reminders.
- """
-
+ async def delete_reminder(self, ctx: Context, id_: int) -> None:
+ """Delete one of your active reminders."""
await self._delete_reminder(id_)
await self._send_confirmation(
ctx, on_success="That reminder has been deleted successfully!"
)
-def setup(bot: Bot):
+def setup(bot: Bot) -> None:
+ """Reminders cog load."""
bot.add_cog(Reminders(bot))
log.info("Cog loaded: Reminders")
diff --git a/bot/cogs/security.py b/bot/cogs/security.py
index e02e91530..316b33d6b 100644
--- a/bot/cogs/security.py
+++ b/bot/cogs/security.py
@@ -6,24 +6,25 @@ log = logging.getLogger(__name__)
class Security(Cog):
- """
- Security-related helpers
- """
+ """Security-related helpers."""
def __init__(self, bot: Bot):
self.bot = bot
self.bot.check(self.check_not_bot) # Global commands check - no bots can run any commands at all
self.bot.check(self.check_on_guild) # Global commands check - commands can't be run in a DM
- def check_not_bot(self, ctx: Context):
+ def check_not_bot(self, ctx: Context) -> bool:
+ """Check if the context is a bot user."""
return not ctx.author.bot
- def check_on_guild(self, ctx: Context):
+ def check_on_guild(self, ctx: Context) -> bool:
+ """Check if the context is in a guild."""
if ctx.guild is None:
raise NoPrivateMessage("This command cannot be used in private messages.")
return True
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Security cog load."""
bot.add_cog(Security(bot))
log.info("Cog loaded: Security")
diff --git a/bot/cogs/site.py b/bot/cogs/site.py
index 4d5b2e811..4a423faa9 100644
--- a/bot/cogs/site.py
+++ b/bot/cogs/site.py
@@ -19,15 +19,13 @@ class Site(Cog):
self.bot = bot
@group(name="site", aliases=("s",), invoke_without_command=True)
- async def site_group(self, ctx):
+ async def site_group(self, ctx: Context) -> None:
"""Commands for getting info about our website."""
-
await ctx.invoke(self.bot.get_command("help"), "site")
@site_group.command(name="home", aliases=("about",))
- async def site_main(self, ctx: Context):
+ async def site_main(self, ctx: Context) -> None:
"""Info about the website itself."""
-
url = f"{URLs.site_schema}{URLs.site}/"
embed = Embed(title="Python Discord website")
@@ -43,9 +41,8 @@ class Site(Cog):
await ctx.send(embed=embed)
@site_group.command(name="resources")
- async def site_resources(self, ctx: Context):
+ async def site_resources(self, ctx: Context) -> None:
"""Info about the site's Resources page."""
-
learning_url = f"{PAGES_URL}/resources"
tools_url = f"{PAGES_URL}/tools"
@@ -63,9 +60,8 @@ class Site(Cog):
await ctx.send(embed=embed)
@site_group.command(name="help")
- async def site_help(self, ctx: Context):
+ async def site_help(self, ctx: Context) -> None:
"""Info about the site's Getting Help page."""
-
url = f"{PAGES_URL}/asking-good-questions"
embed = Embed(title="Asking Good Questions")
@@ -80,9 +76,8 @@ class Site(Cog):
await ctx.send(embed=embed)
@site_group.command(name="faq")
- async def site_faq(self, ctx: Context):
+ async def site_faq(self, ctx: Context) -> None:
"""Info about the site's FAQ page."""
-
url = f"{PAGES_URL}/frequently-asked-questions"
embed = Embed(title="FAQ")
@@ -99,14 +94,8 @@ class Site(Cog):
@site_group.command(aliases=['r', 'rule'], name='rules')
@redirect_output(destination_channel=Channels.bot, bypass_roles=STAFF_ROLES)
- async def site_rules(self, ctx: Context, *rules: int):
- """
- Provides a link to the `rules` endpoint of the website, or displays
- specific rules, if they are requested.
-
- **`ctx`:** The Discord message context
- **`rules`:** The rules a user wants to get.
- """
+ async def site_rules(self, ctx: Context, *rules: int) -> None:
+ """Provides a link to all rules or, if specified, displays specific rule(s)."""
rules_embed = Embed(title='Rules', color=Colour.blurple())
rules_embed.url = f"{PAGES_URL}/rules"
@@ -138,6 +127,7 @@ class Site(Cog):
await LinePaginator.paginate(final_rules, ctx, rules_embed, max_lines=3)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Site cog load."""
bot.add_cog(Site(bot))
log.info("Cog loaded: Site")
diff --git a/bot/cogs/snekbox.py b/bot/cogs/snekbox.py
index d36c0795d..81185cf3e 100644
--- a/bot/cogs/snekbox.py
+++ b/bot/cogs/snekbox.py
@@ -7,11 +7,10 @@ from typing import Optional, Tuple
from discord.ext.commands import Bot, Cog, Context, command, guild_only
-from bot.constants import Channels, STAFF_ROLES, URLs
+from bot.constants import Channels, Roles, URLs
from bot.decorators import in_channel
from bot.utils.messages import wait_for_deletion
-
log = logging.getLogger(__name__)
ESCAPE_REGEX = re.compile("[`\u202E\u200B]{3,}")
@@ -34,12 +33,11 @@ RAW_CODE_REGEX = re.compile(
)
MAX_PASTE_LEN = 1000
+EVAL_ROLES = (Roles.helpers, Roles.moderator, Roles.admin, Roles.owner, Roles.rockstars, Roles.partners)
class Snekbox(Cog):
- """
- Safe evaluation of Python code using Snekbox
- """
+ """Safe evaluation of Python code using Snekbox."""
def __init__(self, bot: Bot):
self.bot = bot
@@ -168,8 +166,8 @@ class Snekbox(Cog):
@command(name="eval", aliases=("e",))
@guild_only()
- @in_channel(Channels.bot, bypass_roles=STAFF_ROLES)
- async def eval_command(self, ctx: Context, *, code: str = None):
+ @in_channel(Channels.bot, bypass_roles=EVAL_ROLES)
+ async def eval_command(self, ctx: Context, *, code: str = None) -> None:
"""
Run Python code and get the results.
@@ -178,13 +176,15 @@ class Snekbox(Cog):
issue with it!
"""
if ctx.author.id in self.jobs:
- return await ctx.send(
+ await ctx.send(
f"{ctx.author.mention} You've already got a job running - "
f"please wait for it to finish!"
)
+ return
if not code: # None or empty string
- return await ctx.invoke(self.bot.get_command("help"), "eval")
+ await ctx.invoke(self.bot.get_command("help"), "eval")
+ return
log.info(
f"Received code from {ctx.author.name}#{ctx.author.discriminator} "
@@ -221,6 +221,7 @@ class Snekbox(Cog):
del self.jobs[ctx.author.id]
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Snekbox cog load."""
bot.add_cog(Snekbox(bot))
log.info("Cog loaded: Snekbox")
diff --git a/bot/cogs/superstarify/__init__.py b/bot/cogs/superstarify/__init__.py
index e9743a2f5..87021eded 100644
--- a/bot/cogs/superstarify/__init__.py
+++ b/bot/cogs/superstarify/__init__.py
@@ -1,6 +1,5 @@
import logging
import random
-from datetime import datetime
from discord import Colour, Embed, Member
from discord.errors import Forbidden
@@ -10,39 +9,40 @@ from bot.cogs.moderation import Moderation
from bot.cogs.modlog import ModLog
from bot.cogs.superstarify.stars import get_nick
from bot.constants import Icons, MODERATION_ROLES, POSITIVE_REPLIES
-from bot.converters import ExpirationDate
+from bot.converters import Duration
from bot.decorators import with_role
from bot.utils.moderation import post_infraction
+from bot.utils.time import format_infraction
log = logging.getLogger(__name__)
NICKNAME_POLICY_URL = "https://pythondiscord.com/pages/rules/#wiki-toc-nickname-policy"
class Superstarify(Cog):
- """
- A set of commands to moderate terrible nicknames.
- """
+ """A set of commands to moderate terrible nicknames."""
def __init__(self, bot: Bot):
self.bot = bot
@property
def moderation(self) -> Moderation:
+ """Get currently loaded Moderation cog instance."""
return self.bot.get_cog("Moderation")
@property
def modlog(self) -> ModLog:
+ """Get currently loaded ModLog cog instance."""
return self.bot.get_cog("ModLog")
@Cog.listener()
- async def on_member_update(self, before: Member, after: Member):
+ async def on_member_update(self, before: Member, after: Member) -> None:
"""
This event will trigger when someone changes their name.
- At this point we will look up the user in our database and check
- whether they are allowed to change their names, or if they are in
- superstar-prison. If they are not allowed, we will change it back.
- """
+ At this point we will look up the user in our database and check whether they are allowed to
+ change their names, or if they are in superstar-prison. If they are not allowed, we will
+ change it back.
+ """
if before.display_name == after.display_name:
return # User didn't change their nickname. Abort!
@@ -71,10 +71,7 @@ class Superstarify(Cog):
f"Changing the nick back to {before.display_name}."
)
await after.edit(nick=forced_nick)
- end_timestamp_human = (
- datetime.fromisoformat(infraction['expires_at'][:-1])
- .strftime('%c')
- )
+ end_timestamp_human = format_infraction(infraction['expires_at'])
try:
await after.send(
@@ -93,14 +90,13 @@ class Superstarify(Cog):
)
@Cog.listener()
- async def on_member_join(self, member: Member):
+ async def on_member_join(self, member: Member) -> None:
"""
This event will trigger when someone (re)joins the server.
- At this point we will look up the user in our database and check
- whether they are in superstar-prison. If so, we will change their name
- back to the forced nickname.
- """
+ At this point we will look up the user in our database and check whether they are in
+ superstar-prison. If so, we will change their name back to the forced nickname.
+ """
active_superstarifies = await self.bot.api_client.get(
'bot/infractions',
params={
@@ -114,9 +110,7 @@ class Superstarify(Cog):
[infraction] = active_superstarifies
forced_nick = get_nick(infraction['id'], member.id)
await member.edit(nick=forced_nick)
- end_timestamp_human = (
- datetime.fromisoformat(infraction['expires_at'][:-1]).strftime('%c')
- )
+ end_timestamp_human = format_infraction(infraction['expires_at'])
try:
await member.send(
@@ -154,14 +148,15 @@ class Superstarify(Cog):
@command(name='superstarify', aliases=('force_nick', 'star'))
@with_role(*MODERATION_ROLES)
async def superstarify(
- self, ctx: Context, member: Member, expiration: ExpirationDate, reason: str = None
- ):
+ self, ctx: Context, member: Member, expiration: Duration, reason: str = None
+ ) -> None:
"""
- This command will force a random superstar name (like Taylor Swift) to be the user's
- nickname for a specified duration. An optional reason can be provided.
+ Force a random superstar name (like Taylor Swift) to be the user's nickname for a specified duration.
+
+ An optional reason can be provided.
+
If no reason is given, the original name will be shown in a generated reason.
"""
-
active_superstarifies = await self.bot.api_client.get(
'bot/infractions',
params={
@@ -171,10 +166,11 @@ class Superstarify(Cog):
}
)
if active_superstarifies:
- return await ctx.send(
+ await ctx.send(
":x: According to my records, this user is already superstarified. "
f"See infraction **#{active_superstarifies[0]['id']}**."
)
+ return
infraction = await post_infraction(
ctx, member,
@@ -224,15 +220,8 @@ class Superstarify(Cog):
@command(name='unsuperstarify', aliases=('release_nick', 'unstar'))
@with_role(*MODERATION_ROLES)
- async def unsuperstarify(self, ctx: Context, member: Member):
- """
- This command will remove the entry from our database, allowing the user
- to once again change their nickname.
-
- :param ctx: Discord message context
- :param member: The member to unsuperstarify
- """
-
+ async def unsuperstarify(self, ctx: Context, member: Member) -> None:
+ """Remove the superstarify entry from our database, allowing the user to change their nickname."""
log.debug(f"Attempting to unsuperstarify the following user: {member.display_name}")
embed = Embed()
@@ -247,9 +236,8 @@ class Superstarify(Cog):
}
)
if not active_superstarifies:
- return await ctx.send(
- ":x: There is no active superstarify infraction for this user."
- )
+ await ctx.send(":x: There is no active superstarify infraction for this user.")
+ return
[infraction] = active_superstarifies
await self.bot.api_client.patch(
@@ -270,6 +258,7 @@ class Superstarify(Cog):
await ctx.send(embed=embed)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Superstarify cog load."""
bot.add_cog(Superstarify(bot))
log.info("Cog loaded: Superstarify")
diff --git a/bot/cogs/superstarify/stars.py b/bot/cogs/superstarify/stars.py
index 9b49d7175..dbac86770 100644
--- a/bot/cogs/superstarify/stars.py
+++ b/bot/cogs/superstarify/stars.py
@@ -81,6 +81,7 @@ STAR_NAMES = (
)
-def get_nick(infraction_id, member_id):
+def get_nick(infraction_id: int, member_id: int) -> str:
+ """Randomly select a nickname from the Superstarify nickname list."""
rng = random.Random(str(infraction_id) + str(member_id))
return rng.choice(STAR_NAMES)
diff --git a/bot/cogs/sync/__init__.py b/bot/cogs/sync/__init__.py
index e4f960620..d4565f848 100644
--- a/bot/cogs/sync/__init__.py
+++ b/bot/cogs/sync/__init__.py
@@ -1,10 +1,13 @@
import logging
+from discord.ext.commands import Bot
+
from .cog import Sync
log = logging.getLogger(__name__)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Sync cog load."""
bot.add_cog(Sync(bot))
log.info("Cog loaded: Sync")
diff --git a/bot/cogs/sync/cog.py b/bot/cogs/sync/cog.py
index 9a3a48bba..b75fb26cd 100644
--- a/bot/cogs/sync/cog.py
+++ b/bot/cogs/sync/cog.py
@@ -177,7 +177,6 @@ class Sync(Cog):
@commands.has_permissions(administrator=True)
async def sync_roles_command(self, ctx: Context) -> None:
"""Manually synchronize the guild's roles with the roles on the site."""
-
initial_response = await ctx.send("📊 Synchronizing roles.")
total_created, total_updated, total_deleted = await syncers.sync_roles(self.bot, ctx.guild)
await initial_response.edit(
@@ -191,7 +190,6 @@ class Sync(Cog):
@commands.has_permissions(administrator=True)
async def sync_users_command(self, ctx: Context) -> None:
"""Manually synchronize the guild's users with the users on the site."""
-
initial_response = await ctx.send("📊 Synchronizing users.")
total_created, total_updated, total_deleted = await syncers.sync_users(self.bot, ctx.guild)
await initial_response.edit(
diff --git a/bot/cogs/sync/syncers.py b/bot/cogs/sync/syncers.py
index 414c24adb..2cc5a66e1 100644
--- a/bot/cogs/sync/syncers.py
+++ b/bot/cogs/sync/syncers.py
@@ -34,7 +34,6 @@ def get_roles_for_sync(
to be deleted on the site, meaning the roles are present on
the API but not in the cached guild.
"""
-
guild_role_ids = {role.id for role in guild_roles}
api_role_ids = {role.id for role in api_roles}
new_role_ids = guild_role_ids - api_role_ids
@@ -66,7 +65,6 @@ async def sync_roles(bot: Bot, guild: Guild) -> Tuple[int, int, int]:
(element `0`) , how many roles were updated (element `1`), and how many
roles were deleted (element `2`) on the API.
"""
-
roles = await bot.api_client.get('bot/roles')
# Pack API roles and guild roles into one common format,
@@ -138,7 +136,6 @@ def get_users_for_sync(
guild, but where the attribute of a user on the API is not
equal to the attribute of the user on the guild.
"""
-
users_to_create = set()
users_to_update = set()
@@ -169,8 +166,7 @@ def get_users_for_sync(
async def sync_users(bot: Bot, guild: Guild) -> Tuple[int, int, None]:
"""
- Synchronize users found on the given
- `guild` with the ones on the API.
+ Synchronize users found in the given `guild` with the ones in the API.
Arguments:
bot (discord.ext.commands.Bot):
@@ -186,7 +182,6 @@ async def sync_users(bot: Bot, guild: Guild) -> Tuple[int, int, None]:
(element `0`) and how many users were updated (element `1`), and `None`
to indicate that a user sync never deletes entries from the API.
"""
-
current_users = await bot.api_client.get('bot/users')
# Pack API users and guild users into one common format,
diff --git a/bot/cogs/tags.py b/bot/cogs/tags.py
index 8e9ba5da3..b9dd3595e 100644
--- a/bot/cogs/tags.py
+++ b/bot/cogs/tags.py
@@ -20,41 +20,26 @@ TEST_CHANNELS = (
class Tags(Cog):
- """
- Save new tags and fetch existing tags.
- """
+ """Save new tags and fetch existing tags."""
def __init__(self, bot: Bot):
self.bot = bot
self.tag_cooldowns = {}
- @group(name='tags', aliases=('tag', 't'), hidden=True, invoke_without_command=True)
- async def tags_group(self, ctx: Context, *, tag_name: TagNameConverter = None):
+ @group(name='tags', aliases=('tag', 't'), invoke_without_command=True)
+ async def tags_group(self, ctx: Context, *, tag_name: TagNameConverter = None) -> None:
"""Show all known tags, a single tag, or run a subcommand."""
-
await ctx.invoke(self.get_command, tag_name=tag_name)
@tags_group.command(name='get', aliases=('show', 'g'))
- async def get_command(self, ctx: Context, *, tag_name: TagNameConverter = None):
- """
- Get a list of all tags or a specified tag.
-
- :param ctx: Discord message context
- :param tag_name:
- If provided, this function shows data for that specific tag.
- If not provided, this function shows the caller a list of all tags.
- """
-
- def _command_on_cooldown(tag_name) -> bool:
+ async def get_command(self, ctx: Context, *, tag_name: TagNameConverter = None) -> None:
+ """Get a specified tag, or a list of all tags if no tag is specified."""
+ def _command_on_cooldown(tag_name: str) -> bool:
"""
- Check if the command is currently on cooldown.
- The cooldown duration is set in constants.py.
+ Check if the command is currently on cooldown, on a per-tag, per-channel basis.
- This works on a per-tag, per-channel basis.
- :param tag_name: The name of the command to check.
- :return: True if the command is cooling down. Otherwise False.
+ The cooldown duration is set in constants.py.
"""
-
now = time.time()
cooldown_conditions = (
@@ -109,15 +94,8 @@ class Tags(Cog):
tag_name: TagNameConverter,
*,
tag_content: TagContentConverter,
- ):
- """
- Create a new tag or update an existing one.
-
- :param ctx: discord message context
- :param tag_name: The name of the tag to create or edit.
- :param tag_content: The content of the tag.
- """
-
+ ) -> None:
+ """Create a new tag or update an existing one."""
body = {
'title': tag_name.lower().strip(),
'embed': {
@@ -140,14 +118,8 @@ class Tags(Cog):
@tags_group.command(name='delete', aliases=('remove', 'rm', 'd'))
@with_role(Roles.admin, Roles.owner)
- async def delete_command(self, ctx: Context, *, tag_name: TagNameConverter):
- """
- Remove a tag from the database.
-
- :param ctx: discord message context
- :param tag_name: The name of the tag to delete.
- """
-
+ async def delete_command(self, ctx: Context, *, tag_name: TagNameConverter) -> None:
+ """Remove a tag from the database."""
await self.bot.api_client.delete(f'bot/tags/{tag_name}')
log.debug(f"{ctx.author} successfully deleted the tag called '{tag_name}'")
@@ -158,6 +130,7 @@ class Tags(Cog):
))
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Tags cog load."""
bot.add_cog(Tags(bot))
log.info("Cog loaded: Tags")
diff --git a/bot/cogs/token_remover.py b/bot/cogs/token_remover.py
index 64bf126d6..7dd0afbbd 100644
--- a/bot/cogs/token_remover.py
+++ b/bot/cogs/token_remover.py
@@ -42,10 +42,16 @@ class TokenRemover(Cog):
@property
def mod_log(self) -> ModLog:
+ """Get currently loaded ModLog cog instance."""
return self.bot.get_cog("ModLog")
@Cog.listener()
- async def on_message(self, msg: Message):
+ async def on_message(self, msg: Message) -> None:
+ """
+ Check each message for a string that matches Discord's token pattern.
+
+ See: https://discordapp.com/developers/docs/reference#snowflakes
+ """
if msg.author.bot:
return
@@ -82,6 +88,11 @@ class TokenRemover(Cog):
@staticmethod
def is_valid_user_id(b64_content: str) -> bool:
+ """
+ Check potential token to see if it contains a valid Discord user ID.
+
+ See: https://discordapp.com/developers/docs/reference#snowflakes
+ """
b64_content += '=' * (-len(b64_content) % 4)
try:
@@ -92,6 +103,11 @@ class TokenRemover(Cog):
@staticmethod
def is_valid_timestamp(b64_content: str) -> bool:
+ """
+ Check potential token to see if it contains a valid timestamp.
+
+ See: https://discordapp.com/developers/docs/reference#snowflakes
+ """
b64_content += '=' * (-len(b64_content) % 4)
try:
@@ -102,6 +118,7 @@ class TokenRemover(Cog):
return snowflake_time(snowflake + TOKEN_EPOCH) < DISCORD_EPOCH_TIMESTAMP
-def setup(bot: Bot):
+def setup(bot: Bot) -> None:
+ """Token Remover cog load."""
bot.add_cog(TokenRemover(bot))
log.info("Cog loaded: TokenRemover")
diff --git a/bot/cogs/utils.py b/bot/cogs/utils.py
index 08e77a24e..b6cecdc7c 100644
--- a/bot/cogs/utils.py
+++ b/bot/cogs/utils.py
@@ -3,9 +3,10 @@ import re
import unicodedata
from email.parser import HeaderParser
from io import StringIO
+from typing import Tuple
from discord import Colour, Embed
-from discord.ext.commands import AutoShardedBot, Cog, Context, command
+from discord.ext.commands import Bot, Cog, Context, command
from bot.constants import Channels, STAFF_ROLES
from bot.decorators import in_channel
@@ -14,26 +15,22 @@ log = logging.getLogger(__name__)
class Utils(Cog):
- """
- A selection of utilities which don't have a clear category.
- """
+ """A selection of utilities which don't have a clear category."""
- def __init__(self, bot: AutoShardedBot):
+ def __init__(self, bot: Bot):
self.bot = bot
self.base_pep_url = "http://www.python.org/dev/peps/pep-"
self.base_github_pep_url = "https://raw.githubusercontent.com/python/peps/master/pep-"
@command(name='pep', aliases=('get_pep', 'p'))
- async def pep_command(self, ctx: Context, pep_number: str):
- """
- Fetches information about a PEP and sends it to the channel.
- """
-
+ async def pep_command(self, ctx: Context, pep_number: str) -> None:
+ """Fetches information about a PEP and sends it to the channel."""
if pep_number.isdigit():
pep_number = int(pep_number)
else:
- return await ctx.invoke(self.bot.get_command("help"), "pep")
+ await ctx.invoke(self.bot.get_command("help"), "pep")
+ return
# Newer PEPs are written in RST instead of txt
if pep_number > 542:
@@ -89,11 +86,8 @@ class Utils(Cog):
@command()
@in_channel(Channels.bot, bypass_roles=STAFF_ROLES)
- async def charinfo(self, ctx, *, characters: str):
- """
- Shows you information on up to 25 unicode characters.
- """
-
+ async def charinfo(self, ctx: Context, *, characters: str) -> None:
+ """Shows you information on up to 25 unicode characters."""
match = re.match(r"<(a?):(\w+):(\d+)>", characters)
if match:
embed = Embed(
@@ -104,14 +98,16 @@ class Utils(Cog):
)
)
embed.colour = Colour.red()
- return await ctx.send(embed=embed)
+ await ctx.send(embed=embed)
+ return
if len(characters) > 25:
embed = Embed(title=f"Too many characters ({len(characters)}/25)")
embed.colour = Colour.red()
- return await ctx.send(embed=embed)
+ await ctx.send(embed=embed)
+ return
- def get_info(char):
+ def get_info(char: str) -> Tuple[str, str]:
digit = f"{ord(char):x}"
if len(digit) <= 4:
u_code = f"\\u{digit:>04}"
@@ -133,6 +129,7 @@ class Utils(Cog):
await ctx.send(embed=embed)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Utils cog load."""
bot.add_cog(Utils(bot))
log.info("Cog loaded: Utils")
diff --git a/bot/cogs/verification.py b/bot/cogs/verification.py
index c42d4d67e..f0a099f27 100644
--- a/bot/cogs/verification.py
+++ b/bot/cogs/verification.py
@@ -29,19 +29,19 @@ If you'd like to unsubscribe from the announcement notifications, simply send `!
class Verification(Cog):
- """
- User verification and role self-management
- """
+ """User verification and role self-management."""
def __init__(self, bot: Bot):
self.bot = bot
@property
def mod_log(self) -> ModLog:
+ """Get currently loaded ModLog cog instance."""
return self.bot.get_cog("ModLog")
@Cog.listener()
- async def on_message(self, message: Message):
+ async def on_message(self, message: Message) -> None:
+ """Check new message event for messages to the checkpoint channel & process."""
if message.author.bot:
return # They're a bot, ignore
@@ -75,11 +75,8 @@ class Verification(Cog):
@command(name='accept', aliases=('verify', 'verified', 'accepted'), hidden=True)
@without_role(Roles.verified)
@in_channel(Channels.verification)
- async def accept_command(self, ctx: Context, *_): # We don't actually care about the args
- """
- Accept our rules and gain access to the rest of the server
- """
-
+ async def accept_command(self, ctx: Context, *_) -> None: # We don't actually care about the args
+ """Accept our rules and gain access to the rest of the server."""
log.debug(f"{ctx.author} called !accept. Assigning the 'Developer' role.")
await ctx.author.add_roles(Object(Roles.verified), reason="Accepted the rules")
try:
@@ -98,11 +95,8 @@ class Verification(Cog):
@command(name='subscribe')
@in_channel(Channels.bot)
- async def subscribe_command(self, ctx: Context, *_): # We don't actually care about the args
- """
- Subscribe to announcement notifications by assigning yourself the role
- """
-
+ async def subscribe_command(self, ctx: Context, *_) -> None: # We don't actually care about the args
+ """Subscribe to announcement notifications by assigning yourself the role."""
has_role = False
for role in ctx.author.roles:
@@ -111,9 +105,8 @@ class Verification(Cog):
break
if has_role:
- return await ctx.send(
- f"{ctx.author.mention} You're already subscribed!",
- )
+ await ctx.send(f"{ctx.author.mention} You're already subscribed!")
+ return
log.debug(f"{ctx.author} called !subscribe. Assigning the 'Announcements' role.")
await ctx.author.add_roles(Object(Roles.announcements), reason="Subscribed to announcements")
@@ -126,11 +119,8 @@ class Verification(Cog):
@command(name='unsubscribe')
@in_channel(Channels.bot)
- async def unsubscribe_command(self, ctx: Context, *_): # We don't actually care about the args
- """
- Unsubscribe from announcement notifications by removing the role from yourself
- """
-
+ async def unsubscribe_command(self, ctx: Context, *_) -> None: # We don't actually care about the args
+ """Unsubscribe from announcement notifications by removing the role from yourself."""
has_role = False
for role in ctx.author.roles:
@@ -139,9 +129,8 @@ class Verification(Cog):
break
if not has_role:
- return await ctx.send(
- f"{ctx.author.mention} You're already unsubscribed!"
- )
+ await ctx.send(f"{ctx.author.mention} You're already unsubscribed!")
+ return
log.debug(f"{ctx.author} called !unsubscribe. Removing the 'Announcements' role.")
await ctx.author.remove_roles(Object(Roles.announcements), reason="Unsubscribed from announcements")
@@ -152,24 +141,22 @@ class Verification(Cog):
f"{ctx.author.mention} Unsubscribed from <#{Channels.announcements}> notifications."
)
- @staticmethod
- async def cog_command_error(ctx: Context, error):
+ # This cannot be static (must have a __func__ attribute).
+ async def cog_command_error(self, ctx: Context, error: Exception) -> None:
+ """Check for & ignore any InChannelCheckFailure."""
if isinstance(error, InChannelCheckFailure):
- # Do nothing; just ignore this error
error.handled = True
@staticmethod
- def bot_check(ctx: Context):
- """
- Block any command within the verification channel that is not !accept.
- """
-
+ def bot_check(ctx: Context) -> bool:
+ """Block any command within the verification channel that is not !accept."""
if ctx.channel.id == Channels.verification:
return ctx.command.name == "accept"
else:
return True
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Verification cog load."""
bot.add_cog(Verification(bot))
log.info("Cog loaded: Verification")
diff --git a/bot/cogs/watchchannels/__init__.py b/bot/cogs/watchchannels/__init__.py
index ac7713803..86e1050fa 100644
--- a/bot/cogs/watchchannels/__init__.py
+++ b/bot/cogs/watchchannels/__init__.py
@@ -1,5 +1,7 @@
import logging
+from discord.ext.commands import Bot
+
from .bigbrother import BigBrother
from .talentpool import TalentPool
@@ -7,7 +9,8 @@ from .talentpool import TalentPool
log = logging.getLogger(__name__)
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Monitoring cogs load."""
bot.add_cog(BigBrother(bot))
log.info("Cog loaded: BigBrother")
diff --git a/bot/cogs/watchchannels/bigbrother.py b/bot/cogs/watchchannels/bigbrother.py
index 338b6c4ad..e191c2dbc 100644
--- a/bot/cogs/watchchannels/bigbrother.py
+++ b/bot/cogs/watchchannels/bigbrother.py
@@ -3,7 +3,7 @@ from collections import ChainMap
from typing import Union
from discord import User
-from discord.ext.commands import Cog, Context, group
+from discord.ext.commands import Bot, Cog, Context, group
from bot.constants import Channels, Roles, Webhooks
from bot.decorators import with_role
@@ -16,7 +16,7 @@ log = logging.getLogger(__name__)
class BigBrother(WatchChannel, Cog, name="Big Brother"):
"""Monitors users by relaying their messages to a watch channel to assist with moderation."""
- def __init__(self, bot) -> None:
+ def __init__(self, bot: Bot) -> None:
super().__init__(
bot,
destination=Channels.big_brother_logs,
diff --git a/bot/cogs/watchchannels/talentpool.py b/bot/cogs/watchchannels/talentpool.py
index 4452d7a59..4a23902d5 100644
--- a/bot/cogs/watchchannels/talentpool.py
+++ b/bot/cogs/watchchannels/talentpool.py
@@ -4,12 +4,13 @@ from collections import ChainMap
from typing import Union
from discord import Color, Embed, Member, User
-from discord.ext.commands import Cog, Context, group
+from discord.ext.commands import Bot, Cog, Context, group
from bot.api import ResponseCodeError
from bot.constants import Channels, Guild, Roles, Webhooks
from bot.decorators import with_role
from bot.pagination import LinePaginator
+from bot.utils import time
from .watchchannel import WatchChannel, proxy_user
log = logging.getLogger(__name__)
@@ -19,7 +20,7 @@ STAFF_ROLES = Roles.owner, Roles.admin, Roles.moderator, Roles.helpers # <- I
class TalentPool(WatchChannel, Cog, name="Talentpool"):
"""Relays messages of helper candidates to a watch channel to observe them."""
- def __init__(self, bot) -> None:
+ def __init__(self, bot: Bot) -> None:
super().__init__(
bot,
destination=Channels.talent_pool,
@@ -33,7 +34,6 @@ class TalentPool(WatchChannel, Cog, name="Talentpool"):
@with_role(Roles.owner, Roles.admin, Roles.moderator)
async def nomination_group(self, ctx: Context) -> None:
"""Highlights the activity of helper nominees by relaying their messages to the talent pool channel."""
-
await ctx.invoke(self.bot.get_command("help"), "talentpool")
@nomination_group.command(name='watched', aliases=('all', 'list'))
@@ -156,7 +156,6 @@ class TalentPool(WatchChannel, Cog, name="Talentpool"):
@with_role(Roles.owner, Roles.admin, Roles.moderator)
async def nomination_edit_group(self, ctx: Context) -> None:
"""Commands to edit nominations."""
-
await ctx.invoke(self.bot.get_command("help"), "talentpool", "edit")
@nomination_edit_group.command(name='reason')
@@ -200,7 +199,7 @@ class TalentPool(WatchChannel, Cog, name="Talentpool"):
log.debug(active)
log.debug(type(nomination_object["inserted_at"]))
- start_date = self._get_human_readable(nomination_object["inserted_at"])
+ start_date = time.format_infraction(nomination_object["inserted_at"])
if active:
lines = textwrap.dedent(
f"""
@@ -214,7 +213,7 @@ class TalentPool(WatchChannel, Cog, name="Talentpool"):
"""
)
else:
- end_date = self._get_human_readable(nomination_object["ended_at"])
+ end_date = time.format_infraction(nomination_object["ended_at"])
lines = textwrap.dedent(
f"""
===============
diff --git a/bot/cogs/watchchannels/watchchannel.py b/bot/cogs/watchchannels/watchchannel.py
index c34b0d5bb..ce8014d69 100644
--- a/bot/cogs/watchchannels/watchchannel.py
+++ b/bot/cogs/watchchannels/watchchannel.py
@@ -1,5 +1,4 @@
import asyncio
-import datetime
import logging
import re
import textwrap
@@ -8,6 +7,7 @@ from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Optional
+import dateutil.parser
import discord
from discord import Color, Embed, HTTPException, Message, Object, errors
from discord.ext.commands import BadArgument, Bot, Cog, Context
@@ -42,6 +42,8 @@ def proxy_user(user_id: str) -> Object:
@dataclass
class MessageHistory:
+ """Represents a watch channel's message history."""
+
last_author: Optional[int] = None
last_channel: Optional[int] = None
message_count: int = 0
@@ -51,7 +53,15 @@ class WatchChannel(metaclass=CogABCMeta):
"""ABC with functionality for relaying users' messages to a certain channel."""
@abstractmethod
- def __init__(self, bot: Bot, destination, webhook_id, api_endpoint, api_default_params, logger) -> None:
+ def __init__(
+ self,
+ bot: Bot,
+ destination: int,
+ webhook_id: int,
+ api_endpoint: str,
+ api_default_params: dict,
+ logger: logging.Logger
+ ) -> None:
self.bot = bot
self.destination = destination # E.g., Channels.big_brother_logs
@@ -138,8 +148,8 @@ class WatchChannel(metaclass=CogABCMeta):
title=f"Warning: Failed to retrieve user cache for the {self.__class__.__name__} watch channel",
text="Could not retrieve the list of watched users from the API and messages will not be relayed.",
ping_everyone=True,
- icon=Icons.token_removed,
- color=Color.red()
+ icon_url=Icons.token_removed,
+ colour=Color.red()
)
async def fetch_user_cache(self) -> bool:
@@ -265,7 +275,7 @@ class WatchChannel(metaclass=CogABCMeta):
self.message_history.message_count += 1
- async def send_header(self, msg) -> None:
+ async def send_header(self, msg: Message) -> None:
"""Sends a header embed with information about the relayed messages to the watch channel."""
user_id = msg.author.id
@@ -311,22 +321,11 @@ class WatchChannel(metaclass=CogABCMeta):
@staticmethod
def _get_time_delta(time_string: str) -> str:
"""Returns the time in human-readable time delta format."""
- date_time = datetime.datetime.strptime(
- time_string,
- "%Y-%m-%dT%H:%M:%S.%fZ"
- ).replace(tzinfo=None)
+ date_time = dateutil.parser.isoparse(time_string).replace(tzinfo=None)
time_delta = time_since(date_time, precision="minutes", max_units=1)
return time_delta
- @staticmethod
- def _get_human_readable(time_string: str, output_format: str = "%Y-%m-%d %H:%M:%S") -> str:
- date_time = datetime.datetime.strptime(
- time_string,
- "%Y-%m-%dT%H:%M:%S.%fZ"
- ).replace(tzinfo=None)
- return date_time.strftime(output_format)
-
def _remove_user(self, user_id: int) -> None:
"""Removes a user from a watch channel."""
self.watched_users.pop(user_id, None)
diff --git a/bot/cogs/wolfram.py b/bot/cogs/wolfram.py
index e88efa033..ab0ed2472 100644
--- a/bot/cogs/wolfram.py
+++ b/bot/cogs/wolfram.py
@@ -1,13 +1,13 @@
import logging
from io import BytesIO
-from typing import List, Optional, Tuple
+from typing import Callable, List, Optional, Tuple
from urllib import parse
import discord
from dateutil.relativedelta import relativedelta
from discord import Embed
from discord.ext import commands
-from discord.ext.commands import BucketType, Cog, Context, check, group
+from discord.ext.commands import Bot, BucketType, Cog, Context, check, group
from bot.constants import Colours, STAFF_ROLES, Wolfram
from bot.pagination import ImagePaginator
@@ -37,18 +37,7 @@ async def send_embed(
img_url: str = None,
f: discord.File = None
) -> None:
- """
- Generates an embed with wolfram as the author, with message_txt as description,
- adds custom colour if specified, a footer and image (could be a file with f param) and sends
- the embed through ctx
- :param ctx: Context
- :param message_txt: str - Message to be sent
- :param colour: int - Default: Colours.soft_red - Colour of embed
- :param footer: str - Default: None - Adds a footer to the embed
- :param img_url:str - Default: None - Adds an image to the embed
- :param f: discord.File - Default: None - Add a file to the msg, often attached as image to embed
- """
-
+ """Generate & send a response embed with Wolfram as the author."""
embed = Embed(colour=colour)
embed.description = message_txt
embed.set_author(name="Wolfram Alpha",
@@ -63,16 +52,12 @@ async def send_embed(
await ctx.send(embed=embed, file=f)
-def custom_cooldown(*ignore: List[int]) -> check:
+def custom_cooldown(*ignore: List[int]) -> Callable:
"""
- Custom cooldown mapping that applies a specific requests per day to users.
- Staff is ignored by the user cooldown, however the cooldown implements a
- total amount of uses per day for the entire guild. (Configurable in configs)
+ Implement per-user and per-guild cooldowns for requests to the Wolfram API.
- :param ignore: List[int] -- list of ids of roles to be ignored by user cooldown
- :return: check
+ A list of roles may be provided to ignore the per-user cooldown
"""
-
async def predicate(ctx: Context) -> bool:
user_bucket = usercd.get_bucket(ctx.message)
@@ -109,8 +94,8 @@ def custom_cooldown(*ignore: List[int]) -> check:
return check(predicate)
-async def get_pod_pages(ctx, bot, query: str) -> Optional[List[Tuple]]:
- # Give feedback that the bot is working.
+async def get_pod_pages(ctx: Context, bot: Bot, query: str) -> Optional[List[Tuple]]:
+ """Get the Wolfram API pod pages for the provided query."""
async with ctx.channel.typing():
url_str = parse.urlencode({
"input": query,
@@ -164,9 +149,7 @@ async def get_pod_pages(ctx, bot, query: str) -> Optional[List[Tuple]]:
class Wolfram(Cog):
- """
- Commands for interacting with the Wolfram|Alpha API.
- """
+ """Commands for interacting with the Wolfram|Alpha API."""
def __init__(self, bot: commands.Bot):
self.bot = bot
@@ -174,14 +157,7 @@ class Wolfram(Cog):
@group(name="wolfram", aliases=("wolf", "wa"), invoke_without_command=True)
@custom_cooldown(*STAFF_ROLES)
async def wolfram_command(self, ctx: Context, *, query: str) -> None:
- """
- Requests all answers on a single image,
- sends an image of all related pods
-
- :param ctx: Context
- :param query: str - string request to api
- """
-
+ """Requests all answers on a single image, sends an image of all related pods."""
url_str = parse.urlencode({
"i": query,
"appid": APPID,
@@ -221,13 +197,10 @@ class Wolfram(Cog):
@custom_cooldown(*STAFF_ROLES)
async def wolfram_page_command(self, ctx: Context, *, query: str) -> None:
"""
- Requests a drawn image of given query
- Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc
+ Requests a drawn image of given query.
- :param ctx: Context
- :param query: str - string request to api
+ Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc.
"""
-
pages = await get_pod_pages(ctx, self.bot, query)
if not pages:
@@ -243,15 +216,12 @@ class Wolfram(Cog):
@wolfram_command.command(name="cut", aliases=("c",))
@custom_cooldown(*STAFF_ROLES)
- async def wolfram_cut_command(self, ctx, *, query: str) -> None:
+ async def wolfram_cut_command(self, ctx: Context, *, query: str) -> None:
"""
- Requests a drawn image of given query
- Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc
+ Requests a drawn image of given query.
- :param ctx: Context
- :param query: str - string request to api
+ Keywords worth noting are, "like curve", "curve", "graph", "pokemon", etc.
"""
-
pages = await get_pod_pages(ctx, self.bot, query)
if not pages:
@@ -267,14 +237,7 @@ class Wolfram(Cog):
@wolfram_command.command(name="short", aliases=("sh", "s"))
@custom_cooldown(*STAFF_ROLES)
async def wolfram_short_command(self, ctx: Context, *, query: str) -> None:
- """
- Requests an answer to a simple question
- Responds in plaintext
-
- :param ctx: Context
- :param query: str - string request to api
- """
-
+ """Requests an answer to a simple question."""
url_str = parse.urlencode({
"i": query,
"appid": APPID,
@@ -304,5 +267,6 @@ class Wolfram(Cog):
def setup(bot: commands.Bot) -> None:
+ """Wolfram cog load."""
bot.add_cog(Wolfram(bot))
log.info("Cog loaded: Wolfram")
diff --git a/bot/constants.py b/bot/constants.py
index e1c47889c..1deeaa3b8 100644
--- a/bot/constants.py
+++ b/bot/constants.py
@@ -375,13 +375,15 @@ class Roles(metaclass=YAMLGetter):
champion: int
contributor: int
core_developer: int
+ helpers: int
jammer: int
moderator: int
muted: int
owner: int
- verified: int # This is the Developers role on PyDis, here named verified for readability reasons.
- helpers: int
+ partners: int
+ rockstars: int
team_leader: int
+ verified: int # This is the Developers role on PyDis, here named verified for readability reasons.
class Guild(metaclass=YAMLGetter):
diff --git a/bot/converters.py b/bot/converters.py
index 4bd9aba13..339da7b60 100644
--- a/bot/converters.py
+++ b/bot/converters.py
@@ -1,10 +1,12 @@
import logging
+import re
from datetime import datetime
from ssl import CertificateError
+from typing import Union
-import dateparser
import discord
from aiohttp import ClientConnectorError
+from dateutil.relativedelta import relativedelta
from discord.ext.commands import BadArgument, Context, Converter
@@ -15,17 +17,16 @@ class ValidPythonIdentifier(Converter):
"""
A converter that checks whether the given string is a valid Python identifier.
- This is used to have package names
- that correspond to how you would use
- the package in your code, e.g.
- `import package`. Raises `BadArgument`
- if the argument is not a valid Python
- identifier, and simply passes through
+ This is used to have package names that correspond to how you would use the package in your
+ code, e.g. `import package`.
+
+ Raises `BadArgument` if the argument is not a valid Python identifier, and simply passes through
the given argument otherwise.
"""
@staticmethod
- async def convert(ctx, argument: str):
+ async def convert(ctx: Context, argument: str) -> str:
+ """Checks whether the given string is a valid Python identifier."""
if not argument.isidentifier():
raise BadArgument(f"`{argument}` is not a valid Python identifier")
return argument
@@ -35,14 +36,15 @@ class ValidURL(Converter):
"""
Represents a valid webpage URL.
- This converter checks whether the given
- URL can be reached and requesting it returns
- a status code of 200. If not, `BadArgument`
- is raised. Otherwise, it simply passes through the given URL.
+ This converter checks whether the given URL can be reached and requesting it returns a status
+ code of 200. If not, `BadArgument` is raised.
+
+ Otherwise, it simply passes through the given URL.
"""
@staticmethod
- async def convert(ctx, url: str):
+ async def convert(ctx: Context, url: str) -> str:
+ """This converter checks whether the given URL can be reached with a status code of 200."""
try:
async with ctx.bot.http_session.get(url) as resp:
if resp.status != 200:
@@ -63,12 +65,11 @@ class ValidURL(Converter):
class InfractionSearchQuery(Converter):
- """
- A converter that checks if the argument is a Discord user, and if not, falls back to a string.
- """
+ """A converter that checks if the argument is a Discord user, and if not, falls back to a string."""
@staticmethod
- async def convert(ctx, arg):
+ async def convert(ctx: Context, arg: str) -> Union[discord.Member, str]:
+ """Check if the argument is a Discord user, and if not, falls back to a string."""
try:
maybe_snowflake = arg.strip("<@!>")
return await ctx.bot.fetch_user(maybe_snowflake)
@@ -77,12 +78,15 @@ class InfractionSearchQuery(Converter):
class Subreddit(Converter):
- """
- Forces a string to begin with "r/" and checks if it's a valid subreddit.
- """
+ """Forces a string to begin with "r/" and checks if it's a valid subreddit."""
@staticmethod
- async def convert(ctx, sub: str):
+ async def convert(ctx: Context, sub: str) -> str:
+ """
+ Force sub to begin with "r/" and check if it's a valid subreddit.
+
+ If sub is a valid subreddit, return it prepended with "r/"
+ """
sub = sub.lower()
if not sub.startswith("r/"):
@@ -103,9 +107,21 @@ class Subreddit(Converter):
class TagNameConverter(Converter):
+ """
+ Ensure that a proposed tag name is valid.
+
+ Valid tag names meet the following conditions:
+ * All ASCII characters
+ * Has at least one non-whitespace character
+ * Not solely numeric
+ * Shorter than 127 characters
+ """
+
@staticmethod
- async def convert(ctx: Context, tag_name: str):
- def is_number(value):
+ async def convert(ctx: Context, tag_name: str) -> str:
+ """Lowercase & strip whitespace from proposed tag_name & ensure it's valid."""
+ def is_number(value: str) -> bool:
+ """Check to see if the input string is numeric."""
try:
float(value)
except ValueError:
@@ -142,8 +158,15 @@ class TagNameConverter(Converter):
class TagContentConverter(Converter):
+ """Ensure proposed tag content is not empty and contains at least one non-whitespace character."""
+
@staticmethod
- async def convert(ctx: Context, tag_content: str):
+ async def convert(ctx: Context, tag_content: str) -> str:
+ """
+ Ensure tag_content is non-empty and contains at least one non-whitespace character.
+
+ If tag_content is valid, return the stripped version.
+ """
tag_content = tag_content.strip()
# The tag contents should not be empty, or filled with whitespace.
@@ -155,20 +178,40 @@ class TagContentConverter(Converter):
return tag_content
-class ExpirationDate(Converter):
- DATEPARSER_SETTINGS = {
- 'PREFER_DATES_FROM': 'future',
- 'TIMEZONE': 'UTC',
- 'TO_TIMEZONE': 'UTC'
- }
-
- async def convert(self, ctx, expiration_string: str):
- expiry = dateparser.parse(expiration_string, settings=self.DATEPARSER_SETTINGS)
- if expiry is None:
- raise BadArgument(f"Failed to parse expiration date from `{expiration_string}`")
-
+class Duration(Converter):
+ """Convert duration strings into UTC datetime.datetime objects."""
+
+ duration_parser = re.compile(
+ r"((?P<years>\d+?) ?(years|year|Y|y) ?)?"
+ r"((?P<months>\d+?) ?(months|month|m) ?)?"
+ r"((?P<weeks>\d+?) ?(weeks|week|W|w) ?)?"
+ r"((?P<days>\d+?) ?(days|day|D|d) ?)?"
+ r"((?P<hours>\d+?) ?(hours|hour|H|h) ?)?"
+ r"((?P<minutes>\d+?) ?(minutes|minute|M) ?)?"
+ r"((?P<seconds>\d+?) ?(seconds|second|S|s))?"
+ )
+
+ async def convert(self, ctx: Context, duration: str) -> datetime:
+ """
+ Converts a `duration` string to a datetime object that's `duration` in the future.
+
+ The converter supports the following symbols for each unit of time:
+ - years: `Y`, `y`, `year`, `years`
+ - months: `m`, `month`, `months`
+ - weeks: `w`, `W`, `week`, `weeks`
+ - days: `d`, `D`, `day`, `days`
+ - hours: `H`, `h`, `hour`, `hours`
+ - minutes: `M`, `minute`, `minutes`
+ - seconds: `S`, `s`, `second`, `seconds`
+
+ The units need to be provided in descending order of magnitude.
+ """
+ match = self.duration_parser.fullmatch(duration)
+ if not match:
+ raise BadArgument(f"`{duration}` is not a valid duration string.")
+
+ duration_dict = {unit: int(amount) for unit, amount in match.groupdict(default=0).items()}
+ delta = relativedelta(**duration_dict)
now = datetime.utcnow()
- if expiry < now:
- expiry = now + (now - expiry)
- return expiry
+ return now + delta
diff --git a/bot/decorators.py b/bot/decorators.py
index 923d21938..33a6bcadd 100644
--- a/bot/decorators.py
+++ b/bot/decorators.py
@@ -1,9 +1,9 @@
import logging
import random
-import typing
from asyncio import Lock, sleep
from contextlib import suppress
from functools import wraps
+from typing import Any, Callable, Container, Optional
from weakref import WeakValueDictionary
from discord import Colour, Embed
@@ -18,6 +18,8 @@ log = logging.getLogger(__name__)
class InChannelCheckFailure(CheckFailure):
+ """Raised when a check fails for a message being sent in a whitelisted channel."""
+
def __init__(self, *channels: int):
self.channels = channels
channels_str = ', '.join(f"<#{c_id}>" for c_id in channels)
@@ -25,11 +27,10 @@ class InChannelCheckFailure(CheckFailure):
super().__init__(f"Sorry, but you may only use this command within {channels_str}.")
-def in_channel(*channels: int, bypass_roles: typing.Container[int] = None):
- """
- Checks that the message is in a whitelisted channel or optionally has a bypass role.
- """
- def predicate(ctx: Context):
+def in_channel(*channels: int, bypass_roles: Container[int] = None) -> Callable:
+ """Checks that the message is in a whitelisted channel or optionally has a bypass role."""
+ def predicate(ctx: Context) -> bool:
+ """In-channel checker predicate."""
if ctx.channel.id in channels:
log.debug(f"{ctx.author} tried to call the '{ctx.command.name}' command. "
f"The command was used in a whitelisted channel.")
@@ -50,42 +51,34 @@ def in_channel(*channels: int, bypass_roles: typing.Container[int] = None):
return commands.check(predicate)
-def with_role(*role_ids: int):
- """
- Returns True if the user has any one
- of the roles in role_ids.
- """
-
- async def predicate(ctx: Context):
+def with_role(*role_ids: int) -> Callable:
+ """Returns True if the user has any one of the roles in role_ids."""
+ async def predicate(ctx: Context) -> bool:
+ """With role checker predicate."""
return with_role_check(ctx, *role_ids)
return commands.check(predicate)
-def without_role(*role_ids: int):
- """
- Returns True if the user does not have any
- of the roles in role_ids.
- """
-
- async def predicate(ctx: Context):
+def without_role(*role_ids: int) -> Callable:
+ """Returns True if the user does not have any of the roles in role_ids."""
+ async def predicate(ctx: Context) -> bool:
return without_role_check(ctx, *role_ids)
return commands.check(predicate)
-def locked():
+def locked() -> Callable:
"""
Allows the user to only run one instance of the decorated command at a time.
- Subsequent calls to the command from the same author are
- ignored until the command has completed invocation.
+
+ Subsequent calls to the command from the same author are ignored until the command has completed invocation.
This decorator has to go before (below) the `command` decorator.
"""
-
- def wrap(func):
+ def wrap(func: Callable) -> Callable:
func.__locks = WeakValueDictionary()
@wraps(func)
- async def inner(self, ctx, *args, **kwargs):
+ async def inner(self: Callable, ctx: Context, *args, **kwargs) -> Optional[Any]:
lock = func.__locks.setdefault(ctx.author.id, Lock())
if lock.locked():
embed = Embed()
@@ -105,15 +98,15 @@ def locked():
return wrap
-def redirect_output(destination_channel: int, bypass_roles: typing.Container[int] = None):
- """
- Changes the channel in the context of the command to redirect the output
- to a certain channel, unless the author has a role to bypass redirection
+def redirect_output(destination_channel: int, bypass_roles: Container[int] = None) -> Callable:
"""
+ Changes the channel in the context of the command to redirect the output to a certain channel.
- def wrap(func):
+ Redirect is bypassed if the author has a role to bypass redirection.
+ """
+ def wrap(func: Callable) -> Callable:
@wraps(func)
- async def inner(self, ctx, *args, **kwargs):
+ async def inner(self: Callable, ctx: Context, *args, **kwargs) -> Any:
if ctx.channel.id == destination_channel:
log.trace(f"Command {ctx.command.name} was invoked in destination_channel, not redirecting")
return await func(self, ctx, *args, **kwargs)
diff --git a/bot/interpreter.py b/bot/interpreter.py
index 06343db39..a42b45a2d 100644
--- a/bot/interpreter.py
+++ b/bot/interpreter.py
@@ -1,5 +1,8 @@
from code import InteractiveInterpreter
from io import StringIO
+from typing import Any
+
+from discord.ext.commands import Bot, Context
CODE_TEMPLATE = """
async def _func():
@@ -8,13 +11,20 @@ async def _func():
class Interpreter(InteractiveInterpreter):
+ """
+ Subclass InteractiveInterpreter to specify custom run functionality.
+
+ Helper class for internal eval.
+ """
+
write_callable = None
- def __init__(self, bot):
+ def __init__(self, bot: Bot):
_locals = {"bot": bot}
super().__init__(_locals)
- async def run(self, code, ctx, io, *args, **kwargs):
+ async def run(self, code: str, ctx: Context, io: StringIO, *args, **kwargs) -> Any:
+ """Execute the provided source code as the bot & return the output."""
self.locals["_rvalue"] = []
self.locals["ctx"] = ctx
self.locals["print"] = lambda x: io.write(f"{x}\n")
diff --git a/bot/pagination.py b/bot/pagination.py
index 0ad5b81f1..76082f459 100644
--- a/bot/pagination.py
+++ b/bot/pagination.py
@@ -2,7 +2,7 @@ import asyncio
import logging
from typing import Iterable, List, Optional, Tuple
-from discord import Embed, Member, Reaction
+from discord import Embed, Member, Message, Reaction
from discord.abc import User
from discord.ext.commands import Context, Paginator
@@ -18,6 +18,8 @@ log = logging.getLogger(__name__)
class EmptyPaginatorEmbed(Exception):
+ """Raised when attempting to paginate with empty contents."""
+
pass
@@ -25,25 +27,24 @@ class LinePaginator(Paginator):
"""
A class that aids in paginating code blocks for Discord messages.
- Attributes
- -----------
- prefix: :class:`str`
+ Available attributes include:
+ * prefix: `str`
The prefix inserted to every page. e.g. three backticks.
- suffix: :class:`str`
+ * suffix: `str`
The suffix appended at the end of every page. e.g. three backticks.
- max_size: :class:`int`
+ * max_size: `int`
The maximum amount of codepoints allowed in a page.
- max_lines: :class:`int`
+ * max_lines: `int`
The maximum amount of lines allowed in a page.
"""
- def __init__(self, prefix='```', suffix='```',
- max_size=2000, max_lines=None):
+ def __init__(
+ self, prefix: str = '```', suffix: str = '```', max_size: int = 2000, max_lines: int = None
+ ) -> None:
"""
- This function overrides the Paginator.__init__
- from inside discord.ext.commands.
- It overrides in order to allow us to configure
- the maximum number of lines per page.
+ This function overrides the Paginator.__init__ from inside discord.ext.commands.
+
+ It overrides in order to allow us to configure the maximum number of lines per page.
"""
self.prefix = prefix
self.suffix = suffix
@@ -54,28 +55,15 @@ class LinePaginator(Paginator):
self._count = len(prefix) + 1 # prefix + newline
self._pages = []
- def add_line(self, line='', *, empty=False):
- """Adds a line to the current page.
-
- If the line exceeds the :attr:`max_size` then an exception
- is raised.
+ def add_line(self, line: str = '', *, empty: bool = False) -> None:
+ """
+ Adds a line to the current page.
- This function overrides the Paginator.add_line
- from inside discord.ext.commands.
- It overrides in order to allow us to configure
- the maximum number of lines per page.
+ If the line exceeds the `self.max_size` then an exception is raised.
- Parameters
- -----------
- line: str
- The line to add.
- empty: bool
- Indicates if another empty line should be added.
+ This function overrides the `Paginator.add_line` from inside `discord.ext.commands`.
- Raises
- ------
- RuntimeError
- The line was too big for the current :attr:`max_size`.
+ It overrides in order to allow us to configure the maximum number of lines per page.
"""
if len(line) > self.max_size - len(self.prefix) - 2:
raise RuntimeError('Line exceeds maximum page size %s' % (self.max_size - len(self.prefix) - 2))
@@ -97,42 +85,39 @@ class LinePaginator(Paginator):
self._count += 1
@classmethod
- async def paginate(cls, lines: Iterable[str], ctx: Context, embed: Embed,
- prefix: str = "", suffix: str = "", max_lines: Optional[int] = None, max_size: int = 500,
- empty: bool = True, restrict_to_user: User = None, timeout: int = 300,
- footer_text: str = None, url: str = None, exception_on_empty_embed: bool = False):
+ async def paginate(
+ cls,
+ lines: Iterable[str],
+ ctx: Context,
+ embed: Embed,
+ prefix: str = "",
+ suffix: str = "",
+ max_lines: Optional[int] = None,
+ max_size: int = 500,
+ empty: bool = True,
+ restrict_to_user: User = None,
+ timeout: int = 300,
+ footer_text: str = None,
+ url: str = None,
+ exception_on_empty_embed: bool = False
+ ) -> Optional[Message]:
"""
- Use a paginator and set of reactions to provide pagination over a set of lines. The reactions are used to
- switch page, or to finish with pagination.
+ Use a paginator and set of reactions to provide pagination over a set of lines.
+
+ The reactions are used to switch page, or to finish with pagination.
+
When used, this will send a message using `ctx.send()` and apply a set of reactions to it. These reactions may
- be used to change page, or to remove pagination from the message. Pagination will also be removed automatically
- if no reaction is added for five minutes (300 seconds).
+ be used to change page, or to remove pagination from the message.
+
+ Pagination will also be removed automatically if no reaction is added for five minutes (300 seconds).
+
+ Example:
>>> embed = Embed()
>>> embed.set_author(name="Some Operation", url=url, icon_url=icon)
- >>> await LinePaginator.paginate(
- ... (line for line in lines),
- ... ctx, embed
- ... )
- :param lines: The lines to be paginated
- :param ctx: Current context object
- :param embed: A pre-configured embed to be used as a template for each page
- :param prefix: Text to place before each page
- :param suffix: Text to place after each page
- :param max_lines: The maximum number of lines on each page
- :param max_size: The maximum number of characters on each page
- :param empty: Whether to place an empty line between each given line
- :param restrict_to_user: A user to lock pagination operations to for this message, if supplied
- :param exception_on_empty_embed: Should there be an exception if the embed is empty?
- :param url: the url to use for the embed headline
- :param timeout: The amount of time in seconds to disable pagination of no reaction is added
- :param footer_text: Text to prefix the page number in the footer with
+ >>> await LinePaginator.paginate((line for line in lines), ctx, embed)
"""
-
- def event_check(reaction_: Reaction, user_: Member):
- """
- Make sure that this reaction is what we want to operate on
- """
-
+ def event_check(reaction_: Reaction, user_: Member) -> bool:
+ """Make sure that this reaction is what we want to operate on."""
no_restrictions = (
# Pagination is not restricted
not restrict_to_user
@@ -301,24 +286,20 @@ class LinePaginator(Paginator):
class ImagePaginator(Paginator):
"""
Helper class that paginates images for embeds in messages.
+
Close resemblance to LinePaginator, except focuses on images over text.
Refer to ImagePaginator.paginate for documentation on how to use.
"""
- def __init__(self, prefix="", suffix=""):
+ def __init__(self, prefix: str = "", suffix: str = ""):
super().__init__(prefix, suffix)
self._current_page = [prefix]
self.images = []
self._pages = []
def add_line(self, line: str = '', *, empty: bool = False) -> None:
- """
- Adds a line to each page, usually just 1 line in this context
- :param line: str to be page content / title
- :param empty: if there should be new lines between entries
- """
-
+ """Adds a line to each page."""
if line:
self._count = len(line)
else:
@@ -327,50 +308,36 @@ class ImagePaginator(Paginator):
self.close_page()
def add_image(self, image: str = None) -> None:
- """
- Adds an image to a page
- :param image: image url to be appended
- """
-
+ """Adds an image to a page."""
self.images.append(image)
@classmethod
- async def paginate(cls, pages: List[Tuple[str, str]], ctx: Context, embed: Embed,
- prefix: str = "", suffix: str = "", timeout: int = 300,
- exception_on_empty_embed: bool = False):
+ async def paginate(
+ cls,
+ pages: List[Tuple[str, str]],
+ ctx: Context, embed: Embed,
+ prefix: str = "",
+ suffix: str = "",
+ timeout: int = 300,
+ exception_on_empty_embed: bool = False
+ ) -> Optional[Message]:
"""
- Use a paginator and set of reactions to provide
- pagination over a set of title/image pairs.The reactions are
- used to switch page, or to finish with pagination.
+ Use a paginator and set of reactions to provide pagination over a set of title/image pairs.
+
+ The reactions are used to switch page, or to finish with pagination.
- When used, this will send a message using `ctx.send()` and
- apply a set of reactions to it. These reactions may
+ When used, this will send a message using `ctx.send()` and apply a set of reactions to it. These reactions may
be used to change page, or to remove pagination from the message.
- Note: Pagination will be removed automatically
- if no reaction is added for five minutes (300 seconds).
+ Note: Pagination will be removed automatically if no reaction is added for five minutes (300 seconds).
+ Example:
>>> embed = Embed()
>>> embed.set_author(name="Some Operation", url=url, icon_url=icon)
>>> await ImagePaginator.paginate(pages, ctx, embed)
-
- Parameters
- -----------
- :param pages: An iterable of tuples with title for page, and img url
- :param ctx: ctx for message
- :param embed: base embed to modify
- :param prefix: prefix of message
- :param suffix: suffix of message
- :param timeout: timeout for when reactions get auto-removed
"""
-
def check_event(reaction_: Reaction, member: Member) -> bool:
- """
- Checks each reaction added, if it matches our conditions pass the wait_for
- :param reaction_: reaction added
- :param member: reaction added by member
- """
-
+ """Checks each reaction added, if it matches our conditions pass the wait_for."""
return all((
# Reaction is on the same message sent
reaction_.message.id == message.id,
diff --git a/bot/patches/__init__.py b/bot/patches/__init__.py
index fd38ea8cf..60f6becaa 100644
--- a/bot/patches/__init__.py
+++ b/bot/patches/__init__.py
@@ -1,4 +1,4 @@
-"""Subpackage that contains patches for discord.py"""
+"""Subpackage that contains patches for discord.py."""
from . import message_edited_at
__all__ = [
diff --git a/bot/patches/message_edited_at.py b/bot/patches/message_edited_at.py
index 528373a9b..a0154f12d 100644
--- a/bot/patches/message_edited_at.py
+++ b/bot/patches/message_edited_at.py
@@ -1,5 +1,5 @@
"""
-# message_edited_at patch
+# message_edited_at patch.
Date: 2019-09-16
Author: Scragly
@@ -16,12 +16,12 @@ from discord import message, utils
log = logging.getLogger(__name__)
-def _handle_edited_timestamp(self, value):
+def _handle_edited_timestamp(self: message.Message, value: str) -> None:
"""Helper function that takes care of parsing the edited timestamp."""
self._edited_timestamp = utils.parse_time(value)
-def apply_patch():
+def apply_patch() -> None:
"""Applies the `edited_at` patch to the `discord.message.Message` class."""
message.Message._handle_edited_timestamp = _handle_edited_timestamp
message.Message._HANDLERS['edited_timestamp'] = message.Message._handle_edited_timestamp
diff --git a/bot/rules/attachments.py b/bot/rules/attachments.py
index 80a15d440..c550aed76 100644
--- a/bot/rules/attachments.py
+++ b/bot/rules/attachments.py
@@ -1,16 +1,12 @@
-"""Detects total attachments exceeding the limit sent by a single user."""
-
from typing import Dict, Iterable, List, Optional, Tuple
from discord import Member, Message
async def apply(
- last_message: Message,
- recent_messages: List[Message],
- config: Dict[str, int]
+ last_message: Message, recent_messages: List[Message], config: Dict[str, int]
) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
-
+ """Detects total attachments exceeding the limit sent by a single user."""
relevant_messages = [last_message] + [
msg
for msg in recent_messages
diff --git a/bot/rules/burst.py b/bot/rules/burst.py
index 80c79be60..25c5a2f33 100644
--- a/bot/rules/burst.py
+++ b/bot/rules/burst.py
@@ -1,16 +1,12 @@
-"""Detects repeated messages sent by a single user."""
-
from typing import Dict, Iterable, List, Optional, Tuple
from discord import Member, Message
async def apply(
- last_message: Message,
- recent_messages: List[Message],
- config: Dict[str, int]
+ last_message: Message, recent_messages: List[Message], config: Dict[str, int]
) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
-
+ """Detects repeated messages sent by a single user."""
relevant_messages = tuple(
msg
for msg in recent_messages
diff --git a/bot/rules/burst_shared.py b/bot/rules/burst_shared.py
index 2cb7b5200..bbe9271b3 100644
--- a/bot/rules/burst_shared.py
+++ b/bot/rules/burst_shared.py
@@ -1,16 +1,12 @@
-"""Detects repeated messages sent by multiple users."""
-
from typing import Dict, Iterable, List, Optional, Tuple
from discord import Member, Message
async def apply(
- last_message: Message,
- recent_messages: List[Message],
- config: Dict[str, int]
+ last_message: Message, recent_messages: List[Message], config: Dict[str, int]
) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
-
+ """Detects repeated messages sent by multiple users."""
total_recent = len(recent_messages)
if total_recent > config['max']:
diff --git a/bot/rules/chars.py b/bot/rules/chars.py
index d05e3cd83..1f587422c 100644
--- a/bot/rules/chars.py
+++ b/bot/rules/chars.py
@@ -1,16 +1,12 @@
-"""Detects total message char count exceeding the limit sent by a single user."""
-
from typing import Dict, Iterable, List, Optional, Tuple
from discord import Member, Message
async def apply(
- last_message: Message,
- recent_messages: List[Message],
- config: Dict[str, int]
+ last_message: Message, recent_messages: List[Message], config: Dict[str, int]
) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
-
+ """Detects total message char count exceeding the limit sent by a single user."""
relevant_messages = tuple(
msg
for msg in recent_messages
diff --git a/bot/rules/discord_emojis.py b/bot/rules/discord_emojis.py
index e4f957ddb..5bab514f2 100644
--- a/bot/rules/discord_emojis.py
+++ b/bot/rules/discord_emojis.py
@@ -1,5 +1,3 @@
-"""Detects total Discord emojis (excluding Unicode emojis) exceeding the limit sent by a single user."""
-
import re
from typing import Dict, Iterable, List, Optional, Tuple
@@ -10,11 +8,9 @@ DISCORD_EMOJI_RE = re.compile(r"<:\w+:\d+>")
async def apply(
- last_message: Message,
- recent_messages: List[Message],
- config: Dict[str, int]
+ last_message: Message, recent_messages: List[Message], config: Dict[str, int]
) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
-
+ """Detects total Discord emojis (excluding Unicode emojis) exceeding the limit sent by a single user."""
relevant_messages = tuple(
msg
for msg in recent_messages
diff --git a/bot/rules/duplicates.py b/bot/rules/duplicates.py
index 763fc9983..455764b53 100644
--- a/bot/rules/duplicates.py
+++ b/bot/rules/duplicates.py
@@ -1,16 +1,12 @@
-"""Detects duplicated messages sent by a single user."""
-
from typing import Dict, Iterable, List, Optional, Tuple
from discord import Member, Message
async def apply(
- last_message: Message,
- recent_messages: List[Message],
- config: Dict[str, int]
+ last_message: Message, recent_messages: List[Message], config: Dict[str, int]
) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
-
+ """Detects duplicated messages sent by a single user."""
relevant_messages = tuple(
msg
for msg in recent_messages
diff --git a/bot/rules/links.py b/bot/rules/links.py
index fa4043fcb..ec75a19c5 100644
--- a/bot/rules/links.py
+++ b/bot/rules/links.py
@@ -1,5 +1,3 @@
-"""Detects total links exceeding the limit sent by a single user."""
-
import re
from typing import Dict, Iterable, List, Optional, Tuple
@@ -10,11 +8,9 @@ LINK_RE = re.compile(r"(https?://[^\s]+)")
async def apply(
- last_message: Message,
- recent_messages: List[Message],
- config: Dict[str, int]
+ last_message: Message, recent_messages: List[Message], config: Dict[str, int]
) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
-
+ """Detects total links exceeding the limit sent by a single user."""
relevant_messages = tuple(
msg
for msg in recent_messages
diff --git a/bot/rules/mentions.py b/bot/rules/mentions.py
index 45c47b6ba..79725a4b1 100644
--- a/bot/rules/mentions.py
+++ b/bot/rules/mentions.py
@@ -1,16 +1,12 @@
-"""Detects total mentions exceeding the limit sent by a single user."""
-
from typing import Dict, Iterable, List, Optional, Tuple
from discord import Member, Message
async def apply(
- last_message: Message,
- recent_messages: List[Message],
- config: Dict[str, int]
+ last_message: Message, recent_messages: List[Message], config: Dict[str, int]
) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
-
+ """Detects total mentions exceeding the limit sent by a single user."""
relevant_messages = tuple(
msg
for msg in recent_messages
diff --git a/bot/rules/newlines.py b/bot/rules/newlines.py
index fdad6ffd3..4e66e1359 100644
--- a/bot/rules/newlines.py
+++ b/bot/rules/newlines.py
@@ -1,5 +1,3 @@
-"""Detects total newlines exceeding the set limit sent by a single user."""
-
import re
from typing import Dict, Iterable, List, Optional, Tuple
@@ -7,11 +5,9 @@ from discord import Member, Message
async def apply(
- last_message: Message,
- recent_messages: List[Message],
- config: Dict[str, int]
+ last_message: Message, recent_messages: List[Message], config: Dict[str, int]
) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
-
+ """Detects total newlines exceeding the set limit sent by a single user."""
relevant_messages = tuple(
msg
for msg in recent_messages
diff --git a/bot/rules/role_mentions.py b/bot/rules/role_mentions.py
index 2177a73b5..0649540b6 100644
--- a/bot/rules/role_mentions.py
+++ b/bot/rules/role_mentions.py
@@ -1,16 +1,12 @@
-"""Detects total role mentions exceeding the limit sent by a single user."""
-
from typing import Dict, Iterable, List, Optional, Tuple
from discord import Member, Message
async def apply(
- last_message: Message,
- recent_messages: List[Message],
- config: Dict[str, int]
+ last_message: Message, recent_messages: List[Message], config: Dict[str, int]
) -> Optional[Tuple[str, Iterable[Member], Iterable[Message]]]:
-
+ """Detects total role mentions exceeding the limit sent by a single user."""
relevant_messages = tuple(
msg
for msg in recent_messages
diff --git a/bot/utils/__init__.py b/bot/utils/__init__.py
index d5ae0a7c5..8184be824 100644
--- a/bot/utils/__init__.py
+++ b/bot/utils/__init__.py
@@ -1,10 +1,12 @@
from abc import ABCMeta
+from typing import Any, Generator, Hashable, Iterable
from discord.ext.commands import CogMeta
class CogABCMeta(CogMeta, ABCMeta):
"""Metaclass for ABCs meant to be implemented as Cogs."""
+
pass
@@ -16,50 +18,59 @@ class CaseInsensitiveDict(dict):
"""
@classmethod
- def _k(cls, key):
+ def _k(cls, key: Hashable) -> Hashable:
+ """Return lowered key if a string-like is passed, otherwise pass key straight through."""
return key.lower() if isinstance(key, str) else key
def __init__(self, *args, **kwargs):
super(CaseInsensitiveDict, self).__init__(*args, **kwargs)
self._convert_keys()
- def __getitem__(self, key):
+ def __getitem__(self, key: Hashable) -> Any:
+ """Case insensitive __setitem__."""
return super(CaseInsensitiveDict, self).__getitem__(self.__class__._k(key))
- def __setitem__(self, key, value):
+ def __setitem__(self, key: Hashable, value: Any):
+ """Case insensitive __setitem__."""
super(CaseInsensitiveDict, self).__setitem__(self.__class__._k(key), value)
- def __delitem__(self, key):
+ def __delitem__(self, key: Hashable) -> Any:
+ """Case insensitive __delitem__."""
return super(CaseInsensitiveDict, self).__delitem__(self.__class__._k(key))
- def __contains__(self, key):
+ def __contains__(self, key: Hashable) -> bool:
+ """Case insensitive __contains__."""
return super(CaseInsensitiveDict, self).__contains__(self.__class__._k(key))
- def pop(self, key, *args, **kwargs):
+ def pop(self, key: Hashable, *args, **kwargs) -> Any:
+ """Case insensitive pop."""
return super(CaseInsensitiveDict, self).pop(self.__class__._k(key), *args, **kwargs)
- def get(self, key, *args, **kwargs):
+ def get(self, key: Hashable, *args, **kwargs) -> Any:
+ """Case insensitive get."""
return super(CaseInsensitiveDict, self).get(self.__class__._k(key), *args, **kwargs)
- def setdefault(self, key, *args, **kwargs):
+ def setdefault(self, key: Hashable, *args, **kwargs) -> Any:
+ """Case insensitive setdefault."""
return super(CaseInsensitiveDict, self).setdefault(self.__class__._k(key), *args, **kwargs)
- def update(self, E=None, **F):
+ def update(self, E: Any = None, **F) -> None:
+ """Case insensitive update."""
super(CaseInsensitiveDict, self).update(self.__class__(E))
super(CaseInsensitiveDict, self).update(self.__class__(**F))
- def _convert_keys(self):
+ def _convert_keys(self) -> None:
+ """Helper method to lowercase all existing string-like keys."""
for k in list(self.keys()):
v = super(CaseInsensitiveDict, self).pop(k)
self.__setitem__(k, v)
-def chunks(iterable, size):
+def chunks(iterable: Iterable, size: int) -> Generator[Any, None, None]:
"""
- Generator that allows you to iterate over any indexable collection in `size`-length chunks
+ Generator that allows you to iterate over any indexable collection in `size`-length chunks.
Found: https://stackoverflow.com/a/312464/4022104
"""
-
for i in range(0, len(iterable), size):
yield iterable[i:i + size]
diff --git a/bot/utils/checks.py b/bot/utils/checks.py
index 195edab0f..19f64ff9f 100644
--- a/bot/utils/checks.py
+++ b/bot/utils/checks.py
@@ -6,11 +6,7 @@ log = logging.getLogger(__name__)
def with_role_check(ctx: Context, *role_ids: int) -> bool:
- """
- Returns True if the user has any one
- of the roles in role_ids.
- """
-
+ """Returns True if the user has any one of the roles in role_ids."""
if not ctx.guild: # Return False in a DM
log.trace(f"{ctx.author} tried to use the '{ctx.command.name}'command from a DM. "
"This command is restricted by the with_role decorator. Rejecting request.")
@@ -27,11 +23,7 @@ def with_role_check(ctx: Context, *role_ids: int) -> bool:
def without_role_check(ctx: Context, *role_ids: int) -> bool:
- """
- Returns True if the user does not have any
- of the roles in role_ids.
- """
-
+ """Returns True if the user does not have any of the roles in role_ids."""
if not ctx.guild: # Return False in a DM
log.trace(f"{ctx.author} tried to use the '{ctx.command.name}' command from a DM. "
"This command is restricted by the without_role decorator. Rejecting request.")
@@ -45,11 +37,7 @@ def without_role_check(ctx: Context, *role_ids: int) -> bool:
def in_channel_check(ctx: Context, channel_id: int) -> bool:
- """
- Checks if the command was executed
- inside of the specified channel.
- """
-
+ """Checks if the command was executed inside of the specified channel."""
check = ctx.channel.id == channel_id
log.trace(f"{ctx.author} tried to call the '{ctx.command.name}' command. "
f"The result of the in_channel check was {check}.")
diff --git a/bot/utils/messages.py b/bot/utils/messages.py
index 94a8b36ed..549b33ca6 100644
--- a/bot/utils/messages.py
+++ b/bot/utils/messages.py
@@ -1,9 +1,9 @@
import asyncio
import contextlib
from io import BytesIO
-from typing import Sequence, Union
+from typing import Optional, Sequence, Union
-from discord import Embed, File, Message, TextChannel, Webhook
+from discord import Client, Embed, File, Member, Message, Reaction, TextChannel, Webhook
from discord.abc import Snowflake
from discord.errors import HTTPException
@@ -17,42 +17,18 @@ async def wait_for_deletion(
user_ids: Sequence[Snowflake],
deletion_emojis: Sequence[str] = (Emojis.cross_mark,),
timeout: float = 60 * 5,
- attach_emojis=True,
- client=None
-):
- """
- Waits for up to `timeout` seconds for a reaction by
- any of the specified `user_ids` to delete the message.
-
- Args:
- message (Message):
- The message that should be monitored for reactions
- and possibly deleted. Must be a message sent on a
- guild since access to the bot instance is required.
-
- user_ids (Sequence[Snowflake]):
- A sequence of users that are allowed to delete
- this message.
-
- Kwargs:
- deletion_emojis (Sequence[str]):
- A sequence of emojis that are considered deletion
- emojis.
-
- timeout (float):
- A positive float denoting the maximum amount of
- time to wait for a deletion reaction.
-
- attach_emojis (bool):
- Whether to attach the given `deletion_emojis`
- to the message in the given `context`
-
- client (Optional[discord.Client]):
- The client instance handling the original command.
- If not given, will take the client from the guild
- of the message.
+ attach_emojis: bool = True,
+ client: Optional[Client] = None
+) -> None:
"""
+ Wait for up to `timeout` seconds for a reaction by any of the specified `user_ids` to delete the message.
+
+ An `attach_emojis` bool may be specified to determine whether to attach the given
+ `deletion_emojis` to the message in the given `context`
+ A `client` instance may be optionally specified, otherwise client will be taken from the
+ guild of the message.
+ """
if message.guild is None and client is None:
raise ValueError("Message must be sent on a guild")
@@ -62,7 +38,8 @@ async def wait_for_deletion(
for emoji in deletion_emojis:
await message.add_reaction(emoji)
- def check(reaction, user):
+ def check(reaction: Reaction, user: Member) -> bool:
+ """Check that the deletion emoji is reacted by the approprite user."""
return (
reaction.message.id == message.id
and reaction.emoji in deletion_emojis
@@ -70,25 +47,17 @@ async def wait_for_deletion(
)
with contextlib.suppress(asyncio.TimeoutError):
- await bot.wait_for(
- 'reaction_add',
- check=check,
- timeout=timeout
- )
+ await bot.wait_for('reaction_add', check=check, timeout=timeout)
await message.delete()
-async def send_attachments(message: Message, destination: Union[TextChannel, Webhook]):
+async def send_attachments(message: Message, destination: Union[TextChannel, Webhook]) -> None:
"""
Re-uploads each attachment in a message to the given channel or webhook.
Each attachment is sent as a separate message to more easily comply with the 8 MiB request size limit.
If attachments are too large, they are instead grouped into a single embed which links to them.
-
- :param message: the message whose attachments to re-upload
- :param destination: the channel in which to re-upload the attachments
"""
-
large = []
for attachment in message.attachments:
try:
diff --git a/bot/utils/scheduling.py b/bot/utils/scheduling.py
index f03865013..08abd91d7 100644
--- a/bot/utils/scheduling.py
+++ b/bot/utils/scheduling.py
@@ -2,7 +2,7 @@ import asyncio
import contextlib
import logging
from abc import abstractmethod
-from typing import Dict
+from typing import Coroutine, Dict, Union
from bot.utils import CogABCMeta
@@ -10,6 +10,7 @@ log = logging.getLogger(__name__)
class Scheduler(metaclass=CogABCMeta):
+ """Task scheduler."""
def __init__(self):
@@ -17,24 +18,23 @@ class Scheduler(metaclass=CogABCMeta):
self.scheduled_tasks: Dict[str, asyncio.Task] = {}
@abstractmethod
- async def _scheduled_task(self, task_object: dict):
+ async def _scheduled_task(self, task_object: dict) -> None:
"""
- A coroutine which handles the scheduling. This is added to the scheduled tasks,
- and should wait the task duration, execute the desired code, and clean up the task.
+ A coroutine which handles the scheduling.
+
+ This is added to the scheduled tasks, and should wait the task duration, execute the desired
+ code, then clean up the task.
+
For example, in Reminders this will wait for the reminder duration, send the reminder,
then make a site API request to delete the reminder from the database.
-
- :param task_object:
"""
- def schedule_task(self, loop: asyncio.AbstractEventLoop, task_id: str, task_data: dict):
+ def schedule_task(self, loop: asyncio.AbstractEventLoop, task_id: str, task_data: dict) -> None:
"""
Schedules a task.
- :param loop: the asyncio event loop
- :param task_id: the ID of the task.
- :param task_data: the data of the task, passed to `Scheduler._scheduled_expiration`.
- """
+ `task_data` is passed to `Scheduler._scheduled_expiration`
+ """
if task_id in self.scheduled_tasks:
return
@@ -42,12 +42,8 @@ class Scheduler(metaclass=CogABCMeta):
self.scheduled_tasks[task_id] = task
- def cancel_task(self, task_id: str):
- """
- Un-schedules a task.
- :param task_id: the ID of the infraction in question
- """
-
+ def cancel_task(self, task_id: str) -> None:
+ """Un-schedules a task."""
task = self.scheduled_tasks.get(task_id)
if task is None:
@@ -59,14 +55,8 @@ class Scheduler(metaclass=CogABCMeta):
del self.scheduled_tasks[task_id]
-def create_task(loop: asyncio.AbstractEventLoop, coro_or_future):
- """
- Creates an asyncio.Task object from a coroutine or future object.
-
- :param loop: the asyncio event loop.
- :param coro_or_future: the coroutine or future object to be scheduled.
- """
-
+def create_task(loop: asyncio.AbstractEventLoop, coro_or_future: Union[Coroutine, asyncio.Future]) -> asyncio.Task:
+ """Creates an asyncio.Task object from a coroutine or future object."""
task: asyncio.Task = asyncio.ensure_future(coro_or_future, loop=loop)
# Silently ignore exceptions in a callback (handles the CancelledError nonsense)
@@ -74,6 +64,7 @@ def create_task(loop: asyncio.AbstractEventLoop, coro_or_future):
return task
-def _silent_exception(future):
+def _silent_exception(future: asyncio.Future) -> None:
+ """Suppress future's exception."""
with contextlib.suppress(Exception):
future.exception()
diff --git a/bot/utils/time.py b/bot/utils/time.py
index a330c9cd8..da28f2c76 100644
--- a/bot/utils/time.py
+++ b/bot/utils/time.py
@@ -1,15 +1,16 @@
import asyncio
import datetime
+import dateutil.parser
from dateutil.relativedelta import relativedelta
RFC1123_FORMAT = "%a, %d %b %Y %H:%M:%S GMT"
+INFRACTION_FORMAT = "%Y-%m-%d %H:%M"
-def _stringify_time_unit(value: int, unit: str):
+def _stringify_time_unit(value: int, unit: str) -> str:
"""
- Returns a string to represent a value and time unit,
- ensuring that it uses the right plural form of the unit.
+ Returns a string to represent a value and time unit, ensuring that it uses the right plural form of the unit.
>>> _stringify_time_unit(1, "seconds")
"1 second"
@@ -18,7 +19,6 @@ def _stringify_time_unit(value: int, unit: str):
>>> _stringify_time_unit(0, "minutes")
"less than a minute"
"""
-
if value == 1:
return f"{value} {unit[:-1]}"
elif value == 0:
@@ -27,18 +27,13 @@ def _stringify_time_unit(value: int, unit: str):
return f"{value} {unit}"
-def humanize_delta(delta: relativedelta, precision: str = "seconds", max_units: int = 6):
+def humanize_delta(delta: relativedelta, precision: str = "seconds", max_units: int = 6) -> str:
"""
Returns a human-readable version of the relativedelta.
- :param delta: A dateutil.relativedelta.relativedelta object
- :param precision: The smallest unit that should be included.
- :param max_units: The maximum number of time-units to return.
-
- :return: A string like `4 days, 12 hours and 1 second`,
- `1 minute`, or `less than a minute`.
+ precision specifies the smallest unit of time to include (e.g. "seconds", "minutes").
+ max_units specifies the maximum number of units of time to include (e.g. 1 may include days but not hours).
"""
-
units = (
("years", delta.years),
("months", delta.months),
@@ -73,19 +68,13 @@ def humanize_delta(delta: relativedelta, precision: str = "seconds", max_units:
return humanized
-def time_since(past_datetime: datetime.datetime, precision: str = "seconds", max_units: int = 6):
+def time_since(past_datetime: datetime.datetime, precision: str = "seconds", max_units: int = 6) -> str:
"""
- Takes a datetime and returns a human-readable string that
- describes how long ago that datetime was.
+ Takes a datetime and returns a human-readable string that describes how long ago that datetime was.
- :param past_datetime: A datetime.datetime object
- :param precision: The smallest unit that should be included.
- :param max_units: The maximum number of time-units to return.
-
- :return: A string like `4 days, 12 hours and 1 second ago`,
- `1 minute ago`, or `less than a minute ago`.
+ precision specifies the smallest unit of time to include (e.g. "seconds", "minutes").
+ max_units specifies the maximum number of units of time to include (e.g. 1 may include days but not hours).
"""
-
now = datetime.datetime.utcnow()
delta = abs(relativedelta(now, past_datetime))
@@ -94,20 +83,22 @@ def time_since(past_datetime: datetime.datetime, precision: str = "seconds", max
return f"{humanized} ago"
-def parse_rfc1123(time_str):
+def parse_rfc1123(time_str: str) -> datetime.datetime:
+ """Parse RFC1123 time string into datetime."""
return datetime.datetime.strptime(time_str, RFC1123_FORMAT).replace(tzinfo=datetime.timezone.utc)
# Hey, this could actually be used in the off_topic_names and reddit cogs :)
-async def wait_until(time: datetime.datetime):
- """
- Wait until a given time.
-
- :param time: A datetime.datetime object to wait until.
- """
-
+async def wait_until(time: datetime.datetime) -> None:
+ """Wait until a given time."""
delay = time - datetime.datetime.utcnow()
delay_seconds = delay.total_seconds()
+ # Incorporate a small delay so we don't rapid-fire the event due to time precision errors
if delay_seconds > 1.0:
await asyncio.sleep(delay_seconds)
+
+
+def format_infraction(timestamp: str) -> str:
+ """Format an infraction timestamp to a more readable ISO 8601 format."""
+ return dateutil.parser.isoparse(timestamp).strftime(INFRACTION_FORMAT)
diff --git a/config-default.yml b/config-default.yml
index 403de21ad..38b26f64f 100644
--- a/config-default.yml
+++ b/config-default.yml
@@ -128,14 +128,15 @@ guild:
champion: 430492892331769857
contributor: 295488872404484098
core_developer: 587606783669829632
+ helpers: 267630620367257601
jammer: 423054537079783434
moderator: &MOD_ROLE 267629731250176001
muted: &MUTED_ROLE 277914926603829249
owner: &OWNER_ROLE 267627879762755584
- verified: 352427296948486144
- helpers: 267630620367257601
+ partners: 323426753857191936
rockstars: &ROCKSTARS_ROLE 458226413825294336
team_leader: 501324292341104650
+ verified: 352427296948486144
webhooks:
talent_pool: 569145364800602132
diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644
index 000000000..9684a3c62
--- /dev/null
+++ b/docker-compose.yml
@@ -0,0 +1,44 @@
+# This docker compose is used for quick setups of the site and database which
+# the bot project relies on for testing. Use it if you haven't got a
+# ready-to-use site environment already setup.
+
+version: "3.7"
+
+services:
+ postgres:
+ image: postgres:11-alpine
+ environment:
+ POSTGRES_DB: pysite
+ POSTGRES_PASSWORD: pysite
+ POSTGRES_USER: pysite
+
+ web:
+ image: pythondiscord/site:latest
+ command: ["run", "--debug"]
+ networks:
+ default:
+ aliases:
+ - api.web
+ - admin.web
+ - staff.web
+ ports:
+ - "127.0.0.1:8000:8000"
+ depends_on:
+ - postgres
+ environment:
+ DATABASE_URL: postgres://pysite:pysite@postgres:5432/pysite
+ SECRET_KEY: suitable-for-development-only
+ STATIC_ROOT: /var/www/static
+
+ bot:
+ build:
+ context: .
+ dockerfile: Dockerfile
+ volumes:
+ - ./logs:/bot/logs
+ - .:/bot:ro
+ depends_on:
+ - web
+ environment:
+ BOT_TOKEN: ${BOT_TOKEN}
+ BOT_API_KEY: badbot13m0n8f570f942013fc818f234916ca531
diff --git a/docker/ci.Dockerfile b/docker/ci.Dockerfile
deleted file mode 100644
index fd7e25239..000000000
--- a/docker/ci.Dockerfile
+++ /dev/null
@@ -1,20 +0,0 @@
-FROM python:3.6-alpine3.7
-
-RUN apk add --update docker \
- curl \
- tini \
- build-base \
- libffi-dev \
- zlib \
- jpeg-dev \
- libxml2 libxml2-dev libxslt-dev \
- zlib-dev \
- freetype-dev
-
-RUN pip install pipenv
-
-ENV LIBRARY_PATH=/lib:/usr/lib
-ENV PIPENV_VENV_IN_PROJECT=1
-ENV PIPENV_IGNORE_VIRTUALENVS=1
-ENV PIPENV_NOSPIN=1
-ENV PIPENV_HIDE_EMOJIS=1
diff --git a/scripts/deploy-azure.sh b/scripts/deploy-azure.sh
deleted file mode 100644
index ed4b719e2..000000000
--- a/scripts/deploy-azure.sh
+++ /dev/null
@@ -1,12 +0,0 @@
-#!/bin/bash
-
-cd ..
-
-# Build and deploy on master branch, only if not a pull request
-if [[ ($BUILD_SOURCEBRANCHNAME == 'master') && ($SYSTEM_PULLREQUEST_PULLREQUESTID == '') ]]; then
- echo "Building image"
- docker build -t pythondiscord/bot:latest .
-
- echo "Pushing image"
- docker push pythondiscord/bot:latest
-fi
diff --git a/tests/test_converters.py b/tests/test_converters.py
index 3cf774c80..35fc5d88e 100644
--- a/tests/test_converters.py
+++ b/tests/test_converters.py
@@ -1,12 +1,13 @@
import asyncio
-from datetime import datetime
-from unittest.mock import MagicMock
+import datetime
+from unittest.mock import MagicMock, patch
import pytest
+from dateutil.relativedelta import relativedelta
from discord.ext.commands import BadArgument
from bot.converters import (
- ExpirationDate,
+ Duration,
TagContentConverter,
TagNameConverter,
ValidPythonIdentifier,
@@ -16,18 +17,6 @@ from bot.converters import (
@pytest.mark.parametrize(
('value', 'expected'),
(
- # sorry aliens
- ('2199-01-01T00:00:00', datetime(2199, 1, 1)),
- )
-)
-def test_expiration_date_converter_for_valid(value: str, expected: datetime):
- converter = ExpirationDate()
- assert asyncio.run(converter.convert(None, value)) == expected
-
-
- ('value', 'expected'),
- (
('hello', 'hello'),
(' h ello ', 'h ello')
)
@@ -91,3 +80,107 @@ def test_valid_python_identifier_for_valid(value: str):
def test_valid_python_identifier_for_invalid(value: str):
with pytest.raises(BadArgument, match=f'`{value}` is not a valid Python identifier'):
asyncio.run(ValidPythonIdentifier.convert(None, value))
+
+
+FIXED_UTC_NOW = datetime.datetime.fromisoformat('2019-01-01T00:00:00')
+
+
+ params=(
+ # Simple duration strings
+ ('1Y', {"years": 1}),
+ ('1y', {"years": 1}),
+ ('1year', {"years": 1}),
+ ('1years', {"years": 1}),
+ ('1m', {"months": 1}),
+ ('1month', {"months": 1}),
+ ('1months', {"months": 1}),
+ ('1w', {"weeks": 1}),
+ ('1W', {"weeks": 1}),
+ ('1week', {"weeks": 1}),
+ ('1weeks', {"weeks": 1}),
+ ('1d', {"days": 1}),
+ ('1D', {"days": 1}),
+ ('1day', {"days": 1}),
+ ('1days', {"days": 1}),
+ ('1h', {"hours": 1}),
+ ('1H', {"hours": 1}),
+ ('1hour', {"hours": 1}),
+ ('1hours', {"hours": 1}),
+ ('1M', {"minutes": 1}),
+ ('1minute', {"minutes": 1}),
+ ('1minutes', {"minutes": 1}),
+ ('1s', {"seconds": 1}),
+ ('1S', {"seconds": 1}),
+ ('1second', {"seconds": 1}),
+ ('1seconds', {"seconds": 1}),
+
+ # Complex duration strings
+ (
+ '1y1m1w1d1H1M1S',
+ {
+ "years": 1,
+ "months": 1,
+ "weeks": 1,
+ "days": 1,
+ "hours": 1,
+ "minutes": 1,
+ "seconds": 1
+ }
+ ),
+ ('5y100S', {"years": 5, "seconds": 100}),
+ ('2w28H', {"weeks": 2, "hours": 28}),
+
+ # Duration strings with spaces
+ ('1 year 2 months', {"years": 1, "months": 2}),
+ ('1d 2H', {"days": 1, "hours": 2}),
+ ('1 week2 days', {"weeks": 1, "days": 2}),
+ )
+)
+def create_future_datetime(request):
+ """Yields duration string and target datetime.datetime object."""
+ duration, duration_dict = request.param
+ future_datetime = FIXED_UTC_NOW + relativedelta(**duration_dict)
+ yield duration, future_datetime
+
+
+def test_duration_converter_for_valid(create_future_datetime: tuple):
+ converter = Duration()
+ duration, expected = create_future_datetime
+ with patch('bot.converters.datetime') as mock_datetime:
+ mock_datetime.utcnow.return_value = FIXED_UTC_NOW
+ assert asyncio.run(converter.convert(None, duration)) == expected
+
+
+ ('duration'),
+ (
+ # Units in wrong order
+ ('1d1w'),
+ ('1s1y'),
+
+ # Duplicated units
+ ('1 year 2 years'),
+ ('1 M 10 minutes'),
+
+ # Unknown substrings
+ ('1MVes'),
+ ('1y3breads'),
+
+ # Missing amount
+ ('ym'),
+
+ # Incorrect whitespace
+ (" 1y"),
+ ("1S "),
+ ("1y 1m"),
+
+ # Garbage
+ ('Guido van Rossum'),
+ ('lemon lemon lemon lemon lemon lemon lemon'),
+ )
+)
+def test_duration_converter_for_invalid(duration: str):
+ converter = Duration()
+ with pytest.raises(BadArgument, match=f'`{duration}` is not a valid duration string.'):
+ asyncio.run(converter.convert(None, duration))
diff --git a/tox.ini b/tox.ini
index 21097cd97..d14819d57 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,6 +1,19 @@
[flake8]
max-line-length=120
-application_import_names=bot,tests
-exclude=.cache,.venv
-ignore=B311,W503,E226,S311,T000
+docstring-convention=all
import-order-style=pycharm
+application_import_names=bot,tests
+exclude=.cache,.venv,constants.py
+ignore=
+ B311,W503,E226,S311,T000
+ # Missing Docstrings
+ D100,D104,D105,D107,
+ # Docstring Whitespace
+ D203,D212,D214,D215,
+ # Docstring Quotes
+ D301,D302,
+ # Docstring Content
+ D400,D401,D402,D404,D405,D406,D407,D408,D409,D410,D411,D412,D413,D414,D416,D417
+ # Type Annotations
+ TYP002,TYP003,TYP101,TYP102,TYP204,TYP206
+per-file-ignores=tests/*:D,TYP