aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar Izan <[email protected]>2022-09-24 20:33:18 +0100
committerGravatar Izan <[email protected]>2022-09-24 20:33:18 +0100
commit0b0e5d21badb36d583101a5c83759054e77dc8b1 (patch)
tree154bce23caf180ee42efdeea377d01f405343169
parentMerge pull request #138 from python-discord/bump-d.py (diff)
Migrate all message utils from bot and sir-lancebot.
-rw-r--r--botcore/utils/messages.py330
1 files changed, 330 insertions, 0 deletions
diff --git a/botcore/utils/messages.py b/botcore/utils/messages.py
new file mode 100644
index 00000000..e9a7cc48
--- /dev/null
+++ b/botcore/utils/messages.py
@@ -0,0 +1,330 @@
+import asyncio
+import random
+import re
+from functools import partial
+from io import BytesIO
+from typing import Callable, Sequence
+
+import discord
+from discord.ext import commands
+
+from botcore.utils import scheduling
+from botcore.utils.logging import get_logger
+
+
+MODERATION_ROLES = (
+ 476190234653229056, # admin
+ 831924068852826163, # moderators
+ 476190357927886848, # moderation team
+ 476190391595433985 # owners
+)
+
+NEGATIVE_REPLIES = [
+ "Noooooo!!",
+ "Nope.",
+ "I'm sorry Dave, I'm afraid I can't do that.",
+ "I don't think so.",
+ "Not gonna happen.",
+ "Out of the question.",
+ "Huh? No.",
+ "Nah.",
+ "Naw.",
+ "Not likely.",
+ "No way, José.",
+ "Not in a million years.",
+ "Fat chance.",
+ "Certainly not.",
+ "NEGATORY.",
+ "Nuh-uh.",
+ "Not in my house!",
+]
+
+log = get_logger(__name__)
+
+
+def reaction_check(
+ reaction: discord.Reaction,
+ user: discord.abc.User,
+ *,
+ message_id: int,
+ allowed_emoji: Sequence[str],
+ allowed_users: Sequence[int],
+ allow_mods: bool = True,
+) -> bool:
+ """
+ Check if a reaction's emoji and author are allowed and the message is `message_id`.
+
+ If the user is not allowed, remove the reaction. Ignore reactions made by the bot.
+ If `allow_mods` is True, allow users with moderator roles even if they're not in `allowed_users`.
+ """
+ right_reaction = (
+ not user.bot
+ and reaction.message.id == message_id
+ and str(reaction.emoji) in allowed_emoji
+ )
+ if not right_reaction:
+ return False
+
+ is_moderator = (
+ allow_mods
+ and any(role.id in MODERATION_ROLES for role in getattr(user, "roles", []))
+ )
+
+ if user.id in allowed_users or is_moderator:
+ log.trace(f"Allowed reaction {reaction} by {user} on {reaction.message.id}.")
+ return True
+ else:
+ log.trace(f"Removing reaction {reaction} by {user} on {reaction.message.id}: disallowed user.")
+ scheduling.create_task(
+ reaction.message.remove_reaction(reaction.emoji, user),
+ suppressed_exceptions=(discord.HTTPException,),
+ name=f"remove_reaction-{reaction}-{reaction.message.id}-{user}"
+ )
+ return False
+
+
+async def wait_for_deletion(
+ bot: commands.Bot,
+ message: discord.Message,
+ user_ids: Sequence[int],
+ deletion_emojis: Sequence[str] = ("<:trashcan:675729438528503910>",),
+ timeout: float = 60 * 5,
+ attach_emojis: bool = True,
+ allow_mods: bool = True
+) -> None:
+ """
+ Wait for any of `user_ids` to react with one of the `deletion_emojis` within `timeout` seconds to delete `message`.
+
+ If `timeout` expires then reactions are cleared to indicate the option to delete has expired.
+
+ An `attach_emojis` bool may be specified to determine whether to attach the given
+ `deletion_emojis` to the message in the given `context`.
+ An `allow_mods` bool may also be specified to allow anyone with a role in `MODERATION_ROLES` to delete
+ the message.
+ """
+ if message.guild is None:
+ raise ValueError("Message must be sent on a guild")
+
+ if attach_emojis:
+ for emoji in deletion_emojis:
+ try:
+ await message.add_reaction(emoji)
+ except discord.NotFound:
+ log.trace(f"Aborting wait_for_deletion: message {message.id} deleted prematurely.")
+ return
+
+ check = partial(
+ reaction_check,
+ message_id=message.id,
+ allowed_emoji=deletion_emojis,
+ allowed_users=user_ids,
+ allow_mods=allow_mods,
+ )
+
+ try:
+ try:
+ await bot.wait_for('reaction_add', check=check, timeout=timeout)
+ except asyncio.TimeoutError:
+ await message.clear_reactions()
+ else:
+ await message.delete()
+ except discord.NotFound:
+ log.trace(f"wait_for_deletion: message {message.id} deleted prematurely.")
+
+
+async def send_attachments(
+ message: discord.Message,
+ destination: discord.TextChannel | discord.Webhook,
+ link_large: bool = True,
+ use_cached: bool = False,
+ **kwargs
+) -> list[str]:
+ """
+ Re-upload the message's attachments to the destination and return a list of their new URLs.
+
+ Each attachment is sent as a separate message to more easily comply with the request/file size
+ limit. If link_large is True, attachments which are too large are instead grouped into a single
+ embed which links to them. Extra kwargs will be passed to send() when sending the attachment.
+ """
+ webhook_send_kwargs = {
+ 'username': message.author.display_name,
+ 'avatar_url': message.author.display_avatar.url,
+ }
+ webhook_send_kwargs.update(kwargs)
+ webhook_send_kwargs['username'] = sub_clyde(webhook_send_kwargs['username'])
+
+ large = []
+ urls = []
+ for attachment in message.attachments:
+ failure_msg = (
+ f"Failed to re-upload attachment {attachment.filename} from message {message.id}"
+ )
+
+ try:
+ # Allow 512 bytes of leeway for the rest of the request.
+ # This should avoid most files that are too large,
+ # but some may get through hence the try-catch.
+ if attachment.size <= destination.guild.filesize_limit - 512:
+ with BytesIO() as file:
+ await attachment.save(file, use_cached=use_cached)
+ attachment_file = discord.File(file, filename=attachment.filename)
+
+ if isinstance(destination, discord.TextChannel):
+ msg = await destination.send(file=attachment_file, **kwargs)
+ urls.append(msg.attachments[0].url)
+ else:
+ await destination.send(file=attachment_file, **webhook_send_kwargs)
+ elif link_large:
+ large.append(attachment)
+ else:
+ log.info(f"{failure_msg} because it's too large.")
+ except discord.HTTPException as e:
+ if link_large and e.status == 413:
+ large.append(attachment)
+ else:
+ log.warning(f"{failure_msg} with status {e.status}.", exc_info=e)
+
+ if link_large and large:
+ desc = "\n".join(f"[{attachment.filename}]({attachment.url})" for attachment in large)
+ embed = discord.Embed(description=desc)
+ embed.set_footer(text="Attachments exceed upload size limit.")
+
+ if isinstance(destination, discord.TextChannel):
+ await destination.send(embed=embed, **kwargs)
+ else:
+ await destination.send(embed=embed, **webhook_send_kwargs)
+
+ return urls
+
+
+async def count_unique_users_reaction(
+ message: discord.Message,
+ reaction_predicate: Callable[[discord.Reaction], bool] = lambda _: True,
+ user_predicate: Callable[[discord.User], bool] = lambda _: True,
+ count_bots: bool = True
+) -> int:
+ """
+ Count the amount of unique users who reacted to the message.
+
+ A reaction_predicate function can be passed to check if this reaction should be counted,
+ another user_predicate to check if the user should also be counted along with a count_bot flag.
+ """
+ unique_users = set()
+
+ for reaction in message.reactions:
+ if reaction_predicate(reaction):
+ async for user in reaction.users():
+ if (count_bots or not user.bot) and user_predicate(user):
+ unique_users.add(user.id)
+
+ return len(unique_users)
+
+
+async def pin_no_system_message(message: discord.Message) -> bool:
+ """Pin the given message, wait a couple of seconds and try to delete the system message."""
+ await message.pin()
+
+ # Make sure that we give it enough time to deliver the message
+ await asyncio.sleep(2)
+ # Search for the system message in the last 10 messages
+ async for historical_message in message.channel.history(limit=10):
+ if historical_message.type == discord.MessageType.pins_add:
+ await historical_message.delete()
+ return True
+
+ return False
+
+
+async def send_denial(ctx: commands.Context, reason: str) -> discord.Message:
+ """Send an embed denying the user with the given reason."""
+ embed = discord.Embed()
+ embed.colour = discord.Colour.red()
+ embed.title = random.choice(NEGATIVE_REPLIES)
+ embed.description = reason
+
+ return await ctx.send(embed=embed)
+
+
+def format_user(user: discord.abc.User) -> str:
+ """Return a string for `user` which has their mention and ID."""
+ return f"{user.mention} (`{user.id}`)"
+
+
+def sub_clyde(username: str | None) -> str | None:
+ """
+ Replace "e"/"E" in any "clyde" in `username` with a Cyrillic "е"/"E" and return the new string.
+
+ Discord disallows "clyde" anywhere in the username for webhooks. It will return a 400.
+ Return None only if `username` is None.
+ """
+ def replace_e(match: re.Match) -> str:
+ char = "е" if match[2] == "e" else "Е"
+ return match[1] + char
+
+ if username:
+ return re.sub(r"(clyd)(e)", replace_e, username, flags=re.I)
+ else:
+ return username # Empty string or None
+
+
+async def get_discord_message(ctx: commands.Context, text: str) -> discord.Message | str:
+ """
+ Attempts to convert a given `text` to a discord Message object and return it.
+
+ Conversion will succeed if given a discord Message ID or link.
+ Returns `text` if the conversion fails.
+ """
+ try:
+ text = await commands.MessageConverter().convert(ctx, text)
+ except commands.BadArgument:
+ pass
+
+ return text
+
+
+async def get_text_and_embed(ctx: commands.Context, text: str) -> tuple[str, discord.Embed | None]:
+ """
+ Attempts to extract the text and embed from a possible link to a discord Message.
+
+ Does not retrieve the text and embed from the Message if it is in a channel the user does
+ not have read permissions in.
+
+ Returns a tuple of:
+ str: If `text` is a valid discord Message, the contents of the message, else `text`.
+ Embed | None: The embed if found in the valid Message, else `None`
+ """
+ embed: discord.Embed | None = None
+
+ msg = await get_discord_message(ctx, text)
+ # Ensure the user has read permissions for the channel the message is in
+ if isinstance(msg, discord.Message):
+ permissions = msg.channel.permissions_for(ctx.author)
+ if permissions.read_messages:
+ text = msg.clean_content
+ # Take first embed because we can't send multiple embeds
+ if msg.embeds:
+ embed = msg.embeds[0]
+
+ return text, embed
+
+
+def convert_embed(func: Callable[[str, ], str], embed: discord.Embed) -> discord.Embed:
+ """
+ Converts the text in an embed using a given conversion function, then return the embed.
+
+ Only modifies the following fields: title, description, footer, fields
+ """
+ embed_dict = embed.to_dict()
+
+ embed_dict["title"] = func(embed_dict.get("title", ""))
+ embed_dict["description"] = func(embed_dict.get("description", ""))
+
+ if "footer" in embed_dict:
+ embed_dict["footer"]["text"] = func(embed_dict["footer"].get("text", ""))
+
+ if "fields" in embed_dict:
+ for field in embed_dict["fields"]:
+ field["name"] = func(field.get("name", ""))
+ field["value"] = func(field.get("value", ""))
+
+ return discord.Embed.from_dict(embed_dict)