Tutorial

Verwenden von ThreadPoolExecutor in Python 3

PythonDevelopment

Der Autor hat den COVID-19 Relief Fund dazu ausgewählt, eine Spende im Rahmen des Programms Write for DOnations zu erhalten.

Einführung

Python-Threads stellen eine Form von Parallelismus dar, mit der Ihr Programm verschiedene Operationen gleichzeitig ausführen kann. Parallelismus in Python lässt sich auch durch Verwendung mehrerer Prozesse erzielen; Threads eignen sich jedoch besonders gut für die Beschleunigung von Anwendungen, die hohe I/O-Leistung benötigen.

Beispiel: I/O-gerichtete Operationen umfassen die Erstellung von Webanfragen und das Lesen von Daten aus Dateien. Im Gegensatz zu I/O-gerichteten Operationen werden CPU-gerichtete Operationen (wie die Ausführung von Berechnungen mit der Python-Standardbibliothek) von Python-Threads nur wenig profitieren.

Python 3 enthält das Dienstprogramm ThreadPoolExecutor zur Ausführung von Code in einem Thread.

In diesem Tutorial werden wir ThreadPoolExecutor verwenden, um zügige Netzwerkanfragen zu erstellen. Wir werden eine Funktion definieren, die für Aufrufe innerhalb von Threads geeignet ist, ThreadPoolExecutor zur Ausführung dieser Funktion nutzen und Ergebnisse aus diesen Ausführungen verarbeiten.

In diesem Tutorial werden wir Netzwerkanfragen stellen, um die Existenz von Wikipedia-Seiten zu überprüfen.

Anmerkung: Die Tatsache, dass I/O-gerichtete Operationen mehr von Threads profitieren als I/O-orientierte Operationen, hängt mit einer Eigenart von Python zusammen, die_ global interpreter loc_k genannt wird. Wenn Sie möchten, können Sie in der offiziellen Python-Dokumentation mehr über „global interpreter lock“ von Python erfahren.

Voraussetzungen

Für eine optimale Nutzung des Tutorials empfiehlt sich Vertrautheit mit der Programmierung in Python und einer lokalen Python-Programmierumgebung mit installiertem requests-Paket.

Sie können für die notwendigen Hintergrundinformationen diese Tutorials durchsehen:

  • pip install --user requests==2.23.0

Schritt 1 — Definieren einer Funktion zur Ausführung in Threads

Definieren wir zunächst eine Funktion, die wir mithilfe von Threads ausführen möchten.

Mit nano oder Ihrem bevorzugten Texteditor/Ihrer bevorzugten Entwicklungsumgebung können Sie diese Datei öffnen:

  • nano wiki_page_function.py

In diesem Tutorial werden wir eine Funktion schreiben, die ermittelt, ob eine Wikipedia-Seite vorhanden ist oder nicht:

wiki_page_function.py
import requests

def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status

Die Funktion get_wiki_page_existence akzeptiert zwei Argumente: eine URL zu einer Wikipedia-Seite (wiki_page_url) und eine timeout-Anzahl von Sekunden, während der auf eine Antwort von dieser URL gewartet werden soll.

get_wiki_page_existence nutzt das requests-Paket, um eine Webanfrage an diese URL zu stellen. Je nach Statuscode der HTTP-Antwort wird eine Zeichenfolge zurückgegeben, die beschreibt, ob die Seite vorhanden ist oder nicht. Verschiedene Statuscodes stellen verschiedene Ergebnisse einer HTTP-Anfrage dar. Hier gehen wir davon aus, dass ein 200-Statuscode („Erfolg“) bedeutet, dass die Wikipedia-Seite existiert, und ein 404-Statuscode („Nicht gefunden“) bedeutet, dass die Wikipedia-Seite nicht existiert.

Wie im Abschnitt zu den Voraussetzungen beschrieben, benötigen Sie das installierte requests-Paket, um diese Funktion ausführen zu können.

Versuchen wir, die Funktion auszuführen, indem wir die url und den Funktionsaufruf nach der Funktion get_wiki_page_existence hinzufügen:

wiki_page_function.py
. . .
url = "https://en.wikipedia.org/wiki/Ocean"
print(get_wiki_page_existence(wiki_page_url=url))

Nachdem Sie den Code hinzugefügt haben, speichern und schließen Sie die Datei.

Wenn wir diesen Code ausführen:

  • python wiki_page_function.py

Erhalten wir eine Ausgabe wie die folgende:

Output
https://en.wikipedia.org/wiki/Ocean - exists

Bei Aufruf der Funktion get_wiki_page_existence mit einer gültigen Wikipedia-Seite wird eine Zeichenfolge zurückgegeben, die bestätigt, dass die Seite tatsächlich existiert.

Achtung: Im Allgemeinen ist es nicht sicher, Python-Objekte oder -Status zwischen Threads zu teilen, ohne sorgfältig darauf zu achten, dass keine Parallelitätsfehler auftreten. Wenn Sie eine Funktion definieren, die in einem Thread ausgeführt werden soll, ist es am besten, eine Funktion festzulegen, die einen einzelnen Auftrag ausführt und den Status nicht an andere Threads weitergibt oder veröffentlicht. get_wiki_page_existence ist ein Beispiel für eine solche Funktion.

Schritt 2 — Verwenden von ThreadPoolExecutor zur Ausführung einer Funktion in Threads

Nachdem wir nun über eine Funktion verfügen, die sich in Threads aufrufen lässt, können wir ThreadPoolExecutor verwenden, um zügig mehrere Aufrufe dieser Funktion auszuführen.

Fügen Sie Ihrem Programm in wiki_page_function.py den folgenden hervorgehobenen Code hinzu:

wiki_page_function.py
import requests
import concurrent.futures

def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status

wiki_page_urls = [
    "https://en.wikipedia.org/wiki/Ocean",
    "https://en.wikipedia.org/wiki/Island",
    "https://en.wikipedia.org/wiki/this_page_does_not_exist",
    "https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for url in wiki_page_urls:
        futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
    for future in concurrent.futures.as_completed(futures):
        print(future.result())

Werfen wir einen Blick auf die Funktionsweise dieses Codes:

  • concurrent.futures wird importiert, um uns Zugriff auf ThreadPoolExecutor zu gewähren.
  • Eine with-Anweisung dient der Erstellung eines ThreadPoolExecutor-Instanz-Executors, der Threads unmittelbar nach dem Abschluss bereinigt.
  • Vier Aufträge werden dem Executor übergeben: einer für jede der URLs in der Liste wiki_page_urls.
  • Jeder Aufruf an submit gibt eine Future-Instanz zurück, die in der futures-Liste gespeichert ist.
  • Die Funktion as_completed wartet, bis jeder Future get_wiki_page_existence-Aufruf abgeschlossen ist, damit wir das Ergebnis ausgeben können.

Wenn wir dieses Programm mit dem folgenden Befehl erneut ausführen:

  • python wiki_page_function.py

Erhalten wir eine Ausgabe wie die folgende:

Output
https://en.wikipedia.org/wiki/Island - exists https://en.wikipedia.org/wiki/Ocean - exists https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist https://en.wikipedia.org/wiki/Shark - exists

Diese Ausgabe ergibt Sinn: drei der URLs sind gültige Wikipedia-Seiten, eine nicht (this_page_does_not_exist). Beachten Sie, dass Ihre Ausgabe eine andere Reihenfolge aufweisen kann als diese Ausgabe. Die Funktion concurrent.futures.as_completed in diesem Beispiel gibt Ergebnisse zurück, sobald sie verfügbar sind. Dabei ist es egal, in welcher Reihenfolge die Aufträge übermittelt wurden.

Schritt 3 — Vearbeiten von Ausnahmen bei Funktionsausführungen in Threads

Im vorherigen Schritt hat get_wiki_page_existence bei allen unseren Aufrufen erfolgreich einen Wert zurückgegeben. In diesem Schritt sehen wir, dass ThreadPoolExecutor auch Ausnahmen auslösen kann, die in Threaded-Funktionsaufrufen generiert werden.

Betrachten wir den folgenden beispielhaften Codeblock:

wiki_page_function.py
import requests
import concurrent.futures


def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status


wiki_page_urls = [
    "https://en.wikipedia.org/wiki/Ocean",
    "https://en.wikipedia.org/wiki/Island",
    "https://en.wikipedia.org/wiki/this_page_does_not_exist",
    "https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for url in wiki_page_urls:
        futures.append(
            executor.submit(
                get_wiki_page_existence, wiki_page_url=url, timeout=0.00001
            )
        )
    for future in concurrent.futures.as_completed(futures):
        try:
            print(future.result())
        except requests.ConnectTimeout:
            print("ConnectTimeout.")

Dieser Codeblock ist fast identisch mit dem, den wir in Schritt 2 verwendet haben; er weist jedoch zwei wichtige Unterschiede auf:

  • Wir übergeben nun timeout=0.00001 an get_wiki_page_existence. Da das requests-Paket seine Webanfrage an Wikipedia in 0,00001 Sekunden nicht abschließen kann, wird eine ConnectTimeout-Ausnahme ausgelöst.
  • Wir erfassen ConnectTimeout-Ausnahmen, die durch future.result() ausgelöst werden, und drucken dabei jedes Mal eine Zeichenfolge aus.

Wenn wir das Programm erneut ausführen, sehen wir die folgende Ausgabe:

Output
ConnectTimeout. ConnectTimeout. ConnectTimeout. ConnectTimeout.

Vier ConnectTimeout-Nachrichten werden ausgegeben (eine für jede unserer vier wiki_page_urls), da keine davon in 0,00001 Sekunden abgeschlossen werden konnte und jede der vier get_wiki_page_existence-Aufrufe eine ConnectTimeout-Ausnahme ausgelöst hat.

Sie haben gesehen, dass wenn ein Funktionsaufruf an einen ThreadPoolExecutor eine Ausnahme auslöst, diese Ausnahme normalerweise durch Aufruf von Future.result ausgelöst werden kann. Ein Aufruf von Future.result bei all Ihren übermittelten Aufrufen stellt sicher, dass Ihr Programm keine Ausnahmen verpasst, die von Ihrer Threaded-Funktion ausgelöst werden.

Schritt 4 — Vergleichen der Ausführungszeit mit und ohne Threads

Überprüfen wir nun, ob die Verwendung von ThreadPoolExecutor Ihr Programm tatsächlich schneller macht.

Lassen Sie uns zunächst die Ausführung von get_wiki_page_existence ohne Threads messen:

wiki_page_function.py
import time
import requests
import concurrent.futures


def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status

wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]

print("Running without threads:")
without_threads_start = time.time()
for url in wiki_page_urls:
    print(get_wiki_page_existence(wiki_page_url=url))
print("Without threads time:", time.time() - without_threads_start)

Im Codebeispiel rufen wir unsere get_wiki_page_existence-Funktion mit fünfzig verschiedenen URLs von Wikipedia-Seiten hintereinander auf. Wir verwenden die Funktion time.time(), um die Anzahl der Sekunden auszugeben, die für die Ausführung unseres Programms benötigt wurde.

Wenn wir diesen Code wie zuvor erneut ausführen, erhalten wir eine Ausgabe wie die folgende:

Output
Running without threads: https://en.wikipedia.org/wiki/0 - exists https://en.wikipedia.org/wiki/1 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Without threads time: 5.803015232086182

Einträge 2 bis 47 in dieser Ausgabe wurden der Kürze halber ausgelassen.

Die Anzahl der Sekunden, die nach Without threads time (Zeit ohne Threads) ausgegeben wird, wird sich bei Ausführung auf Ihrem Computer unterscheiden. Das ist in Ordnung; Sie erhalten einfach eine Baseline-Zahl, die Sie mit einer Lösung vergleichen können, die ThreadPoolExecutor nutzt. In diesem Fall waren es ~5,803 Sekunden.

Führen wir nun die gleichen fünfzig Wikipedia-URLs über get_wiki_page_existence aus, diesmal jedoch mit ThreadPoolExecutor:

wiki_page_function.py
import time
import requests
import concurrent.futures


def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status
wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]

print("Running threaded:")
threaded_start = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for url in wiki_page_urls:
        futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
    for future in concurrent.futures.as_completed(futures):
        print(future.result())
print("Threaded time:", time.time() - threaded_start)

Der Code ist der gleiche Code, den wir in Schritt 2 erstellt haben; diesmal enthält er jedoch zusätzlich einige Druckanweisungen, um die Anzahl der Sekunden anzuzeigen, die zur Ausführung unseres Codes benötigt wurden.

Wenn wir das Programm erneut ausführen, erhalten wir die folgende Ausgabe:

Output
Running threaded: https://en.wikipedia.org/wiki/1 - exists https://en.wikipedia.org/wiki/0 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Threaded time: 1.2201685905456543

Auch die Anzahl der Sekunden, die nach Threaded time (Zeit mit Threads) ausgegeben wird, wird sich auf Ihrem Computer unterscheiden (ebenso die Reihenfolge Ihrer Ausgabe).

Jetzt können Sie die Ausführungszeit beim Abrufen der fünfzig URLs von Wikipedia-Seiten mit und ohne Threads miteinander vergleichen.

Auf dem in diesem Tutorial verwendeten Rechner dauerte es ohne Threads ~5,803 Sekunden; mit Threads waren es ~1,220 Sekunden. Unser Programm lief mit Threads also deutlich schneller.

Zusammenfassung

In diesem Tutorial haben Sie erfahren, wie Sie das Dienstprogramm ThreadPoolExecutor in Python 3 verwenden können, um I/O-gerichteten Code effizient auszuführen. Sie haben eine Funktion erstellt, die sich für Aufrufe innerhalb von Threads eignet, gelernt, wie man sowohl Ausgaben als auch Ausnahmen von Threaded-Ausführungen dieser Funktion abruft, und den Leistungsschub beobachten können, der durch Verwendung von Threads entsteht.

Nun können Sie mehr über andere Parallelitätsfunktionen des concurrent.futures-Moduls erfahren.

Creative Commons License