| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
 | import datetime
import html
import re
from flask import redirect, request, url_for
from werkzeug.exceptions import BadRequest
from pysite.base_route import RouteView
from pysite.constants import BotEventTypes, CHANNEL_MOD_LOG, DEBUG_MODE, EDITOR_ROLES
from pysite.decorators import csrf, require_roles
from pysite.mixins import DBMixin, RMQMixin
from pysite.rst import render
STRIP_REGEX = re.compile(r"<[^<]+?>")
class EditView(RouteView, DBMixin, RMQMixin):
    path = "/edit/<path:page>"  # "path" means that it accepts slashes
    name = "edit"
    table_name = "wiki"
    revision_table_name = "wiki_revisions"
    @require_roles(*EDITOR_ROLES)
    def get(self, page):
        rst = ""
        title = ""
        preview = "<p>Preview will appear here.</p>"
        obj = self.db.get(self.table_name, page)
        if obj:
            rst = obj.get("rst", "")
            title = obj.get("title", "")
            preview = obj.get("html", preview)
            if obj.get("lock_expiry") and obj.get("lock_user") != self.user_data.get("user_id"):
                lock_time = datetime.datetime.fromtimestamp(obj["lock_expiry"])
                if datetime.datetime.utcnow() < lock_time:
                    return self.render("wiki/page_in_use.html", page=page, can_edit=True)
        lock_expiry = datetime.datetime.utcnow() + datetime.timedelta(minutes=5)
        # There are a couple of cases where we will not need to lock a page. One of these is if the application is
        # current set to debug mode. The other of these cases is if the page is empty, because if the page is empty
        # we will only have a partially filled out page if the user quits before saving.
        if obj:
            if not DEBUG_MODE and obj.get("rst"):
                self.db.insert(
                    self.table_name,
                    {
                        "slug": page,
                        "lock_expiry": lock_expiry.timestamp(),
                        "lock_user": self.user_data.get("user_id")
                    },
                    conflict="update"
                )
        return self.render("wiki/page_edit.html", page=page, rst=rst, title=title, preview=preview, can_edit=True)
    @require_roles(*EDITOR_ROLES)
    @csrf
    def post(self, page):
        rst = request.form.get("rst")
        title = request.form["title"]
        if not rst or not rst.strip():
            raise BadRequest()
        if not title or not title.strip():
            raise BadRequest()
        rendered = render(rst)
        obj = {
            "slug": page,
            "title": request.form["title"],
            "rst": rst,
            "html": rendered["html"],
            "text": html.unescape(STRIP_REGEX.sub("", rendered["html"]).strip()),
            "headers": rendered["headers"]
        }
        self.db.insert(
            self.table_name,
            obj,
            conflict="replace"
        )
        if not DEBUG_MODE:
            # Add the post to the revisions table
            revision_payload = {
                "slug": page,
                "post": obj,
                "date": datetime.datetime.utcnow().timestamp(),
                "user": self.user_data.get("user_id")
            }
            del revision_payload["post"]["slug"]
            current_revisions = self.db.filter(self.revision_table_name, lambda rev: rev["slug"] == page)
            sorted_revisions = sorted(current_revisions, key=lambda rev: rev["date"], reverse=True)
            if len(sorted_revisions) > 0:
                old_rev = sorted_revisions[0]
            else:
                old_rev = None
            new_rev = self.db.insert(self.revision_table_name, revision_payload)["generated_keys"][0]
        self.audit_log(page, new_rev, old_rev, obj)
        return redirect(url_for("wiki.page", page=page), code=303)  # Redirect, ensuring a GET
    @require_roles(*EDITOR_ROLES)
    @csrf
    def patch(self, page):
        current = self.db.get(self.table_name, page)
        if not current:
            return "", 404
        if current.get("lock_expiry"):  # If there is a lock present
            # If user patching is not the user with the lock end here
            if current["lock_user"] != self.user_data.get("user_id"):
                return "", 400
            new_lock = datetime.datetime.utcnow() + datetime.timedelta(minutes=5)  # New lock time, 5 minutes in future
            self.db.insert(self.table_name, {
                "slug": page,
                "lock_expiry": new_lock.timestamp()
            }, conflict="update")  # Update with new lock time
        return "", 204
    def audit_log(self, page, new_id, old_data, new_data):
        if not old_data:
            link = f"https://wiki.pythondiscord.com/source/{page}"
        else:
            link = f"https://wiki.pythondiscord.com/history/compare/{old_data['id']}/{new_id}"
        self.rmq_bot_event(
            BotEventTypes.send_embed,
            {
                "target": CHANNEL_MOD_LOG,
                "title": "Page Edit",
                "description": f"**{new_data['title']}** edited by **{self.user_data.get('username')}**. "
                               f"[View the diff here]({link})",
                "colour": 0x3F8DD7,  # Light blue
                "timestamp": datetime.datetime.now().isoformat()
            }
        )
 |