Programming Use Cases with Python

Course

Intro Video

Photo of Keith Thompson

Keith Thompson

DevOps Training Architect II in Content

Length

03:56:44

Difficulty

Intermediate

Course Details

The best ways to improve as a programmer are to 1) read a lot of code and 2) exercise our programming skills by solving problems. In this completely project-based course, we’ll work through various projects from start to finish by breaking down problems and solving them using Python. Along the way, we'll learn about some intermediate to advanced Python topics and see how we can apply concepts that we've already learned to solve completely different problems.

Syllabus

Introduction

Getting Started

Course Introduction

00:00:44

Lesson Description:

Python is one of the most versatile and widely used programming languages that exists today. Whether you work in server administration, web development, or data science, you've likely interacted with a tool written in Python or been asked to write some Python yourself. Like any skill, the only way to get better at programming is to practice. Throughout this course, we'll gain some programming practice by using Python to solve problems as we build complete projects.

About the Course Author

00:00:28

Lesson Description:

In this video, you'll learn a little about me, Keith Thompson.

Environment Setup

Installing Python 3.7 on a Cloud Playground

00:05:56

Lesson Description:

In this lesson, we will learn how to install Python 3 from source on CentOS 7 and Debian-based machines. Note: This course uses Python 3.7, and you will definitely run into issues if you are using Python < 3.7. Download and Install Python 3 from Source on CentOS 7 Here are the commands we will run to build and install Python 3.7 on CentOS 7: sudo -i yum groupinstall -y "Development Tools" yum install -y zlib-devel cd /usr/src wget https://python.org/ftp/python/3.7.3/Python-3.7.3.tar.xz tar xf Python-3.7.3.tar.xz cd Python-3.7.3 ./configure --enable-optimizations --with-ensurepip=install make altinstall exitNote: The make altinstall command prevents the built-in Python executable from being replaced. Download and Install Python 3 from Source on Debian Here are the commands we will run to build and install Python 3.7 on a Debian-based machine: sudo -i apt update -y apt install -y wget build-essential libffi-dev libgdbm-dev libc6-dev libssl-dev zlib1g-dev libbz2-dev libreadline-dev libsqlite3-dev libncurses5-dev libncursesw5-dev xz-utils tk-dev cd /usr/src wget https://www.python.org/ftp/python/3.7.3/Python-3.7.3.tar.xz tar xf Python-3.7.3.tar.xz cd Python-3.7.3.tar.xz ./configure --enable-optimizations --with-ensurepip=install make altinstall exitNote: The make altinstall command prevents the built-in Python executable from being replaced. Ensure Python 3 Works with sudo Make sure secure_path in the /etc/sudoers file includes /usr/local/bin. The line should look something like this: Defaults secure_path = /sbin:/bin:/usr/sbin:/usr/bin:/usr/local/binUpgrade Pip Note: This is not always necessary. The version of pip we have might be up to date, but it's a good practice to try to update it after installation. Because we are working with Python 3, we need to use the pip3.7 executable, and we will use sudo so we can write files under the /usr/local directory. sudo pip3.7 install --upgrade pip

Picking a Text Editor or IDE

00:06:17

Lesson Description:

Before we start writing code, we should think about the tools we're going to use to do the development. Having a well-configured text editor can make the programming experience a lot more enjoyable. Much like in carpentry, having sharp tools leads to a more productive and creative experience. Documentation for This Lesson VimEmacsNanoAtomVS CodeSublimeTextNotepad++PyCharm Terminal-Based Editors There are a few different terminal editors we can work with. The main benefit of using a terminal-based editor is that we can run it on a server that we are connected to. This allows us to stay in the terminal to carry out programming tasks, from developing the code to debugging and deploying. Here are two terminal-based editors that are quite popular: Vim: Modal editor, extremely customizable.Emacs: Unbelievably customizable, not modal (at least not by default). Both of these tools are either pre-installed or readily available on all major Linux distros. The third option is Nano/Pico, which is a tool that should only be used if nothing else is available. GUI-Based Editors GUI-based editors can be extremely powerful and are more aesthetically pleasing than terminal-based editors. This list consists of classic text editors, but most of them can be enhanced using plugins that add additional functionality. We can divide them into two groups: native applications and Electron applications (built using JavaScript). This may seem like an odd distinction, but many people don't like the resource overhead that running Electron-based applications requires. Native SublimeText: A multi-platform text editor that provides excellent performance and can be extended using Python 3Notepad++: A Windows-only text editor that is not as robust as the others, but is a great one to start with Electron-Based Atom: The original Electron-based editor, which is aesthetically pleasing and very extendable through plugins.VS Code: The most popular GUI-based editor, which has a vast ecosystem of plugins and a built-in debugger. This is what I will use throughout the course. IDEs The primary IDE used in the Python community is PyCharm. There is a free community edition and a paid edition. To connect to a remote server for editing tasks, we will need to have the paid version.

Setting Up VS Code for Remote Python Development

00:15:05

Lesson Description:

In this lesson, we're going to set up Visual Studio (VS) Code for remote development. By the time we're finished, we'll have VS Code properly configured to enable us to use Cloud Playground as our development environment while still being able to use nice development tools on our workstations. Documentation for This Lesson VS CodeVS Code - Python ExtensionVS Code - Remote Development ExtensionVS Code - Pyright Extension Installing VS Code VS Code is probably the most popular text editor used by programmers today, and thankfully it's installable on all major operating systems. To follow along with this lesson, we must have VS Code installed on our workstation. Installing Helpful Python Extensions One of the best features of VS Code is the vast number of high-quality extensions we can install and customize to make our development environment our own. To work with Python, we are going to install a few different Python-specific extensions: Python: This is an official extension maintained by Microsoft that adds a lot of Python functionality. With this extension, we can have automated linting, run our tests, debug Python code, run Python files, or even run a line of Python directly in a read-eval-print loop (REPL) — all from within the editor.Pyright: This extension adds support to VS Code to handle the type hints we can add to our Python code (using Python >= 3.5). It also tells us if we are using functions or classes with improper types. There are other extensions we could add for working with other Python-related projects (like Django), but for now, having these two installed is enough for a powerful Python IDE. Setting Up Remote Development If we are working on Python projects located on a development server, then the Remote Development Extension is a fantastic choice. Now that we have set up our Python development cloud server, we can configure a host for the server that will make it easier to connect from within VS Code. In order to follow along, you'll need to be able to do the following: Connect to a remote host using SSHGenerate SSH keys (ssh-keygen)Copy SSH keys (e.g., ssh-copy-id) Let's start by creating an SSH key we'll only use for connecting to our remote SSH servers (so we don't use keys that have access to other servers). Here are the commands we need to run from our workstation: ssh-keygen -t rsa -b 4096 -C "me@example.com" -f /home/cloud_user/.ssh/id_rsa-remote-ssh Generating public/private rsa key pair. Enter passphrase (empty for no passphrase): Enter same passphrase again: Your identification has been saved in /home/cloud_user/.ssh/id_rsa-remote-ssh. Your public key has been saved in /home/cloud_user/.ssh/id_rsa-remote-ssh.pub. The key fingerprint is: SHA256:ISPyzUc8F+A5CbMgSpBcHlYTi5ML9KtAiU5v/7TI87s me@example.com The key's randomart image is: +---[RSA 4096]----+ |++o+o++ ... | |=o=.+.o* o . | |o=.*..+ X . | |+ oo++ + = | |.. =. o S | |. o . . | | . . . | | ..+ . | | ooEo | +----[SHA256]-----+Next, we'll use ssh-copy-id to copy the SSH public key to our Cloud Playground: ssh-copy-id -i ~/.ssh/id_rsa-remote-ssh.pub cloud_user@SERVER_ID.mylabserver.com /usr/bin/ssh-copy-id: INFO: Source of key(s) to be installed: "/home/cloud_user/.ssh/id_rsa-remote-ssh.pub" The authenticity of host 'SERVER_ID.mylabserver.com (18.191.205.57)' can't be established. ECDSA key fingerprint is SHA256:ltRgmgobKpTm0KaXg1RN23JDEkItBtLv+wE3wuwy+o0. Are you sure you want to continue connecting (yes/no)? yes /usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed /usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys cloud_user@SERVER_ID.mylabserver.com's password: Number of key(s) added: 1 Now try logging into the machine, with: "ssh 'cloud_user@SERVER_ID.mylabserver.com'" and check to make sure that only the key(s) you wanted were added.Finally, let's add the following Host entry to our SSH client config: ~/.ssh/config Host python-server User cloud_user HostName SERVER_ID.mylabserver.com IdentityFile ~/.ssh/id_rsa-remote-sshNow from within VS Code we should be able to remotely connect to this server using the Remote Development extension. This will take a little longer to get started the first time as it sets up the VS Code server on the Cloud Playground, but when it's finished, we will be in a new window connected to the remote host (indicated in the bottom left corner). If we select Extensions in the sidebar (the odd square icon), we will now see the extensions that were installed locally, plus the extensions installed on the server. There aren't any remote extensions yet; we should install the Python-related ones to the server by finding them in the list of locally installed extensions and clicking the Install on SSH: python-server button. This button doesn't exist for the extensions that run purely in the client because they will work without being on the server. Special macOS Instructions On macOS there are a few more steps to ensuring a smooth experience when working with SSH in general: Add a Host configuration to ensure that the Keychain is used instead of prompting for the passphraseAdd each identity file to the Keychain The block we need in the ~/.ssh/config file looks like this, we'll add it to the top of the file: ~/.ssh/config (partial) Host * AddKeysToAgent yes UseKeychain yes IdentitiesOnly yesWith that configuration block added, we need to specify that our new SSH key should work with the Keychain: $ ssh-add -K ~/.ssh/id_rsa-remote-sshNow the SSH experience from macOS should be similar to a Linux environment.

Building Sharp Tools

Powerful CLIs

Project Overview and Setup: Load-Testing CLI

00:11:13

Lesson Description:

Python is an amazing scripting language, and one way that we can use it is to create scripts and command line tools. For our first command line project, we're going to build a CLI that will allow us to load-test a web site to see how many requests can be handled in a set amount of time. In this lesson, we'll cover the layout of our project and discuss how we want it to be used. Documentation for This Video Setup.py for HumansPipenvPython .gitignore Starting with README.md Before building a project, it's a good idea to conceptualize how it should be used. This prevents us from building features that aren't really needed. An interesting way to do this is to write the README for the project first. Our project is called assualt, so let's create a directory with a README.md in it and a directory to eventually hold our package's modules: $ mkdir -p assault/assault $ cd assault $ touch assault/__init__.py $ touch README.mdOur tool needs to do a few things: Take a URL to make requests to.Make a number of requests (this should be configurable).Make requests with a certain amount of concurrency (this should be configurable).Output some general stats about the requests. It should optionally allow for JSON file output of this information. Here's an example of what it will look like to make 3000 requests: $ assault -r 3000 -c 10 https://example.com .... Done! --- Results --- Successful requests 3000 Slowest 0.010s Fastest 0.001s Average 0.003s Total time 2.400s Requests Per Minute 90000 Requests Per Second 1250Here's what our README.md will look like: assault/README.md # assault A simple CLI load testing tool. ## Installation Install using `pip`: ``` $ pip install assault ``` ## Usage The simplest usage of `assault` requires only a URL to test against and 500 requests synchronously (one at a time). This is what it would look like: ``` $ assault https://example.com .... Done! --- Results --- Successful requests 500 Slowest 0.010s Fastest 0.001s Average 0.003s Total time 0.620s Requests Per Minute 48360 Requests Per Second 806 ``` If we want to add concurrency, we'll use the `-c` option, and we can use the `-r` option to specify how many requests that we'd like to make: ``` $ assault -r 3000 -c 10 https://example.com .... Done! --- Results --- Successful requests 3000 Slowest 0.010s Fastest 0.001s Average 0.003s Total time 2.400s Requests Per Minute 90000 Requests Per Second 1250 ``` If you'd like to see these results in JSON format, you can use the `-j` option with a path to a JSON file: ``` $ assault -r 3000 -c 10 -j output.json https://example.com .... Done! ``` ## Development For working on `assult`, you'll need to have Python >= 3.7 (because we'll use `asyncio`) and [`pipenv`][1] installed. With those installed, run the following command to create a virtualenv for the project and fetch the dependencies: ``` $ pipenv install --dev ... ``` Next, activate the virtualenv and get to work: ``` $ pipenv shell ... (assault) $ ``` [1]: https://docs.pipenv.org/en/latest/ With our documentation in place, we at least have something to come back to if we lose track of what we should be working towards. The setup.py Some of the other files that we'll want to have before we dig into the code are the setup.py and the .gitignore. These files can be written by hand, but there are some pretty great starting points out there. For the setup.py, we can use the setup.py for Humans. We'll need to make some modifications, but this file will save us a lot of time. Let's download the file and start modifying it: $ curl -O https://raw.githubusercontent.com/navdeep-G/setup.py/master/setup.pyAs for our modifications, we'll want to change things in the # Package meta-data section to be about assault: setup.py (partial) # Package meta-data. NAME = 'assault' DESCRIPTION = 'A Python based web load testing tool.' URL = 'https://github.com/example/assault' EMAIL = 'me@example.com' AUTHOR = 'Example Person' REQUIRES_PYTHON = '>=3.7.0' VERSION = '0.1.0' We'll also want to change any mention of Python 3.6.0 to Python 3.7.0. The .gitignore For our .gitignore file, we're going to use the one for Python maintained by GitHub. We can pull it down using the following curl command: $ curl https://raw.githubusercontent.com/github/gitignore/master/Python.gitignore -o .gitignoreAt this point it makes sense to also intialize our project as a Git repository, so let's do that: $ git initUsing Pipenv for our Virtual Environment Finally, we're going to use Pipenv to manage our virtual environment and development dependencies. Since we're creating an installable library, we'll also need to add dependencies to the setup.py later on, but Pipenv is still useful for us while we're developing. Let's initialize our environment using Python 3.7 and install twine as a development dependency as specified by the setup.py to get the python setup.py upload feature: $ pipenv install --python python3.7 twine --dev ...Now we're ready to make our first commit and then start developing our tool: $ git add --all . $ git commit -m 'Initial commit'

(Optional) Configuring a Project in VS Code

00:08:21

Lesson Description:

VS Code has great options for customizing how it runs based on the project that we're working in. In this lesson, we'll set up VS Code with some customizations for our assault project. Documentation for This Video BlackPylint The Project Workspace To focus on what we're doing, we're first going to close any open windows that we have and open a new remote development session. With the new window open, we'll open the ~/code/assault directory so we only see files that are part of our project. From here, we can set which Python interpreter to use by opening the command palette with Shift + Ctrl + P and then running the "Python: Select Interpreter" command. In the list that is displayed, we should see an option for our project's virtualenv—we'll select that. Now we should have a .vscode directory with a settings.json file in it. This file is where we'll be putting our project's configuration. Let's modify this file a little more so that it looks like this: Note: Your pythonPath value will be different. assault/.vscode/settings.json { "python.pythonPath": "/home/cloud_user/.local/share/virtualenvs/assault-F3hjvTUZ/bin/python", "python.linting.enabled": true, "editor.formatOnSave": true, "python.formatting.provider": "black" } Next we need to open the setup.py file, and when we save it, we should be prompted to install Black and Pylint. Select Yes for both, and VS Code will install them by adding them to our Pipfile as development dependencies. Now when we save files that we're working in, Black will automatically adjust the formatting, and Pylint will let us know if we're breaking any of its linting rules.

Designing the CLI: argparse or click

00:11:53

Lesson Description:

To get started, we're going to write the code that actually presents the CLI. There are a few ways that we could do this. We could use argparse from the standard library, but we're going to use the popular third-party package click. Documentation for This Video argparseclick Installing click We'll be using click to create our CLI, so it needs to be a real dependency of our tool. We're going to add this to the Pipfile using Pipenv: $ pipenv install click ...Additionally, let's add this to our setup.py in the REQUIRED list so that it will be installed when someone installs our package: setup.py (partial) REQUIRED = [ 'click' ] Building the CLI Now that we have click installed, we're ready to use it by creating a cli module: assault/cli.py import click @click.command() def cli(): pass if __name__ == "__main__": cli() We've placed the "__main__" portion in there so that we can easily test this. Now we can test our CLI from within our virtualenv by executing this file: $ pipenv shell (assault) $ python assault/cli.py --help Usage: cli.py [OPTIONS] Options: --help Show this message and exit.The click.command gives us automatic help page generation and makes it easy for us to develop and define subcommands. Our next step is to add our 3 options using the click.option decorator and the URL argument using click.argument: assault/cli.py import click @click.command() @click.option("--requests", "-r", default=500, help="Number of requests") @click.option("--concurrency", "-c", default=1, help="Number of concurrent requests") @click.option("--json-file", "-j", default=None, help="Path to output JSON file") @click.argument("url") def cli(requests, concurrency, json_file, url): print(f"Requests: {requests}") print(f"Concurrency: {concurrency}") print(f"JSON File: {json_file}") print(f"URL: {url}") if __name__ == "__main__": cli() When we take a look at the help text, we see a lot more information: (assault) $ python assault/cli.py --help Usage: cli.py [OPTIONS] URL Options: -r, --requests INTEGER Number of requests -c, --concurrency INTEGER Number of concurrent requests -j, --json-file TEXT Path to output JSON file --help Show this message and exit.Let's see what happens when we run the command without the URL argument: (assault) $ python assault/cli.py Usage: cli.py [OPTIONS] URL Try "cli.py --help" for help. Error: Missing argument "URL".Finally, let's run it with a URL: (assault) $ python assault/cli.py https://example.com Requests: 500 Concurrency: 1 JSON File: None URL: https://example.comThat's all we need to do to get the information from the user that we can then pass to the business logic of our tool. Adding the CLI in setup.py The boilerplate text for the setup.py that we're using already has an entry_points section in it (although commented out). We need to uncomment that section and adjust the boilerplate text: setup.py (partial) entry_points={ 'console_scripts': ['assault=assault.cli:cli'], }, We can now test this by running pip install -e .: (assault) $ pip install -e . (assault) $ assault Usage: assault [OPTIONS] URL Try "assault --help" for help. Error: Missing argument "URL".Besides the output that we need to display after we make our requests, our CLI is mostly complete. Let's commit and move on to something else. (assault) $ git add --all . (assault) $ git commit -m 'Add click and create CLI'

Concurrent Programming with Async & Await — Part 1

00:17:01

Lesson Description:

Supporting concurrent requests will be the most difficult part of this project. Thankfully, Python 3.7 includes the asyncio package, which we can use to do work concurrently. Documentation for This Video asyncioRequestsasyncio.Queue Laying Out the http Module Before we actually write any of our code, let's think about how we want to divide things up. We're going to put the code that makes HTTP requests into a new module named http. We'll begin by outlining a few functions: assault/http.py # Make the request and return the results def fetch(url): pass # A function to take unmade requests from a queue, perform the work, and add result to the queue def worker(name, queue, results) pass # Divide up work into batches and collect final results def distribute_work(url, requests, concurrency, results): pass # Entrypoint to making requests def assault(url, requests, concurrency): pass Ideally, we'll only need to run http.assault(url, requests, concurrency) from our cli function, and we'll pass the results from this function to a statistic module that we'll write later. We need to do a few things: Create an asynchronous queue to hold unmade requests (the size maps to our requests variable).Create worker tasks to match our concurrency value.Start the workers on the values in the queue.Wait for the queue to be processed.Return the results list. To achieve this, we'll have to learn quite a bit about the asyncio module. Running Asynchronous Code Using Async and Await We'll implement our functions from the bottom up, starting with the assault function. This function is synchronous, so we'll write it like we've written any other function up to this point. The distribute_work function is going to be asynchronous, so we'll need to call it using the asyncio.run function. Here's what our assault function will look like: assault/http.py import asyncio # remainder of functions omitted # Entrypoint to making requests def assault(url, requests, concurrency): results = [] asyncio.run(distribute_work(url, requests, concurrency, results)) print(results) First, we're going to create a list that we can have our asynchronous code add information to. This is not a thread-safe approach, but since we don't care about the order of the results and we're only going to add information to the list, this approach is fine. When we call asyncio.run, we pass in the result from calling distribute_work function with our information. This is a little weird because calling a function normally causes it to execute, but we're going to modify distribute_work so that it instead returns a coroutine that asyncio can schedule and run. Let's make distribute_work asynchronous to flesh out this idea: assault/http.py # remainder of functions omitted # Divide up work into batches and collect final results async def distribute_work(url, requests, concurrency, results): pass # Entrypoint to making requests def assault(url, requests, concurrency): results = [] asyncio.run(distribute_work(url, requests, concurrency, results)) print(results) By adding the async keyword before the function definition, we're specifying that the distribute_work function is a coroutine that can only be run a couple of special ways: It's executed by asyncio.run. OR It's "waited on" in another coroutine using the await keyword. Since assault is a synchronous function, we need to use asyncio.run to be able to execute distribute_work. The async and await keywords allow us to create code that we intend to run asynchronously, but when we're using the coroutines within our code, they read and behave synchronously. We'll learn more about async and await as we progress through this file. Distributing Work The distribute_work function is the most complicated function that we'll need to define because it has to do quite a few things to orchestrate all of the requests that we want to make. We'll start by creating an asyncio.Queue that we can add our URL to once for each request that we'd like to make: assault/http.py # remainder of functions omitted # Divide up work into batches and collect final results async def distribute_work(url, requests, concurrency, results): queue = asyncio.Queue() # Add an item to the queue for each request we want to make for _ in range(requests): queue.put_nowait(url) # Entrypoint to making requests def assault(url, requests, concurrency): results = [] asyncio.run(distribute_work(url, requests, concurrency, results)) print(results) The queue that we've created is meant to be used in async programming, and we want it to have an item for every request that we want to make. The simplest way to do this is to use the put_nowait method and pass in the url. This function puts the item at the end of the queue without blocking and immediately moves on to the next line of code. Now that we have a queue, we want to create our concurrent "workers". For this, we're going to create a task using the worker function to match our concurrency value. Let's create these tasks and store them in a list: assault/http.py (partial) # Divide up work into batches and collect final results async def distribute_work(url, requests, concurrency, results): queue = asyncio.Queue() # Add an item to the queue for each request we want to make for _ in range(requests): queue.put_nowait(url) # Create workers to match the concurrency tasks = [] for i in range(concurrency): task = asyncio.create_task(worker(f"worker-{i+1}", queue, results)) tasks.append(task) When we create the task using asyncio.create_task, it will start the worker coroutine. These workers will be responsible for making our requests. The remainder of this function revolves around waiting for the items in the queue to be processed: assault/http.py import asyncio import time # Divide up work into batches and collect final results async def distribute_work(url, requests, concurrency, results): queue = asyncio.Queue() # Add an item to the queue for each request we want to make for _ in range(requests): queue.put_nowait(url) # Create workers to match the concurrency tasks = [] for i in range(concurrency): task = asyncio.create_task(worker(f"worker-{i+1}", queue, results)) tasks.append(task) started_at = time.monotonic() await queue.join() total_time = time.monotonic() - started_at for task in tasks: task.cancel() print("---") print( f"{concurrency} workers took {total_time:.2f} seconds to complete {len(results)} requests" ) Now we need to get our start time and wait for every item in the asyncio.Queue to be processed by calling: await queue.join() This will start all of the tasks, which will each take an item from the queue, process it, and mark it as completed. The last thing we need to do is go through the tasks list and call .cancel() on each one. We need to do this because we're going to have these worker coroutines be infinite loops. We'll cover the worker and fetch functions in the next lesson.

Concurrent Programming with Async & Await — Part 2

00:13:14

Lesson Description:

We've distributed our work across various workers, but those workers don't actually do anything yet. In this lesson, we'll tackle the worker and making HTTP requests. Documentation for This Video asyncioRequestsasyncio.Queueasyncio.loop.run_in_executor Implementing the worker Function We have one worker for each concurrent request we want to be able to make, but each worker will work its way through more than one request. To handle more than one request, we're going to run an infinite loop within each of our workers that will wait for a new item to be added to the queue. We're going to add the entire implementation for the worker up front and then break down each of the parts. Let's add it now: assault/http.py (partial) import asyncio import os import time # Function to continue to process work from queue async def worker(name, queue, results): loop = asyncio.get_event_loop() while True: url = await queue.get() if os.getenv("DEBUG"): print(f"{name} - Fetching {url}") future_result = loop.run_in_executor(None, fetch, url) result = await future_result results.append(result) queue.task_done() The first thing that we're going to do is get the event loop that our current asynchronous code is running within. We're going to use this event loop within our while loop to asynchronously execute our fetch function. Moving into the while loop, the first thing that we need to do is get the URL from the queue. Since our queue is designed to be used asynchronously, that means that when we call queue.get, we need to use the await keyword to say that we want to wait for a value to be returned to us. Then, we just have a little debug statement so that we can see which worker is making a request when we actually run this code. Next, we're going to use loop.run_in_executor to take our fetch function and run it as a coroutine on our current event loop. This allows us to run a function that we know has some blocking code in it (such as a network request) on the event loop. The requests library isn't written to be used with asyncio, but running our fetch function on the event loop allows us to mostly get around that. We receive an asyncio.Future object from this function, which we can use await with to get the actual value back. Lastly, we'll add the result to our results list, and then we get to mark the item from the queue as complete by calling queue.task_done(). By doing this, we let the queue know that the item was processed and it can be considered fully removed. This is important because when we called queue.join(), we were saying that we wanted to wait until this method has been called for every item that was in the queue. Adding requests and Implementing fetch The last function that we need to implement is the fetch function. Thankfully, this function is going to be incredibly simple once we've pulled in the [requests][5] package. (assault) $ pipenv install requests ...We'll also add this to the setup.py, since end users of this package would need to have it installed. setup.py (partial) # What packages are required for this module to be executed? REQUIRED = ["click", "requests"] Finally, let's implement our fetch function: assault/http.py (partial) import asyncio import os import requests import time # Make the actual HTTP request and gather results def fetch(url): started_at = time.monotonic() response = requests.get(url) request_time = time.monotonic() - started_at return {"status_code": response.status_code, "request_time": request_time} The only pieces of information that we care about here are the amount of time that it took to perform the request (request_time) and the status code, so we'll put those into a simple dictionary to return as the result. Let's use the assault function within our CLI to test this: assault/cli.py import click from .http import assault @click.command() @click.option("--requests", "-r", default=500, help="Number of requests") @click.option("--concurrency", "-c", default=1, help="Number of concurrent requests") @click.option("--json-file", "-j", default=None, help="Path to output JSON file") @click.argument("url") def cli(requests, concurrency, json_file, url): print(f"Requests: {requests}") print(f"Concurrency: {concurrency}") print(f"JSON File: {json_file}") print(f"URL: {url}") assault(url, requests, concurrency) Now we're ready to test. Since we ran pip install -e . earlier, the console script for assault that is in our path when our virtualenv is active will always be using our newest code. Let's run it with the DEBUG value set: (assault) $ DEBUG=true assault -r 100 -c 10 https://google.com Requests: 100 Concurrency: 10 JSON File: None URL: https://google.com worker-1 - Fetching https://google.com worker-2 - Fetching https://google.com worker-3 - Fetching https://google.com worker-4 - Fetching https://google.com worker-5 - Fetching https://google.com ... worker-1 - Fetching https://google.com worker-10 - Fetching https://google.com worker-6 - Fetching https://google.com worker-4 - Fetching https://google.com worker-8 - Fetching https://google.com worker-2 - Fetching https://google.com worker-3 - Fetching https://google.com worker-7 - Fetching https://google.com worker-9 - Fetching https://google.com --- 10 workers took 2.56 seconds to complete 100 requests [{'status_code': 200, 'request_time': 0.27036608600000006}, {'status_code': 200, 'request_time': 0.276928557}, {'status_code': 200, 'request_time': 0.287770405}, ...]Most of the requests were omitted, but we can see that after the first batch, our workers will each make new requests as soon as they are available to. We've successfully implemented the most complicated portion of our tool!

Doctests and Types

00:17:47

Lesson Description:

Now that we've collected the information about all of the requests that we've made to our target, we're ready to calculate some statistics. In this lesson, we're going to start working on a class to calculate our results by using doctests and type hinting to add additional information to the class. Documentation for This Video The doctest moduleThe typing modulemypyVS Code Pyright PluginPydoc Writing Doctests and Type Hints If we run our tool, we're going to be able to collect some data that we can use in calculations, but ideally, we'd like to be able to write our code and test it without needing to assault a web server. To do this, we're going to write some automated tests and documentation for our statistics functions. For this to work, we're going to create data that we can easily pre-calculate the expected outputs for and then write our code to ensure that it returns the expected results. Let's get started by creating a file at assault/stats.py, laying out a Results class and some methods and then writing some doctests for them. Additionally, we're going to set up some type hints using the typing module: assault/stats.py from typing import List, Dict class Results: """ Results handles calculating statistics based on a list of requests that were made. Here's an example of what the information will look like: Successful requests 3000 Slowest 0.010s Fastest 0.001s Average 0.003s Total time 2.400s Requests Per Minute 90000 Requests Per Second 125 """ def __init__(self, total_time: float, requests: List[Dict]): self.total_time = total_time self.requests = requests def slowest(self) -> float: pass def fastest(self) -> float: pass def average_time(self) -> float: pass def total_time(self) -> float: pass def successful_requests(self) -> int: pass This looks a little weird, but what we've added to our method definitions are type hints using the typing module. Python is a dynamic language, so we can pass in arguments of any type to a function/method, and that function/method can return values of any type. Type hints allow us to specify what a method is supposed to take as an argument and what it will return. The language itself isn't going to benefit from these too much, but you can use external tools like mypy to run type analysis on our code base to see if we're ever using a method incorrectly based on the type hints. If you're using PyCharm, then you already have type checking baked into the IDE, so you'll see additional error messages. In VS Code, you can add the Pyright extension by Microsoft to get these types of errors if the function/method doesn't return the right type, or if you create a class or use a function with arguments that don't match the type hints. Now let's move on to writing some tests. Doctests work by allowing us to add what looks like a REPL into our docstrings, and those will be run through the interpreter. This gives us a good way to show how we expect our code to work and also lets us verify that it works properly given a shorter set of inputs. Let's write a doctest for the slowest method: assault/stats.py (partial) from typing import List, Dict class Results: # previous code omitted def slowest(self) -> float: """ Returns the slowest request's completion time >>> results = Results(10.6, [{ 'status_code': 200, 'request_time': 3.4 }, { 'status_code': 500, 'request_time': 6.1 }, { 'status_code': 200, 'request_time': 1.04 }]) >>> results.slowest() 6.1 """ pass # remaining code omitted We need to create an instance of the Results class to start. Our next line starting with >>> will call the slowest method, and then the line after that will display what we're expecting as the result. This is the basic setup that we'll use for all of the tests. It's worth noting that by adding docstrings we're actually giving ourselves a way to generate documentation using the pydoc module. Let's add the rest of our tests, and then we'll look at how to run them. assault/stats.py from typing import List, Dict class Results: """ Results handles calculating statistics based on a list of requests that were made. Here's an example of what the information will look like: Successful requests 3000 Slowest 0.010s Fastest 0.001s Average 0.003s Total time 2.400s Requests Per Minute 90000 Requests Per Second 125 """ def __init__(self, total_time: float, requests: List[Dict]): self.total_time = total_time self.requests = requests def slowest(self) -> float: """ Returns the slowest request's completion time >>> results = Results(10.6, [{ ... 'status_code': 200, ... 'request_time': 3.4 ... }, { ... 'status_code': 500, ... 'request_time': 6.1 ... }, { ... 'status_code': 200, ... 'request_time': 1.04 ... }]) >>> results.slowest() 6.1 """ pass def fastest(self) -> float: """ Returns the slowest request's completion time >>> results = Results(10.6, [{ ... 'status_code': 200, ... 'request_time': 3.4 ... }, { ... 'status_code': 500, ... 'request_time': 6.1 ... }, { ... 'status_code': 200, ... 'request_time': 1.04 ... }]) >>> results.fastest() 1.04 """ pass def average_time(self) -> float: """ Returns the slowest request's completion time >>> results = Results(10.6, [{ ... 'status_code': 200, ... 'request_time': 3.4 ... }, { ... 'status_code': 500, ... 'request_time': 6.1 ... }, { ... 'status_code': 200, ... 'request_time': 1.04 ... }]) >>> results.average_time() 9.846666667 """ pass def successful_requests(self) -> int: """ Returns the slowest request's completion time >>> results = Results(10.6, [{ ... 'status_code': 200, ... 'request_time': 3.4 ... }, { ... 'status_code': 500, ... 'request_time': 6.1 ... }, { ... 'status_code': 200, ... 'request_time': 1.04 ... }]) >>> results.successful_requests() 2 """ pass To run our tests, we'll load the doctest module using the -m flag to the python executable and then pass the path to the file: (assault) $ python -m doctest assault/stats.py ********************************************************************** File "assault/stats.py", line 74, in stats.Results.average_time Failed example: results.average_time() Expected: 9.846666667 Got nothing ********************************************************************** File "assault/stats.py", line 55, in stats.Results.fastest Failed example: results.fastest() Expected: 1.04 Got nothing ********************************************************************** File "assault/stats.py", line 36, in stats.Results.slowest Failed example: results.slowest() Expected: 6.1 Got nothing ********************************************************************** File "assault/stats.py", line 93, in stats.Results.successful_requests Failed example: results.successful_requests() Expected: 2 Got nothing ********************************************************************** 4 items had failures: 1 of 2 in stats.Results.average_time 1 of 2 in stats.Results.fastest 1 of 2 in stats.Results.slowest 1 of 2 in stats.Results.successful_requests ***Test Failed*** 4 failures.If we take a look at the average_time test, we see that it expected 2 as a result but Got nothing. Now we can implement our methods to get these tests to pass in the next lesson.

Calculating Statistics

00:09:04

Lesson Description:

We've laid out the Results class for calculating stats and made it easy to work with using type hints and doctests. Now we're ready to implement its functionality. In this lesson, we're going to calculate the stats that we want from the list of requests that we've made. Documentation for This Video The statistics moduleThe doctest moduleThe typing moduleThe sorted functionLambdas Determining the Slowest and Fastest Request Our list of requests is going to be given to us in the order that the requests were completed, and that's not the most useful for us. It would be more helpful if we received them in order of how long they took. Thankfully, to achieve this we can use the built-in sorted function to sort the list based on the request_time value of the dictionaries. We'll need this for a few different methods on our class, so to prevent ourselves from needing to repeat this calculation we'll do this in __init__ before setting the property on the instance: assault/stats.py (partial) from typing import List, Dict class Results: def __init__(self, total_time: float, requests: List[Dict]): self.total_time = total_time self.requests = sorted(requests, key=lambda r: r["request_time"]) # remaining code omitted Since we're working with dictionaries and we want to sort by the key, we need to use a lambda. Lambdas are anonymous function expressions, meaning they are single-expression functions that have an implicit return statement. We can pass a function to the key value on sorted, and we wanted a function that would take a dictionary item and return the request_time value. The equivalent named function for this lambda would be this: def request_time(request_dict): return request_dict['request_time'] Now the requests property on our instance will be sorted from fastest to slowest request time, and we can implement our fastest and slowest functions. assault/stats.py (partial) from typing import List, Dict class Results: # previous code omitted def slowest(self) -> float: """ Returns the slowest request's completion time >>> results = Results(10.6, [{ ... 'status_code': 200, ... 'request_time': 3.4 ... }, { ... 'status_code': 500, ... 'request_time': 6.1 ... }, { ... 'status_code': 200, ... 'request_time': 1.04 ... }]) >>> results.slowest() 6.1 """ return self.requests[-1]["request_time"] def fastest(self) -> float: """ Returns the slowest request's completion time >>> results = Results(10.6, [{ ... 'status_code': 200, ... 'request_time': 3.4 ... }, { ... 'status_code': 500, ... 'request_time': 6.1 ... }, { ... 'status_code': 200, ... 'request_time': 1.04 ... }]) >>> results.fastest() 1.04 """ return self.requests[0]["request_time"] # remaining code omitted We simply need to get the first item to get the fastest response time, and the last item to get the slowest. Now if we run our doctests again, we should see that we have no failures for fastest and slowest: (assault) $ python -m doctest assault/stats.py ********************************************************************** File "assault/stats.py", line 74, in stats.Results.average_time Failed example: results.average_time() Expected: 9.846666667 Got nothing ********************************************************************** File "assault/stats.py", line 93, in stats.Results.successful_requests Failed example: results.successful_requests() Expected: 2 Got nothing ********************************************************************** 2 items had failures: 1 of 2 in stats.Results.average_time 1 of 2 in stats.Results.successful_requests ***Test Failed*** 2 failures.Calculating the Average Request Time Calculating the average request time isn't really that complicated, but we don't even need to implement the algorithm to do it because we can leverage the standard library's statistics module. From the statistics module, we'll use the mean function and return the result from our average_time method. We will need to provide this function with a list of numbers rather than a list of dictionaries though, so we'll use a list comprehension: assault/stats.py (partial) from typing import List, Dict from statistics import mean class Results: # previous code omitted def average_time(self) -> float: """ Returns the slowest request's completion time >>> results = Results(10.6, [{ ... 'status_code': 200, ... 'request_time': 3.4 ... }, { ... 'status_code': 500, ... 'request_time': 6.1 ... }, { ... 'status_code': 200, ... 'request_time': 1.04 ... }]) >>> results.average_time() 9.846666667 """ return mean([r["request_time"] for r in self.requests]) # remaining code omitted The list comprehension allows us to extract information from one list and return a new list with the information that we'd like. In this case, we only want the request_time value from each dictionary. Let's run our tests and see what happens: (assault) $ python -m doctest assault/stats.py ********************************************************************** File "assault/stats.py", line 75, in stats.Results.average_time Failed example: results.average_time() Expected: 9.846666667 Got: 3.513333333333333 ********************************************************************** File "assault/stats.py", line 94, in stats.Results.successful_requests Failed example: results.successful_requests() Expected: 2 Got nothing ********************************************************************** 2 items had failures: 1 of 2 in stats.Results.average_time 1 of 2 in stats.Results.successful_requests ***Test Failed*** 2 failures.The test still fails! This is because we actually wrote our doctest to expect the total time instead of the average time. In this case, we need to fix the test instead of fixing the code. It's good to verify that our expectations are valid before writing an implementation so we don't end up in this situation. In this case, we're going to copy the value that was returned and put that back in the test as the expectation: assault/stats.py (partial) from typing import List, Dict from statistics import mean class Results: # previous code omitted def average_time(self) -> float: """ Returns the slowest request's completion time >>> results = Results(10.6, [{ ... 'status_code': 200, ... 'request_time': 3.4 ... }, { ... 'status_code': 500, ... 'request_time': 6.1 ... }, { ... 'status_code': 200, ... 'request_time': 1.04 ... }]) >>> results.average_time() 3.513333333333333 """ return mean([r["request_time"] for r in self.requests]) # remaining code omitted Running the tests one more time, we should see that there's only one failure remaining. Counting Successful Requests The last number that we need to calculate is the total number of successful requests. We're going to define "successful" as a request where the status_code value is in the range of 200-299. To do this, we can use another list comprehension with an added if segment and then count the number of items in the new list. assault/stats.py (partial) from typing import List, Dict from statistics import mean class Results: # previous code omitted def successful_requests(self) -> int: """ Returns the slowest request's completion time >>> results = Results(10.6, [{ ... 'status_code': 200, ... 'request_time': 3.4 ... }, { ... 'status_code': 500, ... 'request_time': 6.1 ... }, { ... 'status_code': 200, ... 'request_time': 1.04 ... }]) >>> results.successful_requests() 2 """ return len([r for r in self.requests if r["status_code"] in range(200, 299)]) List comprehension can read a lot like English and describe what's happening pretty well. This one says: Return `r` for each `r` in the `self.requests` list if the r's `status_code` is within the range 200 - 299If we run our tests again, we should see that there is no output because all of the tests now pass: (assault) $ python -m doctest assault/stats.py (assault) $

Presenting Results

00:17:23

Lesson Description:

The last feature that we need to implement is the display of results or exporting to JSON. In this lesson, we'll tie the pieces together and add the last of the remaining logic. Documentation for This Video json module Tying the Pieces Together First, let's modify the main function of the cli.py so that it actually uses the code that we've written. We need to use http.assault and stats.Results, and then we'll be able to start worrying about how the results are presented. Here's what main looks like: assault/cli.py import click from .http import assault from .stats import Results @click.command() @click.option("--requests", "-r", default=500, help="Number of requests") @click.option("--concurrency", "-c", default=1, help="Number of concurrent requests") @click.option("--json-file", "-j", default=None, help="Path to output JSON file") @click.argument("url") def cli(requests, concurrency, json_file, url): total_time, request_dicts = assault(url, requests, concurrency) results = Results(total_time, request_dicts) Note: We removed the __name__ == "__main__" portion. This brings up a couple of issues, and our editor may display them to us: We aren't actually returning anything from http.assault yet.Within the http module, we're never exposing the total_time value. Here are the modifications that we need to make to assault/http.py: assault/http.py (partial) # previous code omitted async def distribute_work(url, requests, concurrency, results): queue = asyncio.Queue() # Add an item to the queue for each request we want to make for _ in range(requests): queue.put_nowait(url) # Create workers to match the concurrency tasks = [] for i in range(concurrency): task = asyncio.create_task(worker(f"worker-{i+1}", queue, results)) tasks.append(task) started_at = time.monotonic() await queue.join() total_time = time.monotonic() - started_at for task in tasks: task.cancel() return total_time # Entrypoint to making requests def assault(url, requests, concurrency): results = [] total_time = asyncio.run(distribute_work(url, requests, concurrency, results)) return (total_time, results) Now the code that we've written within assault/cli.py should be good. Displaying the Results We have the results within assault/cli.py now, and we're going to create another function that will display that information based on whether there is a json_file value: assault/cli.py import click from .http import assault from .stats import Results @click.command() @click.option("--requests", "-r", default=500, help="Number of requests") @click.option("--concurrency", "-c", default=1, help="Number of concurrent requests") @click.option("--json-file", "-j", default=None, help="Path to output JSON file") @click.argument("url") def cli(requests, concurrency, json_file, url): total_time, request_dicts = assault(url, requests, concurrency) results = Results(total_time, request_dicts) display(results, json_file) def display(results, json_file): if json_file: # Write to a file print("We're writing to a JSON file") else: # Print to screen print(".... Done!") print("--- Results ---") print(f"Successful Requestst{results.successful_requests()}") print(f"Slowest t{results.slowest()}s") print(f"Fastest t{results.fastest()}s") print(f"Total time t{results.total_time}s") print(f"Requests Per Minutet{results.requests_per_minute()}") print(f"Requests Per Secondt{results.requests_per_second()}") There are more complex ways that we could print out the information while lining things up, but since our output isn't localized, this is good enough. We do have another issue though; we forgot about the requests_per_minute and requests_per_second. Let's go add those to the stats.Results class: assault/stats.py (partial) # import omitted class Results: # previous code omitted def requests_per_minute(self) -> int: """ Returns the number of requests made per minute >>> results = Results(10.6, [{ ... 'status_code': 200, ... 'request_time': 3.4 ... }, { ... 'status_code': 500, ... 'request_time': 6.1 ... }, { ... 'status_code': 200, ... 'request_time': 1.04 ... }]) >>> results.requests_per_minute() 17 """ # 3 / 10.6 = x / 60 # 60 * 3 / 10.6 = x return round(60 * len(self.requests) / self.total_time) def requests_per_second(self) -> int: """ Returns the number of requests made per second >>> results = Results(3.5, [{ ... 'status_code': 200, ... 'request_time': 3.4 ... }, { ... 'status_code': 500, ... 'request_time': 2.9 ... }, { ... 'status_code': 200, ... 'request_time': 1.04 ... }, { ... 'status_code': 200, ... 'request_time': 0.4 ... }]) >>> results.requests_per_second() 1 """ # 4 / 3.5 = x / 1 return round(len(self.requests) / self.total_time) Our tests should now pass, and we're ready to see if things display properly. (assault) $ assault -c 10 -r 100 https://google.com .... Done! --- Results --- Successful Requests 100 Slowest 0.3454575660000001s Fastest 0.20539223699999987s Total time 2.560716678s Requests Per Minute 2343 Requests Per Second 39This is looking pretty good! It would be nice to have some sort of progress indicator while the requests are being made, but that's something for a different time. Writing JSON to a File Exporting to JSON isn't going to be too difficult because of the json module. In the JSON branch of our display function, we're going to call json.dump . If a json_file value exists, we need to make sure that we can write to the file before we ever make requests. Here's what our final assault/cli.py file is going to look like: assault/cli.py import click import sys import json from typing import TextIO from .http import assault from .stats import Results @click.command() @click.option("--requests", "-r", default=500, help="Number of requests") @click.option("--concurrency", "-c", default=1, help="Number of concurrent requests") @click.option("--json-file", "-j", default=None, help="Path to output JSON file") @click.argument("url") def cli(requests, concurrency, json_file, url): output_file = None if json_file: try: output_file = open(json_file, "w") except: print(f"Unable to open file {json_file}") sys.exit(1) total_time, request_dicts = assault(url, requests, concurrency) results = Results(total_time, request_dicts) display(results, output_file) def display(results: Results, json_file: TextIO): if json_file: # Write to a file json.dump( { "successful_requests": results.successful_requests(), "slowest": results.slowest(), "fastest": results.fastest(), "total_time": results.total_time, "requests_per_minute": results.requests_per_minute(), "requests_per_second": results.requests_per_second(), }, json_file, ) json_file.close() print(".... Done!") else: # Print to screen print(".... Done!") print("--- Results ---") print(f"Successful Requestst{results.successful_requests()}") print(f"Slowest t{results.slowest()}s") print(f"Fastest t{results.fastest()}s") print(f"Total time t{results.total_time}s") print(f"Requests Per Minutet{results.requests_per_minute()}") print(f"Requests Per Secondt{results.requests_per_second()}") We've successfully created the minimum viable version of our load-testing CLI.

Easy Data Transformation

Project Overview and Setup: Database Export

00:03:28

Lesson Description:

We frequently have to work with data that we receive from a database or file. Knowing how to convert data from one format to another is an incredibly useful skill for our day-to-day work. In this lesson, we're going to lay out a project to map data from a database into Python objects so that we can easily export the information in different formats like CSV and JSON. Documentation for This Video PipenvPython .gitignoreSQLAlchemypsycopg2-binary Project Setup This project is mostly going to be a library of tools for us to use either in the REPL or within a different script. We want our library to provide a few things: Classes representing database tables (Product & Review)JSON and CSV export helpers for each class Let's create a project directory with a package and a models module: $ mkdir -p dbexport/dbexport $ cd dbexport $ touch dbexport/{__init__,models}.pyNow that we have the files we need, we're ready to finalize the project with Git and a virtualenv. The .gitignore File For our .gitignore file, we're going to use the one for Python maintained by GitHub. We can pull this down using the following curl command: $ curl https://raw.githubusercontent.com/github/gitignore/master/Python.gitignore -o .gitignoreAt this point it makes sense to also initialize our project as a Git repository, so let's do that: $ git initUsing Pipenv for our Virtual Environment Finally, we're going to use Pipenv to manage our virtual environment and development dependencies. We need SQLAlchemy to map our database tables to our model classes. Let's initialize our environment using Python 3.7 and install SQLAlchemy and psycopg2-binary as dependencies: $ pipenv install --python python3.7 SQLAlchemy psycopg2-binary ...Now we're ready to make our first commit and start developing our tool: $ git add --all . $ git commit -m 'Initial commit'

Setting Up a Database Cloud Playground

00:03:44

Lesson Description:

Before we can write anything useful to transform our data, we need a database to work with. In this lecture, we're going to use a Cloud Playground to run a PostgreSQL database with products and product reviews. Documentation for This Video The db_setup.sh Script Setting Up the Server For our PostgreSQL server, we'll use a CentOS 7 Cloud Playground, and all we need to do is pull down our db_setup.sh script and run it. This script will install Docker and run a database container for us that is automatically populated with some information. During the process, we'll need to put in our sudo password a few times and also set a database user and password. For this example, I'll be using admin and password: $ curl -O https://raw.githubusercontent.com/linuxacademy/content-python-use-cases/master/helpers/db_setup.sh $ chmod +x db_setup.sh $ ./db_setup.sh ...Now we can access our database using the postgres:// URL scheme with the following format: postgres://USER:PASSWORD@PUBLIC_IP:PORT/DB_NAMEOnly certain ports are open publicly for Cloud Playgrounds, so if we're connecting to this machine from something other than a Cloud Playground, we'll need to use the public IP address and port 80. From a different Cloud Playground, we can use the private IP address and the standard port of 5432.

Configuring a SQLAlchemy Connection

00:16:20

Lesson Description:

Now that our database is configured, we're ready to start interacting with it from within our Python code. We've already added SQLAlchemy to our project, and now we'll configure it to connect to our database server. Documentation for This Video SQLAlchemySQLAlchemy Engine ConfigurationCreating SQLAlchemy Sessionsfunctools.lru_cache Configuring a Connection We want our tool to be able to create a connection to a database based on either a string that is passed in or by fetching a connection from an environment variable. There are a few ways to do this. We can create a "connection" to run SQL queries directly, or if we want to work with the ORM (Object Relational Mapper), we can use a "session". We're going to add support for both in our library. Ideally, we'll run the connection code once when we run our program, and we'll put it in a config module so that it's obvious where we're doing the database configuration. We'll call the primary function get_connection, and we'll create an engine function to configure the engine (which our session will need also): dbexport/config.py import os from functools import lru_cache from sqlalchemy import create_engine @lru_cache(maxsize=32) def engine(db_url=None): db_url = db_url or os.getenv("DB_URL") if not db_url: raise ValueError("database URL is required") print(f"Returning an engine for {db_url}") return create_engine(db_url) def get_connection(db_url=None): return engine(db_url).connect() We're doing a few different things here: We're caching the result of engine so that it will only configure the engine value once based on a given database URL, and subsequent calls will return the same object from the cache. This is done by using the functools.lru_cache decorator.If there is no DB_URL environment variable and no string is manually passed in, then we'll raise an error because there is absolutely no way that we can connect to the database.The sqlalchemy.create_engine function will give us an engine configured to interact with a specific type of database (PostgreSQL, in this case), but we won't be able to interact with the database until we get a connection by using engine.connect. Let's give this a try in the REPL by connecting to our reviews database: (dbexport) $ DB_URL="postgres://admin:password@PUBLIC_IP:80/reviews" PYTHONPATH=. python >>> from dbexport.config import engine, get_connection >>> db = get_connection() Returning an engine for postgres://admin:password@PUBLIC_IP:80/reviews >>> engine() is engine() Returning an engine for postgres://admin:password@PUBLIC_IP:80/reviews True >>> engine() is engine(None) False >>> result = db.execute("SELECT count(id) FROM reviews") >>> row = result.first() >>> row[0] 2997Notice that although we call engine numerous times, it only prints the first time (when called by get_connection), and when we do the comparison using is, we see that two calls to the function both return the same object. This is the result of the lru_cache decorator caching the result from the first call. There's a difference between engine() and engine(None) — each call has a different number of arguments even though they are functionally equivalent. We're now able to create a database connection using an environment variable, but does it still work if we don't set DB_URL in the environment? Let's exit the REPL and start it back up without setting the variable: (dbexport) $ PYTHONPATH=. python >>> from dbexport.config import get_connection >>> db_url = "postgres://admin:password@PUBLIC_IP:80/reviews" >>> db = get_connection() ... ValueError: database URL is required >>> db = get_connection(db_url) Returning an engine for postgres://admin:password@PUBLIC_IP:80/reviewsWe're successfully raising an error if we have no URL, and we can also see that the lru_cache decorator depends on the arguments passed to the function. Creating a Session To work with the ORM (Object Relational Mapper), we will need to create a sessionmaker and then use sessions to interact with the database. A nice thing about sessions is that we get the benefit of transactions automatically and we can work with our eventual model objects as simple Python objects until we need to interact with the database. The sessionmaker function will create a new class for us that will be configured to interact with our database using the engine that we generate. Let's create our engine function and generate a new session class: dbexport/config.py import os from functools import lru_cache from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker @lru_cache(maxsize=32) def engine(db_url=None): db_url = db_url or os.getenv("DB_URL") if not db_url: raise ValueError("database URL is required") return create_engine(db_url) def get_connection(db_url=None): return engine(db_url).connect() @lru_cache(maxsize=32) def session_class(db_url=None): return sessionmaker(bind=engine(db_url)) try: Session = session_class() except: pass Now we have a function for generating a Session class. When the file is loaded for the first time, we'll attempt to generate a default Session class assuming that the user is utilizing the DB_URL configuration value. Let's load our module into the REPL without an environment variable set: (dbexport) $ PYTHONPATH=. python >>> from dbexport import config Failed to create default Session classWe're seeing this message because we can't create the default engine (it's raising a ValueError). Since the creation of the default Session class is just for convenience, we'll need to implement some error handling to prevent a crash. Let's remove this print statement and load the module one last time with an environment variable: dbexport/config.py import os from functools import lru_cache from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker @lru_cache(maxsize=32) def engine(db_url=None): db_url = db_url or os.getenv("DB_URL") if not db_url: raise ValueError("database URL is required") return create_engine(db_url) def get_connection(db_url=None): return engine(db_url).connect() @lru_cache(maxsize=32) def session_class(db_url=None): return sessionmaker(bind=engine(db_url)) try: Session = session_class() except: pass (dbexport) $ DB_URL="postgres://admin:password@PUBLIC_IP:80/reviews" PYTHONPATH=. python >>> from dbexport.config import Session >>> session = Session() >>> session <sqlalchemy.orm.session.Session object at 0x10c0c6f28> >>> session.bind Engine(postgres://admin:***@keiththomps2c.mylabserver.com:80/reviews)Now we have an easy way to get a Session class to create sessions that automatically connect to our database using the DB_URL. With all of this configuration in place, we're ready to start defining our models.

Modeling Products and Reviews

00:13:37

Lesson Description:

Now that we have a database connection, we're ready to start modeling our database using Python classes. In this lesson, we'll create the models for Product and Review. Documentation for This Video Creating SQLAlchemy MappingSQLAlchemy QueryingSQLAlchemy Relationships Understanding Our Database Schema Before we can map our database tables to models, we need to know what the database tables look like. We have two database tables that we want to map: products — The various items that our organization sells.reviews — Reviews for the products that our organization sells. These database tables are relatively simple. Here's the schema for each table in SQL: create table products ( id SERIAL PRIMARY KEY, name VARCHAR(50) UNIQUE NOT NULL, level INTEGER NOT NULL, published BOOLEAN NOT NULL DEFAULT false, created_on TIMESTAMP NOT NULL DEFAULT NOW() ); alter table products ADD CONSTRAINT level_check CHECK ( level >= 0 AND level <= 2 ); create table reviews ( id SERIAL PRIMARY KEY, product_id INTEGER REFERENCES products(id), rating INTEGER NOT NULL, comment TEXT, created_on TIMESTAMP NOT NULL DEFAULT NOW() ); alter table reviews add constraint rating_check CHECK ( rating > 0 AND rating <= 5 ); We need to create a class for each of our tables, and for each column, we'll need to specify an attribute using the Column class provided by SQLAlchemy. Before all of that, though, we need to create a model base class using the declarative_base function. Here's our starting point: dbexport/models.py from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, Boolean, TIMESTAMP, ForeignKey Base = declarative_base() class Product(Base): __tablename__ = "products" id = Column(Integer, primary_key=True) name = Column(String(50), nullable=False, unique=True) level = Column(Integer, nullable=False) published = Column(Boolean, nullable=False) created_on = Column(TIMESTAMP) class Review(Base): __tablename__ = "reviews" id = Column(Integer, primary_key=True) product_id = Column(Integer, ForeignKey("products.id")) rating = Column(Integer, nullable=False) comment = Column(Text) created_on = Column(TIMESTAMP) Notice that we're not doing anything with our database configuration. Our models can be used with any database that has these tables regardless of the database driver (PostgreSQL, MySQL, etc). We're lacking the interaction between our models, so let's work on that relationship now. Defining Model Relationships When looking at our models, we can say that a product has many reviews. This is known as a "one-to-many" relationship. The other relationship types are "one-to-one" and "many-to-many". Because the Product class has a "one-to-many" relationship with the Review class, it would make sense for us to be able to ask a Product instance for its reviews. From the database standpoint, there's nothing on the products table that gives any indication that there are reviews. Instead, each row in the reviews database points to the associated products table using the product_id column. Let's use the relationship capabilities of the SQLAlchemy ORM to define the relationship on both classes: dbexport/models.py from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, Boolean, TIMESTAMP, ForeignKey from sqlalchemy.orm import relationship Base = declarative_base() class Product(Base): __tablename__ = "products" id = Column(Integer, primary_key=True) name = Column(String, nullable=False) level = Column(Integer, nullable=False) published = Column(Boolean) created_on = Column(TIMESTAMP) reviews = relationship("Review", order_by="Review.rating", back_populates="product") class Review(Base): __tablename__ = "reviews" id = Column(Integer, primary_key=True) product_id = Column(Integer, ForeignKey("products.id")) rating = Column(Integer, nullable=False) comment = Column(Text) created_on = Column(TIMESTAMP) product = relationship("Product", back_populates="reviews") Now we have a great way to interact with our information, and we just need to learn how to utilize the SQLAlchemy querying interface. Our First SQLAlchemy Query The Session class that we generate in the config module is what will be doing the bulk of the work of interacting with the database. Our models are really just here to make it easier for us to conceptually work with the data. Let's load our code back into the REPL and take a look at how we can interact with the information: (dbexport) $ DB_URL="postgres://admin:password@PUBLIC_IP:80/reviews" PYTHONPATH=. python >>> from dbexport.config import Session >>> from dbexport.models import Review, Product >>> session = Session() >>> from sqlalchemy import func >>> session.query(func.count(Product.id)) <sqlalchemy.orm.query.Query object at 0x1028fe630> >>> session.query(func.count(Product.id)).all() [(999,)] >>> products = session.query(Product).limit(5).all() >>> products [<dbexport.models.Product object at 0x10294cda0>, <dbexport.models.Product object at 0x10294ce10>, <dbexport.models.Product object at 0x10294ce80>, <dbexport.models.Product object at 0x10294cef0>, <dbexport.models.Product object at 0x10294cf60>] >>> for product in products: ... print(product.name) ... unactability sporadically actinostomal unsaturation exocrine >>> products[0].reviews [<dbexport.models.Review object at 0x1029c78d0>, <dbexport.models.Review object at 0x1029c7940>]Each time we make a query using the session.query function, it will return a query object. It will not interact with the database until we run some specific functions on the Query class, such as all.

Utilize the Library to Export CSV

00:12:50

Lesson Description:

Our library for interacting with our database is effectively complete and can be used by people on our team to export data. In this lesson, we'll write a custom script requested by someone else in our company to export the data in CSV format. Documentation for This Video SQLAlchemy QueryingSQLAlchemy Query ClassPython csv Module Adding a setup.py We skipped the step where we add a setup.py to our project, but we want to be able to install our project so that it's easy to reference from scripts that we write. For the setup.py, we can use setup.py for Humans. We'll need to make some modifications, but this file will save us a lot of time. Let's download the file and start modifying it: $ curl -O https://raw.githubusercontent.com/navdeep-G/setup.py/master/setup.pyWe need to change things in the # Package meta-data section to be about dbexport: setup.py (partial) # Package meta-data. NAME = "dbexport" DESCRIPTION = "Internal library for interacting with Products and Reviews database" URL = "https://github.com/example/dbexport" EMAIL = "me@example.com" AUTHOR = "Awesome Soul" REQUIRES_PYTHON = ">=3.6.0" VERSION = "0.1.0" # What packages are required for this module to be executed? REQUIRED = ["sqlalchemy", "psycopg2-binary"] Now we can install our package: (dbexport) $ pip install -e .Writing a Script to Export CSV Our library is very limited in scope; it only handles connecting to the database using the DB_URL environment variable convention and provides some models that map to the shape of our database tables. For everything else that we want to do, we'll most likely just use this library in small one-off scripts. One of our coworkers has requested a CSV file that has one line for each product that includes some standard information and also some aggregate review information. Here's an example CSV file with a header and a single row: name,level,published,created_on,review_count,avg_rating Product 1,1,True,2019-07-10,10,4.3 The last two columns, review_count and avg_rating, will be calculated as part of our query. Let's start working on a script called product_csv.py. This script will have slightly more advanced SQL in it, but we'll work our way through it: product_csv.py from dbexport.config import Session from dbexport.models import Product, Review from sqlalchemy.sql import func session = Session() reviews_statement = ( session.query( Review.product_id, func.count("*").label("review_count"), func.avg(Review.rating).label("avg_rating"), ) .group_by(Review.product_id) .subquery() ) for product, review_count, avg_rating in ( session.query( Product, reviews_statement.c.review_count, reviews_statement.c.avg_rating ) .outerjoin(reviews_statement, Product.id == reviews_statement.c.product_id) .limit(6) ): print(product) print(review_count) print(avg_rating) Let's break down what we're doing after we create our session: We create a subquery that will calculate the average rating and count of the reviews. We then add this to the final query that we're going to make.We create our products query so that it returns the Product models and the calculated values for the review information. Because this query is returning more information than we defined in the Product model, SQLAlchemy will return a tuple for each row returned. With our final query, we're leveraging the fact that a query is a generator by utilizing it directly in a for loop and unpacking the returned tuples. For our first run of this script, we've set a limit, but we'll remove this after we make sure that it can run. (dbexport) $ DB_URL=$DB_URL python product_csv.py <dbexport.models.Product object at 0x102172b38> 6 3.8333333333333333 <dbexport.models.Product object at 0x1021e7240> 6 2.1666666666666667 <dbexport.models.Product object at 0x1021e7358> 2 3.0000000000000000 <dbexport.models.Product object at 0x1021e73c8> 3 2.6666666666666667 <dbexport.models.Product object at 0x1021e7438> 3 3.0000000000000000 <dbexport.models.Product object at 0x1021e74a8> 2 1.5000000000000000Exporting CSV This looks pretty good, and now we're ready to export this information as CSV using the standard library's csv module. product_csv.py from dbexport.config import Session from dbexport.models import Product, Review from sqlalchemy.sql import func import csv csv_file = open("product_ratings.csv", mode="w") fields = ["name", "level", "published", "created_on", "review_count", "avg_rating"] csv_writer = csv.DictWriter(csv_file, fieldnames=fields) csv_writer.writeheader() session = Session() reviews_statement = ( session.query( Review.product_id, func.count("*").label("review_count"), func.avg(Review.rating).label("avg_rating"), ) .group_by(Review.product_id) .subquery() ) for product, review_count, avg_rating in session.query( Product, reviews_statement.c.review_count, reviews_statement.c.avg_rating ).outerjoin(reviews_statement, Product.id == reviews_statement.c.product_id): csv_writer.writerow( { "name": product.name, "level": product.level, "published": product.published, "created_on": product.created_on.date(), "review_count": review_count or 0, "avg_rating": round(float(avg_rating), 4) if avg_rating else 0, } ) csv_file.close() We're able to use a csv.DictWriter to write a row for each of our query rows. We need to manipulate some of the returned information (e.g., add a default if there are no reviews for a given product). We also only want to return the date for created_on instead of the full datetime.

Exporting Data as JSON

00:04:23

Lesson Description:

The last thing that we want to do with our dbexport project is to write a script to export to JSON instead of CSV. In this lesson, we'll build a modified version of the CSV script that utilizes the same queries but also utilizes the json module from the standard library. Documentation for This Video The json ModuleThe json.dump Function Exporting Data as JSON The requirements for our JSON output are the same as for the CSV, except that we want to write out an array of JSON objects instead of rows. Let's start by copying the product_csv.py to product_json.py: (dbexport) $ cp product_{csv,json}.pyNext, we're going to remove the CSV-related logic and instead build up a list of dictionaries and write them to a JSON file using the json.dump function: product_json.py from dbexport.config import Session from dbexport.models import Product, Review from sqlalchemy.sql import func import json session = Session() reviews_statement = ( session.query( Review.product_id, func.count("*").label("review_count"), func.avg(Review.rating).label("avg_rating"), ) .group_by(Review.product_id) .subquery() ) products = [] for product, review_count, avg_rating in session.query( Product, reviews_statement.c.review_count, reviews_statement.c.avg_rating ).outerjoin(reviews_statement, Product.id == reviews_statement.c.product_id): products.append({ "name": product.name, "level": product.level, "published": product.published, "created_on": str(product.created_on.date()), "review_count": review_count or 0, "avg_rating": round(float(avg_rating), 4) if avg_rating else 0, }) with open("product_ratings.json", "w") as f: json.dump(products, f) The only other change we made was with how we were writing out the created_on value. A date object is not serializable, so we needed instead to get a str. We converted the datetime to a date and then converted that to a str.

Web Scraping

Project Overview and Setup: Data Checker

00:04:43

Lesson Description:

Web scraping is useful for gathering data for all sorts of things, from data science to simple change notifications. Python is a pretty common language to use for web scraping. For this project, we'll build a web scraper that will monitor an open data set registry. As with all of our projects, we'll start with a blank slate and build up our project structure before digging into the code. Documentation for This Video PipenvPython .gitignoreScrapy Project Setup This project will mostly be one file that will scrape Data.gov's catalog and send us a daily email with information about the additions and removals. Since this is more of an application than an installable library or project, we're not going to have a setup.py for it. Let's create a project directory: $ mkdir data_checker $ cd data_checkerUsing Pipenv for our Virtual Environment We're going to use Pipenv to manage our virtual environment and development dependencies. We need Scrapy to provide us some nice tools for scraping a web page for information that we care about. Let's initialize our environment using Python 3.7 and install Scrapy as a dependency: $ pipenv install --python python3.7 scrapy ...Now we'll start the virtualenv: $ pipenv shell ... (data_checker) $Letting Scrapy Generate the Rest of the Project Scrapy is an opinionated tool that comes with some generators to help us maintain a consistent project structure. Since we've already installed Scrapy to our virtualenv, we can use the scrapy startproject command to build out the rest of our file and directory structure. We've already created the outer directory, so we'll specify that we want the project to be created within our current directory. (data_checker) $ scrapy startproject data_checker .Now we have a lot more files to work with, but for the most part, we'll only be working with spiders. Creating the Git Repository and Committing For our .gitignore file, we're going to utilize the one for Python maintained by GitHub. We can pull this down using the following curl command: $ curl https://raw.githubusercontent.com/github/gitignore/master/Python.gitignore -o .gitignoreAt this point it makes sense to also initialize our project as a Git repository, so let's do that: $ git initFinally, we're ready to make our first commit and start developing our scraper: $ git add --all . $ git commit -m 'Initial commit - generated scrapy project'

Creating a Spider

00:13:10

Lesson Description:

Scrapy allows us to build a spider class that will scrape the information off of a page and even move it onto other pages if we specify that there is pagination. In this lesson, we'll learn about Python generators and break down the HTML markup for Data.gov's catalog page as we write our web scraper. Documentation for This Video ScrapyScrapy DocumentationData.gov CatalogPython Generators Creating Our Spider Here's the general flow that our program will have: Our DatasetSpider will crawl Data.gov's catalog and extract Dataset items.Scrapy will automatically export the Dataset items as JSON after we set some configuration. The Dataset item will go in the items.py that was generated for us (we'll rename the class that's already in there). Let's create our spider. Thankfully, Scrapy provides a generator that will do this for us. (data_checker) $ scrapy genspider dataset catalog.data.gov Created spider 'dataset' using template 'basic' in module: data_checker.spiders.datasetIf we take a look at data_checker/spiders/dataset.py, we can see what we're starting with. We'll change the start_urls to point to the catalog URL: data_checker/spiders/dataset.py # -*- coding: utf-8 -*- import scrapy class DatasetSpider(scrapy.Spider): name = 'dataset' allowed_domains = ['catalog.data.gov'] start_urls = ['https://catalog.data.gov/dataset/'] def parse(self, response): pass The main thing that we'll be working with here is the parse method. Inspecting the HTML Using scrapy shell Scrapy comes with a tool that allows us to interact with markup directly from a REPL. Let's run scrapy shell: (data_checker) $ scrapy shell https://catalog.data.gov/dataset/ ... 2019-08-07 11:16:14 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://catalog.data.gov/robots.txt> (referer: None) 2019-08-07 11:16:14 [scrapy.downloadermiddlewares.robotstxt] DEBUG: Forbidden by robots.txt: <GET https://catalog.data.gov/dataset/> [s] Available Scrapy objects: [s] scrapy scrapy module (contains scrapy.Request, scrapy.Selector, etc) [s] crawler <scrapy.crawler.Crawler object at 0x10cda5c50> [s] item {} [s] request <GET https://catalog.data.gov/dataset/> [s] settings <scrapy.settings.Settings object at 0x10f1f6810> [s] Useful shortcuts: [s] fetch(url[, redirect=True]) Fetch URL and update local objects (by default, redirects are followed) [s] fetch(req) Fetch a scrapy.Request and update local objects [s] shelp() Shell help (print this help) [s] view(response) View response in a browser >>> response >>>We have no response, which is a bit of a problem. If we scroll up a little, we can see this output: DEBUG: Forbidden by robots.txtBy default, Scrapy will respect a website's robots.txt. Unfortunately, this prevents our spider from getting any information from the site. This might seem like it would completely shut down our project, but we can change this in our Scrapy project settings. All we have to do is edit the following line in the data_checker/settings.py: data_checker/settings.py (partial) # Obey robots.txt rules ROBOTSTXT_OBEY = False Now when we run our shell command we can interact with the response. (data_checker) $ scrapy shell https://catalog.data.gov/dataset/ ... 2019-08-07 11:21:07 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://catalog.data.gov/dataset> (referer: None) [s] Available Scrapy objects: [s] scrapy scrapy module (contains scrapy.Request, scrapy.Selector, etc) [s] crawler <scrapy.crawler.Crawler object at 0x106c29510> [s] item {} [s] request <GET https://catalog.data.gov/dataset/> [s] response <200 https://catalog.data.gov/dataset> [s] settings <scrapy.settings.Settings object at 0x106c29710> [s] spider <DefaultSpider 'default' at 0x10714ca10> [s] Useful shortcuts: [s] fetch(url[, redirect=True]) Fetch URL and update local objects (by default, redirects are followed) [s] fetch(req) Fetch a scrapy.Request and update local objects [s] shelp() Shell help (print this help) [s] view(response) View response in a browser >>> response <200 https://catalog.data.gov/dataset>Now we can interact with the response object and inspect the markup using XPaths or CSS identifiers. Poking around the web page in the browser, it looks like the CSS class of dataset-content should give us the container for each of the data set items on the page. There are 20 data sets shown on each page, so we can check our query by checking the length: >>> len(response.css(".dataset-content")) 20That looks like a good start. We'll want to query each result a little more to get a few things: The URL of the data setThe name of the data setThe organization that provides the data set Let's take a single item and see what other queries we can use to get these values. Take a look at the selectors documentation for more information on other ways that we could query for this information: Note: Your values may not be the same if the order of the data sets changes. >>> dataset = response.css(".dataset-content")[0] >>> dataset.css("h3.dataset-heading > a::text").get() 'Demographic Statistics By Zip Code' >>> dataset.css("h3.dataset-heading > a::attr(href)").get() '/dataset/demographic-statistics-by-zip-code-acfc9' >>> dataset.css(".dataset-organization::text").get() 'City of New York —'Now that we have this information, we're ready to start working on our spider's parse method. Generators and the yield Keyword The parse method is interesting because it's expected to be implemented as a generator. This means that this method can be started and stopped. When first run, the method will execute until it hits yield, and then it will stop. Then the method can be called again, and it will continue from where it previously yielded. In our case, we'll be yielding each of the items that we create. Let's customize our Dataset item before we implement parse: data_checker/items.py import scrapy class Dataset(scrapy.Item): name = scrapy.Field() link = scrapy.Field() organization = scrapy.Field() This is a simple class that functions as a named dictionary type. Now let's create some of these while our spider is parsing the page: data_checker/spiders/dataset.py # -*- coding: utf-8 -*- import scrapy from data_checker.items import Dataset class DatasetSpider(scrapy.Spider): name = 'dataset' allowed_domains = ['catalog.data.gov'] start_urls = ['http://catalog.data.gov/dataset/'] def parse(self, response): host = response.url.split("/dataset")[0] for dataset in response.css(".dataset-content"): yield Dataset( name=dataset.css("h3.dataset-heading > a::text").get(), link=host + dataset.css("h3.dataset-heading > a::attr(href)").get(), organization=dataset.css(".dataset-organization::text").get() ) A few things to note: We noticed that the href attributes were relative to the domain, so we pull the host from the response.url.We don't use return; we instead yield the Dataset for every iteration of our loop. Let's test this out and see what is scraped from the page using the scrapy crawl command: (data_checker) $ scrapy crawl dataset ... {'link': 'https://catalog.data.gov/dataset/mile-markers', 'name': 'Mile Markers', 'organization': 'State of North Dakota —'} 2019-08-07 14:25:36 [scrapy.core.scraper] DEBUG: Scraped from <200 https://catalog.data.gov/dataset> {'link': 'https://catalog.data.gov/dataset/global-surface-summary-of-the-day-gsod', 'name': 'Global Surface Summary of the Day - GSOD', 'organization': 'National Oceanic and Atmospheric Administration, Department ' 'of Commerce —'} 2019-08-07 14:25:36 [scrapy.core.engine] INFO: Closing spider (finished) 2019-08-07 14:25:36 [scrapy.statscollectors] INFO: Dumping Scrapy stats: {'downloader/request_bytes': 944, 'downloader/request_count': 4, 'downloader/request_method_count/GET': 4, 'downloader/response_bytes': 198701, 'downloader/response_count': 4, 'downloader/response_status_count/200': 1, 'downloader/response_status_count/301': 3, 'elapsed_time_seconds': 2.175434, 'finish_reason': 'finished', 'finish_time': datetime.datetime(2019, 8, 7, 18, 25, 36, 691346), 'item_scraped_count': 20, 'log_count/DEBUG': 24, 'log_count/INFO': 10, 'memusage/max': 51769344, 'memusage/startup': 51769344, 'response_received_count': 1, 'scheduler/dequeued': 4, 'scheduler/dequeued/memory': 4, 'scheduler/enqueued': 4, 'scheduler/enqueued/memory': 4, 'start_time': datetime.datetime(2019, 8, 7, 18, 25, 34, 515912)} 2019-08-07 14:25:36 [scrapy.core.engine] INFO: Spider closed (finished)We've truncated the results, but it scraped 20 items correctly. The organization names in the results all end with a — (that's an em dash, not a hyphen), so let's strip that off. data_checker/spiders/dataset.py # -*- coding: utf-8 -*- import scrapy from data_checker.items import Dataset class DatasetSpider(scrapy.Spider): name = 'dataset' allowed_domains = ['catalog.data.gov'] start_urls = ['http://catalog.data.gov/dataset/'] def parse(self, response): host = response.url.split("/dataset")[0] for dataset in response.css(".dataset-content"): yield Dataset( name=dataset.css("h3.dataset-heading > a::text").get(), link=host + dataset.css("h3.dataset-heading > a::attr(href)").get(), organization=dataset.css(".dataset-organization::text").get().strip(" —") ) We're successfully scraping the items off of the page, but there are more pages to scrape. In the next lecture, we'll add pagination to our parse method.

Adding Pagination to the Spider

00:06:35

Lesson Description:

We've successfully extracted the dataset information off of a single page, and now we're ready to see what it would take to extract the information off of all of the pages. In this lesson, we'll add pagination to our spider's parse method. Documentation for This Video ScrapyScrapy DocumentationData.gov CatalogFollowing Links Navigating to the Next Page As it stands right now, we'll continue to yield Dataset items from within our parse method until no more are found on the page. But if we want all of the items, we need to continue on to the next page. Thankfully, when the parse method is called after the last item is yielded, it will continue through the method, and we can add logic that will navigate to the next page. To do this, we'll use the response.follow method. We can grab the paginator at the bottom of the page using the pagination class in CSS, and we can then use the link present within the last li element. *data_checker/spiders/dataset.py # -*- coding: utf-8 -*- import scrapy from data_checker.items import Dataset class DatasetSpider(scrapy.Spider): name = 'dataset' allowed_domains = ['catalog.data.gov'] start_urls = ['http://catalog.data.gov/dataset/'] def parse(self, response): host = response.url.split("/dataset")[0] for dataset in response.css(".dataset-content"): yield Dataset( name=dataset.css("h3.dataset-heading > a::text").get(), link=host + dataset.css("h3.dataset-heading > a::attr(href)").get(), organization=dataset.css(".dataset-organization::text").get().strip(" —") ) for link in response.css(".pagination > ul > li:last-child:not(.active) > a"): yield response.follow(link, callback=self.parse) Our selector is pretty complicated because we need to handle the case where we're on the last page. For this case, we need our selector to look for the active class to identify the last li element on the page. Doing this will help us avoid infinitely looping over the last page. If we run this now, we're going to encounter a potential issue. It will run for a very long time because there are a lot of pages. This means we've succeeded, but letting it run on 11k+ pages will take too long for our purposes, so let's give ourselves a max_pages limit. *data_checker/spiders/dataset.py # -*- coding: utf-8 -*- import scrapy from data_checker.items import Dataset class DatasetSpider(scrapy.Spider): name = 'dataset' allowed_domains = ['catalog.data.gov'] start_urls = ['http://catalog.data.gov/dataset/'] max_pages = 5 def parse(self, response): host = response.url.split("/dataset")[0] for dataset in response.css(".dataset-content"): yield Dataset( name=dataset.css("h3.dataset-heading > a::text").get(), link=host + dataset.css("h3.dataset-heading > a::attr(href)").get(), organization=dataset.css(".dataset-organization::text").get().strip(" —") ) for link in response.css(".pagination > ul > li:last-child:not(.active) > a"): page_number = int(link.attrib['href'].split('=')[1]) if page_number > self.max_pages: break yield response.follow(link, callback=self.parse) Now when we run scrapy crawl dataset, we will get 100 items.

Exporting Items and Sending Emails — Part 1

00:14:34

Lesson Description:

We've scraped items from the dataset catalog, and now we're ready to store the information, compare it with the previous day's results, and send notification emails. In this lesson, we'll take a look at how Scrapy feed exports work, and we'll write an extension to send an email. Documentation for This Video ScrapyScrapy DocumentationData.gov CatalogScrapy Feed ExportsWriting a Scrapy Extension Storing Dataset Items Using a Feed Exporter Scrapy provides various types of feed exports. When we run our spider, we always want to store the results in a JSON file specified by the current date. Then we can have an extension that we'll write compare the file from yesterday to today's file and notify us via email if there has been a change. The first step is to store the data by configuring our FEED_FORMAT in our spider's custom_settings: data_checker/spiders/dataset.py # -*- coding: utf-8 -*- import scrapy from data_checker.items import Dataset class DatasetSpider(scrapy.Spider): name = 'dataset' allowed_domains = ['catalog.data.gov'] start_urls = ['http://catalog.data.gov/dataset/'] max_pages = 5 # Enable Feed Storage custom_settings = { 'FEED_FORMAT': 'json', 'FEED_URI': 'file:///tmp/%(time)s.json' } def parse(self, response): host = response.url.split("/dataset")[0] for dataset in response.css(".dataset-content"): yield Dataset( name=dataset.css("h3.dataset-heading > a::text").get(), link=host + dataset.css("h3.dataset-heading > a::attr(href)").get(), organization=dataset.css(".dataset-organization::text").get().strip(" —") ) for link in response.css(".pagination > ul > li:last-child:not(.active) > a"): page_number = int(link.attrib['href'].split('=')[1]) if page_number > self.max_pages: break yield response.follow(link, callback=self.parse) Now when we run our spider, the results will be written into the /tmp directory with the timestamp of when we ran the spider. This will allow us to grab previous runs based on a specific filename format. With this stored, we're ready to write an extension to read these files and determine if we should send an email. Writing a Custom Scrapy Extension Scrapy extensions are simple Python classes that implement the from_crawler class method. Additional methods can be implemented to tie into various signals that are emitted during a crawler run. We want to do something at the very end of the run, so for that, we'll need to tie into the engine_stopped signal. To start, let's create a new data_checker/extensions.py file to hold onto our new extension that we'll call EmailOnChange: data_checker/extensions.py from scrapy import signals from scrapy.exceptions import NotConfigured class EmailOnChange(object): @classmethod def from_crawler(cls, crawler): if not crawler.settings.getbool("EMAIL_ON_CHANGE_ENABLED"): raise NotConfigured # Create an instance of our extension extension = cls() crawler.signals.connect(extension.engine_stopped, signal=signals.engine_stopped) return extension def engine_stopped(self): pass The from_crawler class method will allow us to create an instance of our extension that is configured by the crawler's settings. We've also created a placeholder engine_stopped method that we will run when the engine_stopped signal has been emitted (this is just a naming convention). To enable our extension, we need to do a few things: Add our module to the EXTENSIONS setting in settings.py.Add our custom configuration value for EMAIL_ON_CHANGE_ENABLED to settings.py and set it to True. This is what that section of the settings.py will look like: data_checker/settings.py (partial) # Enable or disable extensions # See https://docs.scrapy.org/en/latest/topics/extensions.html EXTENSIONS = { 'data_checker.extensions.EmailOnChange': 500, } # EmailOnChange Settings EMAIL_ON_CHANGE_ENABLED = True Comparing Previous Results To compare our previous results, we're going to glob the files that match the pattern of a timestamp.json file and see if there is a difference between the two files using the glob and filecmp modules: data_checker/extensions.py import glob import filecmp from scrapy import signals from scrapy.exceptions import NotConfigured class EmailOnChange(object): @classmethod def from_crawler(cls, crawler): if not crawler.settings.getbool("EMAIL_ON_CHANGE_ENABLED"): raise NotConfigured # Create an instance of our extension extension = cls() crawler.signals.connect(extension.engine_stopped, signal=signals.engine_stopped) return extension def engine_stopped(self): runs = sorted(glob.glob("/tmp/[0-9]*-[0-9]*-[0-9]*T[0-9]*-[0-9]*-[0-9]*.json"), reverse=True) if len(runs) < 2: # We can't compare if there's only 1 run return current_file, previous_file = runs[0:2] if not filecmp.cmp(current_file, previous_file): print("nnTHE FILES ARE DIFFERENTnn") else: print("nnNO CHANGEnn") If we run scrapy crawl dataset, we should see that there has been NO CHANGE. To test that our change detection logic works, we can open the most recent JSON file that was created and modify one of the strings. After we've done that, if we run scrapy crawl dataset again, we'll see THE FILES ARE DIFFERENT. In the next lesson, we'll finish our extension by implementing the email sending logic.

Exporting Items and Sending Emails — Part 2

00:08:51

Lesson Description:

Our extension can handle determining whether or not the datasets changed between two spider runs, and now we're ready to add the logic that will send notification emails. Note: This is a continuation from Part 1. Documentation for This Video ScrapyScrapy DocumentationData.gov CatalogScrapy Feed ExportsSending EmailsWriting a Scrapy Extension Sending an Email Now that we can determine when there has been a change, we want to send an email to a specified email address to notify the recipient of the change. To accomplish this, we need to do a few things: Deploy a simple SMTP server to allow us to send emails.Set the mail settings in our settings.py if we need to customize anything.Use the scrapy.mail.MailSender class to send the email. Connecting to a real SMTP server (such as Gmail's) is outside the scope of this tutorial, but we can install a module that will run an SMTP server for us to test with. Let's install aiosmptd and start a server in a different shell: $ cd data_checker $ pipenv install aiosmtpd $ pipenv shell (data_checker) $ python -m aiosmtpd -n -l localhost:8025We won't see anything right away, but when we send a message, we'll see some debug output. This server will bind to port 8025, so we'll need to adjust our settings.py to handle this. We'll also add EMAIL_ON_CHANGE_DESTINATION so that we can configure where to send the email: data_checker/settings.py (partial) # Enable or disable extensions # See https://docs.scrapy.org/en/latest/topics/extensions.html EXTENSIONS = { 'data_checker.extensions.EmailOnChange': 500, } # EmailOnChange Settings EMAIL_ON_CHANGE_ENABLED = True EMAIL_ON_CHANGE_DESTINATION = 'user@example.com' # Mail Settigns MAIL_PORT = 8025 Next, we'll use the scrapy.mail.MailSender to send the mail from our extension: data_checker/extensions.py import glob import filecmp from scrapy import signals from scrapy.exceptions import NotConfigured from scrapy.mail import MailSender class EmailOnChange(object): def __init__(self, destination, mailer): self.destination = destination self.mailer = mailer @classmethod def from_crawler(cls, crawler): if not crawler.settings.getbool("EMAIL_ON_CHANGE_ENABLED"): raise NotConfigured destination = crawler.settings.get("EMAIL_ON_CHANGE_DESTINATION") if not destination: raise NotConfigured("EMAIL_ON_CHANGE_DESTINATION must be provided") mailer = MailSender.from_settings(crawler.settings) # Create an instance of our extension extension = cls(destination, mailer) crawler.signals.connect(extension.engine_stopped, signal=signals.engine_stopped) return extension def engine_stopped(self): runs = sorted(glob.glob("/tmp/[0-9]*-[0-9]*-[0-9]*T[0-9]*-[0-9]*-[0-9]*.json"), reverse=True) if len(runs) < 2: # We can't compare if there's only been 1 run return current_file, previous_file = runs[0:2] if not filecmp.cmp(current_file, previous_file): print("nnTHE FILES ARE DIFFERENTnn") with open(current_file) as f: self.mailer.send( to=[self.destination], subject="Datasets Changed", body="Changes in datasets detected, see attachment for current datasets", attachs=[(current_file.split('/')[-1], 'application/json', f)] ) else: print("nnNO CHANGEnn") If we modify the last crawl's JSON file and run scrapy crawl dataset again, the output from the terminal running aiosmtpd should indicate that a message was sent. It should look something like this: ---------- MESSAGE FOLLOWS ---------- Content-Type: multipart/mixed; boundary="===============9047154112257235713==" MIME-Version: 1.0 From: scrapy@localhost To: user@example.com Date: Thu, 08 Aug 2019 14:17:42 -0400 Subject: Datasets Changed X-Peer: ('127.0.0.1', 63408) ...

Conclusion

Final Steps

What's Next?

00:00:49

Lesson Description:

Thank you for taking the time to go through this course! I hope that you learned a lot, and I want to hear about it. If you could, please take a moment to rate the course—it will help me figure out what is working and what isn't. Now that you've completed this course, here are some additional courses I recommend that will leverage your knowledge of Python while teaching you something new: Google App Engine Deep DiveGoogle Kubernetes Engine Deep DiveCloud Functions Deep DiveAWS Lambda Deep Dive Remember that programming is a skill that needs to be practiced, so find problems to solve and keep pushing yourself. And let me know what you build in the community!