未验证 提交 93ce4c9b 编写于 作者: D Dan Bader 提交者: GitHub

Merge branch 'master' into jayascript-patch-1

# How to Do a Binary Search in Python?
# How to Do a Binary Search in Python
Code snippets supplementing the [How to Do a Binary Search in Python?](https://realpython.com/binary-search-python/) article on [Real Python](https://realpython.com/).
Code snippets supplementing the [How to Do a Binary Search in Python](https://realpython.com/binary-search-python/) article.
## Usage
### Download Sample Dataset
To download a sample dataset for benchmarking, run the following script:
```shell
$ python download_imdb.py
Fetching data from IMDb...
Created "names.txt" and "sorted_names.txt"
```
### Benchmark
**Note:** The hash-based search algorithm requires a separate data structure, so it's not included here. Feel free to treat it as an exercise.
Find the index of Arnold Schwarzenegger in unsorted names using random search:
```shell
$ python benchmark.py -a random -f names.txt 'Arnold Schwarzenegger'
Loading names...ok
[1/10] Found at index=215 (21.68 s)
[2/10] Found at index=215 (148.10 ms)
[3/10] Found at index=215 (63.49 s)
[4/10] Found at index=215 (15.93 s)
[5/10] Found at index=215 (23.38 s)
[6/10] Found at index=215 (60.98 s)
[7/10] Found at index=215 (10.14 s)
[8/10] Found at index=215 (15.81 s)
[9/10] Found at index=215 (5.10 s)
[10/10] Found at index=215 (13.36 s)
best=148.10 ms, worst=63.49 s, avg=23.00 s, median=15.87 s
```
Find the index of Arnold Schwarzenegger in unsorted names using linear search:
```shell
$ python benchmark.py -a linear -f names.txt 'Arnold Schwarzenegger'
Loading names...ok
[1/10] Found at index=215 (30.95 µs)
[2/10] Found at index=215 (26.86 µs)
[3/10] Found at index=215 (26.26 µs)
[4/10] Found at index=215 (26.55 µs)
[5/10] Found at index=215 (26.24 µs)
[6/10] Found at index=215 (25.88 µs)
[7/10] Found at index=215 (25.77 µs)
[8/10] Found at index=215 (25.89 µs)
[9/10] Found at index=215 (25.91 µs)
[10/10] Found at index=215 (25.99 µs)
best=25.77 µs, worst=30.95 µs, avg=26.63 µs, median=26.11 µs
```
Find the index of Arnold Schwarzenegger in *sorted* names using binary search:
```shell
$ python benchmark.py -a binary -f sorted_names.txt 'Arnold Schwarzenegger'
Loading names...ok
[1/10] Found at index=782431 (18.82 µs)
[2/10] Found at index=782431 (9.19 µs)
[3/10] Found at index=782431 (11.08 µs)
[4/10] Found at index=782431 (9.70 µs)
[5/10] Found at index=782431 (10.14 µs)
[6/10] Found at index=782431 (9.35 µs)
[7/10] Found at index=782431 (9.42 µs)
[8/10] Found at index=782431 (8.79 µs)
[9/10] Found at index=782431 (8.66 µs)
[10/10] Found at index=782431 (8.79 µs)
best=8.66 µs, worst=18.82 µs, avg=10.39 µs, median=9.38 µs
```
......@@ -20,7 +20,7 @@ def read_all():
# Serialize the data for the response
person_schema = PersonSchema(many=True)
data = person_schema.dump(people).data
data = person_schema.dump(people)
return data
......@@ -40,7 +40,7 @@ def read_one(person_id):
# Serialize the data for the response
person_schema = PersonSchema()
data = person_schema.dump(person).data
data = person_schema.dump(person)
return data
# Otherwise, nope, didn't find that person
......@@ -73,14 +73,14 @@ def create(person):
# Create a person instance using the schema and the passed in person
schema = PersonSchema()
new_person = schema.load(person, session=db.session).data
new_person = schema.load(person, session=db.session)
# Add the person to the database
db.session.add(new_person)
db.session.commit()
# Serialize and return the newly created person in the response
data = schema.dump(new_person).data
data = schema.dump(new_person)
return data, 201
......@@ -142,7 +142,7 @@ def update(person_id, person):
# turn the passed in person into a db object
schema = PersonSchema()
update = schema.load(person, session=db.session).data
update = schema.load(person, session=db.session)
# Set the id to the person we want to update
update.person_id = update_person.person_id
......@@ -152,7 +152,7 @@ def update(person_id, person):
db.session.commit()
# return updated person in the response
data = schema.dump(update_person).data
data = schema.dump(update_person)
return data, 200
......
#!/usr/bin/env python3
import cffi_example
if __name__ == "__main__":
# Sample data for our call:
x, y = 6, 2.3
answer = cffi_example.lib.cmult(x, y)
print(f" In Python: int: {x} float {y:.1f} return val {answer:.1f}")
#include <stdio.h>
#include "cmult.h"
float cmult(int int_param, float float_param) {
float return_value = int_param * float_param;
printf(" In cmult : int: %d float %.1f returning %.1f\n", int_param,
float_param, return_value);
return return_value;
}
float cmult(int int_param, float float_param);
#include <iostream>
#include <iomanip>
#include "cppmult.hpp"
float cppmult(int int_param, float float_param) {
float return_value = int_param * float_param;
std::cout << std::setprecision(1) << std::fixed
<< " In cppmul: int: " << int_param
<< " float " << float_param
<< " returning " << return_value
<< std::endl;
return return_value;
}
float cppmult(int int_param, float float_param);
#!/usr/bin/env python
""" Simple examples of calling C functions through ctypes module. """
import ctypes
import pathlib
if __name__ == "__main__":
# Load the shared library into c types.
libname = pathlib.Path().absolute() / "libcmult.so"
c_lib = ctypes.CDLL(libname)
# Sample data for our call:
x, y = 6, 2.3
# This will not work:
# answer = c_lib.cmult(x, y)
# This produces a bad answer:
answer = c_lib.cmult(x, ctypes.c_float(y))
print(f" In Python: int: {x} float {y:.1f} return val {answer:.1f}")
print()
# You need tell ctypes that the function returns a float
c_lib.cmult.restype = ctypes.c_float
answer = c_lib.cmult(x, ctypes.c_float(y))
print(f" In Python: int: {x} float {y:.1f} return val {answer:.1f}")
""" Example cython interface definition """
cdef extern from "cppmult.hpp":
float cppmult(int int_param, float float_param)
def pymult( int_param, float_param ):
return cppmult( int_param, float_param )
#!/usr/bin/env python
import cython_example
# Sample data for our call
x, y = 6, 2.3
answer = cython_example.pymult(x, y)
print(f" In Python: int: {x} float {y:.1f} return val {answer:.1f}")
#!/usr/bin/env python
import pybind11_example
if __name__ == "__main__":
# Sample data for our call:
x, y = 6, 2.3
answer = pybind11_example.cpp_function(x, y)
print(f" In Python: int: {x} float {y:.1f} return val {answer:.1f}")
#include <pybind11/pybind11.h>
#include <cppmult.hpp>
PYBIND11_MODULE(pybind11_example, m) {
m.doc() = "pybind11 example plugin"; // Optional module docstring
m.def("cpp_function", &cppmult, "A function which multiplies two numbers");
}
cffi==1.13.2
Cython==0.29.14
invoke==1.3.0
pybind11==2.4.3
pycparser==2.19
""" Task definitions for invoke command line utility for python bindings
overview article. """
import cffi
import invoke
import pathlib
@invoke.task
def clean(c):
""" Remove any built objects """
for pattern in ["*.o", "*.so", "cffi_example* cython_wrapper.cpp"]:
c.run("rm -rf {}".format(pattern))
def print_banner(msg):
print("==================================================")
print("= {} ".format(msg))
@invoke.task
def build_cmult(c):
""" Build the shared library for the sample C code """
print_banner("Building C Library")
invoke.run("gcc -c -Wall -Werror -fpic cmult.c -I /usr/include/python3.7")
invoke.run("gcc -shared -o libcmult.so cmult.o")
print("* Complete")
@invoke.task(build_cmult)
def test_ctypes(c):
""" Run the script to test ctypes """
print_banner("Testing ctypes Module")
invoke.run("python3 ctypes_test.py", pty=True)
@invoke.task(build_cmult)
def build_cffi(c):
""" Build the CFFI Python bindings """
print_banner("Building CFFI Module")
ffi = cffi.FFI()
this_dir = pathlib.Path().absolute()
h_file_name = this_dir / "cmult.h"
with open(h_file_name) as h_file:
ffi.cdef(h_file.read())
ffi.set_source(
"cffi_example",
# Since we are calling a fully built library directly no custom source
# is necessary. We need to include the .h files, though, because behind
# the scenes cffi generates a .c file which contains a Python-friendly
# wrapper around each of the functions.
'#include "cmult.h"',
# The important thing is to include the pre-built lib in the list of
# libraries we are linking against:
libraries=["cmult"],
library_dirs=[this_dir.as_posix()],
extra_link_args=["-Wl,-rpath,."],
)
ffi.compile()
print("* Complete")
@invoke.task()
def test_cffi(c):
""" Run the script to test CFFI """
print_banner("Testing CFFI Module")
invoke.run("python3 cffi_test.py", pty=True)
@invoke.task()
def build_cppmult(c):
""" Build the shared library for the sample C++ code """
print_banner("Building C++ Library")
invoke.run(
"g++ -O3 -Wall -Werror -shared -std=c++11 -fPIC cppmult.cpp "
"-o libcppmult.so "
)
print("* Complete")
def compile_python_module(cpp_name, extension_name):
invoke.run(
"g++ -O3 -Wall -Werror -shared -std=c++11 -fPIC "
"`python3 -m pybind11 --includes` "
"-I /usr/include/python3.7 -I . "
"{0} "
"-o {1}`python3.7-config --extension-suffix` "
"-L. -lcppmult -Wl,-rpath,.".format(cpp_name, extension_name)
)
@invoke.task(build_cppmult)
def build_pybind11(c):
""" Build the pybind11 wrapper library """
print_banner("Building PyBind11 Module")
compile_python_module("pybind11_wrapper.cpp", "pybind11_example")
print("* Complete")
@invoke.task()
def test_pybind11(c):
""" Run the script to test PyBind11 """
print_banner("Testing PyBind11 Module")
invoke.run("python3 pybind11_test.py", pty=True)
@invoke.task(build_cppmult)
def build_cython(c):
""" Build the cython extension module """
print_banner("Building Cython Module")
# Run cython on the pyx file to create a .cpp file
invoke.run("cython --cplus -3 cython_example.pyx -o cython_wrapper.cpp")
# Compile and link the cython wrapper library
compile_python_module("cython_wrapper.cpp", "cython_example")
print("* Complete")
@invoke.task()
def test_cython(c):
""" Run the script to test Cython """
print_banner("Testing Cython Module")
invoke.run("python3 cython_test.py", pty=True)
@invoke.task(
clean,
build_cmult,
test_ctypes,
build_cffi,
test_cffi,
build_pybind11,
test_pybind11,
build_cython,
test_cython,
)
def all(c):
""" Build and run all tests """
pass
#!/usr/bin/env python3
"""MathREPL, a math expression evaluator using Python's eval() and math."""
import math
__version__ = "1.0"
__author__ = "Leodanis Pozo Ramos"
ALLOWED_NAMES = {
k: v for k, v in math.__dict__.items() if not k.startswith("__")
}
PS1 = "mr>>"
WELCOME = f"""
MathREPL {__version__}, your Python math expressions evaluator!
Enter a valid math expression after the prompt "{PS1}".
Type "help" for more information.
Type "quit" or "exit" to exit.
"""
USAGE = f"""
Usage:
Build math expressions using numeric values and operators.
Use any of the following functions and constants:
{', '.join(ALLOWED_NAMES.keys())}
"""
def evaluate(expression):
"""Evaluate a math expression."""
# Compile the expression eventually raising a SyntaxError
# when the user enters an invalid expression
code = compile(expression, "<string>", "eval")
# Validate allowed names
for name in code.co_names:
if name not in ALLOWED_NAMES:
raise NameError(f"The use of '{name}' is not allowed")
# Evaluate the expression eventually raising a ValueError
# when the user uses a math function with a wrong input value
# e.g. math.sqrt(-10)
return eval(code, {"__builtins__": {}}, ALLOWED_NAMES)
def main():
"""Main loop: Read and evaluate user's input."""
print(WELCOME)
while True:
# Read user's input
try:
expression = input(f"{PS1} ")
except (KeyboardInterrupt, EOFError):
raise SystemExit()
# Handle special commands
if expression.lower() == "help":
print(USAGE)
continue
if expression.lower() in {"quit", "exit"}:
raise SystemExit()
# Evaluate the expression and handle errors
try:
result = evaluate(expression)
except SyntaxError:
# If the user enters an invalid expression
print("Invalid input expression syntax")
continue
except (NameError, ValueError) as err:
# If the user tries to use a name that isn't allowed
# or an invalid value to a given math function
print(err)
continue
# Print the result if no error occurs
print(f"The result is: {result}")
if __name__ == "__main__":
main()
# Scientific Python: Using SciPy for Optimization
The code in this folder is associated with the Real Python tutorial [_Scientific Python: Using SciPy for Optimization_](https://realpython.com/python-scipy-cluster-optimize/).
The dataset for the SMS spam clustering example, contained in the file SMSSpamCollection in this folder, is discussed in following articles:
Almeida, T.A., Gómez Hidalgo, J.M., Yamakami, A. Contributions to the Study of SMS Spam Filtering: New Collection and Results. Proceedings of the 2011 ACM Symposium on Document Engineering (DOCENG'11), Mountain View, CA, USA, 2011\. [preprint](http://www.dt.fee.unicamp.br/~tiago/smsspamcollection/doceng11.pdf)
Gómez Hidalgo, J.M., Almeida, T.A., Yamakami, A. On the Validity of a New SMS Spam Collection. Proceedings of the 11th IEEE International Conference on Machine Learning and Applications (ICMLA'12), Boca Raton, FL, USA, 2012\. [preprint](http://www.dt.fee.unicamp.br/~tiago/smsspamcollection/icmla12.pdf)
Almeida, T.A., Gómez Hidalgo, J.M., Silva, T.P. Towards SMS Spam Filtering: Results under a New Dataset. International Journal of Information Security Science (IJISS), 2(1), 1-18, 2013\. [Invited paper - [full version](http://www.dt.fee.unicamp.br/~tiago/smsspamcollection/IJISS13.pdf)]
The original data is available from the [author's website](http://www.dt.fee.unicamp.br/~tiago/smsspamcollection/).
此差异已折叠。
"""
Clustering example using an SMS spam dataset with SciPy.
Associated with the Real Python article
Scientific Python: Using SciPy for Optimization
Available at: https://realpython.com/python-scipy-cluster-optimize/
"""
from pathlib import Path
import numpy as np
from scipy.cluster.vq import whiten, kmeans, vq
HERE = Path(__file__).parent
data = HERE.joinpath("SMSSpamCollection").read_text().strip().split("\n")
digit_counts = np.empty((len(data), 2), dtype=int)
for i, line in enumerate(data):
case, message = line.split("\t")
num_digits = sum(c.isdigit() for c in message)
digit_counts[i, 0] = 0 if case == "ham" else 1
digit_counts[i, 1] = num_digits
unique_counts = np.unique(digit_counts[:, 1], return_counts=True)
unique_counts = np.transpose(np.vstack(unique_counts))
whitened_counts = whiten(unique_counts)
codebook, _ = kmeans(whitened_counts, 3)
codes, _ = vq(whitened_counts, codebook)
ham_code = codes[0]
spam_code = codes[-1]
unknown_code = list(set(range(3)) ^ set((ham_code, spam_code)))[0]
print("definitely ham:", unique_counts[codes == ham_code][-1])
print("definitely spam:", unique_counts[codes == spam_code][-1])
print("unknown:", unique_counts[codes == unknown_code][-1])
digits = digit_counts[:, 1]
predicted_hams = digits == 0
predicted_spams = digits > 20
predicted_unknowns = np.logical_and(digits > 0, digits <= 20)
ham_cluster = digit_counts[predicted_hams]
spam_cluster = digit_counts[predicted_spams]
unknown_cluster = digit_counts[predicted_unknowns]
print("hams:", np.unique(ham_cluster[:, 0], return_counts=True))
print("spams:", np.unique(spam_cluster[:, 0], return_counts=True))
print("unknowns:", np.unique(unknown_cluster[:, 0], return_counts=True))
"""
Constrained minimization example code using SciPy.
Associated with the Real Python article
Scientific Python: Using SciPy for Optimization
Available at: https://realpython.com/python-scipy-cluster-optimize/
"""
import numpy as np
from scipy.optimize import minimize, LinearConstraint
n_buyers = 10
n_shares = 15
np.random.seed(10)
prices = np.random.random(n_buyers)
money_available = np.random.randint(1, 4, n_buyers)
n_shares_per_buyer = money_available / prices
print(prices, money_available, n_shares_per_buyer, sep="\n")
constraint = LinearConstraint(np.ones(n_buyers), lb=n_shares, ub=n_shares)
bounds = [(0, n) for n in n_shares_per_buyer]
def objective_function(x, prices):
return -x.dot(prices)
res = minimize(
objective_function,
10 * np.random.random(n_buyers),
args=(prices,),
constraints=constraint,
bounds=bounds,
)
print(res)
print("The total number of shares is:", sum(res.x))
print("Leftover money for each buyer:", money_available - res.x * prices)
"""
Scalar function minimization example using SciPy.
Associated with the Real Python article
Scientific Python: Using SciPy for Optimization
Available at: https://realpython.com/python-scipy-cluster-optimize/
"""
from scipy.optimize import minimize_scalar
def objective_function(x):
return 3 * x ** 4 - 2 * x + 1
res = minimize_scalar(objective_function)
print(res)
def objective_function(x):
return x ** 4 - x ** 2
res = minimize_scalar(objective_function, method="brent")
print(res)
res = minimize_scalar(objective_function, method="bounded", bounds=[-1, 0])
print(res)
# Python / Sqlite /SqlAlchemy article
This repository contains the content and example code
for the python/sqlite/sqlaclchemy article I'm writing
for Real Python.
This project was built using Python 3.8.0
## Installing The Project
From the main folder take the following steps:
* Install a Python virtual environment for this project
* Activate the virtual environment
* Install the project:
```shell script
python -m pip install -e .
```
\ No newline at end of file
# Build Files Used By The Examples
The code in this directory builds the various data files used
by the example programs. There are three build programs:
* build_temp_data_csv.py
* build_temp_data_sqlite.py
* build_author_book_publisher_sqlite.py
## Build the Temperature CSV File
The build_temp_data_csv.py file builds a CSV data file
(temp_data.csv) in the data directory
containing temperature samples taken by students in a class.
The top row contains the labels, the students name followed
by a date value for each Wednesday of the week for a year.
It then creates data for each sample based on a table of
temperature data, +/- 10 to make the data look variable
and reasonable.
## Build the Temperature Database File
The build_temp_data_sqlite.py file builds a Sqlite database
from the previously created temp_data.csv file called
temp_data.db in the data directory.
## Build the Author / Book / Publisher Database File
The build_author_book_publisher_sqlite.py file builds
a database from the data/author_book_publisher.csv file.
This database contains the tables necessary to describe
the data and the relationships between the data necessary
for the examples.
## Directory Structure
The directory structure is set up in such a way the
build programs (as well as the examples) can find the
data files needed for each one.
## Executing the Programs
* Activate your Python virtualenv
* cd into the build/code directory
* python build_temp_data_csv.py - builds the csv data file
* python build_temp_data_sqlite.py - builds the temperature data from the csv file
* python build_author_book_publisher_sqlite.py - builds the author_book_publisher.db database file
\ No newline at end of file
"""
This program builds the author_book_publisher Sqlite database from the
author_book_publisher.csv file.
"""
import os
import csv
from pkg_resources import resource_filename
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from project.modules.models import Base
from project.modules.models import Author
from project.modules.models import Book
from project.modules.models import Publisher
def get_author_book_publisher_data(filepath):
"""
This function gets the data from the csv file
"""
with open(filepath) as csvfile:
csv_reader = csv.DictReader(csvfile)
data = [row for row in csv_reader]
return data
def populate_database(session, author_book_publisher_data):
# insert the data
for row in author_book_publisher_data:
author = (
session.query(Author)
.filter(Author.lname == row["lname"])
.one_or_none()
)
if author is None:
author = Author(fname=row["fname"], lname=row["lname"])
session.add(author)
book = (
session.query(Book)
.filter(Book.title == row["title"])
.one_or_none()
)
if book is None:
book = Book(title=row["title"])
session.add(book)
publisher = (
session.query(Publisher)
.filter(Publisher.name == row["publisher"])
.one_or_none()
)
if publisher is None:
publisher = Publisher(name=row["publisher"])
session.add(publisher)
# add the items to the relationships
author.books.append(book)
author.publishers.append(publisher)
publisher.authors.append(author)
publisher.books.append(book)
session.commit()
session.close()
def main():
print("starting")
# get the author/book/publisher data into a dictionary structure
csv_filepath = resource_filename(
"project.data", "author_book_publisher.csv"
)
author_book_publisher_data = get_author_book_publisher_data(csv_filepath)
# get the filepath to the database file
sqlite_filepath = resource_filename(
"project.data", "author_book_publisher.db"
)
# does the database exist?
if os.path.exists(sqlite_filepath):
os.remove(sqlite_filepath)
# create the database
engine = create_engine(f"sqlite:///{sqlite_filepath}")
Base.metadata.create_all(engine)
Session = sessionmaker()
Session.configure(bind=engine)
session = Session()
populate_database(session, author_book_publisher_data)
print("finished")
if __name__ == "__main__":
main()
import csv
import datetime
from random import randint
from pkg_resources import resource_filename
start_date = datetime.datetime.strptime("2019-01-02", "%Y-%m-%d")
students = [
"John",
"Mary",
"Susan",
"Doug",
"Andrew",
"George",
"Martha",
"Paul",
"Helen",
]
temperature_data = [
10,
12,
16,
23,
13,
12,
14,
22,
25,
28,
32,
33,
37,
36,
35,
40,
44,
45,
50,
52,
58,
60,
66,
70,
70,
72,
78,
80,
81,
82,
85,
88,
90,
87,
90,
85,
82,
81,
78,
75,
72,
72,
70,
63,
65,
62,
60,
45,
40,
37,
30,
28,
]
def offset_temp(temperature):
"""
This function modifies the temperature +/- a random
amount up to 10
:param temperature: temperature to modify
:return: modified temperature
"""
return temperature + randint(-10, 10)
def main():
# create the CSV file
csv_filepath = resource_filename("project.data", "temp_data.csv")
with open(csv_filepath, "w") as data_fh:
# create the writer
csv_writer = csv.writer(data_fh)
# write the header
header = ["name"]
for week in range(0, 52):
current_date = start_date + datetime.timedelta(days=week * 7)
header.append(current_date.strftime("%Y-%m-%d"))
csv_writer.writerow(header)
# iterate through the students and write their data
for student in students:
data = [student]
# iterate through the weeks
for week in range(0, 52):
data.append(offset_temp(temperature_data[week]))
csv_writer.writerow(data)
if __name__ == "__main__":
main()
"""
This program gathers information from the temp_data.db file about temperature
"""
import os
import csv
from pkg_resources import resource_filename
from datetime import datetime
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, String, Date, Float
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class TemperatureData(Base):
__tablename__ = "temperature_data"
temperature_data_id = Column(Integer, primary_key=True)
name = Column(String, nullable=False)
date = Column(Date, nullable=False)
value = Column(Float, nullable=False)
def get_temperature_data(filepath):
"""
This function gets the temperature data from the csv file
"""
with open(filepath) as csvfile:
csv_reader = csv.DictReader(csvfile)
data = {row["name"]: row for row in csv_reader}
for value in data.values():
value.pop("name")
return data
def populate_database(session, temperature_data):
# insert the data
for student, data in temperature_data.items():
for date, value in data.items():
temp_data = TemperatureData(
name=student,
date=datetime.strptime(date, "%Y-%m-%d").date(),
value=value,
)
session.add(temp_data)
session.commit()
session.close()
def main():
print("starting")
# get the temperature data into a dictionary structure
csv_filepath = resource_filename("project.data", "temp_data.csv")
temperature_data = get_temperature_data(csv_filepath)
# get the filepath to the database file
sqlite_filepath = resource_filename("project.data", "temp_data.db")
# does the database exist?
if os.path.exists(sqlite_filepath):
os.remove(sqlite_filepath)
# create and populate the sqlite database
engine = create_engine(f"sqlite:///{sqlite_filepath}")
Base.metadata.create_all(engine)
Session = sessionmaker()
Session.configure(bind=engine)
session = Session()
populate_database(session, temperature_data)
print("finished")
if __name__ == "__main__":
main()
fname,lname,title,publisher
Issac,Asimov,Foundation,Random House
Pearl,Buck,The Good Earth,Random House
Pearl,Buck,The Good Earth,Simon & Schuster
Tom,Clancy,The Hunt For Red October,Berkley
Tom,Clancy,Patriot Games,Simon & Schuster
Stephen,King,It,Random House
Stephen,King,It,Penguin Random House
Stephen,King,Dead Zone,Random House
Stephen,King,The Shinning,Penguin Random House
John,Le Carre,"Tinker, Tailor, Solider, Spy: A George Smiley Novel",Berkley
Alex,Michaelides,The Silent Patient,Simon & Schuster
Carol,Shaben,Into The Abyss,Simon & Schuster
name,2019-01-02,2019-01-09,2019-01-16,2019-01-23,2019-01-30,2019-02-06,2019-02-13,2019-02-20,2019-02-27,2019-03-06,2019-03-13,2019-03-20,2019-03-27,2019-04-03,2019-04-10,2019-04-17,2019-04-24,2019-05-01,2019-05-08,2019-05-15,2019-05-22,2019-05-29,2019-06-05,2019-06-12,2019-06-19,2019-06-26,2019-07-03,2019-07-10,2019-07-17,2019-07-24,2019-07-31,2019-08-07,2019-08-14,2019-08-21,2019-08-28,2019-09-04,2019-09-11,2019-09-18,2019-09-25,2019-10-02,2019-10-09,2019-10-16,2019-10-23,2019-10-30,2019-11-06,2019-11-13,2019-11-20,2019-11-27,2019-12-04,2019-12-11,2019-12-18,2019-12-25
John,11,9,19,28,11,10,20,24,18,32,40,33,46,42,27,32,42,54,50,47,63,57,60,67,62,64,70,88,73,78,93,89,89,96,98,79,90,85,78,79,66,67,75,63,70,63,59,45,44,29,39,19
Mary,20,21,25,32,14,11,11,20,31,22,24,33,28,32,43,50,37,51,42,48,56,67,76,67,66,74,88,86,91,86,85,88,100,84,82,78,73,76,84,82,70,73,77,71,70,58,56,53,40,41,39,31
Susan,6,10,26,21,23,3,13,28,29,33,31,39,31,28,38,48,50,46,41,51,62,62,62,69,77,77,72,82,83,72,79,78,96,87,88,91,75,88,84,73,77,68,78,59,68,66,57,55,48,40,33,36
Doug,3,11,11,26,8,4,18,27,25,21,42,24,35,28,42,38,52,39,60,62,55,59,72,74,63,65,75,82,83,73,84,92,82,93,100,90,92,73,68,71,62,65,62,64,66,68,56,46,31,35,39,19
Andrew,12,21,8,17,12,16,23,12,19,32,27,42,34,44,35,47,45,54,55,60,56,56,62,71,72,62,72,80,85,86,91,98,80,88,84,92,89,88,71,78,63,79,68,67,62,57,54,44,39,30,36,23
George,18,4,19,31,22,2,7,26,34,25,36,38,36,35,34,38,42,55,48,46,60,60,68,62,65,62,87,73,82,77,82,97,97,80,95,92,73,74,79,78,81,82,63,71,74,57,57,38,38,32,25,21
Martha,14,12,6,20,10,12,7,26,32,25,41,31,30,38,42,37,50,47,56,51,58,70,74,63,70,76,73,89,77,82,77,80,88,81,81,77,89,73,87,74,77,72,62,61,57,67,56,46,40,32,25,34
Paul,18,22,12,32,5,8,15,29,16,27,24,25,27,37,37,42,42,36,49,54,59,56,65,62,76,63,83,72,75,89,79,85,97,82,100,76,73,73,81,67,74,73,61,53,59,66,64,44,42,42,25,34
Helen,14,6,13,18,20,11,12,16,31,19,34,41,44,46,39,50,39,38,50,48,51,61,56,73,78,73,73,90,86,90,82,97,100,90,92,80,73,79,72,82,62,76,72,72,68,56,69,52,41,40,37,36
# Example 1
This example uses the raw temp_data.csv file
to get data into the program and run
various Python functions on it.
In particular getting the average temperature
for a date across all the samples, and getting the
average temperature for every week for the entire
year and returning it sorted.
\ No newline at end of file
"""
This program gathers information from the temp_data.csv file about temperature
"""
import csv
from pkg_resources import resource_filename
from datetime import datetime
from datetime import timedelta
from typing import List, Dict
from collections import defaultdict
def get_temperature_data(filepath: str) -> Dict:
"""
This function gets the temperature data from the csv file
"""
with open(filepath) as csvfile:
csv_reader = csv.DictReader(csvfile)
data = {row["name"]: row for row in csv_reader}
for value in data.values():
value.pop("name")
return data
def get_average_temp_by_date(
date_string: str, temperature_data: Dict
) -> float:
"""
This function gets the average temperature for all the samples
taken by the students by date
:param date_string: date to find average temperature for
:param connection: database connection
:return: average temp for date, or None if not found
"""
# Target date
target_date = datetime.strptime(date_string, "%Y-%m-%d").date()
# Iterate through the data and get the data
data = []
for samples in temperature_data.values():
# Iterate through the samples
for sample_date, sample in samples.items():
# Generate a date range for the sample
min_date = datetime.strptime(
sample_date, "%Y-%m-%d"
).date() + timedelta(days=-3)
max_date = datetime.strptime(
sample_date, "%Y-%m-%d"
).date() + timedelta(days=3)
if min_date <= target_date <= max_date:
data.append(float(sample))
# Get the average temp
return sum(data) / len(data)
def get_average_temp_sorted(direction: str, temperature_data: Dict) -> List:
dir = direction.lower()
if dir not in ["asc", "desc"]:
raise Exception(f"Unknown direction: {direction}")
results = defaultdict(int)
for data in temperature_data.values():
for date, value in data.items():
results[date] += float(value)
for date, total in results.items():
results[date] = float(total) / float(len(temperature_data.keys()))
# Convert dictionary to list
results = results.items()
# Sort the list in the appropriate order
return sorted(
results, key=lambda v: v[1], reverse=False if dir == "asc" else True
)
def main():
print("starting")
# Get the temperature data into a dictionary structure
filepath = resource_filename("project.data", "temp_data.csv")
temperature_data = get_temperature_data(filepath)
# Get the average temperature by date
date_string = "2019-02-10"
average_temp = get_average_temp_by_date(date_string, temperature_data)
print(f"Average temp {date_string}: {average_temp:.2f}")
print()
# Get the average temps for the year sorted ascending or descending
average_temps = get_average_temp_sorted("asc", temperature_data)
for date, average_temp in average_temps:
print(f"Date: {date}, average temp: {average_temp:.2f}")
print()
print("finished")
if __name__ == "__main__":
main()
# Example 2
This example uses temp_data.db database
file to get data into the program and run
various functions on it that use Sqlite SQL
to access the data.
In particular getting the average temperature
for a date across all the samples, and getting the
average temperature for every week for the entire
year and returning it sorted.
\ No newline at end of file
"""
This program gathers information from the database file about temperature
"""
from pkg_resources import resource_filename
from datetime import datetime
from datetime import timedelta
import sqlite3
def get_average_temp_by_date(date_string, connection):
"""
This function gets the average temperature for all the samples
taken by the students by date
:param date_string: date to find average temperature for
:param connection: database connection
:return: average temp for date, or None if not found
"""
# Target date
target_date = datetime.strptime(date_string, "%Y-%m-%d").date()
min_date = target_date + timedelta(days=-3)
max_date = target_date + timedelta(days=3)
cursor = connection.cursor()
sql = """
SELECT
avg(value) as average
FROM temperature_data
WHERE date between ? and ?
"""
result = cursor.execute(sql, (min_date, max_date)).fetchone()
return result[0] if result else None
def get_average_temp_sorted(direction: str, connection) -> list:
dir = direction.lower()
if dir not in ["asc", "desc"]:
raise Exception(f"Unknown direction: {direction}")
cursor = connection.cursor()
sql = f"""
SELECT
date,
AVG(value) as average_temp
FROM temperature_data
GROUP BY date
ORDER BY average_temp {dir}
"""
results = cursor.execute(sql).fetchall()
return results
def main():
print("starting")
# Connect to the sqlite database
sqlite_filepath = resource_filename("project.data", "temp_data.db")
connection = sqlite3.connect(sqlite_filepath)
# Get the average temperature by date
date_string = "2019-02-10"
average_temp = get_average_temp_by_date(date_string, connection)
print(f"Average temp {date_string}: {average_temp:.2f}")
print()
# Get the average temps for the year sorted ascending or descending
average_temps = get_average_temp_sorted("asc", connection)
for date, average_temp in average_temps:
print(f"Date: {date}, average temp: {average_temp:.2f}")
print()
print("finished")
if __name__ == "__main__":
main()
# Example 3
This example uses SqlAlchemy to access the temp_data.db
database to get data into the program and run
various SqlAlchemy object methods to get the data.
In particular getting the average temperature
for a date across all the samples, and getting a list
of all the average temps for the year in sorted order.
\ No newline at end of file
"""
This program gathers information from the temp_data.csv file about temperature
"""
from pkg_resources import resource_filename
from datetime import datetime
from datetime import timedelta
from sqlalchemy import create_engine
from sqlalchemy import Column, String, Integer, Float, Date
from sqlalchemy.sql import func, and_, asc, desc
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class TemperatureData(Base):
__tablename__ = "temperature_data"
id = Column(Integer, primary_key=True)
name = Column(String)
date = Column(Date)
value = Column(Float)
def get_average_temp_by_date(date_string, session) -> float:
"""
This function gets the average temperature for all the samples
taken by the students by date
:param date_string: date to find average temperature for
:param session: SqlAlchemy session
:return: average temp for date, or None if not found
"""
target_date = datetime.strptime(date_string, "%Y-%m-%d").date()
min_date = target_date + timedelta(days=-3)
max_date = target_date + timedelta(days=3)
result = (
session.query(func.avg(TemperatureData.value))
.filter(
and_(
TemperatureData.date >= min_date,
TemperatureData.date <= max_date,
)
)
.one()
)
return result[0]
def get_average_temp_sorted(direction: str, session) -> list:
if direction.lower() not in ["asc", "desc"]:
raise Exception(f"Unknown direction: {direction}")
dir = asc if direction.lower() == "asc" else desc
results = (
session.query(
TemperatureData.date,
func.avg(TemperatureData.value).label("average_temp"),
)
.group_by(TemperatureData.date)
.order_by(dir("average_temp"))
.all()
)
return results
def main():
print("starting")
# Connect to the database using SqlAlchemy
sqlite_filepath = resource_filename("project.data", "temp_data.db")
engine = create_engine(f"sqlite:///{sqlite_filepath}")
Session = sessionmaker()
Session.configure(bind=engine)
session = Session()
# Get the average temperature by date
date_string = "2019-02-10"
average_temp = get_average_temp_by_date(date_string, session)
print(f"Average temp {date_string}: {average_temp:.2f}")
print()
# Get the average temps for the year sorted ascending or descending
average_temps = get_average_temp_sorted("asc", session)
for row in average_temps:
print(f"Date: {row.date}, average temp: {row.average_temp:.2f}")
print()
print("finished")
if __name__ == "__main__":
main()
# Example 4
This example uses the raw author_book_publisher.csv file to
get data into the program and run various python functions
on it.
The csv file uses redundant data to hold relationship data
in the file. The main.py program examines this file to use
those reletionships to get different kinds of information
from the file.
\ No newline at end of file
"""
This is the example 4 program file
"""
import csv
import copy
from pkg_resources import resource_filename
from typing import List
from collections import defaultdict
from uuid import uuid4
from treelib import Tree
def get_author_book_publisher_data(filepath: str) -> List:
"""
This function gets the temperature data from the csv file
"""
with open(filepath) as csvfile:
csv_reader = csv.DictReader(csvfile)
data = [row for row in csv_reader]
return data
def get_total_number_of_books_by_publishers(data, direction) -> List:
"""
:param data: author/book/publisher data
:param direction: direction to sort the data by
:return: List of sorted data
"""
if direction not in ["asc", "desc"]:
raise Exception(f"Unknown direction: {direction}")
# Get total number of books for each publisher
publishers = defaultdict(int)
for row in data:
publishers[row["publisher"]] += 1
# Convert the dictionary to a list of tuples and sort it
retval = [(k, v) for k, v in publishers.items()]
return sorted(
retval,
key=lambda v: v[1],
reverse=False if direction == "asc" else True,
)
def get_total_number_of_authors_by_publishers(data, direction: str) -> List:
"""
:param data: author/book/publisher data
:param direction: direction to sort the data by
:return: List of sorted data
"""
if direction not in ["asc", "desc"]:
raise Exception("Unknown direction", direction)
# Get total number of authors for each publisher
publishers = defaultdict(list)
for row in data:
publishers[row["publisher"]] = []
for row in data:
author = f"{row['fname']} {row['lname']}"
if author not in publishers[row["publisher"]]:
publishers[row["publisher"]].append(author)
retval = [(k, len(v)) for k, v in publishers.items()]
return sorted(
retval,
key=lambda v: v[1],
reverse=False if direction == "asc" else True,
)
def get_authors(data) -> List:
"""
This function returns a list of authors, including their hierarchical
data
:param data: author/book/publisher data
:return: list of authors data
"""
# Get the authors
authors = {f"{row['fname']} {row['lname']}": {} for row in data}
# Get the books associated with the authors
for row in data:
author = f"{row['fname']} {row['lname']}"
title = row["title"]
authors[author][title] = set()
# Get the publisher associated with the book
for row in data:
author = f"{row['fname']} {row['lname']}"
title = row["title"]
publisher = row["publisher"]
authors[author][title].add(publisher)
return authors
def add_new_book(data, author_name, book_title, publisher_name):
"""
This function adds a new book to the data
:param data: author/book/publisher data
:param author_name: author's name
:param book_title: book title
:param publisher_name: publishers name
:return: updated data
"""
# Create a new copy of the passed in data
new_data = copy.deepcopy(data)
# Does the book exist?
books = {row["title"] for row in data}
if book_title in books:
raise Exception("Book exists", book_title)
fname, lname = author_name.split(" ")
# Does the author exist?
authors = {f"{row['fname']} {row['lname']}" for row in data}
if author_name not in authors:
raise Exception("No author found", author_name)
# Does the publisher exist?
publishers = {row["publisher"] for row in data}
if publisher_name not in publishers:
raise Exception("No publisher found", publisher_name)
# Add the new book
new_data.append(
{
"fname": fname,
"lname": lname,
"title": book_title,
"publisher": publisher_name,
}
)
return new_data
def output_hierarchical_author_data(authors):
"""
This function outputs the author/book/publisher information in
a hierarchical manner
:param authors: the collection of root author objects
:return: None
"""
authors_tree = Tree()
authors_tree.create_node("Authors", "authors")
for author, books in authors.items():
authors_tree.create_node(author, author, parent="authors")
for book, publishers in books.items():
authors_tree.create_node(book, book, parent=author)
for publisher in publishers:
authors_tree.create_node(publisher, uuid4(), parent=book)
# Output the hierarchical authors data
print(authors_tree.show())
def main():
"""
The main entry point of the program
"""
print("starting")
# Connect to the database using SqlAlchemy
filepath = resource_filename("project.data", "author_book_publisher.csv")
author_book_publisher_data = get_author_book_publisher_data(filepath)
# Get the total number of books printed by each publisher
total_books_by_publisher = get_total_number_of_books_by_publishers(
author_book_publisher_data, "desc"
)
for publisher, total_books in total_books_by_publisher:
print(f"Publisher: {publisher}, total books: {total_books}")
print()
# Get the total number of authors each publisher publishes
total_authors_by_publisher = get_total_number_of_authors_by_publishers(
author_book_publisher_data, "desc"
)
for publisher, total_authors in total_authors_by_publisher:
print(f"Publisher: {publisher}, total authors: {total_authors}")
print()
# Output hierarchical authors data
authors = get_authors(author_book_publisher_data)
output_hierarchical_author_data(authors)
# Add a new book to the data structure
author_book_publisher_data = add_new_book(
author_book_publisher_data,
author_name="Stephen King",
book_title="The Stand",
publisher_name="Random House",
)
# Output the updated hierarchical authors data
authors = get_authors(author_book_publisher_data)
output_hierarchical_author_data(authors)
print("finished")
if __name__ == "__main__":
main()
# Example 5
This example uses Sqlite and SQL queries to access the
author_book_publisher.db database file to
get data into the program and run various python functions
on it.
The database captures the relationships between tables, and
this is used to generate interesting data about the database.
However, the original CSV file is essentially re-created in
the *get_authors()* function in order to create the authors
hierarchical data necessary to generate the tree view
presented by the main program.
"""
Example 5 program file
"""
from pkg_resources import resource_filename
import sqlite3
from uuid import uuid4
from typing import List
from treelib import Tree
def get_total_number_of_books_by_publishers(connection, direction) -> List:
"""
:param connection: connection to the database
:return: List of sorted data
"""
if direction not in ["asc", "desc"]:
raise Exception("Unknown direction", direction)
cursor = connection.cursor()
sql = f"""
SELECT
p.name as publisher_name,
count(b.title) as total_books
FROM publisher p
JOIN book_publisher bp on bp.publisher_id = p.publisher_id
JOIN book b on b.book_id = bp.book_id
GROUP BY publisher_name
ORDER BY total_books {direction};
"""
result = cursor.execute(sql).fetchall()
return result
def get_total_number_of_authors_by_publishers(connection, direction) -> List:
"""
:param connection: connection to the database
:return: List of sorted data
"""
if direction not in ["asc", "desc"]:
raise Exception("Unknown direction", direction)
cursor = connection.cursor()
sql = f"""
SELECT
p.name as publisher_name,
count(a.lname) as total_authors
FROM publisher p
JOIN author_publisher ap on p.publisher_id = ap.publisher_id
JOIN author a on ap.author_id = a.author_id
GROUP BY publisher_name
ORDER BY total_authors {direction};
"""
result = cursor.execute(sql).fetchall()
return result
def get_authors(connection) -> List:
"""
This method gets the authors data an builds a hierarchical data
structure from there for use by the treelib
:param connection: connection to the database
:return: list of authors data
"""
cursor = connection.cursor()
sql = f"""
SELECT
a.fname || ' ' || a.lname as author,
b.title,
p.name
FROM author a
JOIN book b on b.author_id = a.author_id
JOIN author_publisher ap on ap.author_id = a.author_id
JOIN publisher p on p.publisher_id = ap.publisher_id
"""
result = cursor.execute(sql).fetchall()
# Get the authors
authors = {row[0]: {} for row in result}
# Get the books associated with the authors
for row in result:
author = row[0]
title = row[1]
authors[author][title] = set()
# Get the publisher associated with the book
for row in result:
author = row[0]
title = row[1]
publisher = row[2]
authors[author][title].add(publisher)
return authors
def add_new_book(connection, author_name, book_title, publisher_name):
"""
This function adds a new book to the database
:param session: database session to work with
:param author_name: authors full name
:param book_title: book title
:param publisher_name: publisher of book
:return: None
"""
connection.row_factory = sqlite3.Row
cursor = connection.cursor()
# Does the book exist?
sql = f"""SELECT 1 FROM book WHERE title = ?"""
book = cursor.execute(sql, (book_title,)).fetchone()
if book is not None:
raise Exception("Book exists", book_title)
# Get author
sql = f"""
SELECT
author_id
FROM author
WHERE author.fname = ?
AND author.lname = ?
"""
fname, lname = author_name.split(" ")
author = cursor.execute(sql, (fname, lname)).fetchone()
# Did we get an author?
if author is None:
raise Exception("No author found", author_name)
# Get publisher
sql = f"""
SELECT
publisher_id
FROM publisher
WHERE name = ?
"""
publisher = cursor.execute(sql, (publisher_name,)).fetchone()
# Did we get a publisher?
if publisher is None:
raise Exception("No publisher found", publisher_name)
# Add the book
sql = f"""
INSERT INTO book
(title, author_id)
VALUES(?, ?)
"""
book_id = cursor.execute(sql, (book_title, author["author_id"])).lastrowid
# Update the relationships
sql = f"""
INSERT INTO book_publisher
(book_id, publisher_id)
VALUES(?, ?)
"""
cursor.execute(sql, (book_id, publisher["publisher_id"]))
connection.commit()
def output_hierarchical_author_data(authors):
"""
This function outputs the author/book/publisher information in
a hierarchical manner
:param authors: the collection of root author objects
:return: None
"""
authors_tree = Tree()
authors_tree.create_node("Authors", "authors")
for author, books in authors.items():
authors_tree.create_node(author, author, parent="authors")
for book, publishers in books.items():
authors_tree.create_node(book, book, parent=author)
for publisher in publishers:
authors_tree.create_node(publisher, uuid4(), parent=book)
# Output the hierarchical authors data
print(authors_tree.show())
def main():
"""
Main program entry point
"""
print("starting")
# connect to the sqlite database
sqlite_filepath = resource_filename(
"project.data", "author_book_publisher.db"
)
connection = sqlite3.connect(sqlite_filepath)
# Get the total number of books printed by each publisher
total_books_by_publisher = get_total_number_of_books_by_publishers(
connection, "desc"
)
for publisher, total_books in total_books_by_publisher:
print(f"Publisher: {publisher}, total books: {total_books}")
print()
# Get the total number of authors each publisher publishes
total_authors_by_publisher = get_total_number_of_authors_by_publishers(
connection, "desc"
)
for publisher, total_authors in total_authors_by_publisher:
print(f"Publisher: {publisher}, total authors: {total_authors}")
print()
# Output hierarchical authors data
authors = get_authors(connection)
output_hierarchical_author_data(authors)
# add a new book
add_new_book(
connection,
author_name="Stephen King",
book_title="The Stand",
publisher_name="Random House",
)
# Output hierarchical authors data
authors = get_authors(connection)
output_hierarchical_author_data(authors)
print("finished")
if __name__ == "__main__":
main()
# Example 6
This example uses SqlAlcheym to access the author_book_publisher.db
sqlite file to get data into the program and run various python functions
on it.
The main.py program file shows how to use the objects returned
by SqlAlchemy, and the relationships coded into them, to retreive
the same information the previous example programs did.
\ No newline at end of file
"""
This program gathers information from the temp_data.csv file about temperature
"""
from pkg_resources import resource_filename
from typing import List
from uuid import uuid4
from sqlalchemy import create_engine
from sqlalchemy import and_
from sqlalchemy.sql import func, asc, desc
from sqlalchemy.orm import sessionmaker
from treelib import Tree
from project.modules.models import Author
from project.modules.models import Book
from project.modules.models import Publisher
def get_total_number_of_books_by_publishers(session, direction: str) -> List:
"""
:param session: database session to work with
:param direction:
:return:
"""
if direction not in ["asc", "desc"]:
raise Exception(f"Unknown direction: {direction}")
dir = desc if direction == "desc" else asc
results = (
session.query(
Publisher.name, func.count(Book.title).label("total_books")
)
.join(Publisher.books)
.group_by(Publisher.name)
.order_by(dir("total_books"))
)
return results
def get_total_number_of_authors_by_publishers(session, direction: str) -> List:
"""
:param session: database session to work with
:param direction:
:return:
"""
if direction not in ["asc", "desc"]:
raise Exception(f"Unknown direction: {direction}")
dir = desc if direction == "desc" else asc
results = (
session.query(
Publisher.name, func.count(Author.fname).label("total_authors")
)
.join(Publisher.authors)
.group_by(Publisher.name)
.order_by(dir("total_authors"))
)
return results
def get_authors(session) -> List:
"""
This function returns a list of author objects
:param session: database session to work with
:return: list of Author objects
"""
results = session.query(Author).order_by(Author.lname).all()
return results
def add_new_book(session, author_name, book_title, publisher_name):
"""
This function adds a new book to the database
:param session: database session to work with
:param author_name: authors full name
:param book_title: book title
:param publisher_name: publisher of book
:return: None
"""
fname, lname = author_name.split(" ")
# Does the book exist?
book = session.query(Book).filter(Book.title == book_title).one_or_none()
if book is not None:
raise Exception("Book exists", book_title)
# Get author
author = (
session.query(Author)
.filter(and_(Author.fname == fname, Author.lname == lname))
.one_or_none()
)
# Did we get an author?
if author is None:
raise Exception("No author found", author_name)
# Get publisher
publisher = (
session.query(Publisher)
.filter(Publisher.name == publisher_name)
.one_or_none()
)
# Did we get a publisher?
if publisher is None:
raise Exception("No publisher found", publisher)
# Create the book
new_book = Book(title=book_title)
# Insert the book and create the relationships
author.books.append(new_book)
publisher.books.append(new_book)
session.commit()
def output_hierarchical_author_data(authors):
"""
This function outputs the author/book/publisher information in
a hierarchical manner
:param authors: the collection of root author objects
:return: None
"""
authors_tree = Tree()
authors_tree.create_node("Authors", "authors")
for author in authors:
authors_tree.create_node(
f"{author.fname} {author.lname}",
f"{author.fname} {author.lname}",
parent="authors",
)
for book in author.books:
authors_tree.create_node(
f"{book.title}",
f"{book.title}",
parent=f"{author.fname} {author.lname}",
)
for publisher in book.publishers:
authors_tree.create_node(
f"{publisher.name}", uuid4(), parent=f"{book.title}"
)
# Output the hierarchical authors data
print(authors_tree.show())
def main():
"""
Main entry point of program
"""
print("starting")
# Connect to the database using SqlAlchemy
sqlite_filepath = resource_filename(
"project.data", "author_book_publisher.db"
)
engine = create_engine(f"sqlite:///{sqlite_filepath}")
Session = sessionmaker()
Session.configure(bind=engine)
session = Session()
# Get the total number of books printed by each publisher
total_books_by_publisher = get_total_number_of_books_by_publishers(
session, "desc"
)
for row in total_books_by_publisher:
print(f"Publisher: {row.name}, total books: {row.total_books}")
print()
# Get the total number of authors each publisher publishes
total_authors_by_publisher = get_total_number_of_authors_by_publishers(
session, "desc"
)
for row in total_authors_by_publisher:
print(f"Publisher: {row.name}, total authors: {row.total_authors}")
print()
# Output hierarchical authors data
authors = get_authors(session)
output_hierarchical_author_data(authors)
# Add a new book
add_new_book(
session,
author_name="Stephen King",
book_title="The Stand",
publisher_name="Random House",
)
# Output the updated hierarchical authors data
authors = get_authors(session)
output_hierarchical_author_data(authors)
print("finished")
if __name__ == "__main__":
main()
from sqlalchemy import Column, Integer, String, ForeignKey, Table
from sqlalchemy.orm import relationship, backref
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
author_publisher = Table(
"author_publisher",
Base.metadata,
Column("author_id", Integer, ForeignKey("author.author_id")),
Column("publisher_id", Integer, ForeignKey("publisher.publisher_id")),
)
book_publisher = Table(
"book_publisher",
Base.metadata,
Column("book_id", Integer, ForeignKey("book.book_id")),
Column("publisher_id", Integer, ForeignKey("publisher.publisher_id")),
)
class Author(Base):
__tablename__ = "author"
author_id = Column(Integer, primary_key=True)
fname = Column(String)
lname = Column(String)
books = relationship("Book", backref=backref("author"))
publishers = relationship(
"Publisher", secondary=author_publisher, back_populates="authors"
)
class Book(Base):
__tablename__ = "book"
book_id = Column(Integer, primary_key=True)
author_id = Column(Integer, ForeignKey("author.author_id"))
title = Column(String)
publishers = relationship(
"Publisher", secondary=book_publisher, back_populates="books"
)
class Publisher(Base):
__tablename__ = "publisher"
publisher_id = Column(Integer, primary_key=True)
name = Column(String)
authors = relationship(
"Author", secondary=author_publisher, back_populates="publishers"
)
books = relationship(
"Book", secondary=book_publisher, back_populates="publishers"
)
from setuptools import setup, find_packages
setup(
name="project",
version="1.0",
packages=find_packages(),
install_requires=["SQLAlchemy", "treelib"],
)
......@@ -291,9 +291,9 @@ def plot_with_legend(
all_plots = []
for data, label in zip(y_data, legend_labels):
if log:
temp, = plt.loglog(x_range, data, label=label)
(temp,) = plt.loglog(x_range, data, label=label)
else:
temp, = plt.plot(x_range, data, label=label)
(temp,) = plt.plot(x_range, data, label=label)
all_plots.append(temp)
plt.title(title)
......
import queue
from builtins import range
def task(name, queue):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册