aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar Kieran Siek <[email protected]>2019-10-15 22:35:24 +0800
committerGravatar GitHub <[email protected]>2019-10-15 22:35:24 +0800
commit61bbe0c1212c15dd09654205054d7be4d303f880 (patch)
tree21599a833e30dd8ee9a8e67537ebca1b562ef7a4
parentMerge branch 'master' into off-topic-check (diff)
parentCreate the !mention command. (#493) (diff)
Merge branch 'master' into off-topic-check
-rw-r--r--bot/cogs/information.py86
-rw-r--r--bot/cogs/utils.py132
-rw-r--r--bot/constants.py7
-rw-r--r--bot/utils/checks.py48
-rw-r--r--config-default.yml4
5 files changed, 230 insertions, 47 deletions
diff --git a/bot/cogs/information.py b/bot/cogs/information.py
index 1afb37103..3a7ba0444 100644
--- a/bot/cogs/information.py
+++ b/bot/cogs/information.py
@@ -1,14 +1,18 @@
import colorsys
import logging
+import pprint
import textwrap
import typing
+from typing import Any, Mapping, Optional
+import discord
from discord import CategoryChannel, Colour, Embed, Member, Role, TextChannel, VoiceChannel, utils
-from discord.ext.commands import Bot, Cog, Context, command
+from discord.ext import commands
+from discord.ext.commands import Bot, BucketType, Cog, Context, command, group
from bot.constants import Channels, Emojis, MODERATION_ROLES, STAFF_ROLES
-from bot.decorators import InChannelCheckFailure, with_role
-from bot.utils.checks import with_role_check
+from bot.decorators import InChannelCheckFailure, in_channel, with_role
+from bot.utils.checks import cooldown_with_role_bypass, with_role_check
from bot.utils.time import time_since
log = logging.getLogger(__name__)
@@ -229,6 +233,82 @@ class Information(Cog):
await ctx.send(embed=embed)
+ def format_fields(self, mapping: Mapping[str, Any], field_width: Optional[int] = None) -> str:
+ """Format a mapping to be readable to a human."""
+ # sorting is technically superfluous but nice if you want to look for a specific field
+ fields = sorted(mapping.items(), key=lambda item: item[0])
+
+ if field_width is None:
+ field_width = len(max(mapping.keys(), key=len))
+
+ out = ''
+
+ for key, val in fields:
+ if isinstance(val, dict):
+ # if we have dicts inside dicts we want to apply the same treatment to the inner dictionaries
+ inner_width = int(field_width * 1.6)
+ val = '\n' + self.format_fields(val, field_width=inner_width)
+
+ elif isinstance(val, str):
+ # split up text since it might be long
+ text = textwrap.fill(val, width=100, replace_whitespace=False)
+
+ # indent it, I guess you could do this with `wrap` and `join` but this is nicer
+ val = textwrap.indent(text, ' ' * (field_width + len(': ')))
+
+ # the first line is already indented so we `str.lstrip` it
+ val = val.lstrip()
+
+ if key == 'color':
+ # makes the base 10 representation of a hex number readable to humans
+ val = hex(val)
+
+ out += '{0:>{width}}: {1}\n'.format(key, val, width=field_width)
+
+ # remove trailing whitespace
+ return out.rstrip()
+
+ @cooldown_with_role_bypass(2, 60 * 3, BucketType.member, bypass_roles=STAFF_ROLES)
+ @group(invoke_without_command=True)
+ @in_channel(Channels.bot, bypass_roles=STAFF_ROLES)
+ async def raw(self, ctx: Context, *, message: discord.Message, json: bool = False) -> None:
+ """Shows information about the raw API response."""
+ # I *guess* it could be deleted right as the command is invoked but I felt like it wasn't worth handling
+ # doing this extra request is also much easier than trying to convert everything back into a dictionary again
+ raw_data = await ctx.bot.http.get_message(message.channel.id, message.id)
+
+ paginator = commands.Paginator()
+
+ def add_content(title: str, content: str) -> None:
+ paginator.add_line(f'== {title} ==\n')
+ # replace backticks as it breaks out of code blocks. Spaces seemed to be the most reasonable solution.
+ # we hope it's not close to 2000
+ paginator.add_line(content.replace('```', '`` `'))
+ paginator.close_page()
+
+ if message.content:
+ add_content('Raw message', message.content)
+
+ transformer = pprint.pformat if json else self.format_fields
+ for field_name in ('embeds', 'attachments'):
+ data = raw_data[field_name]
+
+ if not data:
+ continue
+
+ total = len(data)
+ for current, item in enumerate(data, start=1):
+ title = f'Raw {field_name} ({current}/{total})'
+ add_content(title, transformer(item))
+
+ for page in paginator.pages:
+ await ctx.send(page)
+
+ @raw.command()
+ async def json(self, ctx: Context, message: discord.Message) -> None:
+ """Shows information about the raw API response in a copy-pasteable Python format."""
+ await ctx.invoke(self.raw, message=message, json=True)
+
def setup(bot: Bot) -> None:
"""Information cog load."""
diff --git a/bot/cogs/utils.py b/bot/cogs/utils.py
index b6cecdc7c..793fe4c1a 100644
--- a/bot/cogs/utils.py
+++ b/bot/cogs/utils.py
@@ -1,15 +1,18 @@
import logging
import re
import unicodedata
+from asyncio import TimeoutError, sleep
from email.parser import HeaderParser
from io import StringIO
from typing import Tuple
-from discord import Colour, Embed
+from dateutil import relativedelta
+from discord import Colour, Embed, Message, Role
from discord.ext.commands import Bot, Cog, Context, command
-from bot.constants import Channels, STAFF_ROLES
-from bot.decorators import in_channel
+from bot.constants import Channels, MODERATION_ROLES, Mention, STAFF_ROLES
+from bot.decorators import in_channel, with_role
+from bot.utils.time import humanize_delta
log = logging.getLogger(__name__)
@@ -32,56 +35,58 @@ class Utils(Cog):
await ctx.invoke(self.bot.get_command("help"), "pep")
return
- # Newer PEPs are written in RST instead of txt
- if pep_number > 542:
- pep_url = f"{self.base_github_pep_url}{pep_number:04}.rst"
- else:
- pep_url = f"{self.base_github_pep_url}{pep_number:04}.txt"
-
- # Attempt to fetch the PEP
- log.trace(f"Requesting PEP {pep_number} with {pep_url}")
- response = await self.bot.http_session.get(pep_url)
-
- if response.status == 200:
- log.trace("PEP found")
+ possible_extensions = ['.txt', '.rst']
+ found_pep = False
+ for extension in possible_extensions:
+ # Attempt to fetch the PEP
+ pep_url = f"{self.base_github_pep_url}{pep_number:04}{extension}"
+ log.trace(f"Requesting PEP {pep_number} with {pep_url}")
+ response = await self.bot.http_session.get(pep_url)
- pep_content = await response.text()
+ if response.status == 200:
+ log.trace("PEP found")
+ found_pep = True
- # Taken from https://github.com/python/peps/blob/master/pep0/pep.py#L179
- pep_header = HeaderParser().parse(StringIO(pep_content))
+ pep_content = await response.text()
- # Assemble the embed
- pep_embed = Embed(
- title=f"**PEP {pep_number} - {pep_header['Title']}**",
- description=f"[Link]({self.base_pep_url}{pep_number:04})",
- )
-
- pep_embed.set_thumbnail(url="https://www.python.org/static/opengraph-icon-200x200.png")
+ # Taken from https://github.com/python/peps/blob/master/pep0/pep.py#L179
+ pep_header = HeaderParser().parse(StringIO(pep_content))
- # Add the interesting information
- if "Status" in pep_header:
- pep_embed.add_field(name="Status", value=pep_header["Status"])
- if "Python-Version" in pep_header:
- pep_embed.add_field(name="Python-Version", value=pep_header["Python-Version"])
- if "Created" in pep_header:
- pep_embed.add_field(name="Created", value=pep_header["Created"])
- if "Type" in pep_header:
- pep_embed.add_field(name="Type", value=pep_header["Type"])
+ # Assemble the embed
+ pep_embed = Embed(
+ title=f"**PEP {pep_number} - {pep_header['Title']}**",
+ description=f"[Link]({self.base_pep_url}{pep_number:04})",
+ )
- elif response.status == 404:
+ pep_embed.set_thumbnail(url="https://www.python.org/static/opengraph-icon-200x200.png")
+
+ # Add the interesting information
+ if "Status" in pep_header:
+ pep_embed.add_field(name="Status", value=pep_header["Status"])
+ if "Python-Version" in pep_header:
+ pep_embed.add_field(name="Python-Version", value=pep_header["Python-Version"])
+ if "Created" in pep_header:
+ pep_embed.add_field(name="Created", value=pep_header["Created"])
+ if "Type" in pep_header:
+ pep_embed.add_field(name="Type", value=pep_header["Type"])
+
+ elif response.status != 404:
+ # any response except 200 and 404 is expected
+ found_pep = True # actually not, but it's easier to display this way
+ log.trace(f"The user requested PEP {pep_number}, but the response had an unexpected status code: "
+ f"{response.status}.\n{response.text}")
+
+ error_message = "Unexpected HTTP error during PEP search. Please let us know."
+ pep_embed = Embed(title="Unexpected error", description=error_message)
+ pep_embed.colour = Colour.red()
+ break
+
+ if not found_pep:
log.trace("PEP was not found")
not_found = f"PEP {pep_number} does not exist."
pep_embed = Embed(title="PEP not found", description=not_found)
pep_embed.colour = Colour.red()
- else:
- log.trace(f"The user requested PEP {pep_number}, but the response had an unexpected status code: "
- f"{response.status}.\n{response.text}")
-
- error_message = "Unexpected HTTP error during PEP search. Please let us know."
- pep_embed = Embed(title="Unexpected error", description=error_message)
- pep_embed.colour = Colour.red()
-
await ctx.message.channel.send(embed=pep_embed)
@command()
@@ -128,6 +133,47 @@ class Utils(Cog):
await ctx.send(embed=embed)
+ @command()
+ @with_role(*MODERATION_ROLES)
+ async def mention(self, ctx: Context, *, role: Role) -> None:
+ """Set a role to be mentionable for a limited time."""
+ if role.mentionable:
+ await ctx.send(f"{role} is already mentionable!")
+ return
+
+ await role.edit(reason=f"Role unlocked by {ctx.author}", mentionable=True)
+
+ human_time = humanize_delta(relativedelta.relativedelta(seconds=Mention.message_timeout))
+ await ctx.send(
+ f"{role} has been made mentionable. I will reset it in {human_time}, or when someone mentions this role."
+ )
+
+ def check(m: Message) -> bool:
+ """Checks that the message contains the role mention."""
+ return role in m.role_mentions
+
+ try:
+ msg = await self.bot.wait_for("message", check=check, timeout=Mention.message_timeout)
+ except TimeoutError:
+ await role.edit(mentionable=False, reason="Automatic role lock - timeout.")
+ await ctx.send(f"{ctx.author.mention}, you took too long. I have reset {role} to be unmentionable.")
+ return
+
+ if any(r.id in MODERATION_ROLES for r in msg.author.roles):
+ await sleep(Mention.reset_delay)
+ await role.edit(mentionable=False, reason=f"Automatic role lock by {msg.author}")
+ await ctx.send(
+ f"{ctx.author.mention}, I have reset {role} to be unmentionable as "
+ f"{msg.author if msg.author != ctx.author else 'you'} sent a message mentioning it."
+ )
+ return
+
+ await role.edit(mentionable=False, reason=f"Automatic role lock - unauthorised use by {msg.author}")
+ await ctx.send(
+ f"{ctx.author.mention}, I have reset {role} to be unmentionable "
+ f"as I detected unauthorised use by {msg.author} (ID: {msg.author.id})."
+ )
+
def setup(bot: Bot) -> None:
"""Utils cog load."""
diff --git a/bot/constants.py b/bot/constants.py
index 1deeaa3b8..f4f45eb2c 100644
--- a/bot/constants.py
+++ b/bot/constants.py
@@ -475,6 +475,13 @@ class Free(metaclass=YAMLGetter):
cooldown_per: float
+class Mention(metaclass=YAMLGetter):
+ section = 'mention'
+
+ message_timeout: int
+ reset_delay: int
+
+
class RedirectOutput(metaclass=YAMLGetter):
section = 'redirect_output'
diff --git a/bot/utils/checks.py b/bot/utils/checks.py
index 19f64ff9f..ad892e512 100644
--- a/bot/utils/checks.py
+++ b/bot/utils/checks.py
@@ -1,6 +1,8 @@
+import datetime
import logging
+from typing import Callable, Iterable
-from discord.ext.commands import Context
+from discord.ext.commands import BucketType, Cog, Command, CommandOnCooldown, Context, Cooldown, CooldownMapping
log = logging.getLogger(__name__)
@@ -42,3 +44,47 @@ def in_channel_check(ctx: Context, channel_id: int) -> bool:
log.trace(f"{ctx.author} tried to call the '{ctx.command.name}' command. "
f"The result of the in_channel check was {check}.")
return check
+
+
+def cooldown_with_role_bypass(rate: int, per: float, type: BucketType = BucketType.default, *,
+ bypass_roles: Iterable[int]) -> Callable:
+ """
+ Applies a cooldown to a command, but allows members with certain roles to be ignored.
+
+ NOTE: this replaces the `Command.before_invoke` callback, which *might* introduce problems in the future.
+ """
+ # make it a set so lookup is hash based
+ bypass = set(bypass_roles)
+
+ # this handles the actual cooldown logic
+ buckets = CooldownMapping(Cooldown(rate, per, type))
+
+ # will be called after the command has been parse but before it has been invoked, ensures that
+ # the cooldown won't be updated if the user screws up their input to the command
+ async def predicate(cog: Cog, ctx: Context) -> None:
+ nonlocal bypass, buckets
+
+ if any(role.id in bypass for role in ctx.author.roles):
+ return
+
+ # cooldown logic, taken from discord.py internals
+ current = ctx.message.created_at.replace(tzinfo=datetime.timezone.utc).timestamp()
+ bucket = buckets.get_bucket(ctx.message)
+ retry_after = bucket.update_rate_limit(current)
+ if retry_after:
+ raise CommandOnCooldown(bucket, retry_after)
+
+ def wrapper(command: Command) -> Command:
+ # NOTE: this could be changed if a subclass of Command were to be used. I didn't see the need for it
+ # so I just made it raise an error when the decorator is applied before the actual command object exists.
+ #
+ # if the `before_invoke` detail is ever a problem then I can quickly just swap over.
+ if not isinstance(command, Command):
+ raise TypeError('Decorator `cooldown_with_role_bypass` must be applied after the command decorator. '
+ 'This means it has to be above the command decorator in the code.')
+
+ command._before_invoke = predicate
+
+ return command
+
+ return wrapper
diff --git a/config-default.yml b/config-default.yml
index 0dac9bf9f..ca405337e 100644
--- a/config-default.yml
+++ b/config-default.yml
@@ -347,6 +347,10 @@ free:
cooldown_rate: 1
cooldown_per: 60.0
+mention:
+ message_timeout: 300
+ reset_delay: 5
+
redirect_output:
delete_invocation: true
delete_delay: 15