aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGravatar Leon Sandøy <[email protected]>2018-04-20 22:12:03 +0200
committerGravatar GitHub <[email protected]>2018-04-20 22:12:03 +0200
commit35e0f4466677602e9ec6db614e8ea881dbf656cb (patch)
tree43ebdcc530b57211e45e4fe96bb2811e334640f0
parentAdded image URLs for each of these famous rappers. (#55) (diff)
[#1eeu1] Hiphopify (#54)
* Changed the dev-mode logic to be the same as prod for creating new tables if they don't exist. Also added a new feature where a table can be initialized with data if you create a JSON file in the pysite/database/table_init/ folder and fill it with a list of dicts where each dict represents a row in your table. Included a hiphoppers json so that I can actually test if it works in production. It will only init the table if the table is empty. * Not sure if this will solve it, but I think so. * Renamed the tables and primary keys, and alphabetized the dict. Now complies with the gdudes holy wishes. * Almost done with the initial build for this. Implemented GET and DELETE, in order to finish POST I need to expand the database.py interface class. * Alphabetized database convenience wrappers. * Fixed a few typehints and added the sample convenience wrapper to the database class. * Finishing up the POST method and adding a duration parser to the utils folder so we can handle strings like 2w3d and turn them into a timestamp. * Fixed API blueprint loading, which was broken by the setup method in the DBMixIn. I'd forgotten to remove the check for table_name attribute. Also adde some logging and got the DELETE route working. * Added timezone-sensitivity to the duration parser so it will work with rethink. renamed the json and fixed some bugs in the hiphopify API. * Added a utility to test if rdb timestamps are expired, and only returning data from the GET calls if it isn't expired. * changed some log wording * Setting up Lil Joseph as default image. Adding some rappers to the list. * Adding a bunch of logging * These tests no longer apply. New tests must be written in the long run, removing them for now. * Addressing review comments left by Volcyy * Fixed misleading comment.
-rw-r--r--app_test.py22
-rw-r--r--pysite/constants.py4
-rw-r--r--pysite/database.py345
-rw-r--r--pysite/database/table_init/hiphopify_namelist.json12
-rw-r--r--pysite/mixins.py3
-rw-r--r--pysite/utils/time.py62
-rw-r--r--pysite/views/api/bot/hiphopify.py178
-rw-r--r--tox.ini2
8 files changed, 432 insertions, 196 deletions
diff --git a/app_test.py b/app_test.py
index 20293db9..b9a71292 100644
--- a/app_test.py
+++ b/app_test.py
@@ -292,28 +292,6 @@ class Utilities(SiteTest):
class MixinTests(SiteTest):
""" Test cases for mixins """
- def test_dbmixin_runtime_error(self):
- """ Check that wrong values for error view setup raises runtime error """
- from pysite.mixins import DBMixin
-
- dbm = DBMixin()
- try:
- dbm.setup('sdf', 'sdfsdf')
- except RuntimeError:
- return True
- raise Exception('Expected runtime error on setup() when giving wrongful arguments')
-
- def test_dbmixin_table_property(self):
- """ Check the table property returns correctly """
- from pysite.mixins import DBMixin
-
- try:
- dbm = DBMixin()
- dbm.table_name = 'Table'
- self.assertEqual(dbm.table, 'Table')
- except AttributeError:
- pass
-
def test_handler_5xx(self):
""" Check error view returns error message """
from werkzeug.exceptions import InternalServerError
diff --git a/pysite/constants.py b/pysite/constants.py
index ca18a288..4eff3606 100644
--- a/pysite/constants.py
+++ b/pysite/constants.py
@@ -22,7 +22,6 @@ class ValidationTypes(Enum):
DEBUG_MODE = "FLASK_DEBUG" in environ
# All snowflakes should be strings as RethinkDB rounds them as ints
-
OWNER_ROLE = "267627879762755584"
ADMIN_ROLE = "267628507062992896"
MODERATOR_ROLE = "267629731250176001"
@@ -44,7 +43,7 @@ DISCORD_OAUTH_SECRET = environ.get('DISCORD_OAUTH_SECRET', '')
DISCORD_OAUTH_SCOPE = 'identify email guilds.join'
OAUTH_DATABASE = "oauth_data"
-PREFERRED_URL_SCHEME = environ.get("PREFERRED_URL_SCHEME", "https") # Change this in testing to "http"
+PREFERRED_URL_SCHEME = environ.get("PREFERRED_URL_SCHEME", "http")
ERROR_DESCRIPTIONS = {
# 5XX
@@ -78,7 +77,6 @@ DATADOG_ADDRESS = environ.get("DATADOG_ADDRESS") or None
DATADOG_PORT = int(environ.get("DATADOG_PORT") or 0)
# CSRF
-
CSRF = CSRFProtect()
# GitHub Token
diff --git a/pysite/database.py b/pysite/database.py
index 8903fbf4..86c8685d 100644
--- a/pysite/database.py
+++ b/pysite/database.py
@@ -100,7 +100,7 @@ class RethinkDB:
table_data = json.load(json_file)
self.log.trace(f"Loading the json file into the table. "
- f"The json file contains {len(table_data)} rows.")
+ f"The json file contains {len(table_data)} items.")
for row in table_data:
self.insert(
@@ -302,104 +302,42 @@ class RethinkDB:
# region: RethinkDB wrapper functions
- def insert(self, table_name: str, *objects: Dict[str, Any],
- durability: str = "hard",
- return_changes: Union[bool, str] = False,
- conflict: Union[ # Any of...
- str, Callable[ # ...str, or a callable that...
- [Dict[str, Any], Dict[str, Any]], # ...takes two dicts with string keys and any values...
- Dict[str, Any] # ...and returns a dict with string keys and any values
- ]
- ] = "error") -> Union[List, Dict]: # flake8: noqa
- """
- Insert an object or a set of objects into a table
-
- :param table_name: The name of the table to insert into
- :param objects: The objects to be inserted into the table
- :param durability: "hard" (the default) to write the change immediately, "soft" otherwise
- :param return_changes: Whether to return a list of changed values or not - defaults to False
- :param conflict: What to do in the event of a conflict - "error", "replace" and "update" are included, but
- you can also provide your own function in order to handle conflicts yourself. If you do this, the function
- should take two arguments (the old document and the new one), and return a single document to replace both.
-
- :return: A list of changes if `return_changes` is True; a dict detailing the operations run otherwise
- """
-
- query = self.query(table_name).insert(
- objects, durability=durability, return_changes=return_changes, conflict=conflict
- )
-
- if return_changes:
- return self.run(query, coerce=list)
- else:
- return self.run(query, coerce=dict)
-
- def get(self, table_name: str, key: str) -> Union[Dict[str, Any], None]:
- """
- Get a single document from a table by primary key
-
- :param table_name: The name of the table to get the document from
- :param key: The value of the primary key belonging to the document you want
-
- :return: The document, or None if it wasn't found
+ def between(self, table_name: str, *, lower: Any = rethinkdb.minval, upper: Any = rethinkdb.maxval,
+ index: Optional[str] = None, left_bound: str = "closed", right_bound: str = "open") -> List[
+ Dict[str, Any]]:
"""
+ Get all documents between two keys
- result = self.run( # pragma: no cover
- self.query(table_name).get(key)
- )
-
- return dict(result) if result else None # pragma: no cover
-
- def get_all(self, table_name: str, *keys: str, index: str = "id") -> List[Any]:
- """
- Get a list of documents matching a set of keys, on a specific index
+ >>> db = RethinkDB()
+ >>> db.between("users", upper=10, index="conquests")
+ [
+ {"username": "gdude", "conquests": 2},
+ {"username": "joseph", "conquests": 5}
+ ]
+ >>> db.between("users", lower=10, index="conquests")
+ [
+ {"username": "lemon", "conquests": 15}
+ ]
+ >>> db.between("users", lower=2, upper=10, index="conquests" left_bound="open")
+ [
+ {"username": "gdude", "conquests": 2},
+ {"username": "joseph", "conquests": 5}
+ ]
- :param table_name: The name of the table to get documents from
- :param keys: The key values to match against
- :param index: The name of the key or index to match on
+ :param table_name: The table to get documents from
+ :param lower: The lower-bounded value, leave blank to ignore
+ :param upper: The upper-bounded value, leave blank to ignore
+ :param index: The key or index to check on each document
+ :param left_bound: "open" to include documents that exactly match the lower bound, "closed" otherwise
+ :param right_bound: "open" to include documents that exactly match the upper bound, "closed" otherwise
- :return: A list of matching documents; may be empty if no matches were made
+ :return: A list of matched documents; may be empty
"""
-
return self.run( # pragma: no cover
- self.query(table_name).get_all(*keys, index=index),
+ self.query(table_name).between(lower, upper, index=index, left_bound=left_bound, right_bound=right_bound),
coerce=list
)
- def wait(self, table_name: str, wait_for: str = "all_replicas_ready", timeout: int = 0) -> bool:
- """
- Wait until an operation has happened on a specific table; will block the current function
-
- :param table_name: The name of the table to wait against
- :param wait_for: The operation to wait for; may be "ready_for_outdated_reads",
- "ready_for_reads", "ready_for_writes" or "all_replicas_ready", which is the default
- :param timeout: How long to wait before returning; defaults to 0 (forever)
-
- :return: True; but may return False if the timeout was reached
- """
-
- result = self.run( # pragma: no cover
- self.query(table_name).wait(wait_for=wait_for, timeout=timeout),
- coerce=dict
- )
-
- return result.get("ready", 0) > 0
-
- def sync(self, table_name: str) -> bool:
- """
- Following a set of edits with durability set to "soft", this must be called to save those edits
-
- :param table_name: The name of the table to sync
-
- :return: True if the sync was successful; False otherwise
- """
- result = self.run( # pragma: no cover
- self.query(table_name).sync(),
- coerce=dict
- )
-
- return result.get("synced", 0) > 0 # pragma: no cover
-
def changes(self, table_name: str, squash: Union[bool, int] = False, changefeed_queue_size: int = 100_000,
include_initial: Optional[bool] = None, include_states: bool = False,
include_types: bool = False) -> Iterator[Dict[str, Any]]:
@@ -460,90 +398,100 @@ class RethinkDB:
new_connection=True
)
- def pluck(self, table_name: str, *selectors: Union[str, Dict[str, Union[List, Dict]]]):
+ def filter(self, table_name: str, predicate: Callable[[Dict[str, Any]], bool],
+ default: Union[bool, UserError] = False) -> List[Dict[str, Any]]:
"""
- Get a list of values for a specific set of keys for every document in the table; this can include
- nested values
+ Return all documents in a table for which `predicate` returns true.
+ The `predicate` argument should be a function that takes a single argument - a single document to check - and
+ it should return True or False depending on whether the document should be included.
+
+ >>> def many_conquests(doc):
+ ... '''Return documents with at least 10 conquests'''
+ ... return doc["conquests"] >= 10
+ ...
>>> db = RethinkDB()
- >>> db.pluck("users", "username", "password") # Select a flat document
- [
- {"username": "lemon", "password": "hunter2"}
- ]
- >>> db.pluck("users", {"posts": ["title"]}) # Select from nested documents
+ >>> db.filter("users", many_conquests)
[
- {
- "posts": [
- {"title": "New website!"}
- ]
- }
+ {"username": "lemon", "conquests": 15}
]
- :param table_name: The table to get values from
- :param selectors: The set of keys to get values for
- :return: A list containing the requested documents, with only the keys requested
+ :param table_name: The name of the table to get documents for
+ :param predicate: The callable to use to filter the documents
+ :param default: What to do if a document is missing fields; True to include them, `rethink.error()` to raise
+ aa ReqlRuntimeError, or False to skip over the document (the default)
+ :return: A list of documents that match the predicate; may be empty
"""
return self.run( # pragma: no cover
- self.query(table_name).pluck(*selectors),
+ self.query(table_name).filter(predicate, default=default),
coerce=list
)
- def without(self, table_name: str, *selectors: Union[str, Dict[str, Union[List, Dict]]]):
+ def get(self, table_name: str, key: str) -> Optional[Dict[str, Any]]:
"""
- The functional opposite of `pluck()`, returning full documents without the specified selectors
+ Get a single document from a table by primary key
- >>> db = RethinkDB()
- >>> db.without("users", "posts")
- [
- {"username": "lemon", "password": "hunter2}
- ]
+ :param table_name: The name of the table to get the document from
+ :param key: The value of the primary key belonging to the document you want
- :param table_name: The table to get values from
- :param selectors: The set of keys to exclude
- :return: A list containing the requested documents, without the keys requested
+ :return: The document, or None if it wasn't found
"""
- return self.run( # pragma: no cover
- self.query(table_name).without(*selectors)
+ result = self.run( # pragma: no cover
+ self.query(table_name).get(key)
)
- def between(self, table_name: str, *, lower: Any = rethinkdb.minval, upper: Any = rethinkdb.maxval,
- index: Optional[str] = None, left_bound: str = "closed", right_bound: str = "open") -> List[
- Dict[str, Any]]:
- """
- Get all documents between two keys
+ return dict(result) if result else None # pragma: no cover
- >>> db = RethinkDB()
- >>> db.between("users", upper=10, index="conquests")
- [
- {"username": "gdude", "conquests": 2},
- {"username": "joseph", "conquests": 5}
- ]
- >>> db.between("users", lower=10, index="conquests")
- [
- {"username": "lemon", "conquests": 15}
- ]
- >>> db.between("users", lower=2, upper=10, index="conquests" left_bound="open")
- [
- {"username": "gdude", "conquests": 2},
- {"username": "joseph", "conquests": 5}
- ]
+ def get_all(self, table_name: str, *keys: str, index: str = "id") -> List[Any]:
+ """
+ Get a list of documents matching a set of keys, on a specific index
- :param table_name: The table to get documents from
- :param lower: The lower-bounded value, leave blank to ignore
- :param upper: The upper-bounded value, leave blank to ignore
- :param index: The key or index to check on each document
- :param left_bound: "open" to include documents that exactly match the lower bound, "closed" otherwise
- :param right_bound: "open" to include documents that exactly match the upper bound, "closed" otherwise
+ :param table_name: The name of the table to get documents from
+ :param keys: The key values to match against
+ :param index: The name of the key or index to match on
- :return: A list of matched documents; may be empty
+ :return: A list of matching documents; may be empty if no matches were made
"""
+
return self.run( # pragma: no cover
- self.query(table_name).between(lower, upper, index=index, left_bound=left_bound, right_bound=right_bound),
+ self.query(table_name).get_all(*keys, index=index),
coerce=list
)
+ def insert(self, table_name: str, *objects: Dict[str, Any],
+ durability: str = "hard",
+ return_changes: Union[bool, str] = False,
+ conflict: Union[ # Any of...
+ str, Callable[ # ...str, or a callable that...
+ [Dict[str, Any], Dict[str, Any]], # ...takes two dicts with string keys and any values...
+ Dict[str, Any] # ...and returns a dict with string keys and any values
+ ]
+ ] = "error") -> Union[List, Dict]: # flake8: noqa
+ """
+ Insert an object or a set of objects into a table
+
+ :param table_name: The name of the table to insert into
+ :param objects: The objects to be inserted into the table
+ :param durability: "hard" (the default) to write the change immediately, "soft" otherwise
+ :param return_changes: Whether to return a list of changed values or not - defaults to False
+ :param conflict: What to do in the event of a conflict - "error", "replace" and "update" are included, but
+ you can also provide your own function in order to handle conflicts yourself. If you do this, the function
+ should take two arguments (the old document and the new one), and return a single document to replace both.
+
+ :return: A list of changes if `return_changes` is True; a dict detailing the operations run otherwise
+ """
+
+ query = self.query(table_name).insert(
+ objects, durability=durability, return_changes=return_changes, conflict=conflict
+ )
+
+ if return_changes:
+ return self.run(query, coerce=list)
+ else:
+ return self.run(query, coerce=dict)
+
def map(self, table_name: str, func: Callable):
"""
Map a function over every document in a table, with the possibility of modifying it
@@ -571,34 +519,101 @@ class RethinkDB:
coerce=list
)
- def filter(self, table_name: str, predicate: Callable[[Dict[str, Any]], bool],
- default: Union[bool, UserError] = False) -> List[Dict[str, Any]]:
+ def pluck(self, table_name: str, *selectors: Union[str, Dict[str, Union[List, Dict]]]) -> List[Dict[str, Any]]:
"""
- Return all documents in a table for which `predicate` returns true.
-
- The `predicate` argument should be a function that takes a single argument - a single document to check - and
- it should return True or False depending on whether the document should be included.
+ Get a list of values for a specific set of keys for every document in the table; this can include
+ nested values
- >>> def many_conquests(doc):
- ... '''Return documents with at least 10 conquests'''
- ... return doc["conquests"] >= 10
- ...
>>> db = RethinkDB()
- >>> db.filter("users", many_conquests)
+ >>> db.pluck("users", "username", "password") # Select a flat document
[
- {"username": "lemon", "conquests": 15}
+ {"username": "lemon", "password": "hunter2"}
+ ]
+ >>> db.pluck("users", {"posts": ["title"]}) # Select from nested documents
+ [
+ {
+ "posts": [
+ {"title": "New website!"}
+ ]
+ }
]
- :param table_name: The name of the table to get documents for
- :param predicate: The callable to use to filter the documents
- :param default: What to do if a document is missing fields; True to include them, `rethink.error()` to raise
- aa ReqlRuntimeError, or False to skip over the document (the default)
- :return: A list of documents that match the predicate; may be empty
+ :param table_name: The table to get values from
+ :param selectors: The set of keys to get values for
+ :return: A list containing the requested documents, with only the keys requested
"""
return self.run( # pragma: no cover
- self.query(table_name).filter(predicate, default=default),
+ self.query(table_name).pluck(*selectors),
+ coerce=list
+ )
+
+ def sample(self, table_name: str, sample_size: int) -> List[Dict[str, Any]]:
+ """
+ Select a given number of elements from a table at random.
+
+ :param table_name: The name of the table to select from.
+ :param sample_size: The number of elements to select.
+ If this number is higher than the total amount of items in
+ the table, this will return the entire table in random order.
+
+ :return: A list of items from the table.
+ """
+ return self.run( # pragma: no cover
+ self.query(table_name).sample(sample_size),
coerce=list
)
+ def sync(self, table_name: str) -> bool:
+ """
+ Following a set of edits with durability set to "soft", this must be called to save those edits
+
+ :param table_name: The name of the table to sync
+
+ :return: True if the sync was successful; False otherwise
+ """
+ result = self.run( # pragma: no cover
+ self.query(table_name).sync(),
+ coerce=dict
+ )
+
+ return result.get("synced", 0) > 0 # pragma: no cover
+
+ def wait(self, table_name: str, wait_for: str = "all_replicas_ready", timeout: int = 0) -> bool:
+ """
+ Wait until an operation has happened on a specific table; will block the current function
+
+ :param table_name: The name of the table to wait against
+ :param wait_for: The operation to wait for; may be "ready_for_outdated_reads",
+ "ready_for_reads", "ready_for_writes" or "all_replicas_ready", which is the default
+ :param timeout: How long to wait before returning; defaults to 0 (forever)
+
+ :return: True; but may return False if the timeout was reached
+ """
+
+ result = self.run( # pragma: no cover
+ self.query(table_name).wait(wait_for=wait_for, timeout=timeout),
+ coerce=dict
+ )
+
+ return result.get("ready", 0) > 0
+
+ def without(self, table_name: str, *selectors: Union[str, Dict[str, Union[List, Dict]]]):
+ """
+ The functional opposite of `pluck()`, returning full documents without the specified selectors
+
+ >>> db = RethinkDB()
+ >>> db.without("users", "posts")
+ [
+ {"username": "lemon", "password": "hunter2"}
+ ]
+
+ :param table_name: The table to get values from
+ :param selectors: The set of keys to exclude
+ :return: A list containing the requested documents, without the keys requested
+ """
+
+ return self.run( # pragma: no cover
+ self.query(table_name).without(*selectors)
+ )
# endregion
diff --git a/pysite/database/table_init/hiphopify_namelist.json b/pysite/database/table_init/hiphopify_namelist.json
index 888051b1..28d6242a 100644
--- a/pysite/database/table_init/hiphopify_namelist.json
+++ b/pysite/database/table_init/hiphopify_namelist.json
@@ -1581,7 +1581,7 @@
},
{
"name": "Fonzworth Bentley",
- "image_url": "https://www.vegasnews.com/wp-content/uploads/32809-fonzworth-bentley-2-588.jpg"
+ "image_url": "https://i.ytimg.com/vi/PKT8_mXk1-g/maxresdefault.jpg"
},
{
"name": "Fort Minor",
@@ -2660,6 +2660,10 @@
"image_url": "http://s3.amazonaws.com/rapgenius/lil-jon-w.jpg"
},
{
+ "name": "Lil' Joseph",
+ "image_url": "http://beardfist.com/images/lil_joseph.png"
+ },
+ {
"name": "Lil' Mama",
"image_url": "http://images2.fanpop.com/image/photos/11900000/Lil-Mama-3-female-rappers-11934192-440-348.jpg"
},
@@ -3741,7 +3745,7 @@
},
{
"name": "RZA",
- "image_url": "http://www.sosoactive.com/wp-content/uploads/2014/04/rza-2.jpg"
+ "image_url": "https://static01.nyt.com/images/2012/10/21/arts/21RZA1_SPAN/21RZA1_SPAN-jumbo.jpg"
},
{
"name": "R. Kelly",
@@ -5004,6 +5008,10 @@
"image_url": "http://www.xxlmag.com/files/2015/03/x-clan-feat2.jpg"
},
{
+ "name": "Yolandi Visser",
+ "image_url": "https://i.pinimg.com/originals/d0/fd/b2/d0fdb22ac606edd8f1bd15abd8d67faa.jpg"
+ },
+ {
"name": "Young Jeezy",
"image_url": "http://pennylibertygbow.files.wordpress.com/2012/02/youngjeezy3.jpg"
},
diff --git a/pysite/mixins.py b/pysite/mixins.py
index 0a9402a9..5108a9a1 100644
--- a/pysite/mixins.py
+++ b/pysite/mixins.py
@@ -47,9 +47,6 @@ class DBMixin:
if hasattr(super(), "setup"):
super().setup(manager, blueprint) # pragma: no cover
- if not cls.table_name:
- raise RuntimeError("Routes using DBViewMixin must define `table_name`")
-
cls._db = ref(manager.db)
@property
diff --git a/pysite/utils/time.py b/pysite/utils/time.py
new file mode 100644
index 00000000..334408a4
--- /dev/null
+++ b/pysite/utils/time.py
@@ -0,0 +1,62 @@
+from datetime import datetime, timedelta
+
+from rethinkdb import make_timezone
+
+
+UNITS = {
+ 's': lambda value: value,
+ 'm': lambda value: value * 60,
+ 'h': lambda value: value * 60 * 60,
+ 'd': lambda value: value * 60 * 60 * 24,
+ 'w': lambda value: value * 60 * 60 * 24 * 7
+}
+
+
+def parse_duration(duration: str) -> datetime:
+ """
+ Parses a string like '3w' into a datetime 3 weeks from now.
+
+ Also supports strings like 1w2d or 1h25m.
+
+ This function is adapted from a bot called ROWBOAT, written by b1naryth1ef.
+ See https://github.com/b1naryth1ef/rowboat/blob/master/rowboat/util/input.py
+
+ :param duration: a string containing the number and a time unit shorthand.
+ :return: A datetime representing now + the duration
+ """
+
+ if not duration:
+ raise ValueError("No duration provided.")
+
+ value = 0
+ digits = ''
+
+ for char in duration:
+
+ # Add all numbers to the digits string
+ if char.isdigit():
+ digits += char
+ continue
+
+ # If it's not a number and not one of the letters in UNITS, it must be invalid.
+ if char not in UNITS or not digits:
+ raise ValueError("Invalid duration")
+
+ # Otherwise, call the corresponding lambda to convert the value, and keep iterating.
+ value += UNITS[char](int(digits))
+ digits = ''
+
+ return datetime.now(make_timezone("00:00")) + timedelta(seconds=value + 1)
+
+
+def is_expired(rdb_datetime: datetime) -> bool:
+ """
+ Takes a rethinkdb datetime (timezone aware) and
+ figures out if it has expired yet.
+
+ Always compares with UTC 00:00
+
+ :param rdb_timestamp: A datetime as stored in rethinkdb.
+ :return: True if the datetime is in the past.
+ """
+ return datetime.now(make_timezone("00:00")) > rdb_datetime
diff --git a/pysite/views/api/bot/hiphopify.py b/pysite/views/api/bot/hiphopify.py
new file mode 100644
index 00000000..7ebf47e1
--- /dev/null
+++ b/pysite/views/api/bot/hiphopify.py
@@ -0,0 +1,178 @@
+# coding=utf-8
+import datetime
+import logging
+
+from flask import jsonify
+from schema import Optional, Schema
+
+from pysite.base_route import APIView
+from pysite.constants import ValidationTypes
+from pysite.decorators import api_key, api_params
+from pysite.mixins import DBMixin
+from pysite.utils.time import is_expired, parse_duration
+
+log = logging.getLogger(__name__)
+
+GET_SCHEMA = Schema([
+ {
+ "user_id": str
+ }
+])
+
+POST_SCHEMA = Schema([
+ {
+ "user_id": str,
+ "duration": str,
+ Optional("forced_nick"): str
+ }
+])
+
+DELETE_SCHEMA = Schema([
+ {
+ "user_id": str
+ }
+])
+
+
+class HiphopifyView(APIView, DBMixin):
+ path = "/hiphopify"
+ name = "hiphopify"
+ prison_table = "hiphopify"
+ name_table = "hiphopify_namelist"
+
+ @api_key
+ @api_params(schema=GET_SCHEMA, validation_type=ValidationTypes.params)
+ def get(self, params=None):
+ """
+ Check if the user is currently in hiphop-prison.
+
+ If user is currently servin' his sentence in the big house,
+ return the name stored in the forced_nick column of prison_table.
+
+ If user cannot be found in prison, or
+ if his sentence has expired, return nothing.
+
+ Data must be provided as params.
+ API key must be provided as header.
+ """
+
+ user_id = params[0].get("user_id")
+
+ log.debug(f"Checking if user ({user_id}) is permitted to change their nickname.")
+ data = self.db.get(self.prison_table, user_id) or {}
+
+ if data and data.get("end_timestamp"):
+ log.trace("User exists in the prison_table.")
+ end_time = data.get("end_timestamp")
+ if is_expired(end_time):
+ log.trace("...But their sentence has already expired.")
+ data = {} # Return nothing if the sentence has expired.
+
+ return jsonify(data)
+
+ @api_key
+ @api_params(schema=POST_SCHEMA, validation_type=ValidationTypes.json)
+ def post(self, json_data):
+ """
+ Imprisons a user in hiphop-prison.
+
+ If a forced_nick was provided by the caller, the method will force
+ this nick. If not, a random hiphop nick will be selected from the
+ name_table.
+
+ Data must be provided as JSON.
+ API key must be provided as header.
+ """
+
+ user_id = json_data[0].get("user_id")
+ duration = json_data[0].get("duration")
+ forced_nick = json_data[0].get("forced_nick")
+
+ log.debug(f"Attempting to imprison user ({user_id}).")
+
+ # Get random name and picture if no forced_nick was provided.
+ if not forced_nick:
+ log.trace("No forced_nick provided. Fetching a random rapper name and image.")
+ rapper_data = self.db.sample(self.name_table, 1)[0]
+ forced_nick = rapper_data.get('name')
+
+ # If forced nick was provided, try to look up the forced_nick in the database.
+ # If a match cannot be found, just default to Lil' Jon for the image.
+ else:
+ log.trace(f"Forced nick provided ({forced_nick}). Trying to match it with the database.")
+ rapper_data = (
+ self.db.get(self.name_table, forced_nick)
+ or self.db.get(self.name_table, "Lil' Joseph")
+ )
+
+ image_url = rapper_data.get('image_url')
+ log.trace(f"Using the nickname {forced_nick} and the image_url {image_url}.")
+
+ # Convert duration to valid timestamp
+ try:
+ log.trace("Parsing the duration and converting it to a timestamp")
+ end_timestamp = parse_duration(duration)
+ except ValueError:
+ log.warning(f"The duration could not be parsed, or was invalid. The duration was '{duration}'.")
+ return jsonify({
+ "success": False,
+ "error_message": "Invalid duration"
+ })
+
+ log.debug("Everything seems to be in order, inserting the data into the prison_table.")
+ self.db.insert(
+ self.prison_table,
+ {
+ "user_id": user_id,
+ "end_timestamp": end_timestamp,
+ "forced_nick": forced_nick
+ },
+ conflict="update" # If it exists, update it.
+ )
+
+ return jsonify({
+ "success": True,
+ "end_timestamp": end_timestamp,
+ "forced_nick": forced_nick,
+ "image_url": image_url
+ })
+
+ @api_key
+ @api_params(schema=DELETE_SCHEMA, validation_type=ValidationTypes.json)
+ def delete(self, json_data):
+ """
+ Releases a user from hiphop-prison.
+
+ Data must be provided as JSON.
+ API key must be provided as header.
+ """
+
+ user_id = json_data[0].get("user_id")
+
+ log.debug(f"Attempting to release user ({user_id}) from hiphop-prison.")
+ prisoner_data = self.db.get(self.prison_table, user_id)
+ sentence_expired = None
+
+ log.trace(f"Checking if the user ({user_id}) is currently in hiphop-prison.")
+ if prisoner_data and prisoner_data.get("end_datetime"):
+ sentence_expired = datetime.datetime.now() > prisoner_data.get("end_datetime")
+
+ if prisoner_data and not sentence_expired:
+ log.debug("User is currently in hiphop-prison. Deleting the record and releasing the prisoner.")
+ self.db.delete(
+ self.prison_table,
+ user_id
+ )
+ return jsonify({"success": True})
+ elif not prisoner_data:
+ log.warning(f"User ({user_id}) is not currently in hiphop-prison.")
+ return jsonify({
+ "success": False,
+ "error_message": "User is not currently in hiphop-prison!"
+ })
+ elif sentence_expired:
+ log.warning(f"User ({user_id}) was in hiphop-prison, but has already been released.")
+ return jsonify({
+ "success": False,
+ "error_message": "User has already been released from hiphop-prison!"
+ })
diff --git a/tox.ini b/tox.ini
index 42abe576..f55dbae2 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,6 +1,6 @@
[flake8]
max-line-length=120
application_import_names=pysite
-ignore=P102
+ignore=P102,B311,W503,E226,S311
exclude=__pycache__, venv, app_test.py
import-order-style=pycharm \ No newline at end of file