| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
 | """The Pods cog helps with managing Kubernetes pods."""
from textwrap import dedent
from discord.ext import commands
from tabulate import tabulate
from arthur.apis.kubernetes import pods
from arthur.bot import KingArthur
from arthur.utils import generate_error_message
class Pods(commands.Cog):
    """Commands for working with Kubernetes Pods."""
    def __init__(self, bot: KingArthur) -> None:
        self.bot = bot
    @commands.group(name="pods", aliases=["pod"], invoke_without_command=True)
    async def pods_cmd(self, ctx: commands.Context) -> None:
        """Commands for working with Kubernetes Pods."""
        await ctx.send_help(ctx.command)
    @pods_cmd.command(name="list", aliases=["ls"])
    async def pods_list(self, ctx: commands.Context, namespace: str = "default") -> None:
        """List pods in the selected namespace (defaults to default)."""
        pod_list = await pods.list_pods(namespace)
        table_data = []
        if len(pod_list.items) == 0:
            return await ctx.send(
                generate_error_message(
                    description="No pods found, check the namespace exists."
                )
            )
        for pod in pod_list.items:
            match pod.status.phase:
                case "Running":
                    emote = ":green_circle:"
                case "Pending":
                    emote = ":yellow_circle:"
                case "Succeeded":
                    emote = ":white_check_mark:"
                case "Failed":
                    emote = ":x:"
                case "Unknown":
                    emote = ":question:"
                case _:
                    emote = ":grey_question:"
            table_data.append([
                emote,
                pod.metadata.name,
                pod.status.phase,
                pod.status.pod_ip,
                pod.spec.node_name,
            ])
        table = tabulate(
            table_data,
            headers=["Status", "Pod", "Phase", "IP", "Node"],
            tablefmt="psql",
            colalign=("center", "left", "center", "center", "center")
        )
        return_message = dedent("""
            **Pods in namespace `{0}`**
            ```
            {1}
            ```
            """)
        await ctx.send(return_message.format(namespace, table))
        return None
async def setup(bot: KingArthur) -> None:
    """Add the extension to the bot."""
    await bot.add_cog(Pods(bot))
 |