aboutsummaryrefslogtreecommitdiffstats
path: root/bot/cogs/moderation.py
diff options
context:
space:
mode:
authorGravatar Akarys42 <[email protected]>2019-09-23 16:16:10 +0200
committerGravatar Akarys42 <[email protected]>2019-09-23 16:16:10 +0200
commitf4f3b2a15ecdce5291ef2d2d98b0af6d77fbc228 (patch)
tree250036c9d5162c6ee62d1a7bd6c999a03a2caad5 /bot/cogs/moderation.py
parentChange log.error to log.exception (diff)
parentMerge branch 'master' of https://github.com/python-discord/bot into python-di... (diff)
Merge branch 'python-discord-master'
Diffstat (limited to '')
-rw-r--r--bot/cogs/moderation.py816
1 files changed, 367 insertions, 449 deletions
diff --git a/bot/cogs/moderation.py b/bot/cogs/moderation.py
index 73359c88c..81b3864a7 100644
--- a/bot/cogs/moderation.py
+++ b/bot/cogs/moderation.py
@@ -1,25 +1,25 @@
import asyncio
import logging
import textwrap
-from typing import Union
+from datetime import datetime
+from typing import Dict, Union
-from aiohttp import ClientError
from discord import (
- Colour, Embed, Forbidden, Guild, HTTPException, Member, Object, User
+ Colour, Embed, Forbidden, Guild, HTTPException, Member, NotFound, Object, User
)
from discord.ext.commands import (
- BadArgument, BadUnionArgument, Bot, Context, command, group
+ BadArgument, BadUnionArgument, Bot, Cog, Context, command, group
)
from bot import constants
from bot.cogs.modlog import ModLog
-from bot.constants import Colours, Event, Icons, Keys, MODERATION_ROLES, URLs
-from bot.converters import InfractionSearchQuery
+from bot.constants import Colours, Event, Icons, MODERATION_ROLES
+from bot.converters import ExpirationDate, InfractionSearchQuery
from bot.decorators import with_role
from bot.pagination import LinePaginator
-from bot.utils.moderation import post_infraction
+from bot.utils.moderation import already_has_active_infraction, post_infraction
from bot.utils.scheduling import Scheduler, create_task
-from bot.utils.time import parse_rfc1123, wait_until
+from bot.utils.time import wait_until
log = logging.getLogger(__name__)
@@ -28,11 +28,12 @@ INFRACTION_ICONS = {
"Kick": Icons.sign_out,
"Ban": Icons.user_ban
}
-RULES_URL = "https://pythondiscord.com/about/rules"
+RULES_URL = "https://pythondiscord.com/pages/rules"
APPEALABLE_INFRACTIONS = ("Ban", "Mute")
def proxy_user(user_id: str) -> Object:
+ """Create a proxy user for the provided user_id for situations where a Member or User object cannot be resolved."""
try:
user_id = int(user_id)
except ValueError:
@@ -46,54 +47,41 @@ def proxy_user(user_id: str) -> Object:
UserTypes = Union[Member, User, proxy_user]
-class Moderation(Scheduler):
- """
- Server moderation tools.
- """
+class Moderation(Scheduler, Cog):
+ """Server moderation tools."""
def __init__(self, bot: Bot):
self.bot = bot
- self.headers = {"X-API-KEY": Keys.site_api}
self._muted_role = Object(constants.Roles.muted)
super().__init__()
@property
def mod_log(self) -> ModLog:
+ """Get currently loaded ModLog cog instance."""
return self.bot.get_cog("ModLog")
- async def on_ready(self):
+ @Cog.listener()
+ async def on_ready(self) -> None:
+ """Schedule expiration for previous infractions."""
# Schedule expiration for previous infractions
- response = await self.bot.http_session.get(
- URLs.site_infractions,
- params={"dangling": "true"},
- headers=self.headers
+ infractions = await self.bot.api_client.get(
+ 'bot/infractions', params={'active': 'true'}
)
- infraction_list = await response.json()
- for infraction_object in infraction_list:
- if infraction_object["expires_at"] is not None:
- self.schedule_task(self.bot.loop, infraction_object["id"], infraction_object)
+ for infraction in infractions:
+ if infraction["expires_at"] is not None:
+ self.schedule_task(self.bot.loop, infraction["id"], infraction)
# region: Permanent infractions
@with_role(*MODERATION_ROLES)
@command()
- async def warn(self, ctx: Context, user: UserTypes, *, reason: str = None):
- """
- Create a warning infraction in the database for a user.
-
- **`user`:** Accepts user mention, ID, etc.
- **`reason`:** The reason for the warning.
- """
-
- response_object = await post_infraction(ctx, user, type="warning", reason=reason)
- if response_object is None:
+ async def warn(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None:
+ """Create a warning infraction in the database for a user."""
+ infraction = await post_infraction(ctx, user, type="warning", reason=reason)
+ if infraction is None:
return
- notified = await self.notify_infraction(
- user=user,
- infr_type="Warning",
- reason=reason
- )
+ notified = await self.notify_infraction(user=user, infr_type="Warning", reason=reason)
dm_result = ":incoming_envelope: " if notified else ""
action = f"{dm_result}:ok_hand: warned {user.mention}"
@@ -122,33 +110,23 @@ class Moderation(Scheduler):
Reason: {reason}
"""),
content=log_content,
- footer=f"ID {response_object['infraction']['id']}"
+ footer=f"ID {infraction['id']}"
)
@with_role(*MODERATION_ROLES)
@command()
- async def kick(self, ctx: Context, user: Member, *, reason: str = None):
- """
- Kicks a user.
-
- **`user`:** Accepts user mention, ID, etc.
- **`reason`:** The reason for the kick.
- """
-
+ async def kick(self, ctx: Context, user: Member, *, reason: str = None) -> None:
+ """Kicks a user with the provided reason."""
if not await self.respect_role_hierarchy(ctx, user, 'kick'):
# Ensure ctx author has a higher top role than the target user
# Warning is sent to ctx by the helper method
return
- response_object = await post_infraction(ctx, user, type="kick", reason=reason)
- if response_object is None:
+ infraction = await post_infraction(ctx, user, type="kick", reason=reason)
+ if infraction is None:
return
- notified = await self.notify_infraction(
- user=user,
- infr_type="Kick",
- reason=reason
- )
+ notified = await self.notify_infraction(user=user, infr_type="Kick", reason=reason)
self.mod_log.ignore(Event.member_remove, user.id)
@@ -182,32 +160,28 @@ class Moderation(Scheduler):
Reason: {reason}
"""),
content=log_content,
- footer=f"ID {response_object['infraction']['id']}"
+ footer=f"ID {infraction['id']}"
)
@with_role(*MODERATION_ROLES)
@command()
- async def ban(self, ctx: Context, user: UserTypes, *, reason: str = None):
- """
- Create a permanent ban infraction in the database for a user.
-
- **`user`:** Accepts user mention, ID, etc.
- **`reason`:** The reason for the ban.
- """
-
+ async def ban(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None:
+ """Create a permanent ban infraction for a user with the provided reason."""
if not await self.respect_role_hierarchy(ctx, user, 'ban'):
# Ensure ctx author has a higher top role than the target user
# Warning is sent to ctx by the helper method
return
- response_object = await post_infraction(ctx, user, type="ban", reason=reason)
- if response_object is None:
+ if await already_has_active_infraction(ctx=ctx, user=user, type="ban"):
+ return
+
+ infraction = await post_infraction(ctx, user, type="ban", reason=reason)
+ if infraction is None:
return
notified = await self.notify_infraction(
user=user,
infr_type="Ban",
- duration="Permanent",
reason=reason
)
@@ -246,21 +220,18 @@ class Moderation(Scheduler):
Reason: {reason}
"""),
content=log_content,
- footer=f"ID {response_object['infraction']['id']}"
+ footer=f"ID {infraction['id']}"
)
@with_role(*MODERATION_ROLES)
@command()
- async def mute(self, ctx: Context, user: Member, *, reason: str = None):
- """
- Create a permanent mute infraction in the database for a user.
-
- **`user`:** Accepts user mention, ID, etc.
- **`reason`:** The reason for the mute.
- """
+ async def mute(self, ctx: Context, user: Member, *, reason: str = None) -> None:
+ """Create a permanent mute infraction for a user with the provided reason."""
+ if await already_has_active_infraction(ctx=ctx, user=user, type="mute"):
+ return
- response_object = await post_infraction(ctx, user, type="mute", reason=reason)
- if response_object is None:
+ infraction = await post_infraction(ctx, user, type="mute", reason=reason)
+ if infraction is None:
return
self.mod_log.ignore(Event.member_update, user.id)
@@ -269,7 +240,7 @@ class Moderation(Scheduler):
notified = await self.notify_infraction(
user=user,
infr_type="Mute",
- duration="Permanent",
+ expires_at="Permanent",
reason=reason
)
@@ -300,7 +271,7 @@ class Moderation(Scheduler):
Reason: {reason}
"""),
content=log_content,
- footer=f"ID {response_object['infraction']['id']}"
+ footer=f"ID {infraction['id']}"
)
# endregion
@@ -308,19 +279,19 @@ class Moderation(Scheduler):
@with_role(*MODERATION_ROLES)
@command()
- async def tempmute(self, ctx: Context, user: Member, duration: str, *, reason: str = None):
+ async def tempmute(self, ctx: Context, user: Member, duration: ExpirationDate, *, reason: str = None) -> None:
"""
- Create a temporary mute infraction in the database for a user.
+ Create a temporary mute infraction for a user with the provided expiration and reason.
- **`user`:** Accepts user mention, ID, etc.
- **`duration`:** The duration for the temporary mute infraction
- **`reason`:** The reason for the temporary mute.
+ Duration strings are parsed per: http://strftime.org/
"""
+ expiration = duration
- response_object = await post_infraction(
- ctx, user, type="mute", reason=reason, duration=duration
- )
- if response_object is None:
+ if await already_has_active_infraction(ctx=ctx, user=user, type="mute"):
+ return
+
+ infraction = await post_infraction(ctx, user, type="mute", reason=reason, expires_at=expiration)
+ if infraction is None:
return
self.mod_log.ignore(Event.member_update, user.id)
@@ -329,14 +300,17 @@ class Moderation(Scheduler):
notified = await self.notify_infraction(
user=user,
infr_type="Mute",
- duration=duration,
+ expires_at=expiration,
reason=reason
)
- infraction_object = response_object["infraction"]
- infraction_expiration = infraction_object["expires_at"]
+ infraction_expiration = (
+ datetime
+ .fromisoformat(infraction["expires_at"][:-1])
+ .strftime('%c')
+ )
- self.schedule_task(ctx.bot.loop, infraction_object["id"], infraction_object)
+ self.schedule_task(ctx.bot.loop, infraction["id"], infraction)
dm_result = ":incoming_envelope: " if notified else ""
action = f"{dm_result}:ok_hand: muted {user.mention} until {infraction_expiration}"
@@ -363,41 +337,38 @@ class Moderation(Scheduler):
Actor: {ctx.message.author}
DM: {dm_status}
Reason: {reason}
- Duration: {duration}
Expires: {infraction_expiration}
"""),
content=log_content,
- footer=f"ID {response_object['infraction']['id']}"
+ footer=f"ID {infraction['id']}"
)
@with_role(*MODERATION_ROLES)
@command()
- async def tempban(
- self, ctx: Context, user: UserTypes, duration: str, *, reason: str = None
- ):
+ async def tempban(self, ctx: Context, user: UserTypes, duration: ExpirationDate, *, reason: str = None) -> None:
"""
- Create a temporary ban infraction in the database for a user.
+ Create a temporary ban infraction for a user with the provided expiration and reason.
- **`user`:** Accepts user mention, ID, etc.
- **`duration`:** The duration for the temporary ban infraction
- **`reason`:** The reason for the temporary ban.
+ Duration strings are parsed per: http://strftime.org/
"""
+ expiration = duration
if not await self.respect_role_hierarchy(ctx, user, 'tempban'):
# Ensure ctx author has a higher top role than the target user
# Warning is sent to ctx by the helper method
return
- response_object = await post_infraction(
- ctx, user, type="ban", reason=reason, duration=duration
- )
- if response_object is None:
+ if await already_has_active_infraction(ctx=ctx, user=user, type="ban"):
+ return
+
+ infraction = await post_infraction(ctx, user, type="ban", reason=reason, expires_at=expiration)
+ if infraction is None:
return
notified = await self.notify_infraction(
user=user,
infr_type="Ban",
- duration=duration,
+ expires_at=expiration,
reason=reason
)
@@ -410,10 +381,13 @@ class Moderation(Scheduler):
except Forbidden:
action_result = False
- infraction_object = response_object["infraction"]
- infraction_expiration = infraction_object["expires_at"]
+ infraction_expiration = (
+ datetime
+ .fromisoformat(infraction["expires_at"][:-1])
+ .strftime('%c')
+ )
- self.schedule_task(ctx.bot.loop, infraction_object["id"], infraction_object)
+ self.schedule_task(ctx.bot.loop, infraction["id"], infraction)
dm_result = ":incoming_envelope: " if notified else ""
action = f"{dm_result}:ok_hand: banned {user.mention} until {infraction_expiration}"
@@ -439,11 +413,10 @@ class Moderation(Scheduler):
Actor: {ctx.message.author}
DM: {dm_status}
Reason: {reason}
- Duration: {duration}
Expires: {infraction_expiration}
"""),
content=log_content,
- footer=f"ID {response_object['infraction']['id']}"
+ footer=f"ID {infraction['id']}"
)
# endregion
@@ -451,18 +424,14 @@ class Moderation(Scheduler):
@with_role(*MODERATION_ROLES)
@command(hidden=True, aliases=['shadowwarn', 'swarn', 'shadow_warn'])
- async def note(self, ctx: Context, user: UserTypes, *, reason: str = None):
+ async def note(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None:
"""
- Create a private infraction note in the database for a user.
+ Create a private infraction note in the database for a user with the provided reason.
- **`user`:** accepts user mention, ID, etc.
- **`reason`:** The reason for the warning.
+ This does not send the user a notification
"""
-
- response_object = await post_infraction(
- ctx, user, type="warning", reason=reason, hidden=True
- )
- if response_object is None:
+ infraction = await post_infraction(ctx, user, type="warning", reason=reason, hidden=True)
+ if infraction is None:
return
if reason is None:
@@ -480,26 +449,24 @@ class Moderation(Scheduler):
Actor: {ctx.message.author}
Reason: {reason}
"""),
- footer=f"ID {response_object['infraction']['id']}"
+ footer=f"ID {infraction['id']}"
)
@with_role(*MODERATION_ROLES)
@command(hidden=True, aliases=['shadowkick', 'skick'])
- async def shadow_kick(self, ctx: Context, user: Member, *, reason: str = None):
+ async def shadow_kick(self, ctx: Context, user: Member, *, reason: str = None) -> None:
"""
- Kicks a user.
+ Kick a user for the provided reason.
- **`user`:** accepts user mention, ID, etc.
- **`reason`:** The reason for the kick.
+ This does not send the user a notification.
"""
-
if not await self.respect_role_hierarchy(ctx, user, 'shadowkick'):
# Ensure ctx author has a higher top role than the target user
# Warning is sent to ctx by the helper method
return
- response_object = await post_infraction(ctx, user, type="kick", reason=reason, hidden=True)
- if response_object is None:
+ infraction = await post_infraction(ctx, user, type="kick", reason=reason, hidden=True)
+ if infraction is None:
return
self.mod_log.ignore(Event.member_remove, user.id)
@@ -533,26 +500,27 @@ class Moderation(Scheduler):
Reason: {reason}
"""),
content=log_content,
- footer=f"ID {response_object['infraction']['id']}"
+ footer=f"ID {infraction['id']}"
)
@with_role(*MODERATION_ROLES)
@command(hidden=True, aliases=['shadowban', 'sban'])
- async def shadow_ban(self, ctx: Context, user: UserTypes, *, reason: str = None):
+ async def shadow_ban(self, ctx: Context, user: UserTypes, *, reason: str = None) -> None:
"""
- Create a permanent ban infraction in the database for a user.
+ Create a permanent ban infraction for a user with the provided reason.
- **`user`:** Accepts user mention, ID, etc.
- **`reason`:** The reason for the ban.
+ This does not send the user a notification.
"""
-
if not await self.respect_role_hierarchy(ctx, user, 'shadowban'):
# Ensure ctx author has a higher top role than the target user
# Warning is sent to ctx by the helper method
return
- response_object = await post_infraction(ctx, user, type="ban", reason=reason, hidden=True)
- if response_object is None:
+ if await already_has_active_infraction(ctx=ctx, user=user, type="ban"):
+ return
+
+ infraction = await post_infraction(ctx, user, type="ban", reason=reason, hidden=True)
+ if infraction is None:
return
self.mod_log.ignore(Event.member_ban, user.id)
@@ -587,21 +555,22 @@ class Moderation(Scheduler):
Reason: {reason}
"""),
content=log_content,
- footer=f"ID {response_object['infraction']['id']}"
+ footer=f"ID {infraction['id']}"
)
@with_role(*MODERATION_ROLES)
@command(hidden=True, aliases=['shadowmute', 'smute'])
- async def shadow_mute(self, ctx: Context, user: Member, *, reason: str = None):
+ async def shadow_mute(self, ctx: Context, user: Member, *, reason: str = None) -> None:
"""
- Create a permanent mute infraction in the database for a user.
+ Create a permanent mute infraction for a user with the provided reason.
- **`user`:** Accepts user mention, ID, etc.
- **`reason`:** The reason for the mute.
+ This does not send the user a notification.
"""
+ if await already_has_active_infraction(ctx=ctx, user=user, type="mute"):
+ return
- response_object = await post_infraction(ctx, user, type="mute", reason=reason, hidden=True)
- if response_object is None:
+ infraction = await post_infraction(ctx, user, type="mute", reason=reason, hidden=True)
+ if infraction is None:
return
self.mod_log.ignore(Event.member_update, user.id)
@@ -622,7 +591,7 @@ class Moderation(Scheduler):
Actor: {ctx.message.author}
Reason: {reason}
"""),
- footer=f"ID {response_object['infraction']['id']}"
+ footer=f"ID {infraction['id']}"
)
# endregion
@@ -631,29 +600,34 @@ class Moderation(Scheduler):
@with_role(*MODERATION_ROLES)
@command(hidden=True, aliases=["shadowtempmute, stempmute"])
async def shadow_tempmute(
- self, ctx: Context, user: Member, duration: str, *, reason: str = None
- ):
+ self, ctx: Context, user: Member, duration: ExpirationDate, *, reason: str = None
+ ) -> None:
"""
- Create a temporary mute infraction in the database for a user.
+ Create a temporary mute infraction for a user with the provided reason.
- **`user`:** Accepts user mention, ID, etc.
- **`duration`:** The duration for the temporary mute infraction
- **`reason`:** The reason for the temporary mute.
+ Duration strings are parsed per: http://strftime.org/
+
+ This does not send the user a notification.
"""
+ expiration = duration
- response_object = await post_infraction(
- ctx, user, type="mute", reason=reason, duration=duration, hidden=True
- )
- if response_object is None:
+ if await already_has_active_infraction(ctx=ctx, user=user, type="mute"):
+ return
+
+ infraction = await post_infraction(ctx, user, type="mute", reason=reason, expires_at=expiration, hidden=True)
+ if infraction is None:
return
self.mod_log.ignore(Event.member_update, user.id)
await user.add_roles(self._muted_role, reason=reason)
- infraction_object = response_object["infraction"]
- infraction_expiration = infraction_object["expires_at"]
+ infraction_expiration = (
+ datetime
+ .fromisoformat(infraction["expires_at"][:-1])
+ .strftime('%c')
+ )
- self.schedule_expiration(ctx.bot.loop, infraction_object)
+ self.schedule_task(ctx.bot.loop, infraction["id"], infraction)
if reason is None:
await ctx.send(f":ok_hand: muted {user.mention} until {infraction_expiration}.")
@@ -671,34 +645,35 @@ class Moderation(Scheduler):
Member: {user.mention} (`{user.id}`)
Actor: {ctx.message.author}
Reason: {reason}
- Duration: {duration}
Expires: {infraction_expiration}
"""),
- footer=f"ID {response_object['infraction']['id']}"
+ footer=f"ID {infraction['id']}"
)
@with_role(*MODERATION_ROLES)
@command(hidden=True, aliases=["shadowtempban, stempban"])
async def shadow_tempban(
- self, ctx: Context, user: UserTypes, duration: str, *, reason: str = None
- ):
+ self, ctx: Context, user: UserTypes, duration: ExpirationDate, *, reason: str = None
+ ) -> None:
"""
- Create a temporary ban infraction in the database for a user.
+ Create a temporary ban infraction for a user with the provided reason.
- **`user`:** Accepts user mention, ID, etc.
- **`duration`:** The duration for the temporary ban infraction
- **`reason`:** The reason for the temporary ban.
+ Duration strings are parsed per: http://strftime.org/
+
+ This does not send the user a notification.
"""
+ expiration = duration
if not await self.respect_role_hierarchy(ctx, user, 'shadowtempban'):
# Ensure ctx author has a higher top role than the target user
# Warning is sent to ctx by the helper method
return
- response_object = await post_infraction(
- ctx, user, type="ban", reason=reason, duration=duration, hidden=True
- )
- if response_object is None:
+ if await already_has_active_infraction(ctx=ctx, user=user, type="ban"):
+ return
+
+ infraction = await post_infraction(ctx, user, type="ban", reason=reason, expires_at=expiration, hidden=True)
+ if infraction is None:
return
self.mod_log.ignore(Event.member_ban, user.id)
@@ -710,10 +685,13 @@ class Moderation(Scheduler):
except Forbidden:
action_result = False
- infraction_object = response_object["infraction"]
- infraction_expiration = infraction_object["expires_at"]
+ infraction_expiration = (
+ datetime
+ .fromisoformat(infraction["expires_at"][:-1])
+ .strftime('%c')
+ )
- self.schedule_expiration(ctx.bot.loop, infraction_object)
+ self.schedule_task(ctx.bot.loop, infraction["id"], infraction)
if reason is None:
await ctx.send(f":ok_hand: banned {user.mention} until {infraction_expiration}.")
@@ -739,11 +717,10 @@ class Moderation(Scheduler):
Member: {user.mention} (`{user.id}`)
Actor: {ctx.message.author}
Reason: {reason}
- Duration: {duration}
Expires: {infraction_expiration}
"""),
content=log_content,
- footer=f"ID {response_object['infraction']['id']}"
+ footer=f"ID {infraction['id']}"
)
# endregion
@@ -751,40 +728,32 @@ class Moderation(Scheduler):
@with_role(*MODERATION_ROLES)
@command()
- async def unmute(self, ctx: Context, user: Member):
- """
- Deactivates the active mute infraction for a user.
-
- **`user`:** Accepts user mention, ID, etc.
- """
-
+ async def unmute(self, ctx: Context, user: UserTypes) -> None:
+ """Deactivates the active mute infraction for a user."""
try:
# check the current active infraction
- response = await self.bot.http_session.get(
- URLs.site_infractions_user_type_current.format(
- user_id=user.id,
- infraction_type="mute"
- ),
- headers=self.headers
+ response = await self.bot.api_client.get(
+ 'bot/infractions',
+ params={
+ 'active': 'true',
+ 'type': 'mute',
+ 'user__id': user.id
+ }
)
+ if len(response) > 1:
+ log.warning("Found more than one active mute infraction for user `%d`", user.id)
- response_object = await response.json()
- if "error_code" in response_object:
- return await ctx.send(
- ":x: There was an error removing the infraction: "
- f"{response_object['error_message']}"
- )
-
- infraction_object = response_object["infraction"]
- if infraction_object is None:
+ if not response:
# no active infraction
- return await ctx.send(
+ await ctx.send(
f":x: There is no active mute infraction for user {user.mention}."
)
+ return
- await self._deactivate_infraction(infraction_object)
- if infraction_object["expires_at"] is not None:
- self.cancel_expiration(infraction_object["id"])
+ for infraction in response:
+ await self._deactivate_infraction(infraction)
+ if infraction["expires_at"] is not None:
+ self.cancel_expiration(infraction["id"])
notified = await self.notify_pardon(
user=user,
@@ -804,61 +773,82 @@ class Moderation(Scheduler):
await ctx.send(f"{dm_emoji}:ok_hand: Un-muted {user.mention}.")
+ embed_text = textwrap.dedent(
+ f"""
+ Member: {user.mention} (`{user.id}`)
+ Actor: {ctx.message.author}
+ DM: {dm_status}
+ """
+ )
+
+ if len(response) > 1:
+ footer = f"Infraction IDs: {', '.join(str(infr['id']) for infr in response)}"
+ title = "Member unmuted"
+ embed_text += "Note: User had multiple **active** mute infractions in the database."
+ else:
+ infraction = response[0]
+ footer = f"Infraction ID: {infraction['id']}"
+ title = "Member unmuted"
+
# Send a log message to the mod log
await self.mod_log.send_log_message(
icon_url=Icons.user_unmute,
colour=Colour(Colours.soft_green),
- title="Member unmuted",
+ title=title,
thumbnail=user.avatar_url_as(static_format="png"),
- text=textwrap.dedent(f"""
- Member: {user.mention} (`{user.id}`)
- Actor: {ctx.message.author}
- Intended expiry: {infraction_object['expires_at']}
- DM: {dm_status}
- """),
- footer=infraction_object["id"],
+ text=embed_text,
+ footer=footer,
content=log_content
)
-
- except Exception as e:
- log.exception("There was an error removing an infraction.", exc_info=e)
+ except Exception:
+ log.exception("There was an error removing an infraction.")
await ctx.send(":x: There was an error removing the infraction.")
@with_role(*MODERATION_ROLES)
@command()
- async def unban(self, ctx: Context, user: UserTypes):
- """
- Deactivates the active ban infraction for a user.
-
- **`user`:** Accepts user mention, ID, etc.
- """
-
+ async def unban(self, ctx: Context, user: UserTypes) -> None:
+ """Deactivates the active ban infraction for a user."""
try:
# check the current active infraction
- response = await self.bot.http_session.get(
- URLs.site_infractions_user_type_current.format(
- user_id=user.id,
- infraction_type="ban"
- ),
- headers=self.headers
+ response = await self.bot.api_client.get(
+ 'bot/infractions',
+ params={
+ 'active': 'true',
+ 'type': 'ban',
+ 'user__id': str(user.id)
+ }
)
- response_object = await response.json()
- if "error_code" in response_object:
- return await ctx.send(
- ":x: There was an error removing the infraction: "
- f"{response_object['error_message']}"
+ if len(response) > 1:
+ log.warning(
+ "More than one active ban infraction found for user `%d`.",
+ user.id
)
- infraction_object = response_object["infraction"]
- if infraction_object is None:
+ if not response:
# no active infraction
- return await ctx.send(
+ await ctx.send(
f":x: There is no active ban infraction for user {user.mention}."
)
+ return
+
+ for infraction in response:
+ await self._deactivate_infraction(infraction)
+ if infraction["expires_at"] is not None:
+ self.cancel_expiration(infraction["id"])
- await self._deactivate_infraction(infraction_object)
- if infraction_object["expires_at"] is not None:
- self.cancel_expiration(infraction_object["id"])
+ embed_text = textwrap.dedent(
+ f"""
+ Member: {user.mention} (`{user.id}`)
+ Actor: {ctx.message.author}
+ """
+ )
+
+ if len(response) > 1:
+ footer = f"Infraction IDs: {', '.join(str(infr['id']) for infr in response)}"
+ embed_text += "Note: User had multiple **active** ban infractions in the database."
+ else:
+ infraction = response[0]
+ footer = f"Infraction ID: {infraction['id']}"
await ctx.send(f":ok_hand: Un-banned {user.mention}.")
@@ -868,11 +858,8 @@ class Moderation(Scheduler):
colour=Colour(Colours.soft_green),
title="Member unbanned",
thumbnail=user.avatar_url_as(static_format="png"),
- text=textwrap.dedent(f"""
- Member: {user.mention} (`{user.id}`)
- Actor: {ctx.message.author}
- Intended expiry: {infraction_object['expires_at']}
- """)
+ text=embed_text,
+ footer=footer,
)
except Exception:
log.exception("There was an error removing an infraction.")
@@ -883,70 +870,68 @@ class Moderation(Scheduler):
@with_role(*MODERATION_ROLES)
@group(name='infraction', aliases=('infr', 'infractions', 'inf'), invoke_without_command=True)
- async def infraction_group(self, ctx: Context):
+ async def infraction_group(self, ctx: Context) -> None:
"""Infraction manipulation commands."""
-
await ctx.invoke(self.bot.get_command("help"), "infraction")
@with_role(*MODERATION_ROLES)
@infraction_group.group(name='edit', invoke_without_command=True)
- async def infraction_edit_group(self, ctx: Context):
+ async def infraction_edit_group(self, ctx: Context) -> None:
"""Infraction editing commands."""
-
await ctx.invoke(self.bot.get_command("help"), "infraction", "edit")
@with_role(*MODERATION_ROLES)
@infraction_edit_group.command(name="duration")
- async def edit_duration(self, ctx: Context, infraction_id: str, duration: str):
+ async def edit_duration(
+ self, ctx: Context,
+ infraction_id: int, expires_at: Union[ExpirationDate, str]
+ ) -> None:
"""
- Sets the duration of the given infraction, relative to the time of
- updating.
+ Sets the duration of the given infraction, relative to the time of updating.
- **`infraction_id`:** The ID (UUID) of the infraction.
- **`duration`:** The new duration of the infraction, relative to the
- time of updating. Use "permanent" to the infraction as permanent.
+ Duration strings are parsed per: http://strftime.org/, use "permanent" to mark the infraction as permanent.
"""
+ if isinstance(expires_at, str) and expires_at != 'permanent':
+ raise BadArgument(
+ "If `expires_at` is given as a non-datetime, "
+ "it must be `permanent`."
+ )
+ if expires_at == 'permanent':
+ expires_at = None
try:
- previous = await self.bot.http_session.get(
- URLs.site_infractions_by_id.format(
- infraction_id=infraction_id
- ),
- headers=self.headers
+ previous_infraction = await self.bot.api_client.get(
+ 'bot/infractions/' + str(infraction_id)
)
- previous_object = await previous.json()
-
- if duration == "permanent":
- duration = None
# check the current active infraction
- response = await self.bot.http_session.patch(
- URLs.site_infractions,
+ infraction = await self.bot.api_client.patch(
+ 'bot/infractions/' + str(infraction_id),
json={
- "id": infraction_id,
- "duration": duration
- },
- headers=self.headers
+ 'expires_at': (
+ expires_at.isoformat()
+ if expires_at is not None
+ else None
+ )
+ }
)
- response_object = await response.json()
- if "error_code" in response_object or response_object.get("success") is False:
- return await ctx.send(
- ":x: There was an error updating the infraction: "
- f"{response_object['error_message']}"
- )
- infraction_object = response_object["infraction"]
# Re-schedule
- self.cancel_task(infraction_id)
+ self.cancel_task(infraction['id'])
loop = asyncio.get_event_loop()
- self.schedule_task(loop, infraction_object["id"], infraction_object)
+ self.schedule_task(loop, infraction['id'], infraction)
- if duration is None:
+ if expires_at is None:
await ctx.send(f":ok_hand: Updated infraction: marked as permanent.")
else:
+ human_expiry = (
+ datetime
+ .fromisoformat(infraction['expires_at'][:-1])
+ .strftime('%c')
+ )
await ctx.send(
":ok_hand: Updated infraction: set to expire on "
- f"{infraction_object['expires_at']}."
+ f"{human_expiry}."
)
except Exception:
@@ -954,10 +939,8 @@ class Moderation(Scheduler):
await ctx.send(":x: There was an error updating the infraction.")
return
- prev_infraction = previous_object["infraction"]
-
# Get information about the infraction's user
- user_id = int(infraction_object["user"]["user_id"])
+ user_id = infraction["user"]
user = ctx.guild.get_member(user_id)
if user:
@@ -968,7 +951,7 @@ class Moderation(Scheduler):
thumbnail = None
# The infraction's actor
- actor_id = int(infraction_object["actor"]["user_id"])
+ actor_id = infraction["actor"]
actor = ctx.guild.get_member(actor_id) or f"`{actor_id}`"
await self.mod_log.send_log_message(
@@ -980,55 +963,33 @@ class Moderation(Scheduler):
Member: {member_text}
Actor: {actor}
Edited by: {ctx.message.author}
- Previous expiry: {prev_infraction['expires_at']}
- New expiry: {infraction_object['expires_at']}
+ Previous expiry: {previous_infraction['expires_at']}
+ New expiry: {infraction['expires_at']}
""")
)
@with_role(*MODERATION_ROLES)
@infraction_edit_group.command(name="reason")
- async def edit_reason(self, ctx: Context, infraction_id: str, *, reason: str):
- """
- Sets the reason of the given infraction.
- **`infraction_id`:** The ID (UUID) of the infraction.
- **`reason`:** The new reason of the infraction.
- """
-
+ async def edit_reason(self, ctx: Context, infraction_id: int, *, reason: str) -> None:
+ """Edit the reason of the given infraction."""
try:
- previous = await self.bot.http_session.get(
- URLs.site_infractions_by_id.format(
- infraction_id=infraction_id
- ),
- headers=self.headers
+ old_infraction = await self.bot.api_client.get(
+ 'bot/infractions/' + str(infraction_id)
)
- previous_object = await previous.json()
-
- response = await self.bot.http_session.patch(
- URLs.site_infractions,
- json={
- "id": infraction_id,
- "reason": reason
- },
- headers=self.headers
+ updated_infraction = await self.bot.api_client.patch(
+ 'bot/infractions/' + str(infraction_id),
+ json={'reason': reason}
)
- response_object = await response.json()
- if "error_code" in response_object or response_object.get("success") is False:
- return await ctx.send(
- ":x: There was an error updating the infraction: "
- f"{response_object['error_message']}"
- )
-
await ctx.send(f":ok_hand: Updated infraction: set reason to \"{reason}\".")
+
except Exception:
log.exception("There was an error updating an infraction.")
- return await ctx.send(":x: There was an error updating the infraction.")
-
- new_infraction = response_object["infraction"]
- prev_infraction = previous_object["infraction"]
+ await ctx.send(":x: There was an error updating the infraction.")
+ return
# Get information about the infraction's user
- user_id = int(new_infraction["user"]["user_id"])
+ user_id = updated_infraction['user']
user = ctx.guild.get_member(user_id)
if user:
@@ -1039,7 +1000,7 @@ class Moderation(Scheduler):
thumbnail = None
# The infraction's actor
- actor_id = int(new_infraction["actor"]["user_id"])
+ actor_id = updated_infraction['actor']
actor = ctx.guild.get_member(actor_id) or f"`{actor_id}`"
await self.mod_log.send_log_message(
@@ -1051,8 +1012,8 @@ class Moderation(Scheduler):
Member: {user_text}
Actor: {actor}
Edited by: {ctx.message.author}
- Previous reason: {prev_infraction['reason']}
- New reason: {new_infraction['reason']}
+ Previous reason: {old_infraction['reason']}
+ New reason: {updated_infraction['reason']}
""")
)
@@ -1061,11 +1022,8 @@ class Moderation(Scheduler):
@with_role(*MODERATION_ROLES)
@infraction_group.group(name="search", invoke_without_command=True)
- async def infraction_search_group(self, ctx: Context, query: InfractionSearchQuery):
- """
- Searches for infractions in the database.
- """
-
+ async def infraction_search_group(self, ctx: Context, query: InfractionSearchQuery) -> None:
+ """Searches for infractions in the database."""
if isinstance(query, User):
await ctx.invoke(self.search_user, query)
@@ -1074,72 +1032,44 @@ class Moderation(Scheduler):
@with_role(*MODERATION_ROLES)
@infraction_search_group.command(name="user", aliases=("member", "id"))
- async def search_user(self, ctx: Context, user: Union[User, proxy_user]):
- """
- Search for infractions by member.
- """
-
- try:
- response = await self.bot.http_session.get(
- URLs.site_infractions_user.format(
- user_id=user.id
- ),
- params={"hidden": "True"},
- headers=self.headers
- )
- infraction_list = await response.json()
- except ClientError:
- log.exception(f"Failed to fetch infractions for user {user} ({user.id}).")
- await ctx.send(":x: An error occurred while fetching infractions.")
- return
-
+ async def search_user(self, ctx: Context, user: Union[User, proxy_user]) -> None:
+ """Search for infractions by member."""
+ infraction_list = await self.bot.api_client.get(
+ 'bot/infractions',
+ params={'user__id': str(user.id)}
+ )
embed = Embed(
title=f"Infractions for {user} ({len(infraction_list)} total)",
colour=Colour.orange()
)
-
await self.send_infraction_list(ctx, embed, infraction_list)
@with_role(*MODERATION_ROLES)
@infraction_search_group.command(name="reason", aliases=("match", "regex", "re"))
- async def search_reason(self, ctx: Context, reason: str):
- """
- Search for infractions by their reason. Use Re2 for matching.
- """
-
- try:
- response = await self.bot.http_session.get(
- URLs.site_infractions,
- params={"search": reason, "hidden": "True"},
- headers=self.headers
- )
- infraction_list = await response.json()
- except ClientError:
- log.exception(f"Failed to fetch infractions matching reason `{reason}`.")
- await ctx.send(":x: An error occurred while fetching infractions.")
- return
-
+ async def search_reason(self, ctx: Context, reason: str) -> None:
+ """Search for infractions by their reason. Use Re2 for matching."""
+ infraction_list = await self.bot.api_client.get(
+ 'bot/infractions', params={'search': reason}
+ )
embed = Embed(
title=f"Infractions matching `{reason}` ({len(infraction_list)} total)",
colour=Colour.orange()
)
-
await self.send_infraction_list(ctx, embed, infraction_list)
# endregion
# region: Utility functions
- async def send_infraction_list(self, ctx: Context, embed: Embed, infractions: list):
-
+ async def send_infraction_list(self, ctx: Context, embed: Embed, infractions: list) -> None:
+ """Send a paginated embed of infractions for the specified user."""
if not infractions:
await ctx.send(f":warning: No infractions could be found for that query.")
return
- lines = []
- for infraction in infractions:
- lines.append(
- self._infraction_to_string(infraction)
- )
+ lines = tuple(
+ self._infraction_to_string(infraction)
+ for infraction in infractions
+ )
await LinePaginator.paginate(
lines,
@@ -1153,14 +1083,10 @@ class Moderation(Scheduler):
# endregion
# region: Utility functions
- def schedule_expiration(self, loop: asyncio.AbstractEventLoop, infraction_object: dict):
- """
- Schedules a task to expire a temporary infraction.
-
- :param loop: the asyncio event loop
- :param infraction_object: the infraction object to expire at the end of the task
- """
-
+ def schedule_expiration(
+ self, loop: asyncio.AbstractEventLoop, infraction_object: Dict[str, Union[str, int, bool]]
+ ) -> None:
+ """Schedules a task to expire a temporary infraction."""
infraction_id = infraction_object["id"]
if infraction_id in self.scheduled_tasks:
return
@@ -1169,12 +1095,8 @@ class Moderation(Scheduler):
self.scheduled_tasks[infraction_id] = task
- def cancel_expiration(self, infraction_id: str):
- """
- Un-schedules a task set to expire a temporary infraction.
- :param infraction_id: the ID of the infraction in question
- """
-
+ def cancel_expiration(self, infraction_id: str) -> None:
+ """Un-schedules a task set to expire a temporary infraction."""
task = self.scheduled_tasks.get(infraction_id)
if task is None:
log.warning(f"Failed to unschedule {infraction_id}: no task found.")
@@ -1183,19 +1105,17 @@ class Moderation(Scheduler):
log.debug(f"Unscheduled {infraction_id}.")
del self.scheduled_tasks[infraction_id]
- async def _scheduled_task(self, infraction_object: dict):
+ async def _scheduled_task(self, infraction_object: Dict[str, Union[str, int, bool]]) -> None:
"""
- A co-routine which marks an infraction as expired after the delay from the time of
- scheduling to the time of expiration. At the time of expiration, the infraction is
- marked as inactive on the website, and the expiration task is cancelled.
+ Marks an infraction expired after the delay from time of scheduling to time of expiration.
- :param infraction_object: the infraction in question
+ At the time of expiration, the infraction is marked as inactive on the website, and the
+ expiration task is cancelled. The user is then notified via DM.
"""
-
infraction_id = infraction_object["id"]
# transform expiration to delay in seconds
- expiration_datetime = parse_rfc1123(infraction_object["expires_at"])
+ expiration_datetime = datetime.fromisoformat(infraction_object["expires_at"][:-1])
await wait_until(expiration_datetime)
log.debug(f"Marking infraction {infraction_id} as inactive (expired).")
@@ -1204,7 +1124,7 @@ class Moderation(Scheduler):
self.cancel_task(infraction_object["id"])
# Notify the user that they've been unmuted.
- user_id = int(infraction_object["user"]["user_id"])
+ user_id = infraction_object["user"]
guild = self.bot.get_guild(constants.Guild.id)
await self.notify_pardon(
user=guild.get_member(user_id),
@@ -1213,16 +1133,14 @@ class Moderation(Scheduler):
icon_url=Icons.user_unmute
)
- async def _deactivate_infraction(self, infraction_object):
+ async def _deactivate_infraction(self, infraction_object: Dict[str, Union[str, int, bool]]) -> None:
"""
- A co-routine which marks an infraction as inactive on the website. This co-routine does
- not cancel or un-schedule an expiration task.
+ A co-routine which marks an infraction as inactive on the website.
- :param infraction_object: the infraction in question
+ This co-routine does not cancel or un-schedule an expiration task.
"""
-
guild: Guild = self.bot.get_guild(constants.Guild.id)
- user_id = int(infraction_object["user"]["user_id"])
+ user_id = infraction_object["user"]
infraction_type = infraction_object["type"]
if infraction_type == "mute":
@@ -1235,24 +1153,29 @@ class Moderation(Scheduler):
log.warning(f"Failed to un-mute user: {user_id} (not found)")
elif infraction_type == "ban":
user: Object = Object(user_id)
- await guild.unban(user)
-
- await self.bot.http_session.patch(
- URLs.site_infractions,
- headers=self.headers,
- json={
- "id": infraction_object["id"],
- "active": False
- }
+ try:
+ await guild.unban(user)
+ except NotFound:
+ log.info(f"Tried to unban user `{user_id}`, but Discord does not have an active ban registered.")
+
+ await self.bot.api_client.patch(
+ 'bot/infractions/' + str(infraction_object['id']),
+ json={"active": False}
)
- def _infraction_to_string(self, infraction_object):
- actor_id = int(infraction_object["actor"]["user_id"])
+ def _infraction_to_string(self, infraction_object: Dict[str, Union[str, int, bool]]) -> str:
+ """Convert the infraction object to a string representation."""
+ actor_id = infraction_object["actor"]
guild: Guild = self.bot.get_guild(constants.Guild.id)
actor = guild.get_member(actor_id)
- active = infraction_object["active"] is True
- user_id = int(infraction_object["user"]["user_id"])
- hidden = infraction_object.get("hidden", False) is True
+ active = infraction_object["active"]
+ user_id = infraction_object["user"]
+ hidden = infraction_object["hidden"]
+ created = datetime.fromisoformat(infraction_object["inserted_at"][:-1]).strftime("%Y-%m-%d %H:%M")
+ if infraction_object["expires_at"] is None:
+ expires = "*Permanent*"
+ else:
+ expires = datetime.fromisoformat(infraction_object["expires_at"][:-1]).strftime("%Y-%m-%d %H:%M")
lines = textwrap.dedent(f"""
{"**===============**" if active else "==============="}
@@ -1261,8 +1184,8 @@ class Moderation(Scheduler):
Type: **{infraction_object["type"]}**
Shadow: {hidden}
Reason: {infraction_object["reason"] or "*None*"}
- Created: {infraction_object["inserted_at"]}
- Expires: {infraction_object["expires_at"] or "*Permanent*"}
+ Created: {created}
+ Expires: {expires}
Actor: {actor.mention if actor else actor_id}
ID: `{infraction_object["id"]}`
{"**===============**" if active else "==============="}
@@ -1271,28 +1194,24 @@ class Moderation(Scheduler):
return lines.strip()
async def notify_infraction(
- self, user: Union[User, Member], infr_type: str, duration: str = None,
- reason: str = None
- ):
+ self,
+ user: Union[User, Member],
+ infr_type: str,
+ expires_at: Union[datetime, str] = 'N/A',
+ reason: str = "No reason provided."
+ ) -> bool:
"""
- Notify a user of their fresh infraction :)
+ Attempt to notify a user, via DM, of their fresh infraction.
- :param user: The user to send the message to.
- :param infr_type: The type of infraction, as a string.
- :param duration: The duration of the infraction.
- :param reason: The reason for the infraction.
+ Returns a boolean indicator of whether the DM was successful.
"""
-
- if duration is None:
- duration = "N/A"
-
- if reason is None:
- reason = "No reason provided."
+ if isinstance(expires_at, datetime):
+ expires_at = expires_at.strftime('%c')
embed = Embed(
description=textwrap.dedent(f"""
**Type:** {infr_type}
- **Duration:** {duration}
+ **Expires:** {expires_at}
**Reason:** {reason}
"""),
colour=Colour(Colours.soft_red)
@@ -1309,18 +1228,17 @@ class Moderation(Scheduler):
return await self.send_private_embed(user, embed)
async def notify_pardon(
- self, user: Union[User, Member], title: str, content: str,
- icon_url: str = Icons.user_verified
- ):
+ self,
+ user: Union[User, Member],
+ title: str,
+ content: str,
+ icon_url: str = Icons.user_verified
+ ) -> bool:
"""
- Notify a user that an infraction has been lifted.
+ Attempt to notify a user, via DM, of their expired infraction.
- :param user: The user to send the message to.
- :param title: The title of the embed.
- :param content: The content of the embed.
- :param icon_url: URL for the title icon.
+ Optionally returns a boolean indicator of whether the DM was successful.
"""
-
embed = Embed(
description=content,
colour=Colour(Colours.soft_green)
@@ -1330,16 +1248,14 @@ class Moderation(Scheduler):
return await self.send_private_embed(user, embed)
- async def send_private_embed(self, user: Union[User, Member], embed: Embed):
+ async def send_private_embed(self, user: Union[User, Member], embed: Embed) -> bool:
"""
A helper method for sending an embed to a user's DMs.
- :param user: The user to send the embed to.
- :param embed: The embed to send.
+ Returns a boolean indicator of DM success.
"""
-
# sometimes `user` is a `discord.Object`, so let's make it a proper user.
- user = await self.bot.get_user_info(user.id)
+ user = await self.bot.fetch_user(user.id)
try:
await user.send(embed=embed)
@@ -1351,7 +1267,8 @@ class Moderation(Scheduler):
)
return False
- async def log_notify_failure(self, target: str, actor: Member, infraction_type: str):
+ async def log_notify_failure(self, target: str, actor: Member, infraction_type: str) -> None:
+ """Send a mod log entry if an attempt to DM the target user has failed."""
await self.mod_log.send_log_message(
icon_url=Icons.token_removed,
content=actor.mention,
@@ -1365,23 +1282,23 @@ class Moderation(Scheduler):
# endregion
- async def __error(self, ctx, error):
+ @staticmethod
+ async def cog_command_error(ctx: Context, error: Exception) -> None:
+ """Send a notification to the invoking context on a Union failure."""
if isinstance(error, BadUnionArgument):
if User in error.converters:
await ctx.send(str(error.errors[0]))
+ error.handled = True
- async def respect_role_hierarchy(self, ctx: Context, target: UserTypes, infr_type: str) -> bool:
+ @staticmethod
+ async def respect_role_hierarchy(ctx: Context, target: UserTypes, infr_type: str) -> bool:
"""
Check if the highest role of the invoking member is greater than that of the target member.
+
If this check fails, a warning is sent to the invoking ctx.
Returns True always if target is not a discord.Member instance.
-
- :param ctx: The command context when invoked.
- :param target: The target of the infraction.
- :param infr_type: The type of infraction.
"""
-
if not isinstance(target, Member):
return True
@@ -1400,6 +1317,7 @@ class Moderation(Scheduler):
return target_is_lower
-def setup(bot):
+def setup(bot: Bot) -> None:
+ """Moderation cog load."""
bot.add_cog(Moderation(bot))
log.info("Cog loaded: Moderation")