Source code for pycoalescence.sqlite_connection

"""
Safely open, close and fetch data from an sqlite connection.

:class:`.SQLiteConnection` contains context management for opening sql connections, plus basic functionality for
detecting existence and structure of databases.
"""
import sqlite3


[docs]class SQLiteConnection: """ Class containing context management for opening sqlite3 connections. The file name provided can either be a string containing the path to the file, or an sqlite3.Connection object, which will NOT be closed on destruction. This provides two points of entry to the system with the same interface. """ def __init__(self, filename): """ Initialises the SQLiteConnection object. :param str/sqlite3.Connection filename: Should be either a string containing a path to a file to open, or an already-open sqlite3.Connection object. If the latter, the connection will not be closed on destruction. filename can also be a sqlite3.Cursor object, in which case it will simply be returned. """ self.filename = filename self.opened_here = False self.database = None def __enter__(self): """ Opens the connection to the file and returns an active cursor. :return: Active cursor pointing to the file :rtype: sqlite3.Cursor """ if isinstance(self.filename, sqlite3.Connection): self.database = self.filename elif isinstance(self.filename, sqlite3.Cursor): return self.filename else: self.opened_here = True try: self.database = sqlite3.connect(self.filename) except sqlite3.Error as soe: # pragma: no cover self.opened_here = False raise soe return self.database.cursor() def __exit__(self, *args): if self.opened_here: self.database.commit() self.database.close() self.database = None
[docs]def get_table_names(database): """ Gets a list of all table names in the database. :param str/sqlite3.Connection database: the path to the database connection or an already-open database object :return: a list of all table names from the database :rtype: list """ with SQLiteConnection(database) as cursor: return [x[0] for x in cursor.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()]
[docs]def check_sql_table_exist(database, table_name): """ Checks that the supplied table exists in the supplied database. :param str/sqlite3.Connection database: the database to check existence in :param str table_name: the table name to check for :return: true if the table exists :rtype: bool """ table_names = get_table_names(database) return table_name in table_names
[docs]def check_sql_column_exists(database, table_name, column_name): """ Checks if the column exists in the database. :param str/sqlite3.Connection database: the database to check existence in :param str table_name: the table name to check within :param str column_name: the column name to check for :return: true if the column exists. :rtype: bool """ with SQLiteConnection(database) as cursor: c = [i[1] for i in cursor.execute("PRAGMA table_info({})".format(table_name)) if i[1] == column_name] return len(c) != 0
[docs]def fetch_table_from_sql(database, table_name, column_names=False): """ Returns a list of the data contained by the provided table in the database. :raises sqlite3.Error: if the table is not contained in the database (protects SQL injections). :param str/sqlite3.Connection database: the database to obtain from :param str table_name: the table name to fetch data from :param bool column_names: if true, return the column names as the first row in the output :return: a list of lists, containing all data within the provided table in the database """ try: with SQLiteConnection(database) as c: if not check_sql_table_exist(c, table_name): raise sqlite3.Error("Table {} does not exist in database.".format(table_name)) c.execute("SELECT * FROM {}".format(table_name)) output = [] if column_names: output.append([x[0] for x in c.description]) output.extend([list(x) for x in c.fetchall()]) return output except sqlite3.Error as soe: # pragma: no cover raise IOError("Cannot fetch {} from database {}: {}".format(table_name, database, soe))
[docs]def sql_get_max_from_column(database, table_name, column_name): """ Returns the maximum value from the specified column. :param str/sqlite3.Connection database: the database to fetch from :param str table_name: the table name to attain :param str column_name: the column name to obtain from :return: """ with SQLiteConnection(database) as cursor: if not check_sql_table_exist(cursor, table_name): raise sqlite3.Error("Table {} does not exist in database.".format(table_name)) if not check_sql_column_exists(cursor, table_name, column_name): raise sqlite3.Error("Column {} does not exist in {}.".format(column_name, table_name)) ex_str = "SELECT max({}) FROM {}".format(column_name, table_name) cursor.execute(ex_str) return cursor.fetchone()[0]