Tutorial

Использование ThreadPoolExecutor в Python 3

Published on August 26, 2020
Default avatar

By DavidMuller

Author of Intuitive Python

Русский
Использование ThreadPoolExecutor в Python 3

Автор выбрал COVID-19 Relief Fund для получения пожертвования в рамках программы Write for DOnations.

Введение

Потоки в Python представляют собой форму параллельного программирования, позволяющую программе выполнять несколько процедур одновременно. Параллелизм в Python также можно реализовать посредством использования нескольких процессов, однако потоки особенно хорошо подходят для ускорения приложений, использующих существенные объемы ввода/вывода.

Например, операции ввода-вывода включают отправку веб-запросов и чтение данных из файлов. В отличие от операций ввода вывода, операции процессора (например, математические операции со стандартной библиотекой Python) не становятся намного эффективнее при использовании потоков Python.

В состав Python 3 входит утилита ThreadPoolExecutor для выполнения кода в потоке.

В этом обучающем модуле мы используем ThreadPoolExecutor для ускоренной отправки сетевых запросов. Мы определим функцию, хорошо подходящую для вызова в потоках, используем ThreadPoolExecutor для выполнения этой функции и обработаем результаты выполнения.

В этом обучающем модуле мы будем составлять сетевые запросы для проверки существования страниц на портале Wikipedia.

Примечание. Тот факт, что операции ввода-вывода получают больше выгод от потоков, чем операции процессора, связан с использованием в Python глобальной блокировки интерпретатора, которая позволяет только одному потоку сохранять контроль над интерпретатором Python. Если хотите, вы можете узнать больше о глобальном блокировке интерпретатора Python в официальной документации по Python.

Предварительные требования

Для наиболее эффективного прохождения этого обучающего модуля требуется знакомство с программированием на Python и локальной средой программирования Python с requests.

Необходимую информацию можно получить, пройдя следующие обучающие модули:

  1. pip install --user requests==2.23.0

Шаг 1 — Определение функции для выполнения в потоках

Для начала определим функцию, которую мы хотим выполнить с помощью потоков.

Откройте этот файл, используя nano или предпочитаемый текстовый редактор или среду разработки:

  1. nano wiki_page_function.py

Для этого обучающего модуля мы напишем функцию, проверяющую существование страницы на портале Wikipedia:

wiki_page_function.py
import requests

def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status

Функция get_wiki_page_existence принимает два аргумента: URL страницы Wikipedia (wiki_page_url) и timeout — количество секунд ожидания ответа от этого URL.

get_wiki_page_existence использует пакет requests для отправки веб-запроса на этот URL. В зависимости от кода состояния ответа HTTP функция возвращает строку, описывающую наличие или отсутствие страницы. Разные коды состояния соответствуют разным результатам выполнения запроса HTTP. Эта процедура предполагает, что код состояния 200 (успех) означает, что страница Wikipedia существует, а код состояния 404 (не найдено) означает, что страница Wikipedia не существует.

Как указывалось в разделе «Предварительные требования», для запуска этой функции должен быть установлен пакет requests.

Попробуем запустить функцию, добавив url и вызов функции после функции get_wiki_page_existence:

wiki_page_function.py
. . .
url = "https://en.wikipedia.org/wiki/Ocean"
print(get_wiki_page_existence(wiki_page_url=url))

После добавления кода сохраните и закройте файл.

Если мы запустим этот код:

  1. python wiki_page_function.py

Результат будет выглядеть примерно следующим образом:

Output
https://en.wikipedia.org/wiki/Ocean - exists

Вызов функции get_wiki_page_existence для существующей страницы Wikipedia возвращает строку, подтверждающую фактическое существование страницы.

Предупреждение. Обычно небезопасно делать объекты или состояния Python доступными для всех потоков, не приняв особых мер для предотвращения ошибок параллельной обработки. При определении функции для выполнения в потоке лучше всего определить функцию, которая выполняет одну задачу и не делится своим состоянием с другими потоками. get_wiki_page_existence — хороший пример такой функции.

Шаг 2 — Использование ThreadPoolExecutor для выполнения функции в потоках

Теперь у нас есть функция, подходящая для вызова в потоках, и мы можем использовать ThreadPoolExecutor для многократного ускоренного вызова этой функции.

Добавьте следующий выделенный код в свою программу в файле wiki_page_function.py:

wiki_page_function.py
import requests
import concurrent.futures

def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status

wiki_page_urls = [
    "https://en.wikipedia.org/wiki/Ocean",
    "https://en.wikipedia.org/wiki/Island",
    "https://en.wikipedia.org/wiki/this_page_does_not_exist",
    "https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for url in wiki_page_urls:
        futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
    for future in concurrent.futures.as_completed(futures):
        print(future.result())

Посмотрим, как работает этот код:

  • concurrent.futures импортируется, чтобы предоставить нам доступ к ThreadPoolExecutor.
  • Выражение with используется для создания исполнительного блока экземпляра ThreadPoolExecutor, который будет быстро очищать потоки после выполнения.
  • Четыре задания отправляются в исполнительный блок: по одному для каждого URL из списка wiki_page_urls.
  • Каждый вызов submit возвращает экземпляр Future, хранящийся в списке futures.
  • Функция as_completed ожидает каждого вызова Future get_wiki_page_existence для выполнения, чтобы дать нам возможность распечатать результат.

Если мы снова запустим эту программу с помощью следующей команды:

  1. python wiki_page_function.py

Результат будет выглядеть примерно следующим образом:

Output
https://en.wikipedia.org/wiki/Island - exists https://en.wikipedia.org/wiki/Ocean - exists https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist https://en.wikipedia.org/wiki/Shark - exists

Этот вывод имеет смысл: 3 адреса URL указывают на существующие страницы Wikipedia, а один из них this_page_does_not_exist не существует. Обратите внимание. что вывод может иметь другой порядок, отличающийся от показанного здесь. Функция concurrent.futures.as_completed в этом примере возвращает результаты сразу же, как только они становятся доступными, вне зависимости от порядка отправки заданий.

Шаг 3 — Обработка исключений функций, выполняемых в потоках

На предыдущем шаге функция get_wiki_page_existence успешно вернула значения во всех случаях вызова. На этом шаге мы увидим, что ThreadPoolExecutor также может выводить исключения при вызове функций в потоках.

Рассмотрим в качестве примера следующий блок кода:

wiki_page_function.py
import requests
import concurrent.futures


def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status


wiki_page_urls = [
    "https://en.wikipedia.org/wiki/Ocean",
    "https://en.wikipedia.org/wiki/Island",
    "https://en.wikipedia.org/wiki/this_page_does_not_exist",
    "https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for url in wiki_page_urls:
        futures.append(
            executor.submit(
                get_wiki_page_existence, wiki_page_url=url, timeout=0.00001
            )
        )
    for future in concurrent.futures.as_completed(futures):
        try:
            print(future.result())
        except requests.ConnectTimeout:
            print("ConnectTimeout.")

Этот блок кода практически идентичен использованному нами на шаге 2, но имеет два важных отличия:

  • Теперь мы передаем аргумент timeout=0.00001 для функции get_wiki_page_existence. Поскольку пакет requests не может выполнить веб-запрос сайта Wikipedia за 0,00001 секунды, он выдаст исключение ConnectTimeout.
  • Мы собираем исключения ConnectTimeout, выдаваемые future.result(), и выводим строку в каждом таком случае.

Если мы запустим программу снова, мы получим следующий результат:

Output
ConnectTimeout. ConnectTimeout. ConnectTimeout. ConnectTimeout.

Выведено четыре сообщения ConnectTimeout, по одному для каждого из четырех значений wiki_page_urls, поскольку ни один запрос не мог быть выполнен за 0,00001 секунды, и каждый из четырех вызовов get_wiki_page_existence завершился исключением ConnectTimeout.

Мы увидели, что если вызов функции, отправленный в ThreadPoolExecutor, завершается исключением, это исключение может быть выведено обычным образом посредством вызова Future.result. Вызов Future.result для всех вызванных функций гарантирует, что ваша программа не пропустит никаких исключений при выполнении функции в потоке.

Шаг 4 — Сравнение времени исполнения с потоками и без потоков

Убедимся, что использование ThreadPoolExecutor действительно ускоряет нашу программу.

Вначале определим время выполнения функции get_wiki_page_existence при ее запуске без потоков:

wiki_page_function.py
import time
import requests
import concurrent.futures


def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status

wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]

print("Running without threads:")
without_threads_start = time.time()
for url in wiki_page_urls:
    print(get_wiki_page_existence(wiki_page_url=url))
print("Without threads time:", time.time() - without_threads_start)

В этом пример кода мы вызываем функцию get_wiki_page_existence с пятьюдесятью разными URL страниц Wikipedia по одной. Мы используем функцию time.time() для вывода количества секунд выполнения нашей программы.

Если мы запустим этот код снова, как и раньше, мы увидим следующий результат:

Output
Running without threads: https://en.wikipedia.org/wiki/0 - exists https://en.wikipedia.org/wiki/1 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Without threads time: 5.803015232086182

Записи 2–47 в выводимых результатах пропущены для краткости.

Количество секунд, выводимое после Without threads time, будет отличаться для вашего компьютера, и это нормально, ведь это просто базовое число для сравнения с получаемым при использовании ThreadPoolExecutor. В данном случае мы получили результат ~5,803 секунды.

Теперь снова пропустим те же пятьдесят URL страниц Wikipedia через функцию get_wiki_page_existence, но в этот раз с использованием ThreadPoolExecutor:

wiki_page_function.py
import time
import requests
import concurrent.futures


def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status
wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]

print("Running threaded:")
threaded_start = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for url in wiki_page_urls:
        futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
    for future in concurrent.futures.as_completed(futures):
        print(future.result())
print("Threaded time:", time.time() - threaded_start)

Это тот же самый код, который мы создали на шаге 2, только в него добавлены выражения print, показывающие время выполнения нашего кода в секундах.

Если мы снова запустим программу, мы увидим следующий результат:

Output
Running threaded: https://en.wikipedia.org/wiki/1 - exists https://en.wikipedia.org/wiki/0 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Threaded time: 1.2201685905456543

Количество секунд после Threaded time на вашем компьютере будет отличаться (как и порядок вывода).

Теперь вы можете сравнить время выполнения при доставке пятидесяти URL страниц Wikipedia с потоками и без потоков.

На компьютере, использованном для этого обучающего модуля, выполнение операций без потоков заняло ~5,803 секунды, а с потоками — ~1,220 секунды. С потоками наша программа работала значительно быстрее.

Заключение

В этом обучающем модуле мы научились использовать утилиту ThreadPoolExecutor в Python 3 для эффективного выполнения кода, связанного с операциями ввода-вывода. Вы создали функцию, хорошо подходящую для вызова в потоках, научились получать результаты и исключения при выполнении этой фукнции в потоках и оценили прирост производительности, достигаемый за счет использования потоков.

Далее вас могут заинтересовать другие функции параллельной обработки, доступные в модуле concurrent.futures.

Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.

Learn more about us


About the authors
Default avatar

Author of Intuitive Python

Check out Intuitive Python: Productive Development for Projects that Last

https://pragprog.com/titles/dmpython/intuitive-python/



Still looking for an answer?

Ask a questionSearch for more help

Was this helpful?
 
Leave a comment


This textbox defaults to using Markdown to format your answer.

You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!

Try DigitalOcean for free

Click below to sign up and get $200 of credit to try our products over 60 days!

Sign up

Join the Tech Talk
Success! Thank you! Please check your email for further details.

Please complete your information!

Get our biweekly newsletter

Sign up for Infrastructure as a Newsletter.

Hollie's Hub for Good

Working on improving health and education, reducing inequality, and spurring economic growth? We'd like to help.

Become a contributor

Get paid to write technical tutorials and select a tech-focused charity to receive a matching donation.

Welcome to the developer cloud

DigitalOcean makes it simple to launch in the cloud and scale up as you grow — whether you're running one virtual machine or ten thousand.

Learn more
DigitalOcean Cloud Control Panel