aboutsummaryrefslogtreecommitdiffstats
path: root/pysite/database.py
diff options
context:
space:
mode:
Diffstat (limited to 'pysite/database.py')
-rw-r--r--pysite/database.py562
1 files changed, 0 insertions, 562 deletions
diff --git a/pysite/database.py b/pysite/database.py
deleted file mode 100644
index ddf79a31..00000000
--- a/pysite/database.py
+++ /dev/null
@@ -1,562 +0,0 @@
-import logging
-import os
-from typing import Any, Callable, Dict, Iterator, List, Optional, Union
-import re
-
-import rethinkdb
-from rethinkdb.ast import RqlMethodQuery, Table, UserError
-from rethinkdb.net import DefaultConnection
-from werkzeug.exceptions import ServiceUnavailable
-
-from pysite.tables import TABLES
-
-STRIP_REGEX = re.compile(r"<[^<]+?>")
-WIKI_TABLE = "wiki"
-
-
-class RethinkDB:
-
- def __init__(self, loop_type: Optional[str] = "gevent"):
- self.host = os.environ.get("RETHINKDB_HOST", "127.0.0.1")
- self.port = os.environ.get("RETHINKDB_PORT", "28015")
- self.database = os.environ.get("RETHINKDB_DATABASE", "pythondiscord")
- self.log = logging.getLogger(__name__)
- self.conn = None
-
- if loop_type:
- rethinkdb.set_loop_type(loop_type)
-
- with self.get_connection() as self.conn:
- try:
- rethinkdb.db_create(self.database).run(self.conn)
- self.log.debug(f"Database created: '{self.database}'")
- except rethinkdb.RqlRuntimeError:
- self.log.debug(f"Database found: '{self.database}'")
-
- def create_tables(self) -> List[str]:
- """
- Creates whichever tables exist in the TABLES
- constant if they don't already exist in the database.
-
- :return: a list of the tables that were created.
- """
- created = []
-
- for table, obj in TABLES.items():
- if self.create_table(table, obj.primary_key):
- created.append(table)
-
- return created
-
- def get_connection(self, connect_database: bool = True) -> DefaultConnection:
- """
- Grab a connection to the RethinkDB server, optionally without selecting a database
-
- :param connect_database: Whether to immediately connect to the database or not
- """
-
- if connect_database:
- return rethinkdb.connect(host=self.host, port=self.port, db=self.database)
- else:
- return rethinkdb.connect(host=self.host, port=self.port)
-
- def before_request(self):
- """
- Flask pre-request callback to set up a connection for the duration of the request
- """
-
- try:
- self.conn = self.get_connection()
- except rethinkdb.RqlDriverError:
- raise ServiceUnavailable("Database connection could not be established.")
-
- def teardown_request(self, _):
- """
- Flask post-request callback to close a previously set-up connection
-
- :param _: Exception object, not used here
- """
-
- try:
- self.conn.close()
- except AttributeError:
- pass
-
- # region: Convenience wrappers
-
- def create_table(self, table_name: str, primary_key: str = "id", durability: str = "hard", shards: int = 1,
- replicas: Union[int, Dict[str, int]] = 1, primary_replica_tag: Optional[str] = None) -> bool:
- """
- Attempt to create a new table on the current database
-
- :param table_name: The name of the table to create
- :param primary_key: The name of the primary key - defaults to "id"
- :param durability: "hard" (the default) to write the change immediately, "soft" otherwise
- :param shards: The number of shards to span the table over - defaults to 1
- :param replicas: See the RethinkDB documentation relating to replicas
- :param primary_replica_tag: See the RethinkDB documentation relating to replicas
-
- :return: True if the table was created, False if it already exists
- """
-
- with self.get_connection() as conn:
- all_tables = rethinkdb.db(self.database).table_list().run(conn)
- self.log.debug(f"Call to table_list returned the following list of tables: {all_tables}")
-
- if table_name in all_tables:
- self.log.debug(f"Table found: '{table_name}' ({len(all_tables)} tables in total)")
- return False
-
- # Use a kwargs dict because the driver doesn't check the value
- # of `primary_replica_tag` properly; None is not handled
- kwargs = {
- "primary_key": primary_key,
- "durability": durability,
- "shards": shards,
- "replicas": replicas
- }
-
- if primary_replica_tag is not None:
- kwargs["primary_replica_tag"] = primary_replica_tag
-
- rethinkdb.db(self.database).table_create(table_name, **kwargs).run(conn)
-
- self.log.debug(f"Table created: '{table_name}'")
- return True
-
- def delete(self,
- table_name: str,
- primary_key: Union[str, None] = None,
- durability: str = "hard",
- return_changes: Union[bool, str] = False) -> dict:
- """
- Delete one or all documents from a table. This can only delete
- either the contents of an entire table, or a single document.
- For more complex delete operations, please use self.query.
-
- :param table_name: The name of the table to delete from. This must be provided.
- :param primary_key: The primary_key to delete from that table. This is optional.
- :param durability: "hard" (the default) to write the change immediately, "soft" otherwise
- :param return_changes: Whether to return a list of changed values or not - defaults to False
- :return: if return_changes is True, returns a dict containing all changes. Else, returns None.
- """
-
- if primary_key:
- query = self.query(table_name).get(primary_key).delete(
- durability=durability, return_changes=return_changes
- )
- else:
- query = self.query(table_name).delete(
- durability=durability, return_changes=return_changes
- )
-
- if return_changes:
- return self.run(query, coerce=dict)
- self.run(query)
-
- def drop_table(self, table_name: str):
- """
- Attempt to drop a table from the database, along with its data
-
- :param table_name: The name of the table to drop
- :return: True if the table was dropped, False if the table doesn't exist
- """
-
- with self.get_connection() as conn:
- all_tables = rethinkdb.db(self.database).table_list().run(conn)
-
- if table_name not in all_tables:
- return False
-
- rethinkdb.db(self.database).table_drop(table_name).run(conn)
- return True
-
- def query(self, table_name: str) -> Table:
- """
- Get a RethinkDB table object that you can run queries against
-
- >>> db = RethinkDB()
- >>> query = db.query("my_table")
- >>> db.run(query.insert({"key": "value"}), coerce=dict)
- {
- "deleted": 0,
- "errors": 0,
- "inserted": 1,
- "replaced": 0,
- "skipped": 0,
- "unchanged": 0
- }
-
- :param table_name: Name of the table to query against
- :return: The RethinkDB table object for the table
- """
-
- if table_name not in TABLES:
- self.log.warning(f"Table not declared in tables.py: {table_name}")
-
- return rethinkdb.table(table_name)
-
- def run(self, query: Union[RqlMethodQuery, Table], *, new_connection: bool = False,
- connect_database: bool = True, coerce: type = None) -> Union[rethinkdb.Cursor, List, Dict, object]:
- """
- Run a query using a table object obtained from a call to `query()`
-
- >>> db = RethinkDB()
- >>> query = db.query("my_table")
- >>> db.run(query.insert({"key": "value"}), coerce=dict)
- {
- "deleted": 0,
- "errors": 0,
- "inserted": 1,
- "replaced": 0,
- "skipped": 0,
- "unchanged": 0
- }
-
- Note that result coercion is very basic, and doesn't really do any magic. If you want to be able to work
- directly with the result of your query, then don't specify the `coerce` argument - the object that you'd
- usually get from the RethinkDB API will be returned instead.
-
- :param query: The full query to run
- :param new_connection: Whether to create a new connection or use the current request-bound one
- :param connect_database: If creating a new connection, whether to connect to the database immediately
- :param coerce: Optionally, an object type to attempt to coerce the result to
-
- :return: The result of the operation
- """
-
- if not new_connection:
- try:
- result = query.run(self.conn)
- except rethinkdb.ReqlDriverError as e:
- if e.message == "Connection is closed.":
- self.log.warning("Connection was closed, attempting with a new connection...")
- result = query.run(self.get_connection(connect_database))
- else:
- raise
- else:
- result = query.run(self.get_connection(connect_database))
-
- if coerce:
- return coerce(result) if result else coerce()
- return result
-
- # endregion
-
- # region: RethinkDB wrapper functions
-
- def between(self, table_name: str, *, lower: Any = rethinkdb.minval, upper: Any = rethinkdb.maxval,
- index: Optional[str] = None, left_bound: str = "closed", right_bound: str = "open") -> List[
- Dict[str, Any]]:
- """
- Get all documents between two keys
-
- >>> db = RethinkDB()
- >>> db.between("users", upper=10, index="conquests")
- [
- {"username": "gdude", "conquests": 2},
- {"username": "joseph", "conquests": 5}
- ]
- >>> db.between("users", lower=10, index="conquests")
- [
- {"username": "lemon", "conquests": 15}
- ]
- >>> db.between("users", lower=2, upper=10, index="conquests" left_bound="open")
- [
- {"username": "gdude", "conquests": 2},
- {"username": "joseph", "conquests": 5}
- ]
-
- :param table_name: The table to get documents from
- :param lower: The lower-bounded value, leave blank to ignore
- :param upper: The upper-bounded value, leave blank to ignore
- :param index: The key or index to check on each document
- :param left_bound: "open" to include documents that exactly match the lower bound, "closed" otherwise
- :param right_bound: "open" to include documents that exactly match the upper bound, "closed" otherwise
-
- :return: A list of matched documents; may be empty
- """
- return self.run( # pragma: no cover
- self.query(table_name).between(lower, upper, index=index, left_bound=left_bound, right_bound=right_bound),
- coerce=list
- )
-
- def changes(self, table_name: str, squash: Union[bool, int] = False, changefeed_queue_size: int = 100_000,
- include_initial: Optional[bool] = None, include_states: bool = False,
- include_types: bool = False) -> Iterator[Dict[str, Any]]:
- """
- A complicated function allowing you to follow a changefeed for a specific table
-
- This function will not allow you to specify a set of conditions for your changefeed, so you'll
- have to write your own query and run it with `run()` if you need that. If not, you'll just get every
- change for the specified table.
-
- >>> db = RethinkDB()
- >>> for document in db.changes("my_table", squash=True):
- ... print(document.get("new_val", {}))
-
- Documents take the form of a dict with `old_val` and `new_val` fields by default. These are set to a copy of
- the document before and after the change being represented was made, respectively. The format of these dicts
- can change depending on the arguments you pass to the function, however.
-
- If a changefeed must be aborted (for example, if the table was deleted), a ReqlRuntimeError will be
- raised.
-
- Note: This function always creates a new connection. This is to prevent you from losing your changefeed
- when the connection used for a request context is closed.
-
- :param table_name: The name of the table to watch for changes on
-
- :param squash: How to deal with batches of changes to a single document - False (the default) to send changes
- as they happen, True to squash changes for single objects together and send them as a single change,
- or an int to specify how many seconds to wait for an object to change before batching it
-
- :param changefeed_queue_size: The number of changes the server will buffer between client reads before it
- starts to drop changes and issues errors - defaults to 100,000
-
- :param include_initial: If True, the changefeed will start with the initial values of all the documents in
- the table; the results will have `new_val` fields ONLY to start with if this is the case. Note that
- the old values may be intermixed with new changes if you're still iterating through the old values, but
- only as long as the old value for that field has already been sent. If the order of a document you've
- already seen moves it to a part of the group you haven't yet seen, an "unitial" notification is sent, which
- is simply a dict with an `old_val` field set, and not a `new_val` field set. This option defaults to
- False.
-
- :param include_states: Whether to send special state documents to the changefeed as its state changes. This
- comprises of special documents with only a `state` field, set to a string - the state of the feed. There
- are currently two states - "initializing" and "ready". This option defaults to False.
-
- :param include_types: If True, each document generated will include a `type` field which states what type
- of change the document represents. This may be "add", "remove", "change", "initial", "uninitial" or
- "state". This option defaults to False.
-
- :return: A special iterator that will iterate over documents in the changefeed as they're sent. If there is
- no document waiting, this will block the function until there is.
- """
- return self.run( # pragma: no cover
- self.query(table_name).changes(
- squash=squash, changefeed_queue_size=changefeed_queue_size, include_initial=include_initial,
- include_states=include_states, include_offsets=False, include_types=include_types
- ),
- new_connection=True
- )
-
- def filter(self, table_name: str, predicate: Callable[[Dict[str, Any]], bool],
- default: Union[bool, UserError] = False) -> List[Dict[str, Any]]:
- """
- Return all documents in a table for which `predicate` returns true.
-
- The `predicate` argument should be a function that takes a single argument - a single document to check - and
- it should return True or False depending on whether the document should be included.
-
- >>> def many_conquests(doc):
- ... '''Return documents with at least 10 conquests'''
- ... return doc["conquests"] >= 10
- ...
- >>> db = RethinkDB()
- >>> db.filter("users", many_conquests)
- [
- {"username": "lemon", "conquests": 15}
- ]
-
- :param table_name: The name of the table to get documents for
- :param predicate: The callable to use to filter the documents
- :param default: What to do if a document is missing fields; True to include them, `rethink.error()` to raise
- aa ReqlRuntimeError, or False to skip over the document (the default)
- :return: A list of documents that match the predicate; may be empty
- """
-
- return self.run( # pragma: no cover
- self.query(table_name).filter(predicate, default=default),
- coerce=list
- )
-
- def get(self, table_name: str, key: Any) -> Optional[Dict[str, Any]]:
- """
- Get a single document from a table by primary key
-
- :param table_name: The name of the table to get the document from
- :param key: The value of the primary key belonging to the document you want
-
- :return: The document, or None if it wasn't found
- """
-
- result = self.run( # pragma: no cover
- self.query(table_name).get(key)
- )
-
- return dict(result) if result else None # pragma: no cover
-
- def get_all(self, table_name: str, *keys: str, index: str = "id") -> List[Any]:
- """
- Get a list of documents matching a set of keys, on a specific index
-
- :param table_name: The name of the table to get documents from
- :param keys: The key values to match against
- :param index: The name of the key or index to match on
-
- :return: A list of matching documents; may be empty if no matches were made
- """
-
- if keys:
- return self.run( # pragma: no cover
- self.query(table_name).get_all(*keys, index=index),
- coerce=list
- )
- else:
- return self.run(
- self.query(table_name),
- coerce=list
- )
-
- def insert(self, table_name: str, *objects: Dict[str, Any],
- durability: str = "hard",
- return_changes: Union[bool, str] = False,
- conflict: Union[ # Any of...
- str, Callable[ # ...str, or a callable that...
- [Dict[str, Any], Dict[str, Any]], # ...takes two dicts with string keys and any values...
- Dict[str, Any] # ...and returns a dict with string keys and any values
- ]
- ] = "error") -> Dict[str, Any]: # flake8: noqa
- """
- Insert an object or a set of objects into a table
-
- :param table_name: The name of the table to insert into
- :param objects: The objects to be inserted into the table
- :param durability: "hard" (the default) to write the change immediately, "soft" otherwise
- :param return_changes: Whether to return a list of changed values or not - defaults to False
- :param conflict: What to do in the event of a conflict - "error", "replace" and "update" are included, but
- you can also provide your own function in order to handle conflicts yourself. If you do this, the function
- should take two arguments (the old document and the new one), and return a single document to replace both.
-
- :return: A dict detailing the operations run
- """
-
- query = self.query(table_name).insert(
- objects, durability=durability, return_changes=return_changes, conflict=conflict
- )
-
- return self.run(query, coerce=dict)
-
- def map(self, table_name: str, func: Callable):
- """
- Map a function over every document in a table, with the possibility of modifying it
-
- As an example, you could do the following to rename the "id" field to "user_id" for all documents
- in the "users" table.
-
- >>> db = RethinkDB()
- >>> db.map(
- ... "users",
- ... lambda doc: doc.merge({"user_id": doc["id"]}).without("id")
- ... )
-
- :param table_name: The name of the table to map the function over
- :param func: A callable that takes a single argument
-
- :return: Unknown, needs more testing
- """
-
- return self.run( # pragma: no cover
- self.query(table_name).map(func),
- coerce=list
- )
-
- def pluck(self, table_name: str, *selectors: Union[str, Dict[str, Union[List, Dict]]]) -> List[Dict[str, Any]]:
- """
- Get a list of values for a specific set of keys for every document in the table; this can include
- nested values
-
- >>> db = RethinkDB()
- >>> db.pluck("users", "username", "password") # Select a flat document
- [
- {"username": "lemon", "password": "hunter2"}
- ]
- >>> db.pluck("users", {"posts": ["title"]}) # Select from nested documents
- [
- {
- "posts": [
- {"title": "New website!"}
- ]
- }
- ]
-
- :param table_name: The table to get values from
- :param selectors: The set of keys to get values for
- :return: A list containing the requested documents, with only the keys requested
- """
-
- return self.run( # pragma: no cover
- self.query(table_name).pluck(*selectors),
- coerce=list
- )
-
- def sample(self, table_name: str, sample_size: int) -> List[Dict[str, Any]]:
- """
- Select a given number of elements from a table at random.
-
- :param table_name: The name of the table to select from.
- :param sample_size: The number of elements to select.
- If this number is higher than the total amount of items in
- the table, this will return the entire table in random order.
-
- :return: A list of items from the table.
- """
- return self.run( # pragma: no cover
- self.query(table_name).sample(sample_size),
- coerce=list
- )
-
- def sync(self, table_name: str) -> bool:
- """
- Following a set of edits with durability set to "soft", this must be called to save those edits
-
- :param table_name: The name of the table to sync
-
- :return: True if the sync was successful; False otherwise
- """
- result = self.run( # pragma: no cover
- self.query(table_name).sync(),
- coerce=dict
- )
-
- return result.get("synced", 0) > 0 # pragma: no cover
-
- def wait(self, table_name: str, wait_for: str = "all_replicas_ready", timeout: int = 0) -> bool:
- """
- Wait until an operation has happened on a specific table; will block the current function
-
- :param table_name: The name of the table to wait against
- :param wait_for: The operation to wait for; may be "ready_for_outdated_reads",
- "ready_for_reads", "ready_for_writes" or "all_replicas_ready", which is the default
- :param timeout: How long to wait before returning; defaults to 0 (forever)
-
- :return: True; but may return False if the timeout was reached
- """
-
- result = self.run( # pragma: no cover
- self.query(table_name).wait(wait_for=wait_for, timeout=timeout),
- coerce=dict
- )
-
- return result.get("ready", 0) > 0
-
- def without(self, table_name: str, *selectors: Union[str, Dict[str, Union[List, Dict]]]):
- """
- The functional opposite of `pluck()`, returning full documents without the specified selectors
-
- >>> db = RethinkDB()
- >>> db.without("users", "posts")
- [
- {"username": "lemon", "password": "hunter2"}
- ]
-
- :param table_name: The table to get values from
- :param selectors: The set of keys to exclude
- :return: A list containing the requested documents, without the keys requested
- """
-
- return self.run( # pragma: no cover
- self.query(table_name).without(*selectors)
- )
- # endregion