aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar Numerlor <[email protected]>2021-06-30 02:35:07 +0200
committerGravatar Numerlor <[email protected]>2021-07-01 01:16:25 +0200
commitc301f3e2a5a3ca80f88ff13539bebd86f95a8833 (patch)
tree202ff1b8b4cf0a9646537e76c5580c68c9a5ca62
parentMerge pull request #1661 from python-discord/remove-pixels-token-detection (diff)
Base functionality of tag fetching with groups and in file metadata
The code was restructured to hold tags and their identifiers in individual classes and some methods moved to function to detach some of the not directly related functionality from the cog class
-rw-r--r--bot/exts/backend/error_handler.py12
-rw-r--r--bot/exts/info/tags.py244
2 files changed, 167 insertions, 89 deletions
diff --git a/bot/exts/backend/error_handler.py b/bot/exts/backend/error_handler.py
index d8de177f5..78822aece 100644
--- a/bot/exts/backend/error_handler.py
+++ b/bot/exts/backend/error_handler.py
@@ -11,6 +11,7 @@ from bot.bot import Bot
from bot.constants import Colours, Icons, MODERATION_ROLES
from bot.converters import TagNameConverter
from bot.errors import InvalidInfractedUser, LockedResourceError
+from bot.exts.info import tags
from bot.utils.checks import ContextCheckFailure
log = logging.getLogger(__name__)
@@ -154,14 +155,21 @@ class ErrorHandler(Cog):
return
try:
- tag_name = await TagNameConverter.convert(ctx, ctx.invoked_with)
+ tag_identifier = tags.extract_tag_identifier(ctx.message.content)
+ if tag_identifier.group is not None:
+ tag_name = await TagNameConverter.convert(ctx, tag_identifier.name)
+ tag_name_or_group = await TagNameConverter.convert(ctx, tag_identifier.group)
+ else:
+ tag_name = None
+ tag_name_or_group = await TagNameConverter.convert(ctx, tag_identifier.name)
+
except errors.BadArgument:
log.debug(
f"{ctx.author} tried to use an invalid command "
f"and the fallback tag failed validation in TagNameConverter."
)
else:
- if await ctx.invoke(tags_get_command, tag_name=tag_name):
+ if await ctx.invoke(tags_get_command, tag_name_or_group, tag_name):
return
if not any(role.id in MODERATION_ROLES for role in ctx.author.roles):
diff --git a/bot/exts/info/tags.py b/bot/exts/info/tags.py
index bb91a8563..3c7b9ea0b 100644
--- a/bot/exts/info/tags.py
+++ b/bot/exts/info/tags.py
@@ -1,9 +1,13 @@
+from __future__ import annotations
+
import logging
import re
import time
from pathlib import Path
-from typing import Callable, Dict, Iterable, List, Optional
+from typing import Callable, Iterable, List, NamedTuple, Optional
+import discord
+import frontmatter
from discord import Colour, Embed, Member
from discord.ext.commands import Cog, Context, group
@@ -24,90 +28,128 @@ REGEX_NON_ALPHABET = re.compile(r"[^a-z]", re.MULTILINE & re.IGNORECASE)
FOOTER_TEXT = f"To show a tag, type {constants.Bot.prefix}tags <tagname>."
+class TagIdentifier(NamedTuple):
+ """Stores the group and name used as an identifier for a tag."""
+
+ group: Optional[str]
+ name: str
+
+ def get_fuzzy_score(self, fuzz_tag_identifier: TagIdentifier) -> float:
+ """Get fuzzy score, using `fuzz_tag_identifier` as the identifier to fuzzy match with."""
+ if self.group is None:
+ if fuzz_tag_identifier.group is None:
+ # We're only fuzzy matching the name
+ group_score = 1
+ else:
+ # Ignore tags without groups if the identifier contains a group
+ return .0
+ else:
+ if fuzz_tag_identifier.group is None:
+ # Ignore tags with groups if the identifier does not have a group
+ return .0
+ else:
+ group_score = _fuzzy_search(fuzz_tag_identifier.group, self.group)
+
+ fuzzy_score = group_score * _fuzzy_search(fuzz_tag_identifier.name, self.name) * 100
+ if fuzzy_score:
+ log.trace(f"Fuzzy score {fuzzy_score:=06.2f} for tag {self!r} with fuzz {fuzz_tag_identifier!r}")
+ return fuzzy_score
+
+ def __str__(self) -> str:
+ return f"{self.group or ''} {self.name}"
+
+
+class Tag:
+ """Provide an interface to a tag from resources with `file_content`."""
+
+ def __init__(self, file_content: str):
+ post = frontmatter.loads(file_content)
+ self.content = post.content
+ self.metadata = post.metadata
+ self._restricted_to: set[int] = set(self.metadata.get("restricted_to", ()))
+
+ @property
+ def embed(self) -> Embed:
+ """Create an embed for the tag."""
+ embed = Embed.from_dict(self.metadata.get("embed", {}))
+ embed.description = self.content
+ return embed
+
+ def accessible_by(self, member: discord.Member) -> bool:
+ """Check whether `member` can access the tag."""
+ return bool(
+ not self._restricted_to
+ or self._restricted_to & {role.id for role in member.roles}
+ )
+
+
+def _fuzzy_search(search: str, target: str) -> float:
+ """A simple scoring algorithm based on how many letters are found / total, with order in mind."""
+ current, index = 0, 0
+ _search = REGEX_NON_ALPHABET.sub("", search.lower())
+ _targets = iter(REGEX_NON_ALPHABET.split(target.lower()))
+ _target = next(_targets)
+ try:
+ while True:
+ while index < len(_target) and _search[current] == _target[index]:
+ current += 1
+ index += 1
+ index, _target = 0, next(_targets)
+ except (StopIteration, IndexError):
+ pass
+ return current / len(_search)
+
+
class Tags(Cog):
"""Save new tags and fetch existing tags."""
def __init__(self, bot: Bot):
self.bot = bot
self.tag_cooldowns = {}
- self._cache = self.get_tags()
-
- @staticmethod
- def get_tags() -> dict:
- """Get all tags."""
- cache = {}
+ self._tags: dict[TagIdentifier, Tag] = {}
+ self.initialize_tags()
+ def initialize_tags(self) -> None:
+ """Load all tags from resources into `self._tags`."""
base_path = Path("bot", "resources", "tags")
+
for file in base_path.glob("**/*"):
if file.is_file():
- tag_title = file.stem
- tag = {
- "title": tag_title,
- "embed": {
- "description": file.read_text(encoding="utf8"),
- },
- "restricted_to": None,
- "location": f"/bot/{file}"
- }
-
- # Convert to a list to allow negative indexing.
- parents = list(file.relative_to(base_path).parents)
- if len(parents) > 1:
- # -1 would be '.' hence -2 is used as the index.
- tag["restricted_to"] = parents[-2].name
-
- cache[tag_title] = tag
-
- return cache
-
- @staticmethod
- def check_accessibility(user: Member, tag: dict) -> bool:
- """Check if user can access a tag."""
- return not tag["restricted_to"] or tag["restricted_to"].lower() in [role.name.lower() for role in user.roles]
-
- @staticmethod
- def _fuzzy_search(search: str, target: str) -> float:
- """A simple scoring algorithm based on how many letters are found / total, with order in mind."""
- current, index = 0, 0
- _search = REGEX_NON_ALPHABET.sub('', search.lower())
- _targets = iter(REGEX_NON_ALPHABET.split(target.lower()))
- _target = next(_targets)
- try:
- while True:
- while index < len(_target) and _search[current] == _target[index]:
- current += 1
- index += 1
- index, _target = 0, next(_targets)
- except (StopIteration, IndexError):
- pass
- return current / len(_search) * 100
+ parent_dir = file.relative_to(base_path).parent
- def _get_suggestions(self, tag_name: str, thresholds: Optional[List[int]] = None) -> List[str]:
- """Return a list of suggested tags."""
- scores: Dict[str, int] = {
- tag_title: Tags._fuzzy_search(tag_name, tag['title'])
- for tag_title, tag in self._cache.items()
- }
+ tag_name = file.stem
+ tag_group = parent_dir.name if parent_dir.name else None
+ self._tags[TagIdentifier(tag_group, tag_name)] = Tag(file.read_text("utf-8"))
+
+ def _get_suggestions(
+ self,
+ tag_identifier: TagIdentifier,
+ thresholds: Optional[list[int]] = None
+ ) -> list[tuple[TagIdentifier, Tag]]:
+ """Return a list of suggested tags for `tag_identifier`."""
thresholds = thresholds or [100, 90, 80, 70, 60]
for threshold in thresholds:
suggestions = [
- self._cache[tag_title]
- for tag_title, matching_score in scores.items()
- if matching_score >= threshold
+ (identifier, tag)
+ for identifier, tag in self._tags.items()
+ if identifier.get_fuzzy_score(tag_identifier) >= threshold
]
if suggestions:
return suggestions
return []
- def _get_tag(self, tag_name: str) -> list:
- """Get a specific tag."""
- found = [self._cache.get(tag_name.lower(), None)]
- if not found[0]:
- return self._get_suggestions(tag_name)
- return found
+ def get_fuzzy_matches(self, tag_identifier: TagIdentifier) -> list[tuple[TagIdentifier, Tag]]:
+ """Get tags with identifiers similar to `tag_identifier`."""
+ if tag_identifier.group is None:
+ suggestions = self._get_suggestions(tag_identifier)
+ else:
+ # Try fuzzy matching with only a name first
+ suggestions = self._get_suggestions(TagIdentifier(None, tag_identifier.group))
+ suggestions += self._get_suggestions(tag_identifier)
+ return suggestions
def _get_tags_via_content(self, check: Callable[[Iterable], bool], keywords: str, user: Member) -> list:
"""
@@ -158,9 +200,14 @@ class Tags(Cog):
)
@group(name='tags', aliases=('tag', 't'), invoke_without_command=True)
- async def tags_group(self, ctx: Context, *, tag_name: TagNameConverter = None) -> None:
+ async def tags_group(
+ self,
+ ctx: Context,
+ tag_name_or_group: TagNameConverter = None,
+ tag_name: TagNameConverter = None,
+ ) -> None:
"""Show all known tags, a single tag, or run a subcommand."""
- await self.get_command(ctx, tag_name=tag_name)
+ await self.get_command(ctx, tag_name_or_group=tag_name_or_group, tag_name=tag_name)
@tags_group.group(name='search', invoke_without_command=True)
async def search_tag_content(self, ctx: Context, *, keywords: str) -> None:
@@ -182,7 +229,7 @@ class Tags(Cog):
matching_tags = self._get_tags_via_content(any, keywords or 'any', ctx.author)
await self._send_matching_tags(ctx, keywords, matching_tags)
- async def display_tag(self, ctx: Context, tag_name: str = None) -> bool:
+ async def display_tag(self, ctx: Context, tag_identifier: TagIdentifier) -> bool:
"""
If a tag is not found, display similar tag names as suggestions.
@@ -210,45 +257,50 @@ class Tags(Cog):
return True
return False
- if _command_on_cooldown(tag_name):
- time_elapsed = time.time() - self.tag_cooldowns[tag_name]["time"]
+ if _command_on_cooldown(tag_identifier.name):
+ time_elapsed = time.time() - self.tag_cooldowns[tag_identifier.name]["time"]
time_left = constants.Cooldowns.tags - time_elapsed
log.info(
- f"{ctx.author} tried to get the '{tag_name}' tag, but the tag is on cooldown. "
+ f"{ctx.author} tried to get the '{tag_identifier.name}' tag, but the tag is on cooldown. "
f"Cooldown ends in {time_left:.1f} seconds."
)
return True
- if tag_name is not None:
- temp_founds = self._get_tag(tag_name)
-
- founds = []
+ if tag_identifier.name is not None:
- for found_tag in temp_founds:
- if self.check_accessibility(ctx.author, found_tag):
- founds.append(found_tag)
-
- if len(founds) == 1:
- tag = founds[0]
+ if (tag := self._tags.get(tag_identifier)) is not None and tag.accessible_by(ctx.author):
if ctx.channel.id not in TEST_CHANNELS:
- self.tag_cooldowns[tag_name] = {
+ self.tag_cooldowns[tag_identifier.name] = {
"time": time.time(),
"channel": ctx.channel.id
}
- self.bot.stats.incr(f"tags.usages.{tag['title'].replace('-', '_')}")
+ self.bot.stats.incr(
+ f"tags.usages"
+ f"{'.' + tag_identifier.group.replace('-', '_') if tag_identifier.group else ''}"
+ f".{tag_identifier.name.replace('-', '_')}"
+ )
await wait_for_deletion(
- await ctx.send(embed=Embed.from_dict(tag['embed'])),
+ await ctx.send(embed=tag.embed),
[ctx.author.id],
)
return True
- elif founds and len(tag_name) >= 3:
+
+ elif len(tag_identifier.name) >= 3:
+ suggested_tags = self.get_fuzzy_matches(tag_identifier)[:10]
+ if not suggested_tags:
+ return False
+ suggested_tags_text = "\n".join(
+ str(identifier)
+ for identifier, tag in suggested_tags
+ if tag.accessible_by(ctx.author)
+ )
await wait_for_deletion(
await ctx.send(
embed=Embed(
- title='Did you mean ...',
- description='\n'.join(tag['title'] for tag in founds[:10])
+ title="Did you mean ...",
+ description=suggested_tags_text
)
),
[ctx.author.id],
@@ -281,16 +333,34 @@ class Tags(Cog):
return False
@tags_group.command(name='get', aliases=('show', 'g'))
- async def get_command(self, ctx: Context, *, tag_name: TagNameConverter = None) -> bool:
+ async def get_command(
+ self, ctx: Context,
+ tag_name_or_group: TagNameConverter = None,
+ tag_name: TagNameConverter = None,
+ ) -> bool:
"""
Get a specified tag, or a list of all tags if no tag is specified.
Returns True if something can be sent, or if the tag is on cooldown.
Returns False if no matches are found.
"""
- return await self.display_tag(ctx, tag_name)
+ if tag_name is None:
+ tag_name = tag_name_or_group
+ tag_group = None
+ else:
+ tag_group = tag_name_or_group
+ return await self.display_tag(ctx, TagIdentifier(tag_group, tag_name))
def setup(bot: Bot) -> None:
"""Load the Tags cog."""
bot.add_cog(Tags(bot))
+
+
+def extract_tag_identifier(string: str) -> TagIdentifier:
+ """Create a `TagIdentifier` instance from beginning of `string`."""
+ split_string = string.removeprefix(constants.Bot.prefix).split(" ", maxsplit=2)
+ if len(split_string) == 1:
+ return TagIdentifier(None, split_string[0])
+ else:
+ return TagIdentifier(split_string[0], split_string[1])