diff options
| -rw-r--r-- | bot/constants.py | 28 | ||||
| -rw-r--r-- | bot/exts/moderation/stream.py | 135 | ||||
| -rw-r--r-- | config-default.yml | 3 | ||||
| -rw-r--r-- | tests/bot/exts/moderation/test_stream.py | 81 |
4 files changed, 247 insertions, 0 deletions
diff --git a/bot/constants.py b/bot/constants.py index 6bb6aacd2..33ed29c39 100644 --- a/bot/constants.py +++ b/bot/constants.py @@ -469,6 +469,7 @@ class Roles(metaclass=YAMLGetter): unverified: int verified: int # This is the Developers role on PyDis, here named verified for readability reasons. voice_verified: int + video: int class Guild(metaclass=YAMLGetter): @@ -704,3 +705,30 @@ ERROR_REPLIES = [ "Noooooo!!", "I can't believe you've done this", ] + +# TIME_FORMATS defines aliases and multipliers for time formats +# key is a standard time unit name like second ,year, decade etc. +# mul is a multiplier where duration of said time unit * multiplier = time in seconds +# eg. 1 day = 1 * multiplier seconds, so mul = 86400 +TIME_FORMATS = { + "second": { + "aliases": ("s", "sec", "seconds", "secs"), + "mul": 1 + }, + "minute": { + "aliases": ("m", "min", "mins", "minutes"), + "mul": 60 + }, + "hour": { + "aliases": ("h", "hr", "hrs", "hours"), + "mul": 3600 + }, + "day": { + "aliases": ("d", "days"), + "mul": 86400 + }, + "year": { + "aliases": ("yr", "yrs", "years", "y"), + "mul": 31536000 + } +} diff --git a/bot/exts/moderation/stream.py b/bot/exts/moderation/stream.py new file mode 100644 index 000000000..a44095273 --- /dev/null +++ b/bot/exts/moderation/stream.py @@ -0,0 +1,135 @@ +import time + +import discord +from async_rediscache import RedisCache +from discord.ext import commands, tasks + +from bot.bot import Bot +from bot.constants import Guild, Roles, STAFF_ROLES, TIME_FORMATS + +# Constant error messages +NO_USER_SPECIFIED = "Please specify a user" +TIME_FORMAT_NOT_VALID = "Please specify a valid time format ex. 10h or 1day" +TIME_LESS_EQ_0 = "Duration can not be a 0 or lower" +USER_ALREADY_ALLOWED_TO_STREAM = "This user can already stream" +USER_ALREADY_NOT_ALLOWED_TO_STREAM = "This user already can't stream" + + +# FORMATS holds a combined list of all allowed time units +# made from TIME_FORMATS constant +FORMATS = [] +for key, entry in TIME_FORMATS.items(): + FORMATS.extend(entry["aliases"]) + FORMATS.append(key) + + +class Stream(commands.Cog): + """Stream class handles giving screen sharing permission with commands.""" + + # Data cache storing userid to unix_time relation + # user id is used to get member who's streaming permission need to be revoked after some time + # unix_time is a time when user's streaming permission needs tp be revoked in unix time notation + user_cache = RedisCache() + + def __init__(self, bot: Bot): + self.bot = bot + self.remove_permissions.start() + self.guild_static = None + + @staticmethod + def _link_from_alias(time_format: str) -> (dict, str): + """Get TIME_FORMATS key and entry by time format or any of its aliases.""" + for format_key, val in TIME_FORMATS.items(): + if format_key == time_format or time_format in val["aliases"]: + return TIME_FORMATS[format_key], format_key + + def _parse_time_to_seconds(self, duration: int, time_format: str) -> int: + """Get time in seconds from duration and time format.""" + return duration * self._link_from_alias(time_format)[0]["mul"] + + @commands.command(aliases=("streaming", "share")) + @commands.has_any_role(*STAFF_ROLES) + async def stream( + self, + ctx: commands.Context, + user: discord.Member = None, + duration: int = 1, + time_format: str = "h", + *_ + ) -> None: + """ + Stream handles <prefix>stream command. + + argument user - required user mention, any errors should be handled by upper level handler + duration - int must be higher than 0 - defaults to 1 + time_format - str defining what time unit you want to use, must be any of FORMATS - defaults to h + Command give user permission to stream and takes it away after provided duration + """ + # Check for required user argument + # if not provided send NO_USER_SPECIFIED message + if not user: + await ctx.send(NO_USER_SPECIFIED) + return + + # Time can't be negative lol + if duration <= 0: + await ctx.send(TIME_LESS_EQ_0) + return + + # Check if time_format argument is a valid time format + # eg. d, day etc are aliases for day time format + if time_format not in FORMATS: + await ctx.send(TIME_FORMAT_NOT_VALID) + return + + # Check if user already has streaming permission + already_allowed = any(Roles.video == role.id for role in user.roles) + if already_allowed: + await ctx.send(USER_ALREADY_ALLOWED_TO_STREAM) + return + + # Set user id - time in redis cache and add streaming permission role + await self.user_cache.set(user.id, time.time() + self._parse_time_to_seconds(duration, time_format)) + await user.add_roles(discord.Object(Roles.video), reason="Temporary streaming access granted") + await ctx.send(f"{user.mention} can now stream for {duration} {self._link_from_alias(time_format)[1]}/s") + + @tasks.loop(seconds=30) + async def remove_permissions(self) -> None: + """Background loop for removing streaming permission.""" + all_entries = await self.user_cache.items() + for user_id, delete_time in all_entries: + if time.time() > delete_time: + member = self.guild_static.fetch_memebr(user_id) + if member: + await member.remove_roles(discord.Object(Roles.video), reason="Temporary streaming access revoked") + await self.user_cache.pop(user_id) + + @remove_permissions.before_loop + async def await_ready(self) -> None: + """Wait for bot to be ready before starting remove_permissions loop and get guild by id.""" + await self.bot.wait_until_ready() + self.guild_static = self.bot.get_guild(Guild.id) + + @commands.command(aliases=("unstream", )) + @commands.has_any_role(*STAFF_ROLES) + async def revokestream( + self, + ctx: commands.Context, + user: discord.Member = None + ) -> None: + """ + Revokestream handles <prefix>revokestream command. + + argument user - required user mention, any errors should be handled by upper level handler + command removes streaming permission from a user + """ + not_allowed = not any(Roles.video == role.id for role in user.roles) + if not_allowed: + await user.remove_roles(discord.Object(Roles.video)) + else: + await ctx.send(USER_ALREADY_NOT_ALLOWED_TO_STREAM) + + +def setup(bot: Bot) -> None: + """Loads the Stream cog.""" + bot.add_cog(Stream(bot)) diff --git a/config-default.yml b/config-default.yml index 60eb437af..042d80408 100644 --- a/config-default.yml +++ b/config-default.yml @@ -254,6 +254,9 @@ guild: jammers: 737249140966162473 team_leaders: 737250302834638889 + # Streaming + video: 764245844798079016 + moderation_roles: - *OWNERS_ROLE - *ADMINS_ROLE diff --git a/tests/bot/exts/moderation/test_stream.py b/tests/bot/exts/moderation/test_stream.py new file mode 100644 index 000000000..467c373aa --- /dev/null +++ b/tests/bot/exts/moderation/test_stream.py @@ -0,0 +1,81 @@ +import asyncio +import unittest + +from async_rediscache import RedisSession + +from bot.constants import Roles +from bot.exts.moderation.stream import Stream +from tests.helpers import MockBot, MockMember, MockRole + +redis_session = None +redis_loop = asyncio.get_event_loop() + + +def setUpModule(): # noqa: N802 + """Create and connect to the fakeredis session.""" + global redis_session + redis_session = RedisSession(use_fakeredis=True) + redis_loop.run_until_complete(redis_session.connect()) + + +def tearDownModule(): # noqa: N802 + """Close the fakeredis session.""" + if redis_session: + redis_loop.run_until_complete(redis_session.close()) + + +class StreamCommandTest(unittest.IsolatedAsyncioTestCase): + + def setUp(self) -> None: + self.bot = MockBot() + self.cog = Stream(self.bot) + + def test_linking_time_format_from_alias_or_key(self): + """ + User provided time format needs to be lined to a proper entry in TIME_FORMATS + This Test checks _link_from_alias method + Checking for whether alias or key exists in TIME_FORMATS is done before calling this function + """ + + test_cases = (("sec", "second"), + ("s", "second"), + ("seconds", "second"), + ("second", "second"), + ("secs", "second"), + ("min", "minute"), + ("m", "minute"), + ("minutes", "minute"), + ("hr", "hour"), + ("hrs", "hour"), + ("hours", "hour"), + ("d", "day"), + ("days", "day"), + ("yr", "year"), + ("yrs", "year"), + ("y", "year")) + + for case in test_cases: + linked = self.cog._link_from_alias(case[0])[1] + self.assertEqual(linked, case[1]) + + def test_parsing_duration_and_time_format_to_seconds(self): + """ + Test calculating time in seconds from duration and time unit + This test is technically dependent on _link_from_alias function, not the best practice but necessary + """ + test_cases = ((1, "minute", 60), (5, "second", 5), (2, "day", 172800)) + for case in test_cases: + time_in_seconds = self.cog._parse_time_to_seconds(case[0], case[1]) + self.assertEqual(time_in_seconds, case[2]) + + def test_checking_if_user_has_streaming_permission(self): + """ + Test searching for video role in Member.roles + """ + user1 = MockMember(roles=[MockRole(id=Roles.video)]) + user2 = MockMember() + already_allowed_user1 = any(Roles.video == role.id for role in user1.roles) + self.assertEqual(already_allowed_user1, True) + + already_allowed_user2 = any(Roles.video == role.id for role in user2.roles) + self.assertEqual(already_allowed_user2, False) |