diff options
Diffstat (limited to 'bot/exts')
| -rw-r--r-- | bot/exts/evergreen/fun.py | 117 | ||||
| -rw-r--r-- | bot/exts/evergreen/issues.py | 113 | 
2 files changed, 182 insertions, 48 deletions
| diff --git a/bot/exts/evergreen/fun.py b/bot/exts/evergreen/fun.py index 5d4743e4..1668982b 100644 --- a/bot/exts/evergreen/fun.py +++ b/bot/exts/evergreen/fun.py @@ -1,14 +1,16 @@  import functools +import json  import logging  import random -from typing import Callable, Tuple, Union +from pathlib import Path +from typing import Callable, Iterable, Tuple, Union  from discord import Embed, Message  from discord.ext import commands  from discord.ext.commands import Bot, Cog, Context, MessageConverter, clean_content  from bot import utils -from bot.constants import Emojis +from bot.constants import Colours, Emojis  log = logging.getLogger(__name__) @@ -26,12 +28,35 @@ UWU_WORDS = {  } +def caesar_cipher(text: str, offset: int) -> Iterable[str]: +    """ +    Implements a lazy Caesar Cipher algorithm. + +    Encrypts a `text` given a specific integer `offset`. The sign +    of the `offset` dictates the direction in which it shifts to, +    with a negative value shifting to the left, and a positive +    value shifting to the right. +    """ +    for char in text: +        if not char.isascii() or not char.isalpha() or char.isspace(): +            yield char +            continue + +        case_start = 65 if char.isupper() else 97 +        true_offset = (ord(char) - case_start + offset) % 26 + +        yield chr(case_start + true_offset) + +  class Fun(Cog):      """A collection of general commands for fun."""      def __init__(self, bot: Bot) -> None:          self.bot = bot +        with Path("bot/resources/evergreen/caesar_info.json").open("r", encoding="UTF-8") as f: +            self._caesar_cipher_embed = json.load(f) +      @commands.command()      async def roll(self, ctx: Context, num_rolls: int = 1) -> None:          """Outputs a number of random dice emotes (up to 6).""" @@ -79,23 +104,99 @@ class Fun(Cog):              converted_text = f">>> {converted_text.lstrip('> ')}"          await ctx.send(content=converted_text, embed=embed) +    @commands.group(name="caesarcipher", aliases=("caesar", "cc",)) +    async def caesarcipher_group(self, ctx: Context) -> None: +        """ +        Translates a message using the Caesar Cipher. + +        See `decrypt`, `encrypt`, and `info` subcommands. +        """ +        if ctx.invoked_subcommand is None: +            await ctx.invoke(self.bot.get_command("help"), "caesarcipher") + +    @caesarcipher_group.command(name="info") +    async def caesarcipher_info(self, ctx: Context) -> None: +        """Information about the Caesar Cipher.""" +        embed = Embed.from_dict(self._caesar_cipher_embed) +        embed.colour = Colours.dark_green + +        await ctx.send(embed=embed) + +    @staticmethod +    async def _caesar_cipher(ctx: Context, offset: int, msg: str, left_shift: bool = False) -> None: +        """ +        Given a positive integer `offset`, translates and sends the given `msg`. + +        Performs a right shift by default unless `left_shift` is specified as `True`. + +        Also accepts a valid Discord Message ID or link. +        """ +        if offset < 0: +            await ctx.send(":no_entry: Cannot use a negative offset.") +            return + +        if left_shift: +            offset = -offset + +        def conversion_func(text: str) -> str: +            """Encrypts the given string using the Caesar Cipher.""" +            return "".join(caesar_cipher(text, offset)) + +        text, embed = await Fun._get_text_and_embed(ctx, msg) + +        if embed is not None: +            embed = Fun._convert_embed(conversion_func, embed) + +        converted_text = conversion_func(text) + +        if converted_text: +            converted_text = f">>> {converted_text.lstrip('> ')}" + +        await ctx.send(content=converted_text, embed=embed) + +    @caesarcipher_group.command(name="encrypt", aliases=("rightshift", "rshift", "enc",)) +    async def caesarcipher_encrypt(self, ctx: Context, offset: int, *, msg: str) -> None: +        """ +        Given a positive integer `offset`, encrypt the given `msg`. + +        Performs a right shift of the letters in the message. + +        Also accepts a valid Discord Message ID or link. +        """ +        await self._caesar_cipher(ctx, offset, msg, left_shift=False) + +    @caesarcipher_group.command(name="decrypt", aliases=("leftshift", "lshift", "dec",)) +    async def caesarcipher_decrypt(self, ctx: Context, offset: int, *, msg: str) -> None: +        """ +        Given a positive integer `offset`, decrypt the given `msg`. + +        Performs a left shift of the letters in the message. + +        Also accepts a valid Discord Message ID or link. +        """ +        await self._caesar_cipher(ctx, offset, msg, left_shift=True) +      @staticmethod      async def _get_text_and_embed(ctx: Context, text: str) -> Tuple[str, Union[Embed, None]]:          """          Attempts to extract the text and embed from a possible link to a discord Message. +        Does not retrieve the text and embed from the Message if it is in a channel the user does +        not have read permissions in. +          Returns a tuple of:              str: If `text` is a valid discord Message, the contents of the message, else `text`.              Union[Embed, None]: The embed if found in the valid Message, else None          """          embed = None -        # message = await Fun._get_discord_message(ctx, text) -        # if isinstance(message, Message): -        #     text = message.content -        #     # Take first embed because we can't send multiple embeds -        #     if message.embeds: -        #         embed = message.embeds[0] +        msg = await Fun._get_discord_message(ctx, text) +        # Ensure the user has read permissions for the channel the message is in +        if isinstance(msg, Message) and ctx.author.permissions_in(msg.channel).read_messages: +            text = msg.content +            # Take first embed because we can't send multiple embeds +            if msg.embeds: +                embed = msg.embeds[0]          return (text, embed) diff --git a/bot/exts/evergreen/issues.py b/bot/exts/evergreen/issues.py index 0f83731b..5a5c82e7 100644 --- a/bot/exts/evergreen/issues.py +++ b/bot/exts/evergreen/issues.py @@ -1,9 +1,10 @@  import logging +import random  import discord  from discord.ext import commands -from bot.constants import Channels, Colours, Emojis, WHITELISTED_CHANNELS +from bot.constants import Channels, Colours, ERROR_REPLIES, Emojis, Tokens, WHITELISTED_CHANNELS  from bot.utils.decorators import override_in_channel  log = logging.getLogger(__name__) @@ -13,6 +14,12 @@ BAD_RESPONSE = {      403: "Rate limit has been hit! Please try again later!"  } +MAX_REQUESTS = 10 + +REQUEST_HEADERS = dict() +if GITHUB_TOKEN := Tokens.github: +    REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}" +  class Issues(commands.Cog):      """Cog that allows users to retrieve issues from GitHub.""" @@ -21,53 +28,79 @@ class Issues(commands.Cog):          self.bot = bot      @commands.command(aliases=("pr",)) -    @override_in_channel(WHITELISTED_CHANNELS + (Channels.dev_contrib,)) +    @override_in_channel(WHITELISTED_CHANNELS + (Channels.dev_contrib, Channels.dev_branding))      async def issue( -        self, ctx: commands.Context, number: int, repository: str = "seasonalbot", user: str = "python-discord" +        self, +        ctx: commands.Context, +        numbers: commands.Greedy[int], +        repository: str = "seasonalbot", +        user: str = "python-discord"      ) -> None: -        """Command to retrieve issues from a GitHub repository.""" -        url = f"https://api.github.com/repos/{user}/{repository}/issues/{number}" -        merge_url = f"https://api.github.com/repos/{user}/{repository}/pulls/{number}/merge" - -        log.trace(f"Querying GH issues API: {url}") -        async with self.bot.http_session.get(url) as r: -            json_data = await r.json() - -        if r.status in BAD_RESPONSE: -            log.warning(f"Received response {r.status} from: {url}") -            return await ctx.send(f"[{str(r.status)}] {BAD_RESPONSE.get(r.status)}") - -        # The initial API request is made to the issues API endpoint, which will return information -        # if the issue or PR is present. However, the scope of information returned for PRs differs -        # from issues: if the 'issues' key is present in the response then we can pull the data we -        # need from the initial API call. -        if "issues" in json_data.get("html_url"): -            if json_data.get("state") == "open": -                icon_url = Emojis.issue -            else: -                icon_url = Emojis.issue_closed - -        # If the 'issues' key is not contained in the API response and there is no error code, then -        # we know that a PR has been requested and a call to the pulls API endpoint is necessary -        # to get the desired information for the PR. -        else: -            log.trace(f"PR provided, querying GH pulls API for additional information: {merge_url}") -            async with self.bot.http_session.get(merge_url) as m: +        """Command to retrieve issue(s) from a GitHub repository.""" +        links = [] +        numbers = set(numbers) + +        if not numbers: +            await ctx.invoke(self.bot.get_command('help'), 'issue') +            return + +        if len(numbers) > MAX_REQUESTS: +            embed = discord.Embed( +                title=random.choice(ERROR_REPLIES), +                color=Colours.soft_red, +                description=f"Too many issues/PRs! (maximum of {MAX_REQUESTS})" +            ) +            await ctx.send(embed=embed) +            return + +        for number in set(numbers): +            # Convert from list to set to remove duplicates, if any. +            url = f"https://api.github.com/repos/{user}/{repository}/issues/{number}" +            merge_url = f"https://api.github.com/repos/{user}/{repository}/pulls/{number}/merge" + +            log.trace(f"Querying GH issues API: {url}") +            async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r: +                json_data = await r.json() + +            if r.status in BAD_RESPONSE: +                log.warning(f"Received response {r.status} from: {url}") +                return await ctx.send(f"[{str(r.status)}] #{number} {BAD_RESPONSE.get(r.status)}") + +            # The initial API request is made to the issues API endpoint, which will return information +            # if the issue or PR is present. However, the scope of information returned for PRs differs +            # from issues: if the 'issues' key is present in the response then we can pull the data we +            # need from the initial API call. +            if "issues" in json_data.get("html_url"):                  if json_data.get("state") == "open": -                    icon_url = Emojis.pull_request -                # When the status is 204 this means that the state of the PR is merged -                elif m.status == 204: -                    icon_url = Emojis.merge +                    icon_url = Emojis.issue                  else: -                    icon_url = Emojis.pull_request_closed +                    icon_url = Emojis.issue_closed + +            # If the 'issues' key is not contained in the API response and there is no error code, then +            # we know that a PR has been requested and a call to the pulls API endpoint is necessary +            # to get the desired information for the PR. +            else: +                log.trace(f"PR provided, querying GH pulls API for additional information: {merge_url}") +                async with self.bot.http_session.get(merge_url) as m: +                    if json_data.get("state") == "open": +                        icon_url = Emojis.pull_request +                    # When the status is 204 this means that the state of the PR is merged +                    elif m.status == 204: +                        icon_url = Emojis.merge +                    else: +                        icon_url = Emojis.pull_request_closed + +            issue_url = json_data.get("html_url") +            links.append([icon_url, f"[{repository}] #{number} {json_data.get('title')}", issue_url]) -        issue_url = json_data.get("html_url") -        description_text = f"[{repository}] #{number} {json_data.get('title')}" +        # Issue/PR format: emoji to show if open/closed/merged, number and the title as a singular link. +        description_list = ["{0} [{1}]({2})".format(*link) for link in links]          resp = discord.Embed(              colour=Colours.bright_green, -            description=f"{icon_url} [{description_text}]({issue_url})" +            description='\n'.join(description_list)          ) -        resp.set_author(name="GitHub", url=issue_url) + +        resp.set_author(name="GitHub", url=f"https://github.com/{user}/{repository}")          await ctx.send(embed=resp) | 
