diff options
| author | 2020-11-30 15:18:31 +0100 | |
|---|---|---|
| committer | 2020-11-30 15:18:31 +0100 | |
| commit | 04d9cf9583c9e54bb4a45f867e02df9da1bbc357 (patch) | |
| tree | 3910b05cdcfa86253b941a1d1c3ff810f35d827e /bot/exts | |
| parent | Set precision to hours (diff) | |
| parent | Merge pull request #532 from python-discord/sebastiaan/ci/add-core-dev-approv... (diff) | |
Merge branch 'master' into master
Diffstat (limited to 'bot/exts')
41 files changed, 1506 insertions, 1086 deletions
| diff --git a/bot/exts/__init__.py b/bot/exts/__init__.py index 25deb9af..13f484ac 100644 --- a/bot/exts/__init__.py +++ b/bot/exts/__init__.py @@ -1,9 +1,8 @@  import logging  import pkgutil -from pathlib import Path  from typing import Iterator -__all__ = ("get_package_names", "walk_extensions") +__all__ = ("get_package_names",)  log = logging.getLogger(__name__) @@ -13,23 +12,3 @@ def get_package_names() -> Iterator[str]:      for package in pkgutil.iter_modules(__path__):          if package.ispkg:              yield package.name - - -def walk_extensions() -> Iterator[str]: -    """ -    Iterate dot-separated paths to all extensions. - -    The strings are formatted in a way such that the bot's `load_extension` -    method can take them. Use this to load all available extensions. - -    This intentionally doesn't make use of pkgutil's `walk_packages`, as we only -    want to build paths to extensions - not recursively all modules. For some -    extensions, the `setup` function is in the package's __init__ file, while -    modules nested under the package are only helpers. Constructing the paths -    ourselves serves our purpose better. -    """ -    base_path = Path(__path__[0]) - -    for package in get_package_names(): -        for extension in pkgutil.iter_modules([base_path.joinpath(package)]): -            yield f"bot.exts.{package}.{extension.name}" diff --git a/bot/exts/easter/easter_riddle.py b/bot/exts/easter/easter_riddle.py index 8977534f..3c612eb1 100644 --- a/bot/exts/easter/easter_riddle.py +++ b/bot/exts/easter/easter_riddle.py @@ -22,7 +22,7 @@ class EasterRiddle(commands.Cog):      def __init__(self, bot: commands.Bot):          self.bot = bot -        self.winners = [] +        self.winners = set()          self.correct = ""          self.current_channel = None @@ -79,7 +79,7 @@ class EasterRiddle(commands.Cog):          await ctx.send(content, embed=answer_embed) -        self.winners = [] +        self.winners.clear()          self.current_channel = None      @commands.Cog.listener() @@ -92,7 +92,7 @@ class EasterRiddle(commands.Cog):              return          if message.content.lower() == self.correct.lower(): -            self.winners.append(message.author.mention) +            self.winners.add(message.author.mention)  def setup(bot: commands.Bot) -> None: diff --git a/bot/exts/easter/egg_facts.py b/bot/exts/easter/egg_facts.py index 0051aa50..761e9059 100644 --- a/bot/exts/easter/egg_facts.py +++ b/bot/exts/easter/egg_facts.py @@ -6,7 +6,7 @@ from pathlib import Path  import discord  from discord.ext import commands -from bot.bot import SeasonalBot +from bot.bot import Bot  from bot.constants import Channels, Colours, Month  from bot.utils.decorators import seasonal_task @@ -20,7 +20,7 @@ class EasterFacts(commands.Cog):      It also contains a background task which sends an easter egg fact in the event channel everyday.      """ -    def __init__(self, bot: SeasonalBot): +    def __init__(self, bot: Bot):          self.bot = bot          self.facts = self.load_json() @@ -38,7 +38,7 @@ class EasterFacts(commands.Cog):          """A background task that sends an easter egg fact in the event channel everyday."""          await self.bot.wait_until_guild_available() -        channel = self.bot.get_channel(Channels.seasonalbot_commands) +        channel = self.bot.get_channel(Channels.community_bot_commands)          await channel.send(embed=self.make_embed())      @commands.command(name='eggfact', aliases=['fact']) @@ -56,6 +56,6 @@ class EasterFacts(commands.Cog):          ) -def setup(bot: SeasonalBot) -> None: +def setup(bot: Bot) -> None:      """Easter Egg facts cog load."""      bot.add_cog(EasterFacts(bot)) diff --git a/bot/exts/easter/save_the_planet.py b/bot/exts/easter/save_the_planet.py new file mode 100644 index 00000000..8f644259 --- /dev/null +++ b/bot/exts/easter/save_the_planet.py @@ -0,0 +1,29 @@ +import json +from pathlib import Path + +from discord import Embed +from discord.ext import commands + +from bot.utils.randomization import RandomCycle + + +with Path("bot/resources/easter/save_the_planet.json").open('r', encoding='utf8') as f: +    EMBED_DATA = RandomCycle(json.load(f)) + + +class SaveThePlanet(commands.Cog): +    """A cog that teaches users how they can help our planet.""" + +    def __init__(self, bot: commands.Bot) -> None: +        self.bot = bot + +    @commands.command(aliases=('savetheearth', 'saveplanet', 'saveearth')) +    async def savetheplanet(self, ctx: commands.Context) -> None: +        """Responds with a random tip on how to be eco-friendly and help our planet.""" +        return_embed = Embed.from_dict(next(EMBED_DATA)) +        await ctx.send(embed=return_embed) + + +def setup(bot: commands.Bot) -> None: +    """Save the Planet Cog load.""" +    bot.add_cog(SaveThePlanet(bot)) diff --git a/bot/exts/evergreen/8bitify.py b/bot/exts/evergreen/8bitify.py index 60062fc1..c048d9bf 100644 --- a/bot/exts/evergreen/8bitify.py +++ b/bot/exts/evergreen/8bitify.py @@ -14,7 +14,7 @@ class EightBitify(commands.Cog):      @staticmethod      def pixelate(image: Image) -> Image:          """Takes an image and pixelates it.""" -        return image.resize((32, 32)).resize((1024, 1024)) +        return image.resize((32, 32), resample=Image.NEAREST).resize((1024, 1024), resample=Image.NEAREST)      @staticmethod      def quantize(image: Image) -> Image: diff --git a/bot/exts/evergreen/bookmark.py b/bot/exts/evergreen/bookmark.py index 73908702..5fa05d2e 100644 --- a/bot/exts/evergreen/bookmark.py +++ b/bot/exts/evergreen/bookmark.py @@ -5,6 +5,7 @@ import discord  from discord.ext import commands  from bot.constants import Colours, ERROR_REPLIES, Emojis, Icons +from bot.utils.converters import WrappedMessageConverter  log = logging.getLogger(__name__) @@ -19,7 +20,7 @@ class Bookmark(commands.Cog):      async def bookmark(          self,          ctx: commands.Context, -        target_message: discord.Message, +        target_message: WrappedMessageConverter,          *,          title: str = "Bookmark"      ) -> None: diff --git a/bot/exts/evergreen/branding.py b/bot/exts/evergreen/branding.py deleted file mode 100644 index 7e531011..00000000 --- a/bot/exts/evergreen/branding.py +++ /dev/null @@ -1,543 +0,0 @@ -import asyncio -import itertools -import json -import logging -import random -import typing as t -from datetime import datetime, time, timedelta -from pathlib import Path - -import arrow -import discord -from discord.embeds import EmptyEmbed -from discord.ext import commands - -from bot.bot import SeasonalBot -from bot.constants import Branding, Colours, Emojis, MODERATION_ROLES, Tokens -from bot.seasons import SeasonBase, get_all_seasons, get_current_season, get_season -from bot.utils import human_months -from bot.utils.decorators import with_role -from bot.utils.exceptions import BrandingError -from bot.utils.persist import make_persistent - -log = logging.getLogger(__name__) - -STATUS_OK = 200  # HTTP status code - -FILE_BANNER = "banner.png" -FILE_AVATAR = "avatar.png" -SERVER_ICONS = "server_icons" - -BRANDING_URL = "https://api.github.com/repos/python-discord/branding/contents" - -PARAMS = {"ref": "master"}  # Target branch -HEADERS = {"Accept": "application/vnd.github.v3+json"}  # Ensure we use API v3 - -# A GitHub token is not necessary for the cog to operate, -# unauthorized requests are however limited to 60 per hour -if Tokens.github: -    HEADERS["Authorization"] = f"token {Tokens.github}" - - -class GitHubFile(t.NamedTuple): -    """ -    Represents a remote file on GitHub. - -    The `sha` hash is kept so that we can determine that a file has changed, -    despite its filename remaining unchanged. -    """ - -    download_url: str -    path: str -    sha: str - - -def pretty_files(files: t.Iterable[GitHubFile]) -> str: -    """Provide a human-friendly representation of `files`.""" -    return "\n".join(file.path for file in files) - - -def time_until_midnight() -> timedelta: -    """ -    Determine amount of time until the next-up UTC midnight. - -    The exact `midnight` moment is actually delayed to 5 seconds after, in order -    to avoid potential problems due to imprecise sleep. -    """ -    now = datetime.utcnow() -    tomorrow = now + timedelta(days=1) -    midnight = datetime.combine(tomorrow, time(second=5)) - -    return midnight - now - - -class BrandingManager(commands.Cog): -    """ -    Manages the guild's branding. - -    The purpose of this cog is to help automate the synchronization of the branding -    repository with the guild. It is capable of discovering assets in the repository -    via GitHub's API, resolving download urls for them, and delegating -    to the `bot` instance to upload them to the guild. - -    BrandingManager is designed to be entirely autonomous. Its `daemon` background task awakens -    once a day (see `time_until_midnight`) to detect new seasons, or to cycle icons within a single -    season. The daemon can be turned on and off via the `daemon` cmd group. The value set via -    its `start` and `stop` commands is persisted across sessions. If turned on, the daemon will -    automatically start on the next bot start-up. Otherwise, it will wait to be started manually. - -    All supported operations, e.g. setting seasons, applying the branding, or cycling icons, can -    also be invoked manually, via the following API: - -        branding list -            - Show all available seasons - -        branding set <season_name> -            - Set the cog's internal state to represent `season_name`, if it exists -            - If no `season_name` is given, set chronologically current season -            - This will not automatically apply the season's branding to the guild, -              the cog's state can be detached from the guild -            - Seasons can therefore be 'previewed' using this command - -        branding info -            - View detailed information about resolved assets for current season - -        branding refresh -            - Refresh internal state, i.e. synchronize with branding repository - -        branding apply -            - Apply the current internal state to the guild, i.e. upload the assets - -        branding cycle -            - If there are multiple available icons for current season, randomly pick -              and apply the next one - -    The daemon calls these methods autonomously as appropriate. The use of this cog -    is locked to moderation roles. As it performs media asset uploads, it is prone to -    rate-limits - the `apply` command should be used with caution. The `set` command can, -    however, be used freely to 'preview' seasonal branding and check whether paths have been -    resolved as appropriate. - -    While the bot is in debug mode, it will 'mock' asset uploads by logging the passed -    download urls and pretending that the upload was successful. Make use of this -    to test this cog's behaviour. -    """ - -    current_season: t.Type[SeasonBase] - -    banner: t.Optional[GitHubFile] -    avatar: t.Optional[GitHubFile] - -    available_icons: t.List[GitHubFile] -    remaining_icons: t.List[GitHubFile] - -    days_since_cycle: t.Iterator - -    config_file: Path - -    daemon: t.Optional[asyncio.Task] - -    def __init__(self, bot: SeasonalBot) -> None: -        """ -        Assign safe default values on init. - -        At this point, we don't have information about currently available branding. -        Most of these attributes will be overwritten once the daemon connects, or once -        the `refresh` command is used. -        """ -        self.bot = bot -        self.current_season = get_current_season() - -        self.banner = None -        self.avatar = None - -        self.available_icons = [] -        self.remaining_icons = [] - -        self.days_since_cycle = itertools.cycle([None]) - -        self.config_file = make_persistent(Path("bot", "resources", "evergreen", "branding.json")) -        should_run = self._read_config()["daemon_active"] - -        if should_run: -            self.daemon = self.bot.loop.create_task(self._daemon_func()) -        else: -            self.daemon = None - -    @property -    def _daemon_running(self) -> bool: -        """True if the daemon is currently active, False otherwise.""" -        return self.daemon is not None and not self.daemon.done() - -    def _read_config(self) -> t.Dict[str, bool]: -        """Read and return persistent config file.""" -        with self.config_file.open("r", encoding="utf8") as persistent_file: -            return json.load(persistent_file) - -    def _write_config(self, key: str, value: bool) -> None: -        """Write a `key`, `value` pair to persistent config file.""" -        current_config = self._read_config() -        current_config[key] = value - -        with self.config_file.open("w", encoding="utf8") as persistent_file: -            json.dump(current_config, persistent_file) - -    async def _daemon_func(self) -> None: -        """ -        Manage all automated behaviour of the BrandingManager cog. - -        Once a day, the daemon will perform the following tasks: -            - Update `current_season` -            - Poll GitHub API to see if the available branding for `current_season` has changed -            - Update assets if changes are detected (banner, guild icon, bot avatar, bot nickname) -            - Check whether it's time to cycle guild icons - -        The internal loop runs once when activated, then periodically at the time -        given by `time_until_midnight`. - -        All method calls in the internal loop are considered safe, i.e. no errors propagate -        to the daemon's loop. The daemon itself does not perform any error handling on its own. -        """ -        await self.bot.wait_until_guild_available() - -        while True: -            self.current_season = get_current_season() -            branding_changed = await self.refresh() - -            if branding_changed: -                await self.apply() - -            elif next(self.days_since_cycle) == Branding.cycle_frequency: -                await self.cycle() - -            until_midnight = time_until_midnight() -            await asyncio.sleep(until_midnight.total_seconds()) - -    async def _info_embed(self) -> discord.Embed: -        """Make an informative embed representing current season.""" -        info_embed = discord.Embed(description=self.current_season.description, colour=self.current_season.colour) - -        # If we're in a non-evergreen season, also show active months -        if self.current_season is not SeasonBase: -            title = f"{self.current_season.season_name} ({human_months(self.current_season.months)})" -        else: -            title = self.current_season.season_name - -        # Use the author field to show the season's name and avatar if available -        info_embed.set_author(name=title, icon_url=self.avatar.download_url if self.avatar else EmptyEmbed) - -        banner = self.banner.path if self.banner is not None else "Unavailable" -        info_embed.add_field(name="Banner", value=banner, inline=False) - -        avatar = self.avatar.path if self.avatar is not None else "Unavailable" -        info_embed.add_field(name="Avatar", value=avatar, inline=False) - -        icons = pretty_files(self.available_icons) or "Unavailable" -        info_embed.add_field(name="Available icons", value=icons, inline=False) - -        # Only display cycle frequency if we're actually cycling -        if len(self.available_icons) > 1 and Branding.cycle_frequency: -            info_embed.set_footer(text=f"Icon cycle frequency: {Branding.cycle_frequency}") - -        return info_embed - -    async def _reset_remaining_icons(self) -> None: -        """Set `remaining_icons` to a shuffled copy of `available_icons`.""" -        self.remaining_icons = random.sample(self.available_icons, k=len(self.available_icons)) - -    async def _reset_days_since_cycle(self) -> None: -        """ -        Reset the `days_since_cycle` iterator based on configured frequency. - -        If the current season only has 1 icon, or if `Branding.cycle_frequency` is falsey, -        the iterator will always yield None. This signals that the icon shouldn't be cycled. - -        Otherwise, it will yield ints in range [1, `Branding.cycle_frequency`] indefinitely. -        When the iterator yields a value equal to `Branding.cycle_frequency`, it is time to cycle. -        """ -        if len(self.available_icons) > 1 and Branding.cycle_frequency: -            sequence = range(1, Branding.cycle_frequency + 1) -        else: -            sequence = [None] - -        self.days_since_cycle = itertools.cycle(sequence) - -    async def _get_files(self, path: str, include_dirs: bool = False) -> t.Dict[str, GitHubFile]: -        """ -        Get files at `path` in the branding repository. - -        If `include_dirs` is False (default), only returns files at `path`. -        Otherwise, will return both files and directories. Never returns symlinks. - -        Return dict mapping from filename to corresponding `GitHubFile` instance. -        This may return an empty dict if the response status is non-200, -        or if the target directory is empty. -        """ -        url = f"{BRANDING_URL}/{path}" -        async with self.bot.http_session.get(url, headers=HEADERS, params=PARAMS) as resp: -            # Short-circuit if we get non-200 response -            if resp.status != STATUS_OK: -                log.error(f"GitHub API returned non-200 response: {resp}") -                return {} -            directory = await resp.json()  # Directory at `path` - -        allowed_types = {"file", "dir"} if include_dirs else {"file"} -        return { -            file["name"]: GitHubFile(file["download_url"], file["path"], file["sha"]) -            for file in directory -            if file["type"] in allowed_types -        } - -    async def refresh(self) -> bool: -        """ -        Synchronize available assets with branding repository. - -        If the current season is not the evergreen, and lacks at least one asset, -        we use the evergreen seasonal dir as fallback for missing assets. - -        Finally, if neither the seasonal nor fallback branding directories contain -        an asset, it will simply be ignored. - -        Return True if the branding has changed. This will be the case when we enter -        a new season, or when something changes in the current seasons's directory -        in the branding repository. -        """ -        old_branding = (self.banner, self.avatar, self.available_icons) -        seasonal_dir = await self._get_files(self.current_season.branding_path, include_dirs=True) - -        # Only make a call to the fallback directory if there is something to be gained -        branding_incomplete = any( -            asset not in seasonal_dir -            for asset in (FILE_BANNER, FILE_AVATAR, SERVER_ICONS) -        ) -        if branding_incomplete and self.current_season is not SeasonBase: -            fallback_dir = await self._get_files(SeasonBase.branding_path, include_dirs=True) -        else: -            fallback_dir = {} - -        # Resolve assets in this directory, None is a safe value -        self.banner = seasonal_dir.get(FILE_BANNER) or fallback_dir.get(FILE_BANNER) -        self.avatar = seasonal_dir.get(FILE_AVATAR) or fallback_dir.get(FILE_AVATAR) - -        # Now resolve server icons by making a call to the proper sub-directory -        if SERVER_ICONS in seasonal_dir: -            icons_dir = await self._get_files(f"{self.current_season.branding_path}/{SERVER_ICONS}") -            self.available_icons = list(icons_dir.values()) - -        elif SERVER_ICONS in fallback_dir: -            icons_dir = await self._get_files(f"{SeasonBase.branding_path}/{SERVER_ICONS}") -            self.available_icons = list(icons_dir.values()) - -        else: -            self.available_icons = []  # This should never be the case, but an empty list is a safe value - -        # GitHubFile instances carry a `sha` attr so this will pick up if a file changes -        branding_changed = old_branding != (self.banner, self.avatar, self.available_icons) - -        if branding_changed: -            log.info(f"New branding detected (season: {self.current_season.season_name})") -            await self._reset_remaining_icons() -            await self._reset_days_since_cycle() - -        return branding_changed - -    async def cycle(self) -> bool: -        """ -        Apply the next-up server icon. - -        Returns True if an icon is available and successfully gets applied, False otherwise. -        """ -        if not self.available_icons: -            log.info("Cannot cycle: no icons for this season") -            return False - -        if not self.remaining_icons: -            log.info("Reset & shuffle remaining icons") -            await self._reset_remaining_icons() - -        next_up = self.remaining_icons.pop(0) -        success = await self.bot.set_icon(next_up.download_url) - -        return success - -    async def apply(self) -> t.List[str]: -        """ -        Apply current branding to the guild and bot. - -        This delegates to the bot instance to do all the work. We only provide download urls -        for available assets. Assets unavailable in the branding repo will be ignored. - -        Returns a list of names of all failed assets. An asset is considered failed -        if it isn't found in the branding repo, or if something goes wrong while the -        bot is trying to apply it. - -        An empty list denotes that all assets have been applied successfully. -        """ -        report = {asset: False for asset in ("banner", "avatar", "nickname", "icon")} - -        if self.banner is not None: -            report["banner"] = await self.bot.set_banner(self.banner.download_url) - -        if self.avatar is not None: -            report["avatar"] = await self.bot.set_avatar(self.avatar.download_url) - -        if self.current_season.bot_name: -            report["nickname"] = await self.bot.set_nickname(self.current_season.bot_name) - -        report["icon"] = await self.cycle() - -        failed_assets = [asset for asset, succeeded in report.items() if not succeeded] -        return failed_assets - -    @with_role(*MODERATION_ROLES) -    @commands.group(name="branding") -    async def branding_cmds(self, ctx: commands.Context) -> None: -        """Manual branding control.""" -        if not ctx.invoked_subcommand: -            await ctx.send_help(ctx.command) - -    @branding_cmds.command(name="list", aliases=["ls"]) -    async def branding_list(self, ctx: commands.Context) -> None: -        """List all available seasons and branding sources.""" -        embed = discord.Embed(title="Available seasons", colour=Colours.soft_green) - -        for season in get_all_seasons(): -            if season is SeasonBase: -                active_when = "always" -            else: -                active_when = f"in {human_months(season.months)}" - -            description = ( -                f"Active {active_when}\n" -                f"Branding: {season.branding_path}" -            ) -            embed.add_field(name=season.season_name, value=description, inline=False) - -        await ctx.send(embed=embed) - -    @branding_cmds.command(name="set") -    async def branding_set(self, ctx: commands.Context, *, season_name: t.Optional[str] = None) -> None: -        """ -        Manually set season, or reset to current if none given. - -        Season search is a case-less comparison against both seasonal class name, -        and its `season_name` attr. - -        This only pre-loads the cog's internal state to the chosen season, but does not -        automatically apply the branding. As that is an expensive operation, the `apply` -        command must be called explicitly after this command finishes. - -        This means that this command can be used to 'preview' a season gathering info -        about its available assets, without applying them to the guild. - -        If the daemon is running, it will automatically reset the season to current when -        it wakes up. The season set via this command can therefore remain 'detached' from -        what it should be - the daemon will make sure that it's set back properly. -        """ -        if season_name is None: -            new_season = get_current_season() -        else: -            new_season = get_season(season_name) -            if new_season is None: -                raise BrandingError("No such season exists") - -        if self.current_season is new_season: -            raise BrandingError(f"Season {self.current_season.season_name} already active") - -        self.current_season = new_season -        await self.branding_refresh(ctx) - -    @branding_cmds.command(name="info", aliases=["status"]) -    async def branding_info(self, ctx: commands.Context) -> None: -        """ -        Show available assets for current season. - -        This can be used to confirm that assets have been resolved properly. -        When `apply` is used, it attempts to upload exactly the assets listed here. -        """ -        await ctx.send(embed=await self._info_embed()) - -    @branding_cmds.command(name="refresh") -    async def branding_refresh(self, ctx: commands.Context) -> None: -        """Sync currently available assets with branding repository.""" -        async with ctx.typing(): -            await self.refresh() -            await self.branding_info(ctx) - -    @branding_cmds.command(name="apply") -    async def branding_apply(self, ctx: commands.Context) -> None: -        """ -        Apply current season's branding to the guild. - -        Use `info` to check which assets will be applied. Shows which assets have -        failed to be applied, if any. -        """ -        async with ctx.typing(): -            failed_assets = await self.apply() -            if failed_assets: -                raise BrandingError(f"Failed to apply following assets: {', '.join(failed_assets)}") - -            response = discord.Embed(description=f"All assets applied {Emojis.ok_hand}", colour=Colours.soft_green) -            await ctx.send(embed=response) - -    @branding_cmds.command(name="cycle") -    async def branding_cycle(self, ctx: commands.Context) -> None: -        """ -        Apply the next-up guild icon, if multiple are available. - -        The order is random. -        """ -        async with ctx.typing(): -            success = await self.cycle() -            if not success: -                raise BrandingError("Failed to cycle icon") - -            response = discord.Embed(description=f"Success {Emojis.ok_hand}", colour=Colours.soft_green) -            await ctx.send(embed=response) - -    @branding_cmds.group(name="daemon", aliases=["d", "task"]) -    async def daemon_group(self, ctx: commands.Context) -> None: -        """Control the background daemon.""" -        if not ctx.invoked_subcommand: -            await ctx.send_help(ctx.command) - -    @daemon_group.command(name="status") -    async def daemon_status(self, ctx: commands.Context) -> None: -        """Check whether daemon is currently active.""" -        if self._daemon_running: -            remaining_time = (arrow.utcnow() + time_until_midnight()).humanize() -            response = discord.Embed(description=f"Daemon running {Emojis.ok_hand}", colour=Colours.soft_green) -            response.set_footer(text=f"Next refresh {remaining_time}") -        else: -            response = discord.Embed(description="Daemon not running", colour=Colours.soft_red) - -        await ctx.send(embed=response) - -    @daemon_group.command(name="start") -    async def daemon_start(self, ctx: commands.Context) -> None: -        """If the daemon isn't running, start it.""" -        if self._daemon_running: -            raise BrandingError("Daemon already running!") - -        self.daemon = self.bot.loop.create_task(self._daemon_func()) -        self._write_config("daemon_active", True) - -        response = discord.Embed(description=f"Daemon started {Emojis.ok_hand}", colour=Colours.soft_green) -        await ctx.send(embed=response) - -    @daemon_group.command(name="stop") -    async def daemon_stop(self, ctx: commands.Context) -> None: -        """If the daemon is running, stop it.""" -        if not self._daemon_running: -            raise BrandingError("Daemon not running!") - -        self.daemon.cancel() -        self._write_config("daemon_active", False) - -        response = discord.Embed(description=f"Daemon stopped {Emojis.ok_hand}", colour=Colours.soft_green) -        await ctx.send(embed=response) - - -def setup(bot: SeasonalBot) -> None: -    """Load BrandingManager cog.""" -    bot.add_cog(BrandingManager(bot)) diff --git a/bot/exts/evergreen/emoji_count.py b/bot/exts/evergreen/emoji_count.py new file mode 100644 index 00000000..cc43e9ab --- /dev/null +++ b/bot/exts/evergreen/emoji_count.py @@ -0,0 +1,97 @@ +import datetime +import logging +import random +from collections import defaultdict +from typing import List, Tuple + +import discord +from discord.ext import commands + +from bot.constants import Colours, ERROR_REPLIES +from bot.utils.pagination import LinePaginator + +log = logging.getLogger(__name__) + + +class EmojiCount(commands.Cog): +    """Command that give random emoji based on category.""" + +    def __init__(self, bot: commands.Bot): +        self.bot = bot + +    @staticmethod +    def embed_builder(emoji: dict) -> Tuple[discord.Embed, List[str]]: +        """Generates an embed with the emoji names and count.""" +        embed = discord.Embed( +            color=Colours.orange, +            title="Emoji Count", +            timestamp=datetime.datetime.utcnow() +        ) +        msg = [] + +        if len(emoji) == 1: +            for category_name, category_emojis in emoji.items(): +                if len(category_emojis) == 1: +                    msg.append(f"There is **{len(category_emojis)}** emoji in **{category_name}** category") +                else: +                    msg.append(f"There are **{len(category_emojis)}** emojis in **{category_name}** category") +                embed.set_thumbnail(url=random.choice(category_emojis).url) + +        else: +            for category_name, category_emojis in emoji.items(): +                emoji_choice = random.choice(category_emojis) +                if len(category_emojis) > 1: +                    emoji_info = f"There are **{len(category_emojis)}** emojis in **{category_name}** category" +                else: +                    emoji_info = f"There is **{len(category_emojis)}** emoji in **{category_name}** category" +                if emoji_choice.animated: +                    msg.append(f'<a:{emoji_choice.name}:{emoji_choice.id}> {emoji_info}') +                else: +                    msg.append(f'<:{emoji_choice.name}:{emoji_choice.id}> {emoji_info}') +        return embed, msg + +    @staticmethod +    def generate_invalid_embed(emojis: list) -> Tuple[discord.Embed, List[str]]: +        """Generates error embed.""" +        embed = discord.Embed( +            color=Colours.soft_red, +            title=random.choice(ERROR_REPLIES) +        ) +        msg = [] + +        emoji_dict = defaultdict(list) +        for emoji in emojis: +            emoji_dict[emoji.name.split("_")[0]].append(emoji) + +        error_comp = ', '.join(emoji_dict) +        msg.append(f"These are the valid categories\n```{error_comp}```") +        return embed, msg + +    @commands.command(name="emojicount", aliases=["ec", "emojis"]) +    async def emoji_count(self, ctx: commands.Context, *, category_query: str = None) -> None: +        """Returns embed with emoji category and info given by the user.""" +        emoji_dict = defaultdict(list) + +        if not ctx.guild.emojis: +            await ctx.send("No emojis found.") +            return +        log.trace(f"Emoji Category {'' if category_query else 'not '}provided by the user") +        for emoji in ctx.guild.emojis: +            emoji_category = emoji.name.split("_")[0] + +            if category_query is not None and emoji_category not in category_query: +                continue + +            emoji_dict[emoji_category].append(emoji) + +        if not emoji_dict: +            log.trace("Invalid name provided by the user") +            embed, msg = self.generate_invalid_embed(ctx.guild.emojis) +        else: +            embed, msg = self.embed_builder(emoji_dict) +        await LinePaginator.paginate(lines=msg, ctx=ctx, embed=embed) + + +def setup(bot: commands.Bot) -> None: +    """Emoji Count Cog load.""" +    bot.add_cog(EmojiCount(bot)) diff --git a/bot/exts/evergreen/error_handler.py b/bot/exts/evergreen/error_handler.py index 459a2b2d..6e518435 100644 --- a/bot/exts/evergreen/error_handler.py +++ b/bot/exts/evergreen/error_handler.py @@ -9,7 +9,7 @@ from sentry_sdk import push_scope  from bot.constants import Colours, ERROR_REPLIES, NEGATIVE_REPLIES  from bot.utils.decorators import InChannelCheckFailure, InMonthCheckFailure -from bot.utils.exceptions import BrandingError, UserNotPlayingError +from bot.utils.exceptions import UserNotPlayingError  log = logging.getLogger(__name__) @@ -57,10 +57,6 @@ class CommandErrorHandler(commands.Cog):          if isinstance(error, commands.CommandNotFound):              return -        if isinstance(error, BrandingError): -            await ctx.send(embed=self.error_embed(str(error))) -            return -          if isinstance(error, (InChannelCheckFailure, InMonthCheckFailure)):              await ctx.send(embed=self.error_embed(str(error), NEGATIVE_REPLIES), delete_after=7.5)              return diff --git a/bot/exts/evergreen/fun.py b/bot/exts/evergreen/fun.py index b0240c45..101725da 100644 --- a/bot/exts/evergreen/fun.py +++ b/bot/exts/evergreen/fun.py @@ -1,14 +1,16 @@  import functools +import json  import logging  import random -from typing import Callable, Tuple, Union +from pathlib import Path +from typing import Callable, Iterable, Tuple, Union  from discord import Embed, Message  from discord.ext import commands -from discord.ext.commands import Bot, Cog, Context, MessageConverter +from discord.ext.commands import BadArgument, Bot, Cog, Context, MessageConverter, clean_content  from bot import utils -from bot.constants import Emojis +from bot.constants import Client, Colours, Emojis  log = logging.getLogger(__name__) @@ -26,27 +28,52 @@ UWU_WORDS = {  } +def caesar_cipher(text: str, offset: int) -> Iterable[str]: +    """ +    Implements a lazy Caesar Cipher algorithm. + +    Encrypts a `text` given a specific integer `offset`. The sign +    of the `offset` dictates the direction in which it shifts to, +    with a negative value shifting to the left, and a positive +    value shifting to the right. +    """ +    for char in text: +        if not char.isascii() or not char.isalpha() or char.isspace(): +            yield char +            continue + +        case_start = 65 if char.isupper() else 97 +        true_offset = (ord(char) - case_start + offset) % 26 + +        yield chr(case_start + true_offset) + +  class Fun(Cog):      """A collection of general commands for fun."""      def __init__(self, bot: Bot) -> None:          self.bot = bot +        with Path("bot/resources/evergreen/caesar_info.json").open("r", encoding="UTF-8") as f: +            self._caesar_cipher_embed = json.load(f) + +    @staticmethod +    def _get_random_die() -> str: +        """Generate a random die emoji, ready to be sent on Discord.""" +        die_name = f"dice_{random.randint(1, 6)}" +        return getattr(Emojis, die_name) +      @commands.command()      async def roll(self, ctx: Context, num_rolls: int = 1) -> None:          """Outputs a number of random dice emotes (up to 6).""" -        output = "" -        if num_rolls > 6: -            num_rolls = 6 -        elif num_rolls < 1: -            output = ":no_entry: You must roll at least once." -        for _ in range(num_rolls): -            dice = f"dice_{random.randint(1, 6)}" -            output += getattr(Emojis, dice, '') -        await ctx.send(output) +        if 1 <= num_rolls <= 6: +            dice = " ".join(self._get_random_die() for _ in range(num_rolls)) +            await ctx.send(dice) +        else: +            raise BadArgument(f"`{Client.prefix}roll` only supports between 1 and 6 rolls.")      @commands.command(name="uwu", aliases=("uwuwize", "uwuify",)) -    async def uwu_command(self, ctx: Context, *, text: str) -> None: +    async def uwu_command(self, ctx: Context, *, text: clean_content(fix_channel_mentions=True)) -> None:          """Converts a given `text` into it's uwu equivalent."""          conversion_func = functools.partial(              utils.replace_many, replacements=UWU_WORDS, ignore_case=True, match_case=True @@ -62,7 +89,7 @@ class Fun(Cog):          await ctx.send(content=converted_text, embed=embed)      @commands.command(name="randomcase", aliases=("rcase", "randomcaps", "rcaps",)) -    async def randomcase_command(self, ctx: Context, *, text: str) -> None: +    async def randomcase_command(self, ctx: Context, *, text: clean_content(fix_channel_mentions=True)) -> None:          """Randomly converts the casing of a given `text`."""          def conversion_func(text: str) -> str:              """Randomly converts the casing of a given string.""" @@ -79,23 +106,99 @@ class Fun(Cog):              converted_text = f">>> {converted_text.lstrip('> ')}"          await ctx.send(content=converted_text, embed=embed) +    @commands.group(name="caesarcipher", aliases=("caesar", "cc",)) +    async def caesarcipher_group(self, ctx: Context) -> None: +        """ +        Translates a message using the Caesar Cipher. + +        See `decrypt`, `encrypt`, and `info` subcommands. +        """ +        if ctx.invoked_subcommand is None: +            await ctx.invoke(self.bot.get_command("help"), "caesarcipher") + +    @caesarcipher_group.command(name="info") +    async def caesarcipher_info(self, ctx: Context) -> None: +        """Information about the Caesar Cipher.""" +        embed = Embed.from_dict(self._caesar_cipher_embed) +        embed.colour = Colours.dark_green + +        await ctx.send(embed=embed) + +    @staticmethod +    async def _caesar_cipher(ctx: Context, offset: int, msg: str, left_shift: bool = False) -> None: +        """ +        Given a positive integer `offset`, translates and sends the given `msg`. + +        Performs a right shift by default unless `left_shift` is specified as `True`. + +        Also accepts a valid Discord Message ID or link. +        """ +        if offset < 0: +            await ctx.send(":no_entry: Cannot use a negative offset.") +            return + +        if left_shift: +            offset = -offset + +        def conversion_func(text: str) -> str: +            """Encrypts the given string using the Caesar Cipher.""" +            return "".join(caesar_cipher(text, offset)) + +        text, embed = await Fun._get_text_and_embed(ctx, msg) + +        if embed is not None: +            embed = Fun._convert_embed(conversion_func, embed) + +        converted_text = conversion_func(text) + +        if converted_text: +            converted_text = f">>> {converted_text.lstrip('> ')}" + +        await ctx.send(content=converted_text, embed=embed) + +    @caesarcipher_group.command(name="encrypt", aliases=("rightshift", "rshift", "enc",)) +    async def caesarcipher_encrypt(self, ctx: Context, offset: int, *, msg: str) -> None: +        """ +        Given a positive integer `offset`, encrypt the given `msg`. + +        Performs a right shift of the letters in the message. + +        Also accepts a valid Discord Message ID or link. +        """ +        await self._caesar_cipher(ctx, offset, msg, left_shift=False) + +    @caesarcipher_group.command(name="decrypt", aliases=("leftshift", "lshift", "dec",)) +    async def caesarcipher_decrypt(self, ctx: Context, offset: int, *, msg: str) -> None: +        """ +        Given a positive integer `offset`, decrypt the given `msg`. + +        Performs a left shift of the letters in the message. + +        Also accepts a valid Discord Message ID or link. +        """ +        await self._caesar_cipher(ctx, offset, msg, left_shift=True) +      @staticmethod      async def _get_text_and_embed(ctx: Context, text: str) -> Tuple[str, Union[Embed, None]]:          """          Attempts to extract the text and embed from a possible link to a discord Message. +        Does not retrieve the text and embed from the Message if it is in a channel the user does +        not have read permissions in. +          Returns a tuple of:              str: If `text` is a valid discord Message, the contents of the message, else `text`.              Union[Embed, None]: The embed if found in the valid Message, else None          """          embed = None -        # message = await Fun._get_discord_message(ctx, text) -        # if isinstance(message, Message): -        #     text = message.content -        #     # Take first embed because we can't send multiple embeds -        #     if message.embeds: -        #         embed = message.embeds[0] +        msg = await Fun._get_discord_message(ctx, text) +        # Ensure the user has read permissions for the channel the message is in +        if isinstance(msg, Message) and ctx.author.permissions_in(msg.channel).read_messages: +            text = msg.clean_content +            # Take first embed because we can't send multiple embeds +            if msg.embeds: +                embed = msg.embeds[0]          return (text, embed) diff --git a/bot/exts/evergreen/game.py b/bot/exts/evergreen/game.py index 3c8b2725..d0fd7a40 100644 --- a/bot/exts/evergreen/game.py +++ b/bot/exts/evergreen/game.py @@ -11,7 +11,7 @@ from discord import Embed  from discord.ext import tasks  from discord.ext.commands import Cog, Context, group -from bot.bot import SeasonalBot +from bot.bot import Bot  from bot.constants import STAFF_ROLES, Tokens  from bot.utils.decorators import with_role  from bot.utils.pagination import ImagePaginator, LinePaginator @@ -130,7 +130,7 @@ class AgeRatings(IntEnum):  class Games(Cog):      """Games Cog contains commands that collect data from IGDB.""" -    def __init__(self, bot: SeasonalBot): +    def __init__(self, bot: Bot):          self.bot = bot          self.http_session: ClientSession = bot.http_session @@ -415,7 +415,7 @@ class Games(Cog):          return sorted((item for item in results if item[0] >= 0.60), reverse=True)[:4] -def setup(bot: SeasonalBot) -> None: +def setup(bot: Bot) -> None:      """Add/Load Games cog."""      # Check does IGDB API key exist, if not, log warning and don't load cog      if not Tokens.igdb: diff --git a/bot/exts/evergreen/githubinfo.py b/bot/exts/evergreen/githubinfo.py new file mode 100644 index 00000000..2e38e3ab --- /dev/null +++ b/bot/exts/evergreen/githubinfo.py @@ -0,0 +1,98 @@ +import logging +import random +from datetime import datetime +from typing import Optional + +import discord +from discord.ext import commands +from discord.ext.commands.cooldowns import BucketType + +from bot.constants import NEGATIVE_REPLIES + +log = logging.getLogger(__name__) + + +class GithubInfo(commands.Cog): +    """Fetches info from GitHub.""" + +    def __init__(self, bot: commands.Bot): +        self.bot = bot + +    async def fetch_data(self, url: str) -> dict: +        """Retrieve data as a dictionary.""" +        async with self.bot.http_session.get(url) as r: +            return await r.json() + +    @commands.command(name='github', aliases=['gh']) +    @commands.cooldown(1, 60, BucketType.user) +    async def get_github_info(self, ctx: commands.Context, username: Optional[str]) -> None: +        """ +        Fetches a user's GitHub information. + +        Username is optional and sends the help command if not specified. +        """ +        if username is None: +            await ctx.invoke(self.bot.get_command('help'), 'github') +            ctx.command.reset_cooldown(ctx) +            return + +        async with ctx.typing(): +            user_data = await self.fetch_data(f"https://api.github.com/users/{username}") + +            # User_data will not have a message key if the user exists +            if user_data.get('message') is not None: +                await ctx.send(embed=discord.Embed(title=random.choice(NEGATIVE_REPLIES), +                                                   description=f"The profile for `{username}` was not found.", +                                                   colour=discord.Colour.red())) +                return + +            org_data = await self.fetch_data(user_data['organizations_url']) +            orgs = [f"[{org['login']}](https://github.com/{org['login']})" for org in org_data] +            orgs_to_add = ' | '.join(orgs) + +            gists = user_data['public_gists'] + +            # Forming blog link +            if user_data['blog'].startswith("http"):  # Blog link is complete +                blog = user_data['blog'] +            elif user_data['blog']:  # Blog exists but the link is not complete +                blog = f"https://{user_data['blog']}" +            else: +                blog = "No website link available" + +            embed = discord.Embed( +                title=f"`{user_data['login']}`'s GitHub profile info", +                description=f"```{user_data['bio']}```\n" if user_data['bio'] is not None else "", +                colour=0x7289da, +                url=user_data['html_url'], +                timestamp=datetime.strptime(user_data['created_at'], "%Y-%m-%dT%H:%M:%SZ") +            ) +            embed.set_thumbnail(url=user_data['avatar_url']) +            embed.set_footer(text="Account created at") + +            if user_data['type'] == "User": + +                embed.add_field(name="Followers", +                                value=f"[{user_data['followers']}]({user_data['html_url']}?tab=followers)") +                embed.add_field(name="\u200b", value="\u200b") +                embed.add_field(name="Following", +                                value=f"[{user_data['following']}]({user_data['html_url']}?tab=following)") + +            embed.add_field(name="Public repos", +                            value=f"[{user_data['public_repos']}]({user_data['html_url']}?tab=repositories)") +            embed.add_field(name="\u200b", value="\u200b") + +            if user_data['type'] == "User": +                embed.add_field(name="Gists", value=f"[{gists}](https://gist.github.com/{username})") + +                embed.add_field(name=f"Organization{'s' if len(orgs)!=1 else ''}", +                                value=orgs_to_add if orgs else "No organizations") +                embed.add_field(name="\u200b", value="\u200b") +            embed.add_field(name="Website", value=blog) + +        await ctx.send(embed=embed) + + +def setup(bot: commands.Bot) -> None: +    """Adding the cog to the bot.""" +    bot.add_cog(GithubInfo(bot)) diff --git a/bot/exts/evergreen/help.py b/bot/exts/evergreen/help.py index ccd76d76..91147243 100644 --- a/bot/exts/evergreen/help.py +++ b/bot/exts/evergreen/help.py @@ -12,7 +12,7 @@ from discord.ext.commands import CheckFailure, Cog as DiscordCog, Command, Conte  from fuzzywuzzy import fuzz, process  from bot import constants -from bot.bot import SeasonalBot +from bot.bot import Bot  from bot.constants import Emojis  from bot.utils.pagination import (      FIRST_EMOJI, LAST_EMOJI, @@ -511,7 +511,7 @@ class Help(DiscordCog):              await ctx.send(embed=embed) -def unload(bot: SeasonalBot) -> None: +def unload(bot: Bot) -> None:      """      Reinstates the original help command. @@ -521,7 +521,7 @@ def unload(bot: SeasonalBot) -> None:      bot.add_command(bot._old_help) -def setup(bot: SeasonalBot) -> None: +def setup(bot: Bot) -> None:      """      The setup for the help extension. @@ -542,7 +542,7 @@ def setup(bot: SeasonalBot) -> None:          raise -def teardown(bot: SeasonalBot) -> None: +def teardown(bot: Bot) -> None:      """      The teardown for the help extension. diff --git a/bot/exts/evergreen/issues.py b/bot/exts/evergreen/issues.py index 0f83731b..e419a6f5 100644 --- a/bot/exts/evergreen/issues.py +++ b/bot/exts/evergreen/issues.py @@ -1,9 +1,10 @@  import logging +import random  import discord  from discord.ext import commands -from bot.constants import Channels, Colours, Emojis, WHITELISTED_CHANNELS +from bot.constants import Channels, Colours, ERROR_REPLIES, Emojis, Tokens, WHITELISTED_CHANNELS  from bot.utils.decorators import override_in_channel  log = logging.getLogger(__name__) @@ -13,6 +14,12 @@ BAD_RESPONSE = {      403: "Rate limit has been hit! Please try again later!"  } +MAX_REQUESTS = 10 + +REQUEST_HEADERS = dict() +if GITHUB_TOKEN := Tokens.github: +    REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}" +  class Issues(commands.Cog):      """Cog that allows users to retrieve issues from GitHub.""" @@ -21,53 +28,78 @@ class Issues(commands.Cog):          self.bot = bot      @commands.command(aliases=("pr",)) -    @override_in_channel(WHITELISTED_CHANNELS + (Channels.dev_contrib,)) +    @override_in_channel(WHITELISTED_CHANNELS + (Channels.dev_contrib, Channels.dev_branding))      async def issue( -        self, ctx: commands.Context, number: int, repository: str = "seasonalbot", user: str = "python-discord" +        self, +        ctx: commands.Context, +        numbers: commands.Greedy[int], +        repository: str = "sir-lancebot", +        user: str = "python-discord"      ) -> None: -        """Command to retrieve issues from a GitHub repository.""" -        url = f"https://api.github.com/repos/{user}/{repository}/issues/{number}" -        merge_url = f"https://api.github.com/repos/{user}/{repository}/pulls/{number}/merge" - -        log.trace(f"Querying GH issues API: {url}") -        async with self.bot.http_session.get(url) as r: -            json_data = await r.json() - -        if r.status in BAD_RESPONSE: -            log.warning(f"Received response {r.status} from: {url}") -            return await ctx.send(f"[{str(r.status)}] {BAD_RESPONSE.get(r.status)}") - -        # The initial API request is made to the issues API endpoint, which will return information -        # if the issue or PR is present. However, the scope of information returned for PRs differs -        # from issues: if the 'issues' key is present in the response then we can pull the data we -        # need from the initial API call. -        if "issues" in json_data.get("html_url"): -            if json_data.get("state") == "open": -                icon_url = Emojis.issue -            else: -                icon_url = Emojis.issue_closed - -        # If the 'issues' key is not contained in the API response and there is no error code, then -        # we know that a PR has been requested and a call to the pulls API endpoint is necessary -        # to get the desired information for the PR. -        else: -            log.trace(f"PR provided, querying GH pulls API for additional information: {merge_url}") -            async with self.bot.http_session.get(merge_url) as m: +        """Command to retrieve issue(s) from a GitHub repository.""" +        links = [] +        numbers = set(numbers)  # Convert from list to set to remove duplicates, if any + +        if not numbers: +            await ctx.invoke(self.bot.get_command('help'), 'issue') +            return + +        if len(numbers) > MAX_REQUESTS: +            embed = discord.Embed( +                title=random.choice(ERROR_REPLIES), +                color=Colours.soft_red, +                description=f"Too many issues/PRs! (maximum of {MAX_REQUESTS})" +            ) +            await ctx.send(embed=embed) +            return + +        for number in numbers: +            url = f"https://api.github.com/repos/{user}/{repository}/issues/{number}" +            merge_url = f"https://api.github.com/repos/{user}/{repository}/pulls/{number}/merge" + +            log.trace(f"Querying GH issues API: {url}") +            async with self.bot.http_session.get(url, headers=REQUEST_HEADERS) as r: +                json_data = await r.json() + +            if r.status in BAD_RESPONSE: +                log.warning(f"Received response {r.status} from: {url}") +                return await ctx.send(f"[{str(r.status)}] #{number} {BAD_RESPONSE.get(r.status)}") + +            # The initial API request is made to the issues API endpoint, which will return information +            # if the issue or PR is present. However, the scope of information returned for PRs differs +            # from issues: if the 'issues' key is present in the response then we can pull the data we +            # need from the initial API call. +            if "issues" in json_data.get("html_url"):                  if json_data.get("state") == "open": -                    icon_url = Emojis.pull_request -                # When the status is 204 this means that the state of the PR is merged -                elif m.status == 204: -                    icon_url = Emojis.merge +                    icon_url = Emojis.issue                  else: -                    icon_url = Emojis.pull_request_closed +                    icon_url = Emojis.issue_closed + +            # If the 'issues' key is not contained in the API response and there is no error code, then +            # we know that a PR has been requested and a call to the pulls API endpoint is necessary +            # to get the desired information for the PR. +            else: +                log.trace(f"PR provided, querying GH pulls API for additional information: {merge_url}") +                async with self.bot.http_session.get(merge_url) as m: +                    if json_data.get("state") == "open": +                        icon_url = Emojis.pull_request +                    # When the status is 204 this means that the state of the PR is merged +                    elif m.status == 204: +                        icon_url = Emojis.merge +                    else: +                        icon_url = Emojis.pull_request_closed + +            issue_url = json_data.get("html_url") +            links.append([icon_url, f"[{repository}] #{number} {json_data.get('title')}", issue_url]) -        issue_url = json_data.get("html_url") -        description_text = f"[{repository}] #{number} {json_data.get('title')}" +        # Issue/PR format: emoji to show if open/closed/merged, number and the title as a singular link. +        description_list = ["{0} [{1}]({2})".format(*link) for link in links]          resp = discord.Embed(              colour=Colours.bright_green, -            description=f"{icon_url} [{description_text}]({issue_url})" +            description='\n'.join(description_list)          ) -        resp.set_author(name="GitHub", url=issue_url) + +        resp.set_author(name="GitHub", url=f"https://github.com/{user}/{repository}")          await ctx.send(embed=resp) diff --git a/bot/exts/evergreen/minesweeper.py b/bot/exts/evergreen/minesweeper.py index 3e40f493..286ac7a5 100644 --- a/bot/exts/evergreen/minesweeper.py +++ b/bot/exts/evergreen/minesweeper.py @@ -120,14 +120,14 @@ class Minesweeper(commands.Cog):      def format_for_discord(board: GameBoard) -> str:          """Format the board as a string for Discord."""          discord_msg = ( -            ":stop_button:    :regional_indicator_a::regional_indicator_b::regional_indicator_c:" -            ":regional_indicator_d::regional_indicator_e::regional_indicator_f::regional_indicator_g:" -            ":regional_indicator_h::regional_indicator_i::regional_indicator_j:\n\n" +            ":stop_button:    :regional_indicator_a: :regional_indicator_b: :regional_indicator_c: " +            ":regional_indicator_d: :regional_indicator_e: :regional_indicator_f: :regional_indicator_g: " +            ":regional_indicator_h: :regional_indicator_i: :regional_indicator_j:\n\n"          )          rows = []          for row_number, row in enumerate(board):              new_row = f"{MESSAGE_MAPPING[row_number + 1]}    " -            new_row += "".join(MESSAGE_MAPPING[cell] for cell in row) +            new_row += " ".join(MESSAGE_MAPPING[cell] for cell in row)              rows.append(new_row)          discord_msg += "\n".join(rows) @@ -158,7 +158,7 @@ class Minesweeper(commands.Cog):          if ctx.guild:              await ctx.send(f"{ctx.author.mention} is playing Minesweeper") -            chat_msg = await ctx.send(f"Here's there board!\n{self.format_for_discord(revealed_board)}") +            chat_msg = await ctx.send(f"Here's their board!\n{self.format_for_discord(revealed_board)}")          else:              chat_msg = None @@ -176,7 +176,7 @@ class Minesweeper(commands.Cog):          await game.dm_msg.delete()          game.dm_msg = await ctx.author.send(f"Here's your board!\n{self.format_for_discord(game.revealed)}")          if game.activated_on_server: -            await game.chat_msg.edit(content=f"Here's there board!\n{self.format_for_discord(game.revealed)}") +            await game.chat_msg.edit(content=f"Here's their board!\n{self.format_for_discord(game.revealed)}")      @commands.dm_only()      @minesweeper_group.command(name="flag") diff --git a/bot/exts/evergreen/movie.py b/bot/exts/evergreen/movie.py index 93aeef30..340a5724 100644 --- a/bot/exts/evergreen/movie.py +++ b/bot/exts/evergreen/movie.py @@ -190,7 +190,10 @@ class Movie(Cog):      async def get_embed(self, name: str) -> Embed:          """Return embed of random movies. Uses name in title.""" -        return Embed(title=f'Random {name} Movies').set_footer(text='Powered by TMDB (themoviedb.org)') +        embed = Embed(title=f"Random {name} Movies") +        embed.set_footer(text="This product uses the TMDb API but is not endorsed or certified by TMDb.") +        embed.set_thumbnail(url="https://i.imgur.com/LtFtC8H.png") +        return embed  def setup(bot: Bot) -> None: diff --git a/bot/exts/evergreen/reddit.py b/bot/exts/evergreen/reddit.py index fe204419..49127bea 100644 --- a/bot/exts/evergreen/reddit.py +++ b/bot/exts/evergreen/reddit.py @@ -68,9 +68,9 @@ class Reddit(commands.Cog):          # -----------------------------------------------------------          # This code below is bound of change when the emojis are added. -        upvote_emoji = self.bot.get_emoji(638729835245731840) -        comment_emoji = self.bot.get_emoji(638729835073765387) -        user_emoji = self.bot.get_emoji(638729835442602003) +        upvote_emoji = self.bot.get_emoji(755845219890757644) +        comment_emoji = self.bot.get_emoji(755845255001014384) +        user_emoji = self.bot.get_emoji(755845303822974997)          text_emoji = self.bot.get_emoji(676030265910493204)          video_emoji = self.bot.get_emoji(676030265839190047)          image_emoji = self.bot.get_emoji(676030265734201344) diff --git a/bot/exts/evergreen/showprojects.py b/bot/exts/evergreen/showprojects.py deleted file mode 100644 index 328a7aa5..00000000 --- a/bot/exts/evergreen/showprojects.py +++ /dev/null @@ -1,33 +0,0 @@ -import logging - -from discord import Message -from discord.ext import commands - -from bot.constants import Channels - -log = logging.getLogger(__name__) - - -class ShowProjects(commands.Cog): -    """Cog that reacts to posts in the #show-your-projects.""" - -    def __init__(self, bot: commands.Bot): -        self.bot = bot -        self.lastPoster = 0  # Given 0 as the default last poster ID as no user can actually have 0 assigned to them - -    @commands.Cog.listener() -    async def on_message(self, message: Message) -> None: -        """Adds reactions to posts in #show-your-projects.""" -        reactions = ["\U0001f44d", "\U00002764", "\U0001f440", "\U0001f389", "\U0001f680", "\U00002b50", "\U0001f6a9"] -        if (message.channel.id == Channels.show_your_projects -                and message.author.bot is False -                and message.author.id != self.lastPoster): -            for reaction in reactions: -                await message.add_reaction(reaction) - -            self.lastPoster = message.author.id - - -def setup(bot: commands.Bot) -> None: -    """Show Projects Reaction Cog.""" -    bot.add_cog(ShowProjects(bot)) diff --git a/bot/exts/evergreen/snakes/__init__.py b/bot/exts/evergreen/snakes/__init__.py index 2eae2751..bc42f0c2 100644 --- a/bot/exts/evergreen/snakes/__init__.py +++ b/bot/exts/evergreen/snakes/__init__.py @@ -2,7 +2,7 @@ import logging  from discord.ext import commands -from bot.exts.evergreen.snakes.snakes_cog import Snakes +from bot.exts.evergreen.snakes._snakes_cog import Snakes  log = logging.getLogger(__name__) diff --git a/bot/exts/evergreen/snakes/converter.py b/bot/exts/evergreen/snakes/_converter.py index 55609b8e..eee248cf 100644 --- a/bot/exts/evergreen/snakes/converter.py +++ b/bot/exts/evergreen/snakes/_converter.py @@ -7,7 +7,7 @@ import discord  from discord.ext.commands import Context, Converter  from fuzzywuzzy import fuzz -from bot.exts.evergreen.snakes.utils import SNAKE_RESOURCES +from bot.exts.evergreen.snakes._utils import SNAKE_RESOURCES  from bot.utils import disambiguate  log = logging.getLogger(__name__) diff --git a/bot/exts/evergreen/snakes/snakes_cog.py b/bot/exts/evergreen/snakes/_snakes_cog.py index 9bbad9fe..70bb0e73 100644 --- a/bot/exts/evergreen/snakes/snakes_cog.py +++ b/bot/exts/evergreen/snakes/_snakes_cog.py @@ -18,8 +18,8 @@ from discord import Colour, Embed, File, Member, Message, Reaction  from discord.ext.commands import BadArgument, Bot, Cog, CommandError, Context, bot_has_permissions, group  from bot.constants import ERROR_REPLIES, Tokens -from bot.exts.evergreen.snakes import utils -from bot.exts.evergreen.snakes.converter import Snake +from bot.exts.evergreen.snakes import _utils as utils +from bot.exts.evergreen.snakes._converter import Snake  from bot.utils.decorators import locked  log = logging.getLogger(__name__) @@ -1083,13 +1083,13 @@ class Snakes(Cog):              url,              params={                  "part": "snippet", -                "q": urllib.parse.quote(query), +                "q": urllib.parse.quote_plus(query),                  "type": "video",                  "key": Tokens.youtube              }          )          response = await response.json() -        data = response['items'] +        data = response.get("items", [])          # Send the user a video          if len(data) > 0: diff --git a/bot/exts/evergreen/snakes/utils.py b/bot/exts/evergreen/snakes/_utils.py index 7d6caf04..7d6caf04 100644 --- a/bot/exts/evergreen/snakes/utils.py +++ b/bot/exts/evergreen/snakes/_utils.py diff --git a/bot/exts/evergreen/source.py b/bot/exts/evergreen/source.py new file mode 100644 index 00000000..cdfe54ec --- /dev/null +++ b/bot/exts/evergreen/source.py @@ -0,0 +1,109 @@ +import inspect +from pathlib import Path +from typing import Optional, Tuple, Union + +from discord import Embed +from discord.ext import commands + +from bot.constants import Source + +SourceType = Union[commands.Command, commands.Cog, str, commands.ExtensionNotLoaded] + + +class SourceConverter(commands.Converter): +    """Convert an argument into a help command, tag, command, or cog.""" + +    async def convert(self, ctx: commands.Context, argument: str) -> SourceType: +        """Convert argument into source object.""" +        cog = ctx.bot.get_cog(argument) +        if cog: +            return cog + +        cmd = ctx.bot.get_command(argument) +        if cmd: +            return cmd + +        raise commands.BadArgument( +            f"Unable to convert `{argument}` to valid command or Cog." +        ) + + +class BotSource(commands.Cog): +    """Displays information about the bot's source code.""" + +    def __init__(self, bot: commands.Bot): +        self.bot = bot + +    @commands.command(name="source", aliases=("src",)) +    async def source_command(self, ctx: commands.Context, *, source_item: SourceConverter = None) -> None: +        """Display information and a GitHub link to the source code of a command, tag, or cog.""" +        if not source_item: +            embed = Embed(title="Sir Lancebot's GitHub Repository") +            embed.add_field(name="Repository", value=f"[Go to GitHub]({Source.github})") +            embed.set_thumbnail(url=Source.github_avatar_url) +            await ctx.send(embed=embed) +            return + +        embed = await self.build_embed(source_item) +        await ctx.send(embed=embed) + +    def get_source_link(self, source_item: SourceType) -> Tuple[str, str, Optional[int]]: +        """ +        Build GitHub link of source item, return this link, file location and first line number. + +        Raise BadArgument if `source_item` is a dynamically-created object (e.g. via internal eval). +        """ +        if isinstance(source_item, commands.Command): +            src = source_item.callback.__code__ +            filename = src.co_filename +        else: +            src = type(source_item) +            try: +                filename = inspect.getsourcefile(src) +            except TypeError: +                raise commands.BadArgument("Cannot get source for a dynamically-created object.") + +        if not isinstance(source_item, str): +            try: +                lines, first_line_no = inspect.getsourcelines(src) +            except OSError: +                raise commands.BadArgument("Cannot get source for a dynamically-created object.") + +            lines_extension = f"#L{first_line_no}-L{first_line_no+len(lines)-1}" +        else: +            first_line_no = None +            lines_extension = "" + +        file_location = Path(filename).relative_to(Path.cwd()).as_posix() + +        url = f"{Source.github}/blob/master/{file_location}{lines_extension}" + +        return url, file_location, first_line_no or None + +    async def build_embed(self, source_object: SourceType) -> Optional[Embed]: +        """Build embed based on source object.""" +        url, location, first_line = self.get_source_link(source_object) + +        if isinstance(source_object, commands.Command): +            if source_object.cog_name == 'Help': +                title = "Help Command" +                description = source_object.__doc__.splitlines()[1] +            else: +                description = source_object.short_doc +                title = f"Command: {source_object.qualified_name}" +        else: +            title = f"Cog: {source_object.qualified_name}" +            description = source_object.description.splitlines()[0] + +        embed = Embed(title=title, description=description) +        embed.set_thumbnail(url=Source.github_avatar_url) +        embed.add_field(name="Source Code", value=f"[Go to GitHub]({url})") +        line_text = f":{first_line}" if first_line else "" +        embed.set_footer(text=f"{location}{line_text}") + +        return embed + + +def setup(bot: commands.Bot) -> None: +    """Load the BotSource cog.""" +    bot.add_cog(BotSource(bot)) diff --git a/bot/exts/evergreen/space.py b/bot/exts/evergreen/space.py index 3587fc00..bc8e3118 100644 --- a/bot/exts/evergreen/space.py +++ b/bot/exts/evergreen/space.py @@ -8,7 +8,7 @@ from discord import Embed  from discord.ext import tasks  from discord.ext.commands import BadArgument, Cog, Context, Converter, group -from bot.bot import SeasonalBot +from bot.bot import Bot  from bot.constants import Tokens  logger = logging.getLogger(__name__) @@ -37,7 +37,7 @@ class DateConverter(Converter):  class Space(Cog):      """Space Cog contains commands, that show images, facts or other information about space.""" -    def __init__(self, bot: SeasonalBot): +    def __init__(self, bot: Bot):          self.bot = bot          self.http_session = bot.http_session @@ -240,7 +240,7 @@ class Space(Cog):          ).set_image(url=image).set_footer(text="Powered by NASA API" + footer) -def setup(bot: SeasonalBot) -> None: +def setup(bot: Bot) -> None:      """Load Space Cog."""      if not Tokens.nasa:          logger.warning("Can't find NASA API key. Not loading Space Cog.") diff --git a/bot/exts/evergreen/trivia_quiz.py b/bot/exts/evergreen/trivia_quiz.py index 8dceceac..fe692c2a 100644 --- a/bot/exts/evergreen/trivia_quiz.py +++ b/bot/exts/evergreen/trivia_quiz.py @@ -121,8 +121,10 @@ class TriviaQuiz(commands.Cog):              # A function to check whether user input is the correct answer(close to the right answer)              def check(m: discord.Message) -> bool: -                ratio = fuzz.ratio(answer.lower(), m.content.lower()) -                return ratio > 85 and m.channel == ctx.channel +                return ( +                    m.channel == ctx.channel +                    and fuzz.ratio(answer.lower(), m.content.lower()) > 85 +                )              try:                  msg = await self.bot.wait_for('message', check=check, timeout=10) diff --git a/bot/exts/evergreen/wikipedia.py b/bot/exts/evergreen/wikipedia.py new file mode 100644 index 00000000..be36e2c4 --- /dev/null +++ b/bot/exts/evergreen/wikipedia.py @@ -0,0 +1,114 @@ +import asyncio +import datetime +import logging +from typing import List, Optional + +from aiohttp import client_exceptions +from discord import Color, Embed, Message +from discord.ext import commands + +from bot.constants import Wikipedia + +log = logging.getLogger(__name__) + +SEARCH_API = "https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={search_term}&format=json" +WIKIPEDIA_URL = "https://en.wikipedia.org/wiki/{title}" + + +class WikipediaSearch(commands.Cog): +    """Get info from wikipedia.""" + +    def __init__(self, bot: commands.Bot): +        self.bot = bot +        self.http_session = bot.http_session + +    @staticmethod +    def formatted_wiki_url(index: int, title: str) -> str: +        """Formating wikipedia link with index and title.""" +        return f'`{index}` [{title}]({WIKIPEDIA_URL.format(title=title.replace(" ", "_"))})' + +    async def search_wikipedia(self, search_term: str) -> Optional[List[str]]: +        """Search wikipedia and return the first 10 pages found.""" +        pages = [] +        async with self.http_session.get(SEARCH_API.format(search_term=search_term)) as response: +            try: +                data = await response.json() + +                search_results = data["query"]["search"] + +                # Ignore pages with "may refer to" +                for search_result in search_results: +                    log.info("trying to append titles") +                    if "may refer to" not in search_result["snippet"]: +                        pages.append(search_result["title"]) +            except client_exceptions.ContentTypeError: +                pages = None + +        log.info("Finished appending titles") +        return pages + +    @commands.cooldown(1, 10, commands.BucketType.user) +    @commands.command(name="wikipedia", aliases=["wiki"]) +    async def wikipedia_search_command(self, ctx: commands.Context, *, search: str) -> None: +        """Return list of results containing your search query from wikipedia.""" +        titles = await self.search_wikipedia(search) + +        def check(message: Message) -> bool: +            return message.author.id == ctx.author.id and message.channel == ctx.channel + +        if not titles: +            await ctx.send("Sorry, we could not find a wikipedia article using that search term") +            return + +        async with ctx.typing(): +            log.info("Finished appending titles to titles_no_underscore list") + +            s_desc = "\n".join(self.formatted_wiki_url(index, title) for index, title in enumerate(titles, start=1)) +            embed = Embed(colour=Color.blue(), title=f"Wikipedia results for `{search}`", description=s_desc) +            embed.timestamp = datetime.datetime.utcnow() +            await ctx.send(embed=embed) +        embed = Embed(colour=Color.green(), description="Enter number to choose") +        msg = await ctx.send(embed=embed) +        titles_len = len(titles)  # getting length of list + +        for retry_count in range(1, Wikipedia.total_chance + 1): +            retries_left = Wikipedia.total_chance - retry_count +            if retry_count < Wikipedia.total_chance: +                error_msg = f"You have `{retries_left}/{Wikipedia.total_chance}` chances left" +            else: +                error_msg = 'Please try again by using `.wiki` command' +            try: +                message = await ctx.bot.wait_for('message', timeout=60.0, check=check) +                response_from_user = await self.bot.get_context(message) + +                if response_from_user.command: +                    return + +                response = int(message.content) +                if response < 0: +                    await ctx.send(f"Sorry, but you can't give negative index, {error_msg}") +                elif response == 0: +                    await ctx.send(f"Sorry, please give an integer between `1` to `{titles_len}`, {error_msg}") +                else: +                    await ctx.send(WIKIPEDIA_URL.format(title=titles[response - 1].replace(" ", "_"))) +                    break + +            except asyncio.TimeoutError: +                embed = Embed(colour=Color.red(), description=f"Time's up {ctx.author.mention}") +                await msg.edit(embed=embed) +                break + +            except ValueError: +                await ctx.send(f"Sorry, but you cannot do that, I will only accept an positive integer, {error_msg}") + +            except IndexError: +                await ctx.send(f"Sorry, please give an integer between `1` to `{titles_len}`, {error_msg}") + +            except Exception as e: +                log.info(f"Caught exception {e}, breaking out of retry loop") +                break + + +def setup(bot: commands.Bot) -> None: +    """Wikipedia Cog load.""" +    bot.add_cog(WikipediaSearch(bot)) diff --git a/bot/exts/evergreen/wonder_twins.py b/bot/exts/evergreen/wonder_twins.py new file mode 100644 index 00000000..afc5346e --- /dev/null +++ b/bot/exts/evergreen/wonder_twins.py @@ -0,0 +1,49 @@ +import random +from pathlib import Path + +import yaml +from discord.ext.commands import Bot, Cog, Context, command + + +class WonderTwins(Cog): +    """Cog for a Wonder Twins inspired command.""" + +    def __init__(self, bot: Bot): +        self.bot = bot + +        with open(Path.cwd() / "bot" / "resources" / "evergreen" / "wonder_twins.yaml", "r", encoding="utf-8") as f: +            info = yaml.load(f, Loader=yaml.FullLoader) +            self.water_types = info["water_types"] +            self.objects = info["objects"] +            self.adjectives = info["adjectives"] + +    @staticmethod +    def append_onto(phrase: str, insert_word: str) -> str: +        """Appends one word onto the end of another phrase in order to format with the proper determiner.""" +        if insert_word.endswith("s"): +            phrase = phrase.split() +            del phrase[0] +            phrase = " ".join(phrase) + +        insert_word = insert_word.split()[-1] +        return " ".join([phrase, insert_word]) + +    def format_phrase(self) -> str: +        """Creates a transformation phrase from available words.""" +        adjective = random.choice((None, random.choice(self.adjectives))) +        object_name = random.choice(self.objects) +        water_type = random.choice(self.water_types) + +        if adjective: +            object_name = self.append_onto(adjective, object_name) +        return f"{object_name} of {water_type}" + +    @command(name="formof", aliases=["wondertwins", "wondertwin", "fo"]) +    async def form_of(self, ctx: Context) -> None: +        """Command to send a Wonder Twins inspired phrase to the user invoking the command.""" +        await ctx.send(f"Form of {self.format_phrase()}!") + + +def setup(bot: Bot) -> None: +    """Load the WonderTwins cog.""" +    bot.add_cog(WonderTwins(bot)) diff --git a/bot/exts/halloween/candy_collection.py b/bot/exts/halloween/candy_collection.py index caf0df11..0cb37ecd 100644 --- a/bot/exts/halloween/candy_collection.py +++ b/bot/exts/halloween/candy_collection.py @@ -1,11 +1,9 @@ -import functools -import json  import logging -import os  import random -from typing import List, Union +from typing import Union  import discord +from async_rediscache import RedisCache  from discord.ext import commands  from bot.constants import Channels, Month @@ -13,27 +11,37 @@ from bot.utils.decorators import in_month  log = logging.getLogger(__name__) -json_location = os.path.join("bot", "resources", "halloween", "candy_collection.json") -  # chance is 1 in x range, so 1 in 20 range would give 5% chance (for add candy)  ADD_CANDY_REACTION_CHANCE = 20  # 5%  ADD_CANDY_EXISTING_REACTION_CHANCE = 10  # 10%  ADD_SKULL_REACTION_CHANCE = 50  # 2%  ADD_SKULL_EXISTING_REACTION_CHANCE = 20  # 5% +EMOJIS = dict( +    CANDY="\N{CANDY}", +    SKULL="\N{SKULL}", +    MEDALS=( +        '\N{FIRST PLACE MEDAL}', +        '\N{SECOND PLACE MEDAL}', +        '\N{THIRD PLACE MEDAL}', +        '\N{SPORTS MEDAL}', +        '\N{SPORTS MEDAL}', +    ), +) +  class CandyCollection(commands.Cog):      """Candy collection game Cog.""" +    # User candy amount records +    candy_records = RedisCache() + +    # Candy and skull messages mapping +    candy_messages = RedisCache() +    skull_messages = RedisCache() +      def __init__(self, bot: commands.Bot):          self.bot = bot -        with open(json_location, encoding="utf8") as candy: -            self.candy_json = json.load(candy) -            self.msg_reacted = self.candy_json['msg_reacted'] -        self.get_candyinfo = dict() -        for userinfo in self.candy_json['records']: -            userid = userinfo['userid'] -            self.get_candyinfo[userid] = userinfo      @in_month(Month.OCTOBER)      @commands.Cog.listener() @@ -43,19 +51,17 @@ class CandyCollection(commands.Cog):          if message.author.bot:              return          # ensure it's hacktober channel -        if message.channel.id != Channels.seasonalbot_commands: +        if message.channel.id != Channels.community_bot_commands:              return          # do random check for skull first as it has the lower chance          if random.randint(1, ADD_SKULL_REACTION_CHANCE) == 1: -            d = {"reaction": '\N{SKULL}', "msg_id": message.id, "won": False} -            self.msg_reacted.append(d) -            return await message.add_reaction('\N{SKULL}') +            await self.skull_messages.set(message.id, "skull") +            return await message.add_reaction(EMOJIS['SKULL'])          # check for the candy chance next          if random.randint(1, ADD_CANDY_REACTION_CHANCE) == 1: -            d = {"reaction": '\N{CANDY}', "msg_id": message.id, "won": False} -            self.msg_reacted.append(d) -            return await message.add_reaction('\N{CANDY}') +            await self.candy_messages.set(message.id, "candy") +            return await message.add_reaction(EMOJIS['CANDY'])      @in_month(Month.OCTOBER)      @commands.Cog.listener() @@ -67,43 +73,44 @@ class CandyCollection(commands.Cog):              return          # check to ensure it is in correct channel -        if message.channel.id != Channels.seasonalbot_commands: +        if message.channel.id != Channels.community_bot_commands:              return          # if its not a candy or skull, and it is one of 10 most recent messages,          # proceed to add a skull/candy with higher chance -        if str(reaction.emoji) not in ('\N{SKULL}', '\N{CANDY}'): -            if message.id in await self.ten_recent_msg(): +        if str(reaction.emoji) not in (EMOJIS['SKULL'], EMOJIS['CANDY']): +            recent_message_ids = map( +                lambda m: m.id, +                await self.hacktober_channel.history(limit=10).flatten() +            ) +            if message.id in recent_message_ids:                  await self.reacted_msg_chance(message)              return -        for react in self.msg_reacted: -            # check to see if the message id of a message we added a -            # reaction to is in json file, and if nobody has won/claimed it yet -            if react['msg_id'] == message.id and react['won'] is False: -                react['user_reacted'] = user.id -                react['won'] = True -                try: -                    # if they have record/candies in json already it will do this -                    user_records = self.get_candyinfo[user.id] -                    if str(reaction.emoji) == '\N{CANDY}': -                        user_records['record'] += 1 -                    if str(reaction.emoji) == '\N{SKULL}': -                        if user_records['record'] <= 3: -                            user_records['record'] = 0 -                            lost = 'all of your' -                        else: -                            lost = random.randint(1, 3) -                            user_records['record'] -= lost -                        await self.send_spook_msg(message.author, message.channel, lost) - -                except KeyError: -                    # otherwise it will raise KeyError so we need to add them to file -                    if str(reaction.emoji) == '\N{CANDY}': -                        print('ok') -                        d = {"userid": user.id, "record": 1} -                        self.candy_json['records'].append(d) -                await self.remove_reactions(reaction) +        if await self.candy_messages.get(message.id) == "candy" and str(reaction.emoji) == EMOJIS['CANDY']: +            await self.candy_messages.delete(message.id) +            if await self.candy_records.contains(user.id): +                await self.candy_records.increment(user.id) +            else: +                await self.candy_records.set(user.id, 1) + +        elif await self.skull_messages.get(message.id) == "skull" and str(reaction.emoji) == EMOJIS['SKULL']: +            await self.skull_messages.delete(message.id) + +            if prev_record := await self.candy_records.get(user.id): +                lost = min(random.randint(1, 3), prev_record) +                await self.candy_records.decrement(user.id, lost) + +                if lost == prev_record: +                    await CandyCollection.send_spook_msg(user, message.channel, 'all of your') +                else: +                    await CandyCollection.send_spook_msg(user, message.channel, lost) +            else: +                await CandyCollection.send_no_candy_spook_message(user, message.channel) +        else: +            return  # Skip saving + +        await reaction.clear()      async def reacted_msg_chance(self, message: discord.Message) -> None:          """ @@ -113,109 +120,71 @@ class CandyCollection(commands.Cog):          existing reaction.          """          if random.randint(1, ADD_SKULL_EXISTING_REACTION_CHANCE) == 1: -            d = {"reaction": '\N{SKULL}', "msg_id": message.id, "won": False} -            self.msg_reacted.append(d) -            return await message.add_reaction('\N{SKULL}') +            await self.skull_messages.set(message.id, "skull") +            return await message.add_reaction(EMOJIS['SKULL'])          if random.randint(1, ADD_CANDY_EXISTING_REACTION_CHANCE) == 1: -            d = {"reaction": '\N{CANDY}', "msg_id": message.id, "won": False} -            self.msg_reacted.append(d) -            return await message.add_reaction('\N{CANDY}') - -    async def ten_recent_msg(self) -> List[int]: -        """Get the last 10 messages sent in the channel.""" -        ten_recent = [] -        recent_msg_id = max( -            message.id for message in self.bot._connection._messages -            if message.channel.id == Channels.seasonalbot_commands -        ) - -        channel = await self.hacktober_channel() -        ten_recent.append(recent_msg_id) - -        for i in range(9): -            o = discord.Object(id=recent_msg_id + i) -            msg = await next(channel.history(limit=1, before=o)) -            ten_recent.append(msg.id) +            await self.candy_messages.set(message.id, "candy") +            return await message.add_reaction(EMOJIS['CANDY']) -        return ten_recent - -    async def get_message(self, msg_id: int) -> Union[discord.Message, None]: -        """Get the message from its ID.""" -        try: -            o = discord.Object(id=msg_id + 1) -            # Use history rather than get_message due to -            #         poor ratelimit (50/1s vs 1/1s) -            msg = await next(self.hacktober_channel.history(limit=1, before=o)) - -            if msg.id != msg_id: -                return None - -            return msg - -        except Exception: -            return None - -    async def hacktober_channel(self) -> discord.TextChannel: +    @property +    def hacktober_channel(self) -> discord.TextChannel:          """Get #hacktoberbot channel from its ID.""" -        return self.bot.get_channel(id=Channels.seasonalbot_commands) - -    async def remove_reactions(self, reaction: discord.Reaction) -> None: -        """Remove all candy/skull reactions.""" -        try: -            async for user in reaction.users(): -                await reaction.message.remove_reaction(reaction.emoji, user) +        return self.bot.get_channel(id=Channels.community_bot_commands) -        except discord.HTTPException: -            pass - -    async def send_spook_msg(self, author: discord.Member, channel: discord.TextChannel, candies: int) -> None: +    @staticmethod +    async def send_spook_msg( +        author: discord.Member, channel: discord.TextChannel, candies: Union[str, int] +    ) -> None:          """Send a spooky message."""          e = discord.Embed(colour=author.colour)          e.set_author(name="Ghosts and Ghouls and Jack o' lanterns at night; "                            f"I took {candies} candies and quickly took flight.")          await channel.send(embed=e) -    def save_to_json(self) -> None: -        """Save JSON to a local file.""" -        with open(json_location, 'w', encoding="utf8") as outfile: -            json.dump(self.candy_json, outfile) +    @staticmethod +    async def send_no_candy_spook_message( +        author: discord.Member, +        channel: discord.TextChannel +    ) -> None: +        """An alternative spooky message sent when user has no candies in the collection.""" +        embed = discord.Embed(color=author.color) +        embed.set_author(name="Ghosts and Ghouls and Jack o' lanterns at night; " +                              "I tried to take your candies but you had none to begin with!") +        await channel.send(embed=embed)      @in_month(Month.OCTOBER)      @commands.command()      async def candy(self, ctx: commands.Context) -> None:          """Get the candy leaderboard and save to JSON.""" -        # Use run_in_executor to prevent blocking -        thing = functools.partial(self.save_to_json) -        await self.bot.loop.run_in_executor(None, thing) - -        emoji = ( -            '\N{FIRST PLACE MEDAL}', -            '\N{SECOND PLACE MEDAL}', -            '\N{THIRD PLACE MEDAL}', -            '\N{SPORTS MEDAL}', -            '\N{SPORTS MEDAL}' -        ) - -        top_sorted = sorted(self.candy_json['records'], key=lambda k: k.get('record', 0), reverse=True) -        top_five = top_sorted[:5] +        records = await self.candy_records.items() -        usersid = [] -        records = [] -        for record in top_five: -            usersid.append(record['userid']) -            records.append(record['record']) +        def generate_leaderboard() -> str: +            top_sorted = sorted( +                ((user_id, score) for user_id, score in records if score > 0), +                key=lambda x: x[1], +                reverse=True +            ) +            top_five = top_sorted[:5] -        value = '\n'.join(f'{emoji[index]} <@{usersid[index]}>: {records[index]}' -                          for index in range(0, len(usersid))) or 'No Candies' +            return '\n'.join( +                f"{EMOJIS['MEDALS'][index]} <@{record[0]}>: {record[1]}" +                for index, record in enumerate(top_five) +            ) if top_five else 'No Candies'          e = discord.Embed(colour=discord.Colour.blurple()) -        e.add_field(name="Top Candy Records", value=value, inline=False) -        e.add_field(name='\u200b', -                    value="Candies will randomly appear on messages sent. " -                          "\nHit the candy when it appears as fast as possible to get the candy! " -                          "\nBut beware the ghosts...", -                    inline=False) +        e.add_field( +            name="Top Candy Records", +            value=generate_leaderboard(), +            inline=False +        ) +        e.add_field( +            name='\u200b', +            value="Candies will randomly appear on messages sent. " +                  "\nHit the candy when it appears as fast as possible to get the candy! " +                  "\nBut beware the ghosts...", +            inline=False +        )          await ctx.send(embed=e) diff --git a/bot/exts/halloween/hacktober-issue-finder.py b/bot/exts/halloween/hacktober-issue-finder.py index b5ad1c4f..9deadde9 100644 --- a/bot/exts/halloween/hacktober-issue-finder.py +++ b/bot/exts/halloween/hacktober-issue-finder.py @@ -7,13 +7,19 @@ import aiohttp  import discord  from discord.ext import commands -from bot.constants import Month +from bot.constants import Month, Tokens  from bot.utils.decorators import in_month  log = logging.getLogger(__name__)  URL = "https://api.github.com/search/issues?per_page=100&q=is:issue+label:hacktoberfest+language:python+state:open" -HEADERS = {"Accept": "application / vnd.github.v3 + json"} + +REQUEST_HEADERS = { +    "User-Agent": "Python Discord Hacktoberbot", +    "Accept": "application / vnd.github.v3 + json" +} +if GITHUB_TOKEN := Tokens.github: +    REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}"  class HacktoberIssues(commands.Cog): @@ -66,7 +72,7 @@ class HacktoberIssues(commands.Cog):                      url += f"&page={page}"              log.debug(f"making api request to url: {url}") -            async with session.get(url, headers=HEADERS) as response: +            async with session.get(url, headers=REQUEST_HEADERS) as response:                  if response.status != 200:                      log.error(f"expected 200 status (got {response.status}) from the GitHub api.")                      await ctx.send(f"ERROR: expected 200 status (got {response.status}) from the GitHub api.") @@ -97,7 +103,7 @@ class HacktoberIssues(commands.Cog):          labels = [label["name"] for label in issue["labels"]]          embed = discord.Embed(title=title) -        embed.description = body +        embed.description = body[:500] + '...' if len(body) > 500 else body          embed.add_field(name="labels", value="\n".join(labels))          embed.url = issue_url          embed.set_footer(text=issue_url) diff --git a/bot/exts/halloween/hacktoberstats.py b/bot/exts/halloween/hacktoberstats.py index db5e37f2..84b75022 100644 --- a/bot/exts/halloween/hacktoberstats.py +++ b/bot/exts/halloween/hacktoberstats.py @@ -1,35 +1,47 @@ -import json  import logging  import re  from collections import Counter -from datetime import datetime -from pathlib import Path -from typing import List, Tuple +from datetime import datetime, timedelta +from typing import List, Tuple, Union  import aiohttp  import discord +from async_rediscache import RedisCache  from discord.ext import commands -from bot.constants import Channels, Month, WHITELISTED_CHANNELS +from bot.constants import Channels, Month, Tokens, WHITELISTED_CHANNELS  from bot.utils.decorators import in_month, override_in_channel -from bot.utils.persist import make_persistent  log = logging.getLogger(__name__)  CURRENT_YEAR = datetime.now().year  # Used to construct GH API query  PRS_FOR_SHIRT = 4  # Minimum number of PRs before a shirt is awarded -HACKTOBER_WHITELIST = WHITELISTED_CHANNELS + (Channels.hacktoberfest_2019,) +REVIEW_DAYS = 14  # number of days needed after PR can be mature +HACKTOBER_WHITELIST = WHITELISTED_CHANNELS + (Channels.hacktoberfest_2020,) + +REQUEST_HEADERS = {"User-Agent": "Python Discord Hacktoberbot"} +# using repo topics API during preview period requires an accept header +GITHUB_TOPICS_ACCEPT_HEADER = {"Accept": "application/vnd.github.mercy-preview+json"} +if GITHUB_TOKEN := Tokens.github: +    REQUEST_HEADERS["Authorization"] = f"token {GITHUB_TOKEN}" +    GITHUB_TOPICS_ACCEPT_HEADER["Authorization"] = f"token {GITHUB_TOKEN}" + +GITHUB_NONEXISTENT_USER_MESSAGE = ( +    "The listed users cannot be searched either because the users do not exist " +    "or you do not have permission to view the users." +)  class HacktoberStats(commands.Cog):      """Hacktoberfest statistics Cog.""" +    # Stores mapping of user IDs and GitHub usernames +    linked_accounts = RedisCache() +      def __init__(self, bot: commands.Bot):          self.bot = bot -        self.link_json = make_persistent(Path("bot", "resources", "halloween", "github_links.json")) -        self.linked_accounts = self.load_linked_users() -    @in_month(Month.OCTOBER) +    @in_month(Month.SEPTEMBER, Month.OCTOBER, Month.NOVEMBER)      @commands.group(name="hacktoberstats", aliases=("hackstats",), invoke_without_command=True)      @override_in_channel(HACKTOBER_WHITELIST)      async def hacktoberstats_group(self, ctx: commands.Context, github_username: str = None) -> None: @@ -41,10 +53,10 @@ class HacktoberStats(commands.Cog):          get that user's contributions          """          if not github_username: -            author_id, author_mention = HacktoberStats._author_mention_from_context(ctx) +            author_id, author_mention = self._author_mention_from_context(ctx) -            if str(author_id) in self.linked_accounts.keys(): -                github_username = self.linked_accounts[author_id]["github_username"] +            if await self.linked_accounts.contains(author_id): +                github_username = await self.linked_accounts.get(author_id)                  logging.info(f"Getting stats for {author_id} linked GitHub account '{github_username}'")              else:                  msg = ( @@ -57,49 +69,38 @@ class HacktoberStats(commands.Cog):          await self.get_stats(ctx, github_username) -    @in_month(Month.OCTOBER) +    @in_month(Month.SEPTEMBER, Month.OCTOBER, Month.NOVEMBER)      @hacktoberstats_group.command(name="link")      @override_in_channel(HACKTOBER_WHITELIST)      async def link_user(self, ctx: commands.Context, github_username: str = None) -> None:          """          Link the invoking user's Github github_username to their Discord ID. -        Linked users are stored as a nested dict: -            { -                Discord_ID: { -                    "github_username": str -                    "date_added": datetime -                } -            } +        Linked users are stored in Redis: User ID => GitHub Username.          """ -        author_id, author_mention = HacktoberStats._author_mention_from_context(ctx) +        author_id, author_mention = self._author_mention_from_context(ctx)          if github_username: -            if str(author_id) in self.linked_accounts.keys(): -                old_username = self.linked_accounts[author_id]["github_username"] +            if await self.linked_accounts.contains(author_id): +                old_username = await self.linked_accounts.get(author_id)                  logging.info(f"{author_id} has changed their github link from '{old_username}' to '{github_username}'")                  await ctx.send(f"{author_mention}, your GitHub username has been updated to: '{github_username}'")              else:                  logging.info(f"{author_id} has added a github link to '{github_username}'")                  await ctx.send(f"{author_mention}, your GitHub username has been added") -            self.linked_accounts[author_id] = { -                "github_username": github_username, -                "date_added": datetime.now() -            } - -            self.save_linked_users() +            await self.linked_accounts.set(author_id, github_username)          else:              logging.info(f"{author_id} tried to link a GitHub account but didn't provide a username")              await ctx.send(f"{author_mention}, a GitHub username is required to link your account") -    @in_month(Month.OCTOBER) +    @in_month(Month.SEPTEMBER, Month.OCTOBER, Month.NOVEMBER)      @hacktoberstats_group.command(name="unlink")      @override_in_channel(HACKTOBER_WHITELIST)      async def unlink_user(self, ctx: commands.Context) -> None:          """Remove the invoking user's account link from the log.""" -        author_id, author_mention = HacktoberStats._author_mention_from_context(ctx) +        author_id, author_mention = self._author_mention_from_context(ctx) -        stored_user = self.linked_accounts.pop(author_id, None) +        stored_user = await self.linked_accounts.pop(author_id, None)          if stored_user:              await ctx.send(f"{author_mention}, your GitHub profile has been unlinked")              logging.info(f"{author_id} has unlinked their GitHub account") @@ -107,53 +108,15 @@ class HacktoberStats(commands.Cog):              await ctx.send(f"{author_mention}, you do not currently have a linked GitHub account")              logging.info(f"{author_id} tried to unlink their GitHub account but no account was linked") -        self.save_linked_users() - -    def load_linked_users(self) -> dict: -        """ -        Load list of linked users from local JSON file. - -        Linked users are stored as a nested dict: -            { -                Discord_ID: { -                    "github_username": str -                    "date_added": datetime -                } -            } -        """ -        if self.link_json.exists(): -            logging.info(f"Loading linked GitHub accounts from '{self.link_json}'") -            with open(self.link_json, 'r', encoding="utf8") as file: -                linked_accounts = json.load(file) - -            logging.info(f"Loaded {len(linked_accounts)} linked GitHub accounts from '{self.link_json}'") -            return linked_accounts -        else: -            logging.info(f"Linked account log: '{self.link_json}' does not exist") -            return {} - -    def save_linked_users(self) -> None: -        """ -        Save list of linked users to local JSON file. - -        Linked users are stored as a nested dict: -            { -                Discord_ID: { -                    "github_username": str -                    "date_added": datetime -                } -            } -        """ -        logging.info(f"Saving linked_accounts to '{self.link_json}'") -        with open(self.link_json, 'w', encoding="utf8") as file: -            json.dump(self.linked_accounts, file, default=str) -        logging.info(f"linked_accounts saved to '{self.link_json}'") -      async def get_stats(self, ctx: commands.Context, github_username: str) -> None:          """          Query GitHub's API for PRs created by a GitHub user during the month of October. -        PRs with the 'invalid' tag are ignored +        PRs with an 'invalid' or 'spam' label are ignored + +        For PRs created after October 3rd, they have to be in a repository that has a +        'hacktoberfest' topic, unless the PR is labelled 'hacktoberfest-accepted' for it +        to count.          If a valid github_username is provided, an embed is generated and posted to the channel @@ -163,30 +126,30 @@ class HacktoberStats(commands.Cog):              prs = await self.get_october_prs(github_username)              if prs: -                stats_embed = self.build_embed(github_username, prs) +                stats_embed = await self.build_embed(github_username, prs)                  await ctx.send('Here are some stats!', embed=stats_embed)              else: -                await ctx.send(f"No October GitHub contributions found for '{github_username}'") +                await ctx.send(f"No valid October GitHub contributions found for '{github_username}'") -    def build_embed(self, github_username: str, prs: List[dict]) -> discord.Embed: +    async def build_embed(self, github_username: str, prs: List[dict]) -> discord.Embed:          """Return a stats embed built from github_username's PRs."""          logging.info(f"Building Hacktoberfest embed for GitHub user: '{github_username}'") -        pr_stats = self._summarize_prs(prs) +        in_review, accepted = await self._categorize_prs(prs) -        n = pr_stats['n_prs'] +        n = len(accepted) + len(in_review)  # total number of PRs          if n >= PRS_FOR_SHIRT: -            shirtstr = f"**{github_username} has earned a tshirt!**" +            shirtstr = f"**{github_username} is eligible for a T-shirt or a tree!**"          elif n == PRS_FOR_SHIRT - 1: -            shirtstr = f"**{github_username} is 1 PR away from a tshirt!**" +            shirtstr = f"**{github_username} is 1 PR away from a T-shirt or a tree!**"          else: -            shirtstr = f"**{github_username} is {PRS_FOR_SHIRT - n} PRs away from a tshirt!**" +            shirtstr = f"**{github_username} is {PRS_FOR_SHIRT - n} PRs away from a T-shirt or a tree!**"          stats_embed = discord.Embed(              title=f"{github_username}'s Hacktoberfest",              color=discord.Color(0x9c4af7),              description=( -                f"{github_username} has made {n} " -                f"{HacktoberStats._contributionator(n)} in " +                f"{github_username} has made {n} valid " +                f"{self._contributionator(n)} in "                  f"October\n\n"                  f"{shirtstr}\n\n"              ) @@ -196,128 +159,262 @@ class HacktoberStats(commands.Cog):          stats_embed.set_author(              name="Hacktoberfest",              url="https://hacktoberfest.digitalocean.com", -            icon_url="https://hacktoberfest.digitalocean.com/pretty_logo.png" +            icon_url="https://avatars1.githubusercontent.com/u/35706162?s=200&v=4"          ) + +        # this will handle when no PRs in_review or accepted +        review_str = self._build_prs_string(in_review, github_username) or "None" +        accepted_str = self._build_prs_string(accepted, github_username) or "None"          stats_embed.add_field( -            name="Top 5 Repositories:", -            value=self._build_top5str(pr_stats) +            name=":clock1: In Review", +            value=review_str +        ) +        stats_embed.add_field( +            name=":tada: Accepted", +            value=accepted_str          )          logging.info(f"Hacktoberfest PR built for GitHub user '{github_username}'")          return stats_embed      @staticmethod -    async def get_october_prs(github_username: str) -> List[dict]: +    async def get_october_prs(github_username: str) -> Union[List[dict], None]:          """          Query GitHub's API for PRs created during the month of October by github_username. -        PRs with an 'invalid' tag are ignored +        PRs with an 'invalid' or 'spam' label are ignored unless it is merged or approved + +        For PRs created after October 3rd, they have to be in a repository that has a +        'hacktoberfest' topic, unless the PR is labelled 'hacktoberfest-accepted' for it +        to count.          If PRs are found, return a list of dicts with basic PR information          For each PR: -            { +        {              "repo_url": str -            "repo_shortname": str (e.g. "python-discord/seasonalbot") +            "repo_shortname": str (e.g. "python-discord/sir-lancebot")              "created_at": datetime.datetime -            } +            "number": int +        }          Otherwise, return None          """ -        logging.info(f"Generating Hacktoberfest PR query for GitHub user: '{github_username}'") +        logging.info(f"Fetching Hacktoberfest Stats for GitHub user: '{github_username}'")          base_url = "https://api.github.com/search/issues?q=" -        not_label = "invalid"          action_type = "pr" -        is_query = f"public+author:{github_username}" +        is_query = "public"          not_query = "draft" -        date_range = f"{CURRENT_YEAR}-10-01T00:00:00%2B14:00..{CURRENT_YEAR}-10-31T23:59:59-11:00" +        date_range = f"{CURRENT_YEAR}-09-30T10:00Z..{CURRENT_YEAR}-11-01T12:00Z"          per_page = "300"          query_url = (              f"{base_url}" -            f"-label:{not_label}"              f"+type:{action_type}"              f"+is:{is_query}" +            f"+author:{github_username}"              f"+-is:{not_query}"              f"+created:{date_range}"              f"&per_page={per_page}"          ) +        logging.debug(f"GitHub query URL generated: {query_url}") -        headers = {"user-agent": "Discord Python Hacktoberbot"} -        async with aiohttp.ClientSession() as session: -            async with session.get(query_url, headers=headers) as resp: -                jsonresp = await resp.json() - +        jsonresp = await HacktoberStats._fetch_url(query_url, REQUEST_HEADERS)          if "message" in jsonresp.keys():              # One of the parameters is invalid, short circuit for now              api_message = jsonresp["errors"][0]["message"] -            logging.error(f"GitHub API request for '{github_username}' failed with message: {api_message}") -            return -        else: -            if jsonresp["total_count"] == 0: -                # Short circuit if there aren't any PRs -                logging.info(f"No Hacktoberfest PRs found for GitHub user: '{github_username}'") -                return + +            # Ignore logging non-existent users or users we do not have permission to see +            if api_message == GITHUB_NONEXISTENT_USER_MESSAGE: +                logging.debug(f"No GitHub user found named '{github_username}'")              else: -                logging.info(f"Found {len(jsonresp['items'])} Hacktoberfest PRs for GitHub user: '{github_username}'") -                outlist = [] -                for item in jsonresp["items"]: -                    shortname = HacktoberStats._get_shortname(item["repository_url"]) -                    itemdict = { -                        "repo_url": f"https://www.github.com/{shortname}", -                        "repo_shortname": shortname, -                        "created_at": datetime.strptime( -                            item["created_at"], r"%Y-%m-%dT%H:%M:%SZ" -                        ), -                    } +                logging.error(f"GitHub API request for '{github_username}' failed with message: {api_message}") +            return + +        if jsonresp["total_count"] == 0: +            # Short circuit if there aren't any PRs +            logging.info(f"No Hacktoberfest PRs found for GitHub user: '{github_username}'") +            return + +        logging.info(f"Found {len(jsonresp['items'])} Hacktoberfest PRs for GitHub user: '{github_username}'") +        outlist = []  # list of pr information dicts that will get returned +        oct3 = datetime(int(CURRENT_YEAR), 10, 3, 23, 59, 59, tzinfo=None) +        hackto_topics = {}  # cache whether each repo has the appropriate topic (bool values) +        for item in jsonresp["items"]: +            shortname = HacktoberStats._get_shortname(item["repository_url"]) +            itemdict = { +                "repo_url": f"https://www.github.com/{shortname}", +                "repo_shortname": shortname, +                "created_at": datetime.strptime( +                    item["created_at"], r"%Y-%m-%dT%H:%M:%SZ" +                ), +                "number": item["number"] +            } + +            # if the PR has 'invalid' or 'spam' labels, the PR must be +            # either merged or approved for it to be included +            if HacktoberStats._has_label(item, ["invalid", "spam"]): +                if not await HacktoberStats._is_accepted(itemdict): +                    continue + +            # PRs before oct 3 no need to check for topics +            # continue the loop if 'hacktoberfest-accepted' is labelled then +            # there is no need to check for its topics +            if itemdict["created_at"] < oct3: +                outlist.append(itemdict) +                continue + +            # checking PR's labels for "hacktoberfest-accepted" +            if HacktoberStats._has_label(item, "hacktoberfest-accepted"): +                outlist.append(itemdict) +                continue + +            # no need to query github if repo topics are fetched before already +            if shortname in hackto_topics.keys(): +                if hackto_topics[shortname]:                      outlist.append(itemdict) -                return outlist +                    continue +            # fetch topics for the pr repo +            topics_query_url = f"https://api.github.com/repos/{shortname}/topics" +            logging.debug(f"Fetching repo topics for {shortname} with url: {topics_query_url}") +            jsonresp2 = await HacktoberStats._fetch_url(topics_query_url, GITHUB_TOPICS_ACCEPT_HEADER) +            if jsonresp2.get("names") is None: +                logging.error(f"Error fetching topics for {shortname}: {jsonresp2['message']}") +                return + +            # PRs after oct 3 that doesn't have 'hacktoberfest-accepted' label +            # must be in repo with 'hacktoberfest' topic +            if "hacktoberfest" in jsonresp2["names"]: +                hackto_topics[shortname] = True  # cache result in the dict for later use if needed +                outlist.append(itemdict) +        return outlist + +    @staticmethod +    async def _fetch_url(url: str, headers: dict) -> dict: +        """Retrieve API response from URL.""" +        async with aiohttp.ClientSession() as session: +            async with session.get(url, headers=headers) as resp: +                jsonresp = await resp.json() +        return jsonresp + +    @staticmethod +    def _has_label(pr: dict, labels: Union[List[str], str]) -> bool: +        """ +        Check if a PR has label 'labels'. + +        'labels' can be a string or a list of strings, if it's a list of strings +        it will return true if any of the labels match. +        """ +        if not pr.get("labels"):  # if PR has no labels +            return False +        if (isinstance(labels, str)) and (any(label["name"].casefold() == labels for label in pr["labels"])): +            return True +        for item in labels: +            if any(label["name"].casefold() == item for label in pr["labels"]): +                return True +        return False + +    @staticmethod +    async def _is_accepted(pr: dict) -> bool: +        """Check if a PR is merged, approved, or labelled hacktoberfest-accepted.""" +        # checking for merge status +        query_url = f"https://api.github.com/repos/{pr['repo_shortname']}/pulls/" +        query_url += str(pr["number"]) +        jsonresp = await HacktoberStats._fetch_url(query_url, REQUEST_HEADERS) + +        if "message" in jsonresp.keys(): +            logging.error( +                f"Error fetching PR stats for #{pr['number']} in repo {pr['repo_shortname']}:\n" +                f"{jsonresp['message']}" +            ) +            return False +        if ("merged" in jsonresp.keys()) and jsonresp["merged"]: +            return True + +        # checking for the label, using `jsonresp` which has the label information +        if HacktoberStats._has_label(jsonresp, "hacktoberfest-accepted"): +            return True + +        # checking approval +        query_url += "/reviews" +        jsonresp2 = await HacktoberStats._fetch_url(query_url, REQUEST_HEADERS) +        if isinstance(jsonresp2, dict): +            # if API request is unsuccessful it will be a dict with the error in 'message' +            logging.error( +                f"Error fetching PR reviews for #{pr['number']} in repo {pr['repo_shortname']}:\n" +                f"{jsonresp2['message']}" +            ) +            return False +        # if it is successful it will be a list instead of a dict +        if len(jsonresp2) == 0:  # if PR has no reviews +            return False + +        # loop through reviews and check for approval +        for item in jsonresp2: +            if "status" in item.keys(): +                if item['status'] == "APPROVED": +                    return True +        return False      @staticmethod      def _get_shortname(in_url: str) -> str:          """          Extract shortname from https://api.github.com/repos/* URL. -        e.g. "https://api.github.com/repos/python-discord/seasonalbot" +        e.g. "https://api.github.com/repos/python-discord/sir-lancebot"               |               V -             "python-discord/seasonalbot" +             "python-discord/sir-lancebot"          """          exp = r"https?:\/\/api.github.com\/repos\/([/\-\_\.\w]+)"          return re.findall(exp, in_url)[0]      @staticmethod -    def _summarize_prs(prs: List[dict]) -> dict: +    async def _categorize_prs(prs: List[dict]) -> tuple:          """ -        Generate statistics from an input list of PR dictionaries, as output by get_october_prs. +        Categorize PRs into 'in_review' and 'accepted' and returns as a tuple. -        Return a dictionary containing: -            { -            "n_prs": int -            "top5": [(repo_shortname, ncontributions), ...] -            } +        PRs created less than 14 days ago are 'in_review', PRs that are not +        are 'accepted' (after 14 days review period). + +        PRs that are accepted must either be merged, approved, or labelled +        'hacktoberfest-accepted.          """ -        contributed_repos = [pr["repo_shortname"] for pr in prs] -        return {"n_prs": len(prs), "top5": Counter(contributed_repos).most_common(5)} +        now = datetime.now() +        oct3 = datetime(CURRENT_YEAR, 10, 3, 23, 59, 59, tzinfo=None) +        in_review = [] +        accepted = [] +        for pr in prs: +            if (pr['created_at'] + timedelta(REVIEW_DAYS)) > now: +                in_review.append(pr) +            elif (pr['created_at'] <= oct3) or await HacktoberStats._is_accepted(pr): +                accepted.append(pr) + +        return in_review, accepted      @staticmethod -    def _build_top5str(stats: List[tuple]) -> str: +    def _build_prs_string(prs: List[tuple], user: str) -> str:          """ -        Build a string from the Top 5 contributions that is compatible with a discord.Embed field. - -        Top 5 contributions should be a list of tuples, as output in the stats dictionary by -        _summarize_prs +        Builds a discord embed compatible string for a list of PRs. -        String is of the form: -           n contribution(s) to [shortname](url) -           ... +        Repository name with the link to pull requests authored by 'user' for +        each PR.          """          base_url = "https://www.github.com/" -        contributionstrs = [] -        for repo in stats['top5']: -            n = repo[1] -            contributionstrs.append(f"{n} {HacktoberStats._contributionator(n)} to [{repo[0]}]({base_url}{repo[0]})") - -        return "\n".join(contributionstrs) +        str_list = [] +        repo_list = [pr["repo_shortname"] for pr in prs] +        prs_list = Counter(repo_list).most_common(5)  # get first 5 counted PRs +        more = len(prs) - sum(i[1] for i in prs_list) + +        for pr in prs_list: +            # for example: https://www.github.com/python-discord/bot/pulls/octocat +            # will display pull requests authored by octocat. +            # pr[1] is the number of PRs to the repo +            string = f"{pr[1]} to [{pr[0]}]({base_url}{pr[0]}/pulls/{user})" +            str_list.append(string) +        if more: +            str_list.append(f"...and {more} more") + +        return "\n".join(str_list)      @staticmethod      def _contributionator(n: int) -> str: diff --git a/bot/exts/halloween/monstersurvey.py b/bot/exts/halloween/monstersurvey.py index 7b1a1e84..80196825 100644 --- a/bot/exts/halloween/monstersurvey.py +++ b/bot/exts/halloween/monstersurvey.py @@ -202,4 +202,3 @@ class MonsterSurvey(Cog):  def setup(bot: Bot) -> None:      """Monster survey Cog load.""" -    bot.add_cog(MonsterSurvey(bot)) diff --git a/bot/exts/halloween/scarymovie.py b/bot/exts/halloween/scarymovie.py index c80e0298..0807eca6 100644 --- a/bot/exts/halloween/scarymovie.py +++ b/bot/exts/halloween/scarymovie.py @@ -121,7 +121,8 @@ class ScaryMovie(commands.Cog):              if value:                  embed.add_field(name=name, value=value) -        embed.set_footer(text='powered by themoviedb.org') +        embed.set_footer(text="This product uses the TMDb API but is not endorsed or certified by TMDb.") +        embed.set_thumbnail(url="https://i.imgur.com/LtFtC8H.png")          return embed diff --git a/bot/exts/halloween/spookyreact.py b/bot/exts/halloween/spookyreact.py index e5945aea..b335df75 100644 --- a/bot/exts/halloween/spookyreact.py +++ b/bot/exts/halloween/spookyreact.py @@ -29,13 +29,7 @@ class SpookyReact(Cog):      @in_month(Month.OCTOBER)      @Cog.listener()      async def on_message(self, ctx: discord.Message) -> None: -        """ -        A command to send the seasonalbot github project. - -        Lines that begin with the bot's command prefix are ignored - -        Seasonalbot's own messages are ignored -        """ +        """Triggered when the bot sees a message in October."""          for trigger in SPOOKY_TRIGGERS.keys():              trigger_test = re.search(SPOOKY_TRIGGERS[trigger][0], ctx.content.lower())              if trigger_test: diff --git a/bot/exts/halloween/spookysound.py b/bot/exts/halloween/spookysound.py deleted file mode 100644 index 569a9153..00000000 --- a/bot/exts/halloween/spookysound.py +++ /dev/null @@ -1,48 +0,0 @@ -import logging -import random -from pathlib import Path - -import discord -from discord.ext import commands - -from bot.bot import SeasonalBot -from bot.constants import Hacktoberfest - -log = logging.getLogger(__name__) - - -class SpookySound(commands.Cog): -    """A cog that plays a spooky sound in a voice channel on command.""" - -    def __init__(self, bot: SeasonalBot): -        self.bot = bot -        self.sound_files = list(Path("bot/resources/halloween/spookysounds").glob("*.mp3")) -        self.channel = None - -    @commands.cooldown(rate=1, per=1) -    @commands.command(brief="Play a spooky sound, restricted to once per 2 mins") -    async def spookysound(self, ctx: commands.Context) -> None: -        """ -        Connect to the Hacktoberbot voice channel, play a random spooky sound, then disconnect. - -        Cannot be used more than once in 2 minutes. -        """ -        if not self.channel: -            await self.bot.wait_until_guild_available() -            self.channel = self.bot.get_channel(Hacktoberfest.voice_id) - -        await ctx.send("Initiating spooky sound...") -        file_path = random.choice(self.sound_files) -        src = discord.FFmpegPCMAudio(str(file_path.resolve())) -        voice = await self.channel.connect() -        voice.play(src, after=lambda e: self.bot.loop.create_task(self.disconnect(voice))) - -    @staticmethod -    async def disconnect(voice: discord.VoiceClient) -> None: -        """Helper method to disconnect a given voice client.""" -        await voice.disconnect() - - -def setup(bot: SeasonalBot) -> None: -    """Spooky sound Cog load.""" -    bot.add_cog(SpookySound(bot)) diff --git a/bot/exts/halloween/timeleft.py b/bot/exts/halloween/timeleft.py index 295acc89..47adb09b 100644 --- a/bot/exts/halloween/timeleft.py +++ b/bot/exts/halloween/timeleft.py @@ -13,20 +13,23 @@ class TimeLeft(commands.Cog):      def __init__(self, bot: commands.Bot):          self.bot = bot -    @staticmethod -    def in_october() -> bool: -        """Return True if the current month is October.""" -        return datetime.utcnow().month == 10 +    def in_hacktober(self) -> bool: +        """Return True if the current time is within Hacktoberfest.""" +        _, end, start = self.load_date() + +        now = datetime.utcnow() + +        return start <= now <= end      @staticmethod -    def load_date() -> Tuple[int, datetime, datetime]: +    def load_date() -> Tuple[datetime, datetime, datetime]:          """Return of a tuple of the current time and the end and start times of the next October."""          now = datetime.utcnow()          year = now.year          if now.month > 10:              year += 1 -        end = datetime(year, 11, 1, 11, 59, 59) -        start = datetime(year, 10, 1) +        end = datetime(year, 11, 1, 12)  # November 1st 12:00 (UTC-12:00) +        start = datetime(year, 9, 30, 10)  # September 30th 10:00 (UTC+14:00)          return now, end, start      @commands.command() @@ -35,16 +38,23 @@ class TimeLeft(commands.Cog):          Calculates the time left until the end of Hacktober.          Whilst in October, displays the days, hours and minutes left. -        Only displays the days left until the beginning and end whilst in a different month +        Only displays the days left until the beginning and end whilst in a different month. + +        This factors in that Hacktoberfest starts when it is October anywhere in the world +        and ends with the same rules. It treats the start as UTC+14:00 and the end as +        UTC-12.          """          now, end, start = self.load_date()          diff = end - now          days, seconds = diff.days, diff.seconds -        if self.in_october(): +        if self.in_hacktober():              minutes = seconds // 60              hours, minutes = divmod(minutes, 60) -            await ctx.send(f"There is currently only {days} days, {hours} hours and {minutes}" -                           "minutes left until the end of Hacktober.") + +            await ctx.send( +                f"There are {days} days, {hours} hours and {minutes}" +                f" minutes left until the end of Hacktober." +            )          else:              start_diff = start - now              start_days = start_diff.days diff --git a/bot/exts/pride/pride_facts.py b/bot/exts/pride/pride_facts.py index 9ff4c9e0..5bd5d0ce 100644 --- a/bot/exts/pride/pride_facts.py +++ b/bot/exts/pride/pride_facts.py @@ -9,7 +9,7 @@ import dateutil.parser  import discord  from discord.ext import commands -from bot.bot import SeasonalBot +from bot.bot import Bot  from bot.constants import Channels, Colours, Month  from bot.utils.decorators import seasonal_task @@ -21,7 +21,7 @@ Sendable = Union[commands.Context, discord.TextChannel]  class PrideFacts(commands.Cog):      """Provides a new fact every day during the Pride season!""" -    def __init__(self, bot: SeasonalBot): +    def __init__(self, bot: Bot):          self.bot = bot          self.facts = self.load_facts() @@ -38,7 +38,7 @@ class PrideFacts(commands.Cog):          """Background task to post the daily pride fact every day."""          await self.bot.wait_until_guild_available() -        channel = self.bot.get_channel(Channels.seasonalbot_commands) +        channel = self.bot.get_channel(Channels.community_bot_commands)          await self.send_select_fact(channel, datetime.utcnow())      async def send_random_fact(self, ctx: commands.Context) -> None: @@ -102,6 +102,6 @@ class PrideFacts(commands.Cog):          ) -def setup(bot: SeasonalBot) -> None: +def setup(bot: Bot) -> None:      """Cog loader for pride facts."""      bot.add_cog(PrideFacts(bot)) diff --git a/bot/exts/utils/__init__.py b/bot/exts/utils/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/bot/exts/utils/__init__.py diff --git a/bot/exts/utils/extensions.py b/bot/exts/utils/extensions.py new file mode 100644 index 00000000..bb22c353 --- /dev/null +++ b/bot/exts/utils/extensions.py @@ -0,0 +1,265 @@ +import functools +import logging +import typing as t +from enum import Enum + +from discord import Colour, Embed +from discord.ext import commands +from discord.ext.commands import Context, group + +from bot import exts +from bot.bot import Bot +from bot.constants import Client, Emojis, MODERATION_ROLES, Roles +from bot.utils.checks import with_role_check +from bot.utils.extensions import EXTENSIONS, unqualify +from bot.utils.pagination import LinePaginator + +log = logging.getLogger(__name__) + + +UNLOAD_BLACKLIST = {f"{exts.__name__}.utils.extensions"} +BASE_PATH_LEN = len(exts.__name__.split(".")) + + +class Action(Enum): +    """Represents an action to perform on an extension.""" + +    # Need to be partial otherwise they are considered to be function definitions. +    LOAD = functools.partial(Bot.load_extension) +    UNLOAD = functools.partial(Bot.unload_extension) +    RELOAD = functools.partial(Bot.reload_extension) + + +class Extension(commands.Converter): +    """ +    Fully qualify the name of an extension and ensure it exists. + +    The * and ** values bypass this when used with the reload command. +    """ + +    async def convert(self, ctx: Context, argument: str) -> str: +        """Fully qualify the name of an extension and ensure it exists.""" +        # Special values to reload all extensions +        if argument == "*" or argument == "**": +            return argument + +        argument = argument.lower() + +        if argument in EXTENSIONS: +            return argument +        elif (qualified_arg := f"{exts.__name__}.{argument}") in EXTENSIONS: +            return qualified_arg + +        matches = [] +        for ext in EXTENSIONS: +            if argument == unqualify(ext): +                matches.append(ext) + +        if len(matches) > 1: +            matches.sort() +            names = "\n".join(matches) +            raise commands.BadArgument( +                f":x: `{argument}` is an ambiguous extension name. " +                f"Please use one of the following fully-qualified names.```\n{names}```" +            ) +        elif matches: +            return matches[0] +        else: +            raise commands.BadArgument(f":x: Could not find the extension `{argument}`.") + + +class Extensions(commands.Cog): +    """Extension management commands.""" + +    def __init__(self, bot: Bot): +        self.bot = bot + +    @group(name="extensions", aliases=("ext", "exts", "c", "cogs"), invoke_without_command=True) +    async def extensions_group(self, ctx: Context) -> None: +        """Load, unload, reload, and list loaded extensions.""" +        await ctx.send_help(ctx.command) + +    @extensions_group.command(name="load", aliases=("l",)) +    async def load_command(self, ctx: Context, *extensions: Extension) -> None: +        r""" +        Load extensions given their fully qualified or unqualified names. + +        If '\*' or '\*\*' is given as the name, all unloaded extensions will be loaded. +        """  # noqa: W605 +        if not extensions: +            await ctx.send_help(ctx.command) +            return + +        if "*" in extensions or "**" in extensions: +            extensions = set(EXTENSIONS) - set(self.bot.extensions.keys()) + +        msg = self.batch_manage(Action.LOAD, *extensions) +        await ctx.send(msg) + +    @extensions_group.command(name="unload", aliases=("ul",)) +    async def unload_command(self, ctx: Context, *extensions: Extension) -> None: +        r""" +        Unload currently loaded extensions given their fully qualified or unqualified names. + +        If '\*' or '\*\*' is given as the name, all loaded extensions will be unloaded. +        """  # noqa: W605 +        if not extensions: +            await ctx.send_help(ctx.command) +            return + +        blacklisted = "\n".join(UNLOAD_BLACKLIST & set(extensions)) + +        if blacklisted: +            msg = f":x: The following extension(s) may not be unloaded:```{blacklisted}```" +        else: +            if "*" in extensions or "**" in extensions: +                extensions = set(self.bot.extensions.keys()) - UNLOAD_BLACKLIST + +            msg = self.batch_manage(Action.UNLOAD, *extensions) + +        await ctx.send(msg) + +    @extensions_group.command(name="reload", aliases=("r",), root_aliases=("reload",)) +    async def reload_command(self, ctx: Context, *extensions: Extension) -> None: +        r""" +        Reload extensions given their fully qualified or unqualified names. + +        If an extension fails to be reloaded, it will be rolled-back to the prior working state. + +        If '\*' is given as the name, all currently loaded extensions will be reloaded. +        If '\*\*' is given as the name, all extensions, including unloaded ones, will be reloaded. +        """  # noqa: W605 +        if not extensions: +            await ctx.send_help(ctx.command) +            return + +        if "**" in extensions: +            extensions = EXTENSIONS +        elif "*" in extensions: +            extensions = set(self.bot.extensions.keys()) | set(extensions) +            extensions.remove("*") + +        msg = self.batch_manage(Action.RELOAD, *extensions) + +        await ctx.send(msg) + +    @extensions_group.command(name="list", aliases=("all",)) +    async def list_command(self, ctx: Context) -> None: +        """ +        Get a list of all extensions, including their loaded status. + +        Grey indicates that the extension is unloaded. +        Green indicates that the extension is currently loaded. +        """ +        embed = Embed(colour=Colour.blurple()) +        embed.set_author( +            name="Extensions List", +            url=Client.github_bot_repo, +            icon_url=str(self.bot.user.avatar_url) +        ) + +        lines = [] +        categories = self.group_extension_statuses() +        for category, extensions in sorted(categories.items()): +            # Treat each category as a single line by concatenating everything. +            # This ensures the paginator will not cut off a page in the middle of a category. +            category = category.replace("_", " ").title() +            extensions = "\n".join(sorted(extensions)) +            lines.append(f"**{category}**\n{extensions}\n") + +        log.debug(f"{ctx.author} requested a list of all cogs. Returning a paginated list.") +        await LinePaginator.paginate(lines, ctx, embed, max_size=1200, empty=False) + +    def group_extension_statuses(self) -> t.Mapping[str, str]: +        """Return a mapping of extension names and statuses to their categories.""" +        categories = {} + +        for ext in EXTENSIONS: +            if ext in self.bot.extensions: +                status = Emojis.status_online +            else: +                status = Emojis.status_offline + +            path = ext.split(".") +            if len(path) > BASE_PATH_LEN + 1: +                category = " - ".join(path[BASE_PATH_LEN:-1]) +            else: +                category = "uncategorised" + +            categories.setdefault(category, []).append(f"{status}  {path[-1]}") + +        return categories + +    def batch_manage(self, action: Action, *extensions: str) -> str: +        """ +        Apply an action to multiple extensions and return a message with the results. + +        If only one extension is given, it is deferred to `manage()`. +        """ +        if len(extensions) == 1: +            msg, _ = self.manage(action, extensions[0]) +            return msg + +        verb = action.name.lower() +        failures = {} + +        for extension in extensions: +            _, error = self.manage(action, extension) +            if error: +                failures[extension] = error + +        emoji = ":x:" if failures else ":ok_hand:" +        msg = f"{emoji} {len(extensions) - len(failures)} / {len(extensions)} extensions {verb}ed." + +        if failures: +            failures = "\n".join(f"{ext}\n    {err}" for ext, err in failures.items()) +            msg += f"\nFailures:```{failures}```" + +        log.debug(f"Batch {verb}ed extensions.") + +        return msg + +    def manage(self, action: Action, ext: str) -> t.Tuple[str, t.Optional[str]]: +        """Apply an action to an extension and return the status message and any error message.""" +        verb = action.name.lower() +        error_msg = None + +        try: +            action.value(self.bot, ext) +        except (commands.ExtensionAlreadyLoaded, commands.ExtensionNotLoaded): +            if action is Action.RELOAD: +                # When reloading, just load the extension if it was not loaded. +                return self.manage(Action.LOAD, ext) + +            msg = f":x: Extension `{ext}` is already {verb}ed." +            log.debug(msg[4:]) +        except Exception as e: +            if hasattr(e, "original"): +                e = e.original + +            log.exception(f"Extension '{ext}' failed to {verb}.") + +            error_msg = f"{e.__class__.__name__}: {e}" +            msg = f":x: Failed to {verb} extension `{ext}`:\n```{error_msg}```" +        else: +            msg = f":ok_hand: Extension successfully {verb}ed: `{ext}`." +            log.debug(msg[10:]) + +        return msg, error_msg + +    # This cannot be static (must have a __func__ attribute). +    def cog_check(self, ctx: Context) -> bool: +        """Only allow moderators and core developers to invoke the commands in this cog.""" +        return with_role_check(ctx, *MODERATION_ROLES, Roles.core_developers) + +    # This cannot be static (must have a __func__ attribute). +    async def cog_command_error(self, ctx: Context, error: Exception) -> None: +        """Handle BadArgument errors locally to prevent the help command from showing.""" +        if isinstance(error, commands.BadArgument): +            await ctx.send(str(error)) +            error.handled = True + + +def setup(bot: Bot) -> None: +    """Load the Extensions cog.""" +    bot.add_cog(Extensions(bot)) diff --git a/bot/exts/valentines/be_my_valentine.py b/bot/exts/valentines/be_my_valentine.py index b1258307..4db4d191 100644 --- a/bot/exts/valentines/be_my_valentine.py +++ b/bot/exts/valentines/be_my_valentine.py @@ -99,7 +99,7 @@ class BeMyValentine(commands.Cog):          emoji_1, emoji_2 = self.random_emoji()          lovefest_role = discord.utils.get(ctx.guild.roles, id=Lovefest.role_id) -        channel = self.bot.get_channel(Channels.seasonalbot_commands) +        channel = self.bot.get_channel(Channels.community_bot_commands)          valentine, title = self.valentine_check(valentine_type)          if user is None: diff --git a/bot/exts/valentines/movie_generator.py b/bot/exts/valentines/movie_generator.py index 0843175a..4df9e0d5 100644 --- a/bot/exts/valentines/movie_generator.py +++ b/bot/exts/valentines/movie_generator.py @@ -48,6 +48,8 @@ class RomanceMovieFinder(commands.Cog):                  embed.set_image(url=f"http://image.tmdb.org/t/p/w200/{selected_movie['poster_path']}")                  embed.add_field(name="Release date :clock1:", value=selected_movie["release_date"])                  embed.add_field(name="Rating :star2:", value=selected_movie["vote_average"]) +                embed.set_footer(text="This product uses the TMDb API but is not endorsed or certified by TMDb.") +                embed.set_thumbnail(url="https://i.imgur.com/LtFtC8H.png")                  await ctx.send(embed=embed)              except KeyError:                  warning_message = "A KeyError was raised while fetching information on the movie. The API service" \ diff --git a/bot/exts/valentines/valentine_zodiac.py b/bot/exts/valentines/valentine_zodiac.py index ef9ddc78..2696999f 100644 --- a/bot/exts/valentines/valentine_zodiac.py +++ b/bot/exts/valentines/valentine_zodiac.py @@ -1,7 +1,10 @@ +import calendar +import json  import logging  import random -from json import load +from datetime import datetime  from pathlib import Path +from typing import Tuple, Union  import discord  from discord.ext import commands @@ -19,37 +22,123 @@ class ValentineZodiac(commands.Cog):      def __init__(self, bot: commands.Bot):          self.bot = bot -        self.zodiacs = self.load_json() +        self.zodiacs, self.zodiac_fact = self.load_comp_json()      @staticmethod -    def load_json() -> dict: +    def load_comp_json() -> Tuple[dict, dict]:          """Load zodiac compatibility from static JSON resource.""" -        p = Path("bot/resources/valentines/zodiac_compatibility.json") -        with p.open(encoding="utf8") as json_data: -            zodiacs = load(json_data) -            return zodiacs - -    @commands.command(name="partnerzodiac") -    async def counter_zodiac(self, ctx: commands.Context, zodiac_sign: str) -> None: -        """Provides a counter compatible zodiac sign to the given user's zodiac sign.""" -        try: -            compatible_zodiac = random.choice(self.zodiacs[zodiac_sign.lower()]) -        except KeyError: -            return await ctx.send(zodiac_sign.capitalize() + " zodiac sign does not exist.") - -        emoji1 = random.choice(HEART_EMOJIS) -        emoji2 = random.choice(HEART_EMOJIS) -        embed = discord.Embed( -            title="Zodic Compatibility", -            description=f'{zodiac_sign.capitalize()}{emoji1}{compatible_zodiac["Zodiac"]}\n' -                        f'{emoji2}Compatibility meter : {compatible_zodiac["compatibility_score"]}{emoji2}', -            color=Colours.pink -        ) -        embed.add_field( -            name=f'A letter from Dr.Zodiac {LETTER_EMOJI}', -            value=compatible_zodiac['description'] -        ) +        explanation_file = Path("bot/resources/valentines/zodiac_explanation.json") +        compatibility_file = Path("bot/resources/valentines/zodiac_compatibility.json") +        with explanation_file.open(encoding="utf8") as json_data: +            zodiac_fact = json.load(json_data) +            for zodiac_data in zodiac_fact.values(): +                zodiac_data['start_at'] = datetime.fromisoformat(zodiac_data['start_at']) +                zodiac_data['end_at'] = datetime.fromisoformat(zodiac_data['end_at']) + +        with compatibility_file.open(encoding="utf8") as json_data: +            zodiacs = json.load(json_data) + +        return zodiacs, zodiac_fact + +    def generate_invalidname_embed(self, zodiac: str) -> discord.Embed: +        """Returns error embed.""" +        embed = discord.Embed() +        embed.color = Colours.soft_red +        error_msg = f"**{zodiac}** is not a valid zodiac sign, here is the list of valid zodiac signs.\n" +        names = list(self.zodiac_fact) +        middle_index = len(names) // 2 +        first_half_names = ", ".join(names[:middle_index]) +        second_half_names = ", ".join(names[middle_index:]) +        embed.description = error_msg + first_half_names + ",\n" + second_half_names +        log.info("Invalid zodiac name provided.") +        return embed + +    def zodiac_build_embed(self, zodiac: str) -> discord.Embed: +        """Gives informative zodiac embed.""" +        zodiac = zodiac.capitalize() +        embed = discord.Embed() +        embed.color = Colours.pink +        if zodiac in self.zodiac_fact: +            log.trace("Making zodiac embed.") +            embed.title = f"__{zodiac}__" +            embed.description = self.zodiac_fact[zodiac]["About"] +            embed.add_field(name='__Motto__', value=self.zodiac_fact[zodiac]["Motto"], inline=False) +            embed.add_field(name='__Strengths__', value=self.zodiac_fact[zodiac]["Strengths"], inline=False) +            embed.add_field(name='__Weaknesses__', value=self.zodiac_fact[zodiac]["Weaknesses"], inline=False) +            embed.add_field(name='__Full form__', value=self.zodiac_fact[zodiac]["full_form"], inline=False) +            embed.set_thumbnail(url=self.zodiac_fact[zodiac]["url"]) +        else: +            embed = self.generate_invalidname_embed(zodiac) +        log.trace("Successfully created zodiac information embed.") +        return embed + +    def zodiac_date_verifier(self, query_date: datetime) -> str: +        """Returns zodiac sign by checking date.""" +        for zodiac_name, zodiac_data in self.zodiac_fact.items(): +            if zodiac_data["start_at"].date() <= query_date.date() <= zodiac_data["end_at"].date(): +                log.trace("Zodiac name sent.") +                return zodiac_name + +    @commands.group(name='zodiac', invoke_without_command=True) +    async def zodiac(self, ctx: commands.Context, zodiac_sign: str) -> None: +        """Provides information about zodiac sign by taking zodiac sign name as input.""" +        final_embed = self.zodiac_build_embed(zodiac_sign) +        await ctx.send(embed=final_embed) +        log.trace("Embed successfully sent.") + +    @zodiac.command(name="date") +    async def date_and_month(self, ctx: commands.Context, date: int, month: Union[int, str]) -> None: +        """Provides information about zodiac sign by taking month and date as input.""" +        if isinstance(month, str): +            month = month.capitalize() +            try: +                month = list(calendar.month_abbr).index(month[:3]) +                log.trace('Valid month name entered by user') +            except ValueError: +                log.info('Invalid month name entered by user') +                await ctx.send(f"Sorry, but `{month}` is not a valid month name.") +                return +        if (month == 1 and 1 <= date <= 19) or (month == 12 and 22 <= date <= 31): +            zodiac = "capricorn" +            final_embed = self.zodiac_build_embed(zodiac) +        else: +            try: +                zodiac_sign_based_on_date = self.zodiac_date_verifier(datetime(2020, month, date)) +                log.trace("zodiac sign based on month and date received.") +            except ValueError as e: +                final_embed = discord.Embed() +                final_embed.color = Colours.soft_red +                final_embed.description = f"Zodiac sign could not be found because.\n```{e}```" +                log.info(f'Error in "zodiac date" command:\n{e}.') +            else: +                final_embed = self.zodiac_build_embed(zodiac_sign_based_on_date) + +        await ctx.send(embed=final_embed) +        log.trace("Embed from date successfully sent.") + +    @zodiac.command(name="partnerzodiac", aliases=['partner']) +    async def partner_zodiac(self, ctx: commands.Context, zodiac_sign: str) -> None: +        """Provides a random counter compatible zodiac sign to the given user's zodiac sign.""" +        embed = discord.Embed() +        embed.color = Colours.pink +        zodiac_check = self.zodiacs.get(zodiac_sign.capitalize()) +        if zodiac_check: +            compatible_zodiac = random.choice(self.zodiacs[zodiac_sign.capitalize()]) +            emoji1 = random.choice(HEART_EMOJIS) +            emoji2 = random.choice(HEART_EMOJIS) +            embed.title = "Zodiac Compatibility" +            embed.description = ( +                f'{zodiac_sign.capitalize()}{emoji1}{compatible_zodiac["Zodiac"]}\n' +                f'{emoji2}Compatibility meter : {compatible_zodiac["compatibility_score"]}{emoji2}' +            ) +            embed.add_field( +                name=f'A letter from Dr.Zodiac {LETTER_EMOJI}', +                value=compatible_zodiac['description'] +            ) +        else: +            embed = self.generate_invalidname_embed(zodiac_sign)          await ctx.send(embed=embed) +        log.trace("Embed from date successfully sent.")  def setup(bot: commands.Bot) -> None: | 
