aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bot/exts/moderation/watchchannels/_watchchannel.py78
-rw-r--r--bot/exts/recruitment/__init__.py0
-rw-r--r--bot/exts/recruitment/talentpool/__init__.py8
-rw-r--r--bot/exts/recruitment/talentpool/_cog.py (renamed from bot/exts/moderation/watchchannels/talentpool.py)83
-rw-r--r--bot/exts/recruitment/talentpool/_review.py321
-rw-r--r--bot/utils/time.py8
6 files changed, 467 insertions, 31 deletions
diff --git a/bot/exts/moderation/watchchannels/_watchchannel.py b/bot/exts/moderation/watchchannels/_watchchannel.py
index 0793a66af..9f26c34f2 100644
--- a/bot/exts/moderation/watchchannels/_watchchannel.py
+++ b/bot/exts/moderation/watchchannels/_watchchannel.py
@@ -5,9 +5,8 @@ import textwrap
from abc import abstractmethod
from collections import defaultdict, deque
from dataclasses import dataclass
-from typing import Optional
+from typing import Any, Dict, Optional
-import dateutil.parser
import discord
from discord import Color, DMChannel, Embed, HTTPException, Message, errors
from discord.ext.commands import Cog, Context
@@ -20,7 +19,7 @@ from bot.exts.filters.webhook_remover import WEBHOOK_URL_RE
from bot.exts.moderation.modlog import ModLog
from bot.pagination import LinePaginator
from bot.utils import CogABCMeta, messages
-from bot.utils.time import time_since
+from bot.utils.time import get_time_delta
log = logging.getLogger(__name__)
@@ -136,7 +135,10 @@ class WatchChannel(metaclass=CogABCMeta):
if not await self.fetch_user_cache():
await self.modlog.send_log_message(
title=f"Warning: Failed to retrieve user cache for the {self.__class__.__name__} watch channel",
- text="Could not retrieve the list of watched users from the API and messages will not be relayed.",
+ text=(
+ "Could not retrieve the list of watched users from the API. "
+ "Messages will not be relayed, and reviews not rescheduled."
+ ),
ping_everyone=True,
icon_url=Icons.token_removed,
colour=Color.red()
@@ -280,7 +282,7 @@ class WatchChannel(metaclass=CogABCMeta):
actor = actor.display_name if actor else self.watched_users[user_id]['actor']
inserted_at = self.watched_users[user_id]['inserted_at']
- time_delta = self._get_time_delta(inserted_at)
+ time_delta = get_time_delta(inserted_at)
reason = self.watched_users[user_id]['reason']
@@ -308,35 +310,61 @@ class WatchChannel(metaclass=CogABCMeta):
The optional kwarg `update_cache` specifies whether the cache should
be refreshed by polling the API.
"""
- if update_cache:
- if not await self.fetch_user_cache():
- await ctx.send(f":x: Failed to update {self.__class__.__name__} user cache, serving from cache")
- update_cache = False
+ watched_data = await self.prepare_watched_users_data(ctx, oldest_first, update_cache)
- lines = []
- for user_id, user_data in self.watched_users.items():
- inserted_at = user_data['inserted_at']
- time_delta = self._get_time_delta(inserted_at)
- lines.append(f"• <@{user_id}> (added {time_delta})")
-
- if oldest_first:
- lines.reverse()
+ if update_cache and not watched_data["updated"]:
+ await ctx.send(f":x: Failed to update {self.__class__.__name__} user cache, serving from cache")
- lines = lines or ("There's nothing here yet.",)
+ lines = watched_data["info"].values() or ("There's nothing here yet.",)
embed = Embed(
- title=f"{self.__class__.__name__} watched users ({'updated' if update_cache else 'cached'})",
+ title=watched_data["title"],
color=Color.blue()
)
await LinePaginator.paginate(lines, ctx, embed, empty=False)
- @staticmethod
- def _get_time_delta(time_string: str) -> str:
- """Returns the time in human-readable time delta format."""
- date_time = dateutil.parser.isoparse(time_string).replace(tzinfo=None)
- time_delta = time_since(date_time, precision="minutes", max_units=1)
+ async def prepare_watched_users_data(
+ self, ctx: Context, oldest_first: bool = False, update_cache: bool = True
+ ) -> Dict[str, Any]:
+ """
+ Prepare overview information of watched users to list.
+
+ The optional kwarg `oldest_first` orders the list by oldest entry.
+
+ The optional kwarg `update_cache` specifies whether the cache should
+ be refreshed by polling the API.
+
+ Returns a dictionary with a "title" key for the list's title, and a "info" key with
+ information about each user.
+
+ The dictionary additionally has an "updated" field which is true if a cache update was
+ requested and it succeeded.
+ """
+ list_data = {}
+ if update_cache:
+ if not await self.fetch_user_cache():
+ update_cache = False
+ list_data["updated"] = update_cache
+
+ watched_iter = self.watched_users.items()
+ if oldest_first:
+ watched_iter = reversed(watched_iter)
+
+ list_data["info"] = {}
+ for user_id, user_data in watched_iter:
+ member = ctx.guild.get_member(user_id)
+ line = f"• `{user_id}`"
+ if member:
+ line += f" ({member.name}#{member.discriminator})"
+ inserted_at = user_data['inserted_at']
+ line += f", added {get_time_delta(inserted_at)}"
+ if not member: # Cross off users who left the server.
+ line = f"~~{line}~~"
+ list_data["info"][user_id] = line
+
+ list_data["title"] = f"{self.__class__.__name__} watched users ({'updated' if update_cache else 'cached'})"
- return time_delta
+ return list_data
def _remove_user(self, user_id: int) -> None:
"""Removes a user from a watch channel."""
diff --git a/bot/exts/recruitment/__init__.py b/bot/exts/recruitment/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/bot/exts/recruitment/__init__.py
diff --git a/bot/exts/recruitment/talentpool/__init__.py b/bot/exts/recruitment/talentpool/__init__.py
new file mode 100644
index 000000000..52d27eb99
--- /dev/null
+++ b/bot/exts/recruitment/talentpool/__init__.py
@@ -0,0 +1,8 @@
+from bot.bot import Bot
+
+
+def setup(bot: Bot) -> None:
+ """Load the TalentPool cog."""
+ from bot.exts.recruitment.talentpool._cog import TalentPool
+
+ bot.add_cog(TalentPool(bot))
diff --git a/bot/exts/moderation/watchchannels/talentpool.py b/bot/exts/recruitment/talentpool/_cog.py
index d75688fa6..b809cea17 100644
--- a/bot/exts/moderation/watchchannels/talentpool.py
+++ b/bot/exts/recruitment/talentpool/_cog.py
@@ -11,6 +11,7 @@ from bot.bot import Bot
from bot.constants import Channels, Guild, MODERATION_ROLES, STAFF_ROLES, Webhooks
from bot.converters import FetchedMember
from bot.exts.moderation.watchchannels._watchchannel import WatchChannel
+from bot.exts.recruitment.talentpool._review import Reviewer
from bot.pagination import LinePaginator
from bot.utils import time
@@ -33,6 +34,9 @@ class TalentPool(WatchChannel, Cog, name="Talentpool"):
disable_header=True,
)
+ self.reviewer = Reviewer(self.__class__.__name__, bot, self)
+ self.bot.loop.create_task(self.reviewer.reschedule_reviews())
+
@group(name='talentpool', aliases=('tp', 'talent', 'nomination', 'n'), invoke_without_command=True)
@has_any_role(*MODERATION_ROLES)
async def nomination_group(self, ctx: Context) -> None:
@@ -42,7 +46,10 @@ class TalentPool(WatchChannel, Cog, name="Talentpool"):
@nomination_group.command(name='watched', aliases=('all', 'list'), root_aliases=("nominees",))
@has_any_role(*MODERATION_ROLES)
async def watched_command(
- self, ctx: Context, oldest_first: bool = False, update_cache: bool = True
+ self,
+ ctx: Context,
+ oldest_first: bool = False,
+ update_cache: bool = True
) -> None:
"""
Shows the users that are currently being monitored in the talent pool.
@@ -54,6 +61,47 @@ class TalentPool(WatchChannel, Cog, name="Talentpool"):
"""
await self.list_watched_users(ctx, oldest_first=oldest_first, update_cache=update_cache)
+ async def list_watched_users(
+ self,
+ ctx: Context,
+ oldest_first: bool = False,
+ update_cache: bool = True
+ ) -> None:
+ """
+ Gives an overview of the nominated users list.
+
+ It specifies the users' mention, name, how long ago they were nominated, and whether their
+ review was scheduled or already posted.
+
+ The optional kwarg `oldest_first` orders the list by oldest entry.
+
+ The optional kwarg `update_cache` specifies whether the cache should
+ be refreshed by polling the API.
+ """
+ # TODO Once the watch channel is removed, this can be done in a smarter way, without splitting and overriding
+ # the list_watched_users function.
+ watched_data = await self.prepare_watched_users_data(ctx, oldest_first, update_cache)
+
+ if update_cache and not watched_data["updated"]:
+ await ctx.send(f":x: Failed to update {self.__class__.__name__} user cache, serving from cache")
+
+ lines = []
+ for user_id, line in watched_data["info"].items():
+ if self.watched_users[user_id]['reviewed']:
+ line += " *(reviewed)*"
+ elif user_id in self.reviewer:
+ line += " *(scheduled)*"
+ lines.append(line)
+
+ if not lines:
+ lines = ("There's nothing here yet.",)
+
+ embed = Embed(
+ title=watched_data["title"],
+ color=Color.blue()
+ )
+ await LinePaginator.paginate(lines, ctx, embed, empty=False)
+
@nomination_group.command(name='oldest')
@has_any_role(*MODERATION_ROLES)
async def oldest_command(self, ctx: Context, update_cache: bool = True) -> None:
@@ -115,7 +163,9 @@ class TalentPool(WatchChannel, Cog, name="Talentpool"):
resp.raise_for_status()
self.watched_users[user.id] = response_data
- msg = f":white_check_mark: The nomination for {user} has been added to the talent pool"
+
+ if user.id not in self.reviewer:
+ self.reviewer.schedule_review(user.id)
history = await self.bot.api_client.get(
self.api_endpoint,
@@ -126,6 +176,7 @@ class TalentPool(WatchChannel, Cog, name="Talentpool"):
}
)
+ msg = f"✅ The nomination for {user} has been added to the talent pool"
if history:
msg += f"\n\n({len(history)} previous nominations in total)"
@@ -249,6 +300,24 @@ class TalentPool(WatchChannel, Cog, name="Talentpool"):
await self.fetch_user_cache() # Update cache.
await ctx.send(":white_check_mark: Updated the end reason of the nomination!")
+ @nomination_group.command(aliases=('mr',))
+ @has_any_role(*MODERATION_ROLES)
+ async def mark_reviewed(self, ctx: Context, user_id: int) -> None:
+ """Mark a user's nomination as reviewed and cancel the review task."""
+ if not await self.reviewer.mark_reviewed(ctx, user_id):
+ return
+ await ctx.send(f"✅ The user with ID `{user_id}` was marked as reviewed.")
+
+ @nomination_group.command(aliases=('review',))
+ @has_any_role(*MODERATION_ROLES)
+ async def post_review(self, ctx: Context, user_id: int) -> None:
+ """Post the automatic review for the user ahead of time."""
+ if not await self.reviewer.mark_reviewed(ctx, user_id):
+ return
+
+ await self.reviewer.post_review(user_id, update_database=False)
+ await ctx.message.add_reaction("✅")
+
@Cog.listener()
async def on_member_ban(self, guild: Guild, user: Union[User, Member]) -> None:
"""Remove `user` from the talent pool after they are banned."""
@@ -277,6 +346,8 @@ class TalentPool(WatchChannel, Cog, name="Talentpool"):
)
self._remove_user(user_id)
+ self.reviewer.cancel(user_id)
+
return True
def _nomination_to_string(self, nomination_object: dict) -> str:
@@ -329,7 +400,7 @@ class TalentPool(WatchChannel, Cog, name="Talentpool"):
return lines.strip()
-
-def setup(bot: Bot) -> None:
- """Load the TalentPool cog."""
- bot.add_cog(TalentPool(bot))
+ def cog_unload(self) -> None:
+ """Cancels all review tasks on cog unload."""
+ super().cog_unload()
+ self.reviewer.cancel_all()
diff --git a/bot/exts/recruitment/talentpool/_review.py b/bot/exts/recruitment/talentpool/_review.py
new file mode 100644
index 000000000..c2c1312d9
--- /dev/null
+++ b/bot/exts/recruitment/talentpool/_review.py
@@ -0,0 +1,321 @@
+import asyncio
+import logging
+import random
+import textwrap
+import typing
+from collections import Counter
+from datetime import datetime, timedelta
+from typing import List, Optional, Union
+
+from dateutil.parser import isoparse
+from dateutil.relativedelta import relativedelta
+from discord import Emoji, Member, Message, TextChannel
+from discord.ext.commands import Context
+
+from bot.api import ResponseCodeError
+from bot.bot import Bot
+from bot.constants import Channels, Guild, Roles
+from bot.utils.scheduling import Scheduler
+from bot.utils.time import get_time_delta, humanize_delta, time_since
+
+if typing.TYPE_CHECKING:
+ from bot.exts.recruitment.talentpool._cog import TalentPool
+
+log = logging.getLogger(__name__)
+
+# Maximum amount of days before an automatic review is posted.
+MAX_DAYS_IN_POOL = 30
+
+# Maximum amount of characters allowed in a message
+MAX_MESSAGE_SIZE = 2000
+
+
+class Reviewer:
+ """Schedules, formats, and publishes reviews of helper nominees."""
+
+ def __init__(self, name: str, bot: Bot, pool: 'TalentPool'):
+ self.bot = bot
+ self._pool = pool
+ self._review_scheduler = Scheduler(name)
+
+ def __contains__(self, user_id: int) -> bool:
+ """Return True if the user with ID user_id is scheduled for review, False otherwise."""
+ return user_id in self._review_scheduler
+
+ async def reschedule_reviews(self) -> None:
+ """Reschedule all active nominations to be reviewed at the appropriate time."""
+ log.trace("Rescheduling reviews")
+ await self.bot.wait_until_guild_available()
+ # TODO Once the watch channel is removed, this can be done in a smarter way, e.g create a sync function.
+ await self._pool.fetch_user_cache()
+
+ for user_id, user_data in self._pool.watched_users.items():
+ if not user_data["reviewed"]:
+ self.schedule_review(user_id)
+
+ def schedule_review(self, user_id: int) -> None:
+ """Schedules a single user for review."""
+ log.trace(f"Scheduling review of user with ID {user_id}")
+
+ user_data = self._pool.watched_users[user_id]
+ inserted_at = isoparse(user_data['inserted_at']).replace(tzinfo=None)
+ review_at = inserted_at + timedelta(days=MAX_DAYS_IN_POOL)
+
+ # If it's over a day overdue, it's probably an old nomination and shouldn't be automatically reviewed.
+ if datetime.utcnow() - review_at < timedelta(days=1):
+ self._review_scheduler.schedule_at(review_at, user_id, self.post_review(user_id, update_database=True))
+
+ async def post_review(self, user_id: int, update_database: bool) -> None:
+ """Format a generic review of a user and post it to the mod announcements channel."""
+ log.trace(f"Posting the review of {user_id}")
+
+ nomination = self._pool.watched_users[user_id]
+ if not nomination:
+ log.trace(f"There doesn't appear to be an active nomination for {user_id}")
+ return
+
+ guild = self.bot.get_guild(Guild.id)
+ channel = guild.get_channel(Channels.mod_announcements)
+ member = guild.get_member(user_id)
+
+ if update_database:
+ await self.bot.api_client.patch(f"{self._pool.api_endpoint}/{nomination['id']}", json={"reviewed": True})
+
+ if not member:
+ await channel.send(
+ f"I tried to review the user with ID `{user_id}`, but they don't appear to be on the server 😔"
+ )
+ return
+
+ opening = f"<@&{Roles.moderators}> <@&{Roles.admins}>\n{member.mention} ({member}) for Helper!"
+
+ current_nominations = "\n\n".join(
+ f"**<@{entry['actor']}>:** {entry['reason'] or '*no reason given*'}" for entry in nomination['entries']
+ )
+ current_nominations = f"**Nominated by:**\n{current_nominations}"
+
+ review_body = await self._construct_review_body(member)
+
+ seen_emoji = self._random_ducky(guild)
+ vote_request = (
+ "*Refer to their nomination and infraction histories for further details*.\n"
+ f"*Please react {seen_emoji} if you've seen this post."
+ " Then react 👍 for approval, or 👎 for disapproval*."
+ )
+
+ review = "\n\n".join(part for part in (opening, current_nominations, review_body, vote_request))
+
+ message = (await self._bulk_send(channel, review))[-1]
+ for reaction in (seen_emoji, "👍", "👎"):
+ await message.add_reaction(reaction)
+
+ async def _construct_review_body(self, member: Member) -> str:
+ """Formats the body of the nomination, with details of activity, infractions, and previous nominations."""
+ activity = await self._activity_review(member)
+ infractions = await self._infractions_review(member)
+ prev_nominations = await self._previous_nominations_review(member)
+
+ body = f"{activity}\n\n{infractions}"
+ if prev_nominations:
+ body += f"\n\n{prev_nominations}"
+ return body
+
+ async def _activity_review(self, member: Member) -> str:
+ """
+ Format the activity of the nominee.
+
+ Adds details on how long they've been on the server, their total message count,
+ and the channels they're the most active in.
+ """
+ log.trace(f"Fetching the metricity data for {member.id}'s review")
+ try:
+ user_activity = await self.bot.api_client.get(f"bot/users/{member.id}/metricity_review_data")
+ except ResponseCodeError as e:
+ if e.status == 404:
+ log.trace(f"The user {member.id} seems to have no activity logged in Metricity.")
+ messages = "no"
+ channels = ""
+ else:
+ log.trace(f"An unexpected error occured while fetching information of user {member.id}.")
+ raise
+ else:
+ log.trace(f"Activity found for {member.id}, formatting review.")
+ messages = user_activity["total_messages"]
+ # Making this part flexible to the amount of expected and returned channels.
+ first_channel = user_activity["top_channel_activity"][0]
+ channels = f", with {first_channel[1]} messages in {first_channel[0]}"
+
+ if len(user_activity["top_channel_activity"]) > 1:
+ channels += ", " + ", ".join(
+ f"{count} in {channel}" for channel, count in user_activity["top_channel_activity"][1: -1]
+ )
+ last_channel = user_activity["top_channel_activity"][-1]
+ channels += f", and {last_channel[1]} in {last_channel[0]}"
+
+ time_on_server = humanize_delta(relativedelta(datetime.utcnow(), member.joined_at), max_units=2)
+ review = (
+ f"{member.name} has been on the server for **{time_on_server}**"
+ f" and has **{messages} messages**{channels}."
+ )
+
+ return review
+
+ async def _infractions_review(self, member: Member) -> str:
+ """
+ Formats the review of the nominee's infractions, if any.
+
+ The infractions are listed by type and amount, and it is stated how long ago the last one was issued.
+ """
+ log.trace(f"Fetching the infraction data for {member.id}'s review")
+ infraction_list = await self.bot.api_client.get(
+ 'bot/infractions/expanded',
+ params={'user__id': str(member.id), 'ordering': '-inserted_at'}
+ )
+
+ log.trace(f"{len(infraction_list)} infractions found for {member.id}, formatting review.")
+ if not infraction_list:
+ return "They have no infractions."
+
+ # Count the amount of each type of infraction.
+ infr_stats = list(Counter(infr["type"] for infr in infraction_list).items())
+
+ # Format into a sentence.
+ infractions = ", ".join(
+ f"{count} {self._format_infr_name(infr_type, count)}"
+ for infr_type, count in infr_stats[:-1]
+ )
+ if len(infr_stats) > 1:
+ last_infr, last_count = infr_stats[-1]
+ infractions += f", and {last_count} {self._format_infr_name(last_infr, last_count)}"
+
+ infractions = f"**{infractions}**"
+
+ # Show when the last one was issued.
+ if len(infraction_list) == 1:
+ infractions += ", issued "
+ else:
+ infractions += ", with the last infraction issued "
+
+ # Infractions were ordered by time since insertion descending.
+ infractions += get_time_delta(infraction_list[0]['inserted_at'])
+
+ return f"They have {infractions}."
+
+ @staticmethod
+ def _format_infr_name(infr_type: str, count: int) -> str:
+ """
+ Format the infraction type in a way readable in a sentence.
+
+ Underscores are replaced with spaces, as well as *attempting* to show the appropriate plural form if necessary.
+ This function by no means covers all rules of grammar.
+ """
+ formatted = infr_type.replace("_", " ")
+ if count > 1:
+ if infr_type.endswith(('ch', 'sh')):
+ formatted += "e"
+ formatted += "s"
+
+ return formatted
+
+ async def _previous_nominations_review(self, member: Member) -> Optional[str]:
+ """
+ Formats the review of the nominee's previous nominations.
+
+ The number of previous nominations and unnominations are shown, as well as the reason the last one ended.
+ """
+ log.trace(f"Fetching the nomination history data for {member.id}'s review")
+ history = await self.bot.api_client.get(
+ self._pool.api_endpoint,
+ params={
+ "user__id": str(member.id),
+ "active": "false",
+ "ordering": "-inserted_at"
+ }
+ )
+
+ log.trace(f"{len(history)} previous nominations found for {member.id}, formatting review.")
+ if not history:
+ return
+
+ num_entries = sum(len(nomination["entries"]) for nomination in history)
+
+ nomination_times = f"{num_entries} times" if num_entries > 1 else "once"
+ rejection_times = f"{len(history)} times" if len(history) > 1 else "once"
+ end_time = time_since(isoparse(history[0]['ended_at']).replace(tzinfo=None), max_units=2)
+
+ review = (
+ f"They were nominated **{nomination_times}** before"
+ f", but their nomination was called off **{rejection_times}**."
+ f"\nThe last one ended {end_time} with the reason: {history[0]['end_reason']}"
+ )
+
+ return review
+
+ @staticmethod
+ def _random_ducky(guild: Guild) -> Union[Emoji, str]:
+ """Picks a random ducky emoji to be used to mark the vote as seen. If no duckies found returns 👀."""
+ duckies = [emoji for emoji in guild.emojis if emoji.name.startswith("ducky")]
+ if not duckies:
+ return "👀"
+ return random.choice(duckies)
+
+ @staticmethod
+ async def _bulk_send(channel: TextChannel, text: str) -> List[Message]:
+ """
+ Split a text into several if necessary, and post them to the channel.
+
+ Returns the resulting message objects.
+ """
+ messages = textwrap.wrap(text, width=MAX_MESSAGE_SIZE, replace_whitespace=False)
+ log.trace(f"The provided string will be sent to the channel {channel.id} as {len(messages)} messages.")
+
+ results = []
+ for message in messages:
+ await asyncio.sleep(1)
+ results.append(await channel.send(message))
+
+ return results
+
+ async def mark_reviewed(self, ctx: Context, user_id: int) -> bool:
+ """
+ Mark an active nomination as reviewed, updating the database and canceling the review task.
+
+ Returns True if the user was successfully marked as reviewed, False otherwise.
+ """
+ log.trace(f"Updating user {user_id} as reviewed")
+ await self._pool.fetch_user_cache()
+ if user_id not in self._pool.watched_users:
+ log.trace(f"Can't find a nominated user with id {user_id}")
+ await ctx.send(f"❌ Can't find a currently nominated user with id `{user_id}`")
+ return False
+
+ nomination = self._pool.watched_users[user_id]
+ if nomination["reviewed"]:
+ await ctx.send("❌ This nomination was already reviewed, but here's a cookie 🍪")
+ return False
+
+ await self.bot.api_client.patch(f"{self._pool.api_endpoint}/{nomination['id']}", json={"reviewed": True})
+ if user_id in self._review_scheduler:
+ self._review_scheduler.cancel(user_id)
+
+ return True
+
+ def cancel(self, user_id: int) -> None:
+ """
+ Cancels the review of the nominee with ID `user_id`.
+
+ It's important to note that this applies only until reschedule_reviews is called again.
+ To permanently cancel someone's review, either remove them from the pool, or use mark_reviewed.
+ """
+ log.trace(f"Canceling the review of user {user_id}.")
+ self._review_scheduler.cancel(user_id)
+
+ def cancel_all(self) -> None:
+ """
+ Cancels all reviews.
+
+ It's important to note that this applies only until reschedule_reviews is called again.
+ To permanently cancel someone's review, either remove them from the pool, or use mark_reviewed.
+ """
+ log.trace("Canceling all reviews.")
+ self._review_scheduler.cancel_all()
diff --git a/bot/utils/time.py b/bot/utils/time.py
index f862e40f7..466f0adc2 100644
--- a/bot/utils/time.py
+++ b/bot/utils/time.py
@@ -85,6 +85,14 @@ def humanize_delta(delta: relativedelta, precision: str = "seconds", max_units:
return humanized
+def get_time_delta(time_string: str) -> str:
+ """Returns the time in human-readable time delta format."""
+ date_time = dateutil.parser.isoparse(time_string).replace(tzinfo=None)
+ time_delta = time_since(date_time, precision="minutes", max_units=1)
+
+ return time_delta
+
+
def parse_duration_string(duration: str) -> Optional[relativedelta]:
"""
Converts a `duration` string to a relativedelta object.