aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bot/__main__.py1
-rw-r--r--bot/cogs/infractions.py262
-rw-r--r--bot/cogs/moderation.py268
-rw-r--r--bot/utils/moderation.py29
4 files changed, 309 insertions, 251 deletions
diff --git a/bot/__main__.py b/bot/__main__.py
index f25693734..019550a89 100644
--- a/bot/__main__.py
+++ b/bot/__main__.py
@@ -57,6 +57,7 @@ bot.load_extension("bot.cogs.defcon")
bot.load_extension("bot.cogs.eval")
bot.load_extension("bot.cogs.free")
bot.load_extension("bot.cogs.information")
+bot.load_extension("bot.cogs.infractions")
bot.load_extension("bot.cogs.jams")
bot.load_extension("bot.cogs.moderation")
bot.load_extension("bot.cogs.off_topic_names")
diff --git a/bot/cogs/infractions.py b/bot/cogs/infractions.py
new file mode 100644
index 000000000..17e5ab094
--- /dev/null
+++ b/bot/cogs/infractions.py
@@ -0,0 +1,262 @@
+import asyncio
+import logging
+import textwrap
+import typing as t
+
+import discord
+from discord.ext import commands
+from discord.ext.commands import Context
+
+from bot import constants
+from bot.cogs.moderation import Moderation
+from bot.cogs.modlog import ModLog
+from bot.converters import Duration, InfractionSearchQuery
+from bot.pagination import LinePaginator
+from bot.utils import time
+from bot.utils.checks import with_role_check
+from bot.utils.moderation import Infraction, proxy_user
+
+log = logging.getLogger(__name__)
+
+UserConverter = t.Union[discord.User, proxy_user]
+
+
+def permanent_duration(expires_at: str) -> str:
+ """Only allow an expiration to be 'permanent' if it is a string."""
+ expires_at = expires_at.lower()
+ if expires_at != "permanent":
+ raise commands.BadArgument
+ else:
+ return expires_at
+
+
+class Infractions(commands.Cog):
+ """Management of infractions."""
+
+ def __init__(self, bot: commands.Bot):
+ self.bot = bot
+
+ @property
+ def mod_log(self) -> ModLog:
+ """Get currently loaded ModLog cog instance."""
+ return self.bot.get_cog("ModLog")
+
+ @property
+ def mod_cog(self) -> Moderation:
+ """Get currently loaded Moderation cog instance."""
+ return self.bot.get_cog("Moderation")
+
+ # region: Edit infraction commands
+
+ @commands.group(name='infraction', aliases=('infr', 'infractions', 'inf'), invoke_without_command=True)
+ async def infraction_group(self, ctx: Context) -> None:
+ """Infraction manipulation commands."""
+ await ctx.invoke(self.bot.get_command("help"), "infraction")
+
+ @infraction_group.command(name='edit')
+ async def infraction_edit(
+ self,
+ ctx: Context,
+ infraction_id: int,
+ expires_at: t.Union[Duration, permanent_duration, None],
+ *,
+ reason: str = None
+ ) -> None:
+ """
+ Edit the duration and/or the reason of an infraction.
+
+ Durations are relative to the time of updating.
+ Use "permanent" to mark the infraction as permanent.
+ """
+ if expires_at is None and reason is None:
+ # Unlike UserInputError, the error handler will show a specified message for BadArgument
+ raise commands.BadArgument("Neither a new expiry nor a new reason was specified.")
+
+ # Retrieve the previous infraction for its information.
+ old_infraction = await self.bot.api_client.get(f'bot/infractions/{infraction_id}')
+
+ request_data = {}
+ confirm_messages = []
+ log_text = ""
+
+ if expires_at == "permanent":
+ request_data['expires_at'] = None
+ confirm_messages.append("marked as permanent")
+ elif expires_at is not None:
+ request_data['expires_at'] = expires_at.isoformat()
+ expiry = expires_at.strftime(time.INFRACTION_FORMAT)
+ confirm_messages.append(f"set to expire on {expiry}")
+ else:
+ confirm_messages.append("expiry unchanged")
+
+ if reason:
+ request_data['reason'] = reason
+ confirm_messages.append("set a new reason")
+ log_text += f"""
+ Previous reason: {old_infraction['reason']}
+ New reason: {reason}
+ """.rstrip()
+ else:
+ confirm_messages.append("reason unchanged")
+
+ # Update the infraction
+ new_infraction = await self.bot.api_client.patch(
+ f'bot/infractions/{infraction_id}',
+ json=request_data,
+ )
+
+ # Re-schedule infraction if the expiration has been updated
+ if 'expires_at' in request_data:
+ self.mod_cog.cancel_task(new_infraction['id'])
+ loop = asyncio.get_event_loop()
+ self.mod_cog.schedule_task(loop, new_infraction['id'], new_infraction)
+
+ log_text += f"""
+ Previous expiry: {old_infraction['expires_at'] or "Permanent"}
+ New expiry: {new_infraction['expires_at'] or "Permanent"}
+ """.rstrip()
+
+ await ctx.send(f":ok_hand: Updated infraction: {' & '.join(confirm_messages)}")
+
+ # Get information about the infraction's user
+ user_id = new_infraction['user']
+ user = ctx.guild.get_member(user_id)
+
+ if user:
+ user_text = f"{user.mention} (`{user.id}`)"
+ thumbnail = user.avatar_url_as(static_format="png")
+ else:
+ user_text = f"`{user_id}`"
+ thumbnail = None
+
+ # The infraction's actor
+ actor_id = new_infraction['actor']
+ actor = ctx.guild.get_member(actor_id) or f"`{actor_id}`"
+
+ await self.mod_log.send_log_message(
+ icon_url=constants.Icons.pencil,
+ colour=discord.Colour.blurple(),
+ title="Infraction edited",
+ thumbnail=thumbnail,
+ text=textwrap.dedent(f"""
+ Member: {user_text}
+ Actor: {actor}
+ Edited by: {ctx.message.author}{log_text}
+ """)
+ )
+
+ # endregion
+ # region: Search infractions
+
+ @infraction_group.group(name="search", invoke_without_command=True)
+ async def infraction_search_group(self, ctx: Context, query: InfractionSearchQuery) -> None:
+ """Searches for infractions in the database."""
+ if isinstance(query, discord.User):
+ await ctx.invoke(self.search_user, query)
+ else:
+ await ctx.invoke(self.search_reason, query)
+
+ @infraction_search_group.command(name="user", aliases=("member", "id"))
+ async def search_user(self, ctx: Context, user: UserConverter) -> None:
+ """Search for infractions by member."""
+ infraction_list = await self.bot.api_client.get(
+ 'bot/infractions',
+ params={'user__id': str(user.id)}
+ )
+ embed = discord.Embed(
+ title=f"Infractions for {user} ({len(infraction_list)} total)",
+ colour=discord.Colour.orange()
+ )
+ await self.send_infraction_list(ctx, embed, infraction_list)
+
+ @infraction_search_group.command(name="reason", aliases=("match", "regex", "re"))
+ async def search_reason(self, ctx: Context, reason: str) -> None:
+ """Search for infractions by their reason. Use Re2 for matching."""
+ infraction_list = await self.bot.api_client.get(
+ 'bot/infractions',
+ params={'search': reason}
+ )
+ embed = discord.Embed(
+ title=f"Infractions matching `{reason}` ({len(infraction_list)} total)",
+ colour=discord.Colour.orange()
+ )
+ await self.send_infraction_list(ctx, embed, infraction_list)
+
+ # endregion
+ # region: Utility functions
+
+ async def send_infraction_list(
+ self,
+ ctx: Context,
+ embed: discord.Embed,
+ infractions: t.Iterable[Infraction]
+ ) -> None:
+ """Send a paginated embed of infractions for the specified user."""
+ if not infractions:
+ await ctx.send(f":warning: No infractions could be found for that query.")
+ return
+
+ lines = tuple(
+ self.infraction_to_string(infraction)
+ for infraction in infractions
+ )
+
+ await LinePaginator.paginate(
+ lines,
+ ctx=ctx,
+ embed=embed,
+ empty=True,
+ max_lines=3,
+ max_size=1000
+ )
+
+ def infraction_to_string(self, infraction_object: Infraction) -> str:
+ """Convert the infraction object to a string representation."""
+ actor_id = infraction_object["actor"]
+ guild = self.bot.get_guild(constants.Guild.id)
+ actor = guild.get_member(actor_id)
+ active = infraction_object["active"]
+ user_id = infraction_object["user"]
+ hidden = infraction_object["hidden"]
+ created = time.format_infraction(infraction_object["inserted_at"])
+ if infraction_object["expires_at"] is None:
+ expires = "*Permanent*"
+ else:
+ expires = time.format_infraction(infraction_object["expires_at"])
+
+ lines = textwrap.dedent(f"""
+ {"**===============**" if active else "==============="}
+ Status: {"__**Active**__" if active else "Inactive"}
+ User: {self.bot.get_user(user_id)} (`{user_id}`)
+ Type: **{infraction_object["type"]}**
+ Shadow: {hidden}
+ Reason: {infraction_object["reason"] or "*None*"}
+ Created: {created}
+ Expires: {expires}
+ Actor: {actor.mention if actor else actor_id}
+ ID: `{infraction_object["id"]}`
+ {"**===============**" if active else "==============="}
+ """)
+
+ return lines.strip()
+
+ # endregion
+
+ # This cannot be static (must have a __func__ attribute).
+ def cog_check(self, ctx: Context) -> bool:
+ """Only allow moderators to invoke the commands in this cog."""
+ return with_role_check(ctx, *constants.MODERATION_ROLES)
+
+ # This cannot be static (must have a __func__ attribute).
+ async def cog_command_error(self, ctx: Context, error: Exception) -> None:
+ """Send a notification to the invoking context on a Union failure."""
+ if isinstance(error, commands.BadUnionArgument):
+ if discord.User in error.converters:
+ await ctx.send(str(error.errors[0]))
+ error.handled = True
+
+
+def setup(bot: commands.Bot) -> None:
+ """Load the Infractions cog."""
+ bot.add_cog(Infractions(bot))
+ log.info("Cog loaded: Infractions")
diff --git a/bot/cogs/moderation.py b/bot/cogs/moderation.py
index a6e48fa59..35bc24195 100644
--- a/bot/cogs/moderation.py
+++ b/bot/cogs/moderation.py
@@ -1,25 +1,23 @@
-import asyncio
import logging
import textwrap
from datetime import datetime
-from typing import Awaitable, Dict, Iterable, Optional, Union
+from typing import Awaitable, Optional, Union
from discord import (
Colour, Embed, Forbidden, Guild, HTTPException, Member, NotFound, Object, User
)
-from discord.ext.commands import (
- BadArgument, BadUnionArgument, Bot, Cog, Context, command, group
-)
+from discord.ext.commands import BadUnionArgument, Bot, Cog, Context, command
from bot import constants
from bot.cogs.modlog import ModLog
from bot.constants import Colours, Event, Icons, MODERATION_ROLES
-from bot.converters import Duration, InfractionSearchQuery
+from bot.converters import Duration
from bot.decorators import respect_role_hierarchy, with_role
-from bot.pagination import LinePaginator
-from bot.utils.moderation import already_has_active_infraction, post_infraction
+from bot.utils.moderation import (
+ Infraction, MemberObject, already_has_active_infraction, post_infraction, proxy_user
+)
from bot.utils.scheduling import Scheduler
-from bot.utils.time import INFRACTION_FORMAT, format_infraction, wait_until
+from bot.utils.time import format_infraction, wait_until
log = logging.getLogger(__name__)
@@ -34,30 +32,7 @@ RULES_URL = "https://pythondiscord.com/pages/rules"
APPEALABLE_INFRACTIONS = ("ban", "mute")
-def proxy_user(user_id: str) -> Object:
- """Create a proxy user for the provided user_id for situations where a Member or User object cannot be resolved."""
- try:
- user_id = int(user_id)
- except ValueError:
- raise BadArgument
- user = Object(user_id)
- user.mention = user.id
- user.avatar_url_as = lambda static_format: None
- return user
-
-
-def permanent_duration(expires_at: str) -> str:
- """Only allow an expiration to be 'permanent' if it is a string."""
- expires_at = expires_at.lower()
- if expires_at != "permanent":
- raise BadArgument
- else:
- return expires_at
-
-
-UserConverter = Union[Member, User, proxy_user]
-UserObject = Union[Member, User, Object]
-Infraction = Dict[str, Union[str, int, bool]]
+MemberConverter = Union[Member, User, proxy_user]
class Moderation(Scheduler, Cog):
@@ -88,7 +63,7 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@command()
- async def warn(self, ctx: Context, user: UserConverter, *, reason: str = None) -> None:
+ async def warn(self, ctx: Context, user: MemberConverter, *, reason: str = None) -> None:
"""Create a warning infraction in the database for a user."""
infraction = await post_infraction(ctx, user, type="warning", reason=reason)
if infraction is None:
@@ -113,7 +88,7 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@command()
@respect_role_hierarchy()
- async def ban(self, ctx: Context, user: UserConverter, *, reason: str = None) -> None:
+ async def ban(self, ctx: Context, user: MemberConverter, *, reason: str = None) -> None:
"""Create a permanent ban infraction for a user with the provided reason."""
if await already_has_active_infraction(ctx=ctx, user=user, type="ban"):
return
@@ -154,7 +129,7 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@command()
@respect_role_hierarchy()
- async def tempban(self, ctx: Context, user: UserConverter, duration: Duration, *, reason: str = None) -> None:
+ async def tempban(self, ctx: Context, user: MemberConverter, duration: Duration, *, reason: str = None) -> None:
"""
Create a temporary ban infraction for a user with the provided expiration and reason.
@@ -178,7 +153,7 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@command(hidden=True)
- async def note(self, ctx: Context, user: UserConverter, *, reason: str = None) -> None:
+ async def note(self, ctx: Context, user: MemberConverter, *, reason: str = None) -> None:
"""
Create a private infraction note in the database for a user with the provided reason.
@@ -211,7 +186,7 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@command(hidden=True, aliases=['shadowban', 'sban'])
@respect_role_hierarchy()
- async def shadow_ban(self, ctx: Context, user: UserConverter, *, reason: str = None) -> None:
+ async def shadow_ban(self, ctx: Context, user: MemberConverter, *, reason: str = None) -> None:
"""
Create a permanent ban infraction for a user with the provided reason.
@@ -261,7 +236,7 @@ class Moderation(Scheduler, Cog):
@command(hidden=True, aliases=["shadowtempban, stempban"])
@respect_role_hierarchy()
async def shadow_tempban(
- self, ctx: Context, user: UserConverter, duration: Duration, *, reason: str = None
+ self, ctx: Context, user: MemberConverter, duration: Duration, *, reason: str = None
) -> None:
"""
Create a temporary ban infraction for a user with the provided reason.
@@ -288,7 +263,7 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@command()
- async def unmute(self, ctx: Context, user: UserConverter) -> None:
+ async def unmute(self, ctx: Context, user: MemberConverter) -> None:
"""Deactivates the active mute infraction for a user."""
try:
# check the current active infraction
@@ -366,7 +341,7 @@ class Moderation(Scheduler, Cog):
@with_role(*MODERATION_ROLES)
@command()
- async def unban(self, ctx: Context, user: UserConverter) -> None:
+ async def unban(self, ctx: Context, user: MemberConverter) -> None:
"""Deactivates the active ban infraction for a user."""
try:
# check the current active infraction
@@ -426,174 +401,9 @@ class Moderation(Scheduler, Cog):
await ctx.send(":x: There was an error removing the infraction.")
# endregion
- # region: Edit infraction commands
-
- @with_role(*MODERATION_ROLES)
- @group(name='infraction', aliases=('infr', 'infractions', 'inf'), invoke_without_command=True)
- async def infraction_group(self, ctx: Context) -> None:
- """Infraction manipulation commands."""
- await ctx.invoke(self.bot.get_command("help"), "infraction")
-
- @with_role(*MODERATION_ROLES)
- @infraction_group.command(name='edit')
- async def infraction_edit(
- self,
- ctx: Context,
- infraction_id: int,
- expires_at: Union[Duration, permanent_duration, None],
- *,
- reason: str = None
- ) -> None:
- """
- Edit the duration and/or the reason of an infraction.
-
- Durations are relative to the time of updating.
- Use "permanent" to mark the infraction as permanent.
- """
- if expires_at is None and reason is None:
- # Unlike UserInputError, the error handler will show a specified message for BadArgument
- raise BadArgument("Neither a new expiry nor a new reason was specified.")
-
- # Retrieve the previous infraction for its information.
- old_infraction = await self.bot.api_client.get(f'bot/infractions/{infraction_id}')
-
- request_data = {}
- confirm_messages = []
- log_text = ""
-
- if expires_at == "permanent":
- request_data['expires_at'] = None
- confirm_messages.append("marked as permanent")
- elif expires_at is not None:
- request_data['expires_at'] = expires_at.isoformat()
- confirm_messages.append(f"set to expire on {expires_at.strftime(INFRACTION_FORMAT)}")
- else:
- confirm_messages.append("expiry unchanged")
-
- if reason:
- request_data['reason'] = reason
- confirm_messages.append("set a new reason")
- log_text += f"""
- Previous reason: {old_infraction['reason']}
- New reason: {reason}
- """.rstrip()
- else:
- confirm_messages.append("reason unchanged")
-
- # Update the infraction
- new_infraction = await self.bot.api_client.patch(
- f'bot/infractions/{infraction_id}',
- json=request_data,
- )
-
- # Re-schedule infraction if the expiration has been updated
- if 'expires_at' in request_data:
- self.cancel_task(new_infraction['id'])
- loop = asyncio.get_event_loop()
- self.schedule_task(loop, new_infraction['id'], new_infraction)
-
- log_text += f"""
- Previous expiry: {old_infraction['expires_at'] or "Permanent"}
- New expiry: {new_infraction['expires_at'] or "Permanent"}
- """.rstrip()
-
- await ctx.send(f":ok_hand: Updated infraction: {' & '.join(confirm_messages)}")
-
- # Get information about the infraction's user
- user_id = new_infraction['user']
- user = ctx.guild.get_member(user_id)
-
- if user:
- user_text = f"{user.mention} (`{user.id}`)"
- thumbnail = user.avatar_url_as(static_format="png")
- else:
- user_text = f"`{user_id}`"
- thumbnail = None
-
- # The infraction's actor
- actor_id = new_infraction['actor']
- actor = ctx.guild.get_member(actor_id) or f"`{actor_id}`"
-
- await self.mod_log.send_log_message(
- icon_url=Icons.pencil,
- colour=Colour.blurple(),
- title="Infraction edited",
- thumbnail=thumbnail,
- text=textwrap.dedent(f"""
- Member: {user_text}
- Actor: {actor}
- Edited by: {ctx.message.author}{log_text}
- """)
- )
-
- # endregion
- # region: Search infractions
-
- @with_role(*MODERATION_ROLES)
- @infraction_group.group(name="search", invoke_without_command=True)
- async def infraction_search_group(self, ctx: Context, query: InfractionSearchQuery) -> None:
- """Searches for infractions in the database."""
- if isinstance(query, User):
- await ctx.invoke(self.search_user, query)
-
- else:
- await ctx.invoke(self.search_reason, query)
-
- @with_role(*MODERATION_ROLES)
- @infraction_search_group.command(name="user", aliases=("member", "id"))
- async def search_user(self, ctx: Context, user: Union[User, proxy_user]) -> None:
- """Search for infractions by member."""
- infraction_list = await self.bot.api_client.get(
- 'bot/infractions',
- params={'user__id': str(user.id)}
- )
- embed = Embed(
- title=f"Infractions for {user} ({len(infraction_list)} total)",
- colour=Colour.orange()
- )
- await self.send_infraction_list(ctx, embed, infraction_list)
-
- @with_role(*MODERATION_ROLES)
- @infraction_search_group.command(name="reason", aliases=("match", "regex", "re"))
- async def search_reason(self, ctx: Context, reason: str) -> None:
- """Search for infractions by their reason. Use Re2 for matching."""
- infraction_list = await self.bot.api_client.get(
- 'bot/infractions', params={'search': reason}
- )
- embed = Embed(
- title=f"Infractions matching `{reason}` ({len(infraction_list)} total)",
- colour=Colour.orange()
- )
- await self.send_infraction_list(ctx, embed, infraction_list)
- # endregion
# region: Utility functions
- async def send_infraction_list(
- self,
- ctx: Context,
- embed: Embed,
- infractions: Iterable[Infraction]
- ) -> None:
- """Send a paginated embed of infractions for the specified user."""
- if not infractions:
- await ctx.send(f":warning: No infractions could be found for that query.")
- return
-
- lines = tuple(
- self._infraction_to_string(infraction)
- for infraction in infractions
- )
-
- await LinePaginator.paginate(
- lines,
- ctx=ctx,
- embed=embed,
- empty=True,
- max_lines=3,
- max_size=1000
- )
-
def cancel_expiration(self, infraction_id: str) -> None:
"""Un-schedules a task set to expire a temporary infraction."""
task = self.scheduled_tasks.get(infraction_id)
@@ -662,42 +472,12 @@ class Moderation(Scheduler, Cog):
json={"active": False}
)
- def _infraction_to_string(self, infraction_object: Infraction) -> str:
- """Convert the infraction object to a string representation."""
- actor_id = infraction_object["actor"]
- guild: Guild = self.bot.get_guild(constants.Guild.id)
- actor = guild.get_member(actor_id)
- active = infraction_object["active"]
- user_id = infraction_object["user"]
- hidden = infraction_object["hidden"]
- created = format_infraction(infraction_object["inserted_at"])
- if infraction_object["expires_at"] is None:
- expires = "*Permanent*"
- else:
- expires = format_infraction(infraction_object["expires_at"])
-
- lines = textwrap.dedent(f"""
- {"**===============**" if active else "==============="}
- Status: {"__**Active**__" if active else "Inactive"}
- User: {self.bot.get_user(user_id)} (`{user_id}`)
- Type: **{infraction_object["type"]}**
- Shadow: {hidden}
- Reason: {infraction_object["reason"] or "*None*"}
- Created: {created}
- Expires: {expires}
- Actor: {actor.mention if actor else actor_id}
- ID: `{infraction_object["id"]}`
- {"**===============**" if active else "==============="}
- """)
-
- return lines.strip()
-
async def notify_infraction(
- self,
- user: UserObject,
- infr_type: str,
- expires_at: Optional[str] = None,
- reason: Optional[str] = None
+ self,
+ user: MemberObject,
+ infr_type: str,
+ expires_at: Optional[str] = None,
+ reason: Optional[str] = None
) -> bool:
"""
Attempt to notify a user, via DM, of their fresh infraction.
@@ -725,7 +505,7 @@ class Moderation(Scheduler, Cog):
async def notify_pardon(
self,
- user: UserObject,
+ user: MemberObject,
title: str,
content: str,
icon_url: str = Icons.user_verified
@@ -744,7 +524,7 @@ class Moderation(Scheduler, Cog):
return await self.send_private_embed(user, embed)
- async def send_private_embed(self, user: UserObject, embed: Embed) -> bool:
+ async def send_private_embed(self, user: MemberObject, embed: Embed) -> bool:
"""
A helper method for sending an embed to a user's DMs.
@@ -767,7 +547,7 @@ class Moderation(Scheduler, Cog):
self,
ctx: Context,
infraction: Infraction,
- user: UserObject,
+ user: MemberObject,
action_coro: Optional[Awaitable] = None
) -> None:
"""Apply an infraction to the user, log the infraction, and optionally notify the user."""
diff --git a/bot/utils/moderation.py b/bot/utils/moderation.py
index 7860f14a1..48ebe422c 100644
--- a/bot/utils/moderation.py
+++ b/bot/utils/moderation.py
@@ -1,27 +1,42 @@
import logging
+import typing as t
from datetime import datetime
-from typing import Optional, Union
-from discord import Member, Object, User
+import discord
+from discord.ext import commands
from discord.ext.commands import Context
from bot.api import ResponseCodeError
-from bot.constants import Keys
log = logging.getLogger(__name__)
-HEADERS = {"X-API-KEY": Keys.site_api}
+MemberObject = t.Union[discord.Member, discord.User, discord.Object]
+Infraction = t.Dict[str, t.Union[str, int, bool]]
+
+
+def proxy_user(user_id: str) -> discord.Object:
+ """Create a proxy user for the provided user_id for situations where a Member or User object cannot be resolved."""
+ try:
+ user_id = int(user_id)
+ except ValueError:
+ raise commands.BadArgument
+
+ user = discord.Object(user_id)
+ user.mention = user.id
+ user.avatar_url_as = lambda static_format: None
+
+ return user
async def post_infraction(
ctx: Context,
- user: Union[Member, Object, User],
+ user: MemberObject,
type: str,
reason: str,
expires_at: datetime = None,
hidden: bool = False,
active: bool = True,
-) -> Optional[dict]:
+) -> t.Optional[dict]:
"""Posts an infraction to the API."""
payload = {
"actor": ctx.message.author.id,
@@ -52,7 +67,7 @@ async def post_infraction(
return response
-async def already_has_active_infraction(ctx: Context, user: Union[Member, Object, User], type: str) -> bool:
+async def already_has_active_infraction(ctx: Context, user: MemberObject, type: str) -> bool:
"""Checks if a user already has an active infraction of the given type."""
active_infractions = await ctx.bot.api_client.get(
'bot/infractions',