1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
  | 
"""The zones cog helps with managing Cloudflare zones."""
import discord
from discord.ext import commands
from kubernetes_asyncio.client.models import V1CronJobList
from arthur.apis.kubernetes import jobs
from arthur.bot import KingArthur
from arthur.config import CONFIG
class CronJobView(discord.ui.View):
    """This view allows users to select and trigger a CronJob."""
    def __init__(self, cron_jobs: V1CronJobList) -> None:
        super().__init__()
        self.cron_jobs = cron_jobs
        for cron_job in self.cron_jobs.items:
            cj = cron_job.metadata.name
            ns = cron_job.metadata.namespace
            self.children[0].add_option(
                label=cron_job.metadata.name, value=f"{ns}/{cj}", description=ns, emoji="🛠️"
            )
    def disable_select(self) -> None:
        """Disable the select button."""
        self.children[0].disabled = True
    async def interaction_check(self, interaction: discord.Interaction) -> bool:
        """Ensure the user has the DevOps role."""
        return CONFIG.devops_role in [r.id for r in interaction.user.roles]
    @discord.ui.select(
        placeholder="Select a CronJob to trigger...",
    )
    async def select_job(
        self, interaction: discord.Interaction, dropdown: discord.ui.Select
    ) -> None:
        """Drop down menu contains the list of cronjobsb."""
        cronjob_namespace, cronjob_name = dropdown.values[0].split("/")
        cronjob = await jobs.get_cronjob(cronjob_namespace, cronjob_name)
        new_job = await jobs.create_job(
            cronjob_namespace,
            f"{cronjob_name}-{interaction.message.id}",
            cronjob.spec.job_template.spec,
        )
        self.disable_select()
        await interaction.message.edit(view=self)
        await interaction.response.send_message(f"🌬️ Spawned job `{new_job.metadata.name}`")
class Jobs(commands.Cog):
    """Commands for working with Kubernetes Jobs & CronJobs."""
    def __init__(self, bot: KingArthur) -> None:
        self.bot = bot
    @commands.group(name="cronjob", aliases=["cronjobs", "cj"], invoke_without_command=True)
    async def cronjob(self, ctx: commands.Context) -> None:
        """Commands for working with Kubernetes CronJobs."""
        await ctx.send_help(ctx.command)
    @cronjob.command(name="trigger")
    async def trigger(self, ctx: commands.Context) -> None:
        """Command to trigger a Kubernetes cronjob now."""
        cronjobs = await jobs.list_cronjobs()
        view = CronJobView(cronjobs)
        await ctx.send(":tools: Pick a CronJob to trigger", view=view)
async def setup(bot: KingArthur) -> None:
    """Add the extension to the bot."""
    await bot.add_cog(Jobs(bot))
 
  |