diff options
author | 2021-06-30 02:35:07 +0200 | |
---|---|---|
committer | 2021-07-01 01:16:25 +0200 | |
commit | c301f3e2a5a3ca80f88ff13539bebd86f95a8833 (patch) | |
tree | 202ff1b8b4cf0a9646537e76c5580c68c9a5ca62 | |
parent | Merge pull request #1661 from python-discord/remove-pixels-token-detection (diff) |
Base functionality of tag fetching with groups and in file metadata
The code was restructured to hold tags and their identifiers in
individual classes and some methods moved to function
to detach some of the not directly related functionality from the cog class
-rw-r--r-- | bot/exts/backend/error_handler.py | 12 | ||||
-rw-r--r-- | bot/exts/info/tags.py | 244 |
2 files changed, 167 insertions, 89 deletions
diff --git a/bot/exts/backend/error_handler.py b/bot/exts/backend/error_handler.py index d8de177f5..78822aece 100644 --- a/bot/exts/backend/error_handler.py +++ b/bot/exts/backend/error_handler.py @@ -11,6 +11,7 @@ from bot.bot import Bot from bot.constants import Colours, Icons, MODERATION_ROLES from bot.converters import TagNameConverter from bot.errors import InvalidInfractedUser, LockedResourceError +from bot.exts.info import tags from bot.utils.checks import ContextCheckFailure log = logging.getLogger(__name__) @@ -154,14 +155,21 @@ class ErrorHandler(Cog): return try: - tag_name = await TagNameConverter.convert(ctx, ctx.invoked_with) + tag_identifier = tags.extract_tag_identifier(ctx.message.content) + if tag_identifier.group is not None: + tag_name = await TagNameConverter.convert(ctx, tag_identifier.name) + tag_name_or_group = await TagNameConverter.convert(ctx, tag_identifier.group) + else: + tag_name = None + tag_name_or_group = await TagNameConverter.convert(ctx, tag_identifier.name) + except errors.BadArgument: log.debug( f"{ctx.author} tried to use an invalid command " f"and the fallback tag failed validation in TagNameConverter." ) else: - if await ctx.invoke(tags_get_command, tag_name=tag_name): + if await ctx.invoke(tags_get_command, tag_name_or_group, tag_name): return if not any(role.id in MODERATION_ROLES for role in ctx.author.roles): diff --git a/bot/exts/info/tags.py b/bot/exts/info/tags.py index bb91a8563..3c7b9ea0b 100644 --- a/bot/exts/info/tags.py +++ b/bot/exts/info/tags.py @@ -1,9 +1,13 @@ +from __future__ import annotations + import logging import re import time from pathlib import Path -from typing import Callable, Dict, Iterable, List, Optional +from typing import Callable, Iterable, List, NamedTuple, Optional +import discord +import frontmatter from discord import Colour, Embed, Member from discord.ext.commands import Cog, Context, group @@ -24,90 +28,128 @@ REGEX_NON_ALPHABET = re.compile(r"[^a-z]", re.MULTILINE & re.IGNORECASE) FOOTER_TEXT = f"To show a tag, type {constants.Bot.prefix}tags <tagname>." +class TagIdentifier(NamedTuple): + """Stores the group and name used as an identifier for a tag.""" + + group: Optional[str] + name: str + + def get_fuzzy_score(self, fuzz_tag_identifier: TagIdentifier) -> float: + """Get fuzzy score, using `fuzz_tag_identifier` as the identifier to fuzzy match with.""" + if self.group is None: + if fuzz_tag_identifier.group is None: + # We're only fuzzy matching the name + group_score = 1 + else: + # Ignore tags without groups if the identifier contains a group + return .0 + else: + if fuzz_tag_identifier.group is None: + # Ignore tags with groups if the identifier does not have a group + return .0 + else: + group_score = _fuzzy_search(fuzz_tag_identifier.group, self.group) + + fuzzy_score = group_score * _fuzzy_search(fuzz_tag_identifier.name, self.name) * 100 + if fuzzy_score: + log.trace(f"Fuzzy score {fuzzy_score:=06.2f} for tag {self!r} with fuzz {fuzz_tag_identifier!r}") + return fuzzy_score + + def __str__(self) -> str: + return f"{self.group or ''} {self.name}" + + +class Tag: + """Provide an interface to a tag from resources with `file_content`.""" + + def __init__(self, file_content: str): + post = frontmatter.loads(file_content) + self.content = post.content + self.metadata = post.metadata + self._restricted_to: set[int] = set(self.metadata.get("restricted_to", ())) + + @property + def embed(self) -> Embed: + """Create an embed for the tag.""" + embed = Embed.from_dict(self.metadata.get("embed", {})) + embed.description = self.content + return embed + + def accessible_by(self, member: discord.Member) -> bool: + """Check whether `member` can access the tag.""" + return bool( + not self._restricted_to + or self._restricted_to & {role.id for role in member.roles} + ) + + +def _fuzzy_search(search: str, target: str) -> float: + """A simple scoring algorithm based on how many letters are found / total, with order in mind.""" + current, index = 0, 0 + _search = REGEX_NON_ALPHABET.sub("", search.lower()) + _targets = iter(REGEX_NON_ALPHABET.split(target.lower())) + _target = next(_targets) + try: + while True: + while index < len(_target) and _search[current] == _target[index]: + current += 1 + index += 1 + index, _target = 0, next(_targets) + except (StopIteration, IndexError): + pass + return current / len(_search) + + class Tags(Cog): """Save new tags and fetch existing tags.""" def __init__(self, bot: Bot): self.bot = bot self.tag_cooldowns = {} - self._cache = self.get_tags() - - @staticmethod - def get_tags() -> dict: - """Get all tags.""" - cache = {} + self._tags: dict[TagIdentifier, Tag] = {} + self.initialize_tags() + def initialize_tags(self) -> None: + """Load all tags from resources into `self._tags`.""" base_path = Path("bot", "resources", "tags") + for file in base_path.glob("**/*"): if file.is_file(): - tag_title = file.stem - tag = { - "title": tag_title, - "embed": { - "description": file.read_text(encoding="utf8"), - }, - "restricted_to": None, - "location": f"/bot/{file}" - } - - # Convert to a list to allow negative indexing. - parents = list(file.relative_to(base_path).parents) - if len(parents) > 1: - # -1 would be '.' hence -2 is used as the index. - tag["restricted_to"] = parents[-2].name - - cache[tag_title] = tag - - return cache - - @staticmethod - def check_accessibility(user: Member, tag: dict) -> bool: - """Check if user can access a tag.""" - return not tag["restricted_to"] or tag["restricted_to"].lower() in [role.name.lower() for role in user.roles] - - @staticmethod - def _fuzzy_search(search: str, target: str) -> float: - """A simple scoring algorithm based on how many letters are found / total, with order in mind.""" - current, index = 0, 0 - _search = REGEX_NON_ALPHABET.sub('', search.lower()) - _targets = iter(REGEX_NON_ALPHABET.split(target.lower())) - _target = next(_targets) - try: - while True: - while index < len(_target) and _search[current] == _target[index]: - current += 1 - index += 1 - index, _target = 0, next(_targets) - except (StopIteration, IndexError): - pass - return current / len(_search) * 100 + parent_dir = file.relative_to(base_path).parent - def _get_suggestions(self, tag_name: str, thresholds: Optional[List[int]] = None) -> List[str]: - """Return a list of suggested tags.""" - scores: Dict[str, int] = { - tag_title: Tags._fuzzy_search(tag_name, tag['title']) - for tag_title, tag in self._cache.items() - } + tag_name = file.stem + tag_group = parent_dir.name if parent_dir.name else None + self._tags[TagIdentifier(tag_group, tag_name)] = Tag(file.read_text("utf-8")) + + def _get_suggestions( + self, + tag_identifier: TagIdentifier, + thresholds: Optional[list[int]] = None + ) -> list[tuple[TagIdentifier, Tag]]: + """Return a list of suggested tags for `tag_identifier`.""" thresholds = thresholds or [100, 90, 80, 70, 60] for threshold in thresholds: suggestions = [ - self._cache[tag_title] - for tag_title, matching_score in scores.items() - if matching_score >= threshold + (identifier, tag) + for identifier, tag in self._tags.items() + if identifier.get_fuzzy_score(tag_identifier) >= threshold ] if suggestions: return suggestions return [] - def _get_tag(self, tag_name: str) -> list: - """Get a specific tag.""" - found = [self._cache.get(tag_name.lower(), None)] - if not found[0]: - return self._get_suggestions(tag_name) - return found + def get_fuzzy_matches(self, tag_identifier: TagIdentifier) -> list[tuple[TagIdentifier, Tag]]: + """Get tags with identifiers similar to `tag_identifier`.""" + if tag_identifier.group is None: + suggestions = self._get_suggestions(tag_identifier) + else: + # Try fuzzy matching with only a name first + suggestions = self._get_suggestions(TagIdentifier(None, tag_identifier.group)) + suggestions += self._get_suggestions(tag_identifier) + return suggestions def _get_tags_via_content(self, check: Callable[[Iterable], bool], keywords: str, user: Member) -> list: """ @@ -158,9 +200,14 @@ class Tags(Cog): ) @group(name='tags', aliases=('tag', 't'), invoke_without_command=True) - async def tags_group(self, ctx: Context, *, tag_name: TagNameConverter = None) -> None: + async def tags_group( + self, + ctx: Context, + tag_name_or_group: TagNameConverter = None, + tag_name: TagNameConverter = None, + ) -> None: """Show all known tags, a single tag, or run a subcommand.""" - await self.get_command(ctx, tag_name=tag_name) + await self.get_command(ctx, tag_name_or_group=tag_name_or_group, tag_name=tag_name) @tags_group.group(name='search', invoke_without_command=True) async def search_tag_content(self, ctx: Context, *, keywords: str) -> None: @@ -182,7 +229,7 @@ class Tags(Cog): matching_tags = self._get_tags_via_content(any, keywords or 'any', ctx.author) await self._send_matching_tags(ctx, keywords, matching_tags) - async def display_tag(self, ctx: Context, tag_name: str = None) -> bool: + async def display_tag(self, ctx: Context, tag_identifier: TagIdentifier) -> bool: """ If a tag is not found, display similar tag names as suggestions. @@ -210,45 +257,50 @@ class Tags(Cog): return True return False - if _command_on_cooldown(tag_name): - time_elapsed = time.time() - self.tag_cooldowns[tag_name]["time"] + if _command_on_cooldown(tag_identifier.name): + time_elapsed = time.time() - self.tag_cooldowns[tag_identifier.name]["time"] time_left = constants.Cooldowns.tags - time_elapsed log.info( - f"{ctx.author} tried to get the '{tag_name}' tag, but the tag is on cooldown. " + f"{ctx.author} tried to get the '{tag_identifier.name}' tag, but the tag is on cooldown. " f"Cooldown ends in {time_left:.1f} seconds." ) return True - if tag_name is not None: - temp_founds = self._get_tag(tag_name) - - founds = [] + if tag_identifier.name is not None: - for found_tag in temp_founds: - if self.check_accessibility(ctx.author, found_tag): - founds.append(found_tag) - - if len(founds) == 1: - tag = founds[0] + if (tag := self._tags.get(tag_identifier)) is not None and tag.accessible_by(ctx.author): if ctx.channel.id not in TEST_CHANNELS: - self.tag_cooldowns[tag_name] = { + self.tag_cooldowns[tag_identifier.name] = { "time": time.time(), "channel": ctx.channel.id } - self.bot.stats.incr(f"tags.usages.{tag['title'].replace('-', '_')}") + self.bot.stats.incr( + f"tags.usages" + f"{'.' + tag_identifier.group.replace('-', '_') if tag_identifier.group else ''}" + f".{tag_identifier.name.replace('-', '_')}" + ) await wait_for_deletion( - await ctx.send(embed=Embed.from_dict(tag['embed'])), + await ctx.send(embed=tag.embed), [ctx.author.id], ) return True - elif founds and len(tag_name) >= 3: + + elif len(tag_identifier.name) >= 3: + suggested_tags = self.get_fuzzy_matches(tag_identifier)[:10] + if not suggested_tags: + return False + suggested_tags_text = "\n".join( + str(identifier) + for identifier, tag in suggested_tags + if tag.accessible_by(ctx.author) + ) await wait_for_deletion( await ctx.send( embed=Embed( - title='Did you mean ...', - description='\n'.join(tag['title'] for tag in founds[:10]) + title="Did you mean ...", + description=suggested_tags_text ) ), [ctx.author.id], @@ -281,16 +333,34 @@ class Tags(Cog): return False @tags_group.command(name='get', aliases=('show', 'g')) - async def get_command(self, ctx: Context, *, tag_name: TagNameConverter = None) -> bool: + async def get_command( + self, ctx: Context, + tag_name_or_group: TagNameConverter = None, + tag_name: TagNameConverter = None, + ) -> bool: """ Get a specified tag, or a list of all tags if no tag is specified. Returns True if something can be sent, or if the tag is on cooldown. Returns False if no matches are found. """ - return await self.display_tag(ctx, tag_name) + if tag_name is None: + tag_name = tag_name_or_group + tag_group = None + else: + tag_group = tag_name_or_group + return await self.display_tag(ctx, TagIdentifier(tag_group, tag_name)) def setup(bot: Bot) -> None: """Load the Tags cog.""" bot.add_cog(Tags(bot)) + + +def extract_tag_identifier(string: str) -> TagIdentifier: + """Create a `TagIdentifier` instance from beginning of `string`.""" + split_string = string.removeprefix(constants.Bot.prefix).split(" ", maxsplit=2) + if len(split_string) == 1: + return TagIdentifier(None, split_string[0]) + else: + return TagIdentifier(split_string[0], split_string[1]) |