1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
"""The zones cog helps with managing Cloudflare zones."""
import discord
from discord.ext import commands
from kubernetes_asyncio.client.models import V1CronJobList
from arthur.apis.kubernetes import jobs
from arthur.bot import KingArthur
from arthur.config import CONFIG
class CronJobView(discord.ui.View):
"""This view allows users to select and trigger a CronJob."""
def __init__(self, cron_jobs: V1CronJobList) -> None:
super().__init__()
self.cron_jobs = cron_jobs
for cron_job in self.cron_jobs.items:
cj = cron_job.metadata.name
ns = cron_job.metadata.namespace
self.children[0].add_option(
label=cron_job.metadata.name, value=f"{ns}/{cj}", description=ns, emoji="🛠️"
)
def disable_select(self) -> None:
"""Disable the select button."""
self.children[0].disabled = True
async def interaction_check(self, interaction: discord.Interaction) -> bool:
"""Ensure the user has the DevOps role."""
return CONFIG.devops_role in [r.id for r in interaction.user.roles]
@discord.ui.select(
placeholder="Select a CronJob to trigger...",
)
async def select_job(
self, interaction: discord.Interaction, dropdown: discord.ui.Select
) -> None:
"""Drop down menu contains the list of cronjobsb."""
cronjob_namespace, cronjob_name = dropdown.values[0].split("/")
cronjob = await jobs.get_cronjob(cronjob_namespace, cronjob_name)
new_job = await jobs.create_job(
cronjob_namespace,
f"{cronjob_name}-{interaction.message.id}",
cronjob.spec.job_template.spec,
)
self.disable_select()
await interaction.message.edit(view=self)
await interaction.response.send_message(f"🌬️ Spawned job `{new_job.metadata.name}`")
class Jobs(commands.Cog):
"""Commands for working with Kubernetes Jobs & CronJobs."""
def __init__(self, bot: KingArthur) -> None:
self.bot = bot
@commands.group(name="cronjob", aliases=["cronjobs", "cj"], invoke_without_command=True)
async def cronjob(self, ctx: commands.Context) -> None:
"""Commands for working with Kubernetes CronJobs."""
await ctx.send_help(ctx.command)
@cronjob.command(name="trigger")
async def trigger(self, ctx: commands.Context) -> None:
"""Command to trigger a Kubernetes cronjob now."""
cronjobs = await jobs.list_cronjobs()
view = CronJobView(cronjobs)
await ctx.send(":tools: Pick a CronJob to trigger", view=view)
async def setup(bot: KingArthur) -> None:
"""Add the extension to the bot."""
await bot.add_cog(Jobs(bot))
|