Автор выбрал COVID-19 Relief Fund для получения пожертвования в рамках программы Write for DOnations.
Потоки в Python представляют собой форму параллельного программирования, позволяющую программе выполнять несколько процедур одновременно. Параллелизм в Python также можно реализовать посредством использования нескольких процессов, однако потоки особенно хорошо подходят для ускорения приложений, использующих существенные объемы ввода/вывода.
Например, операции ввода-вывода включают отправку веб-запросов и чтение данных из файлов. В отличие от операций ввода вывода, операции процессора (например, математические операции со стандартной библиотекой Python) не становятся намного эффективнее при использовании потоков Python.
В состав Python 3 входит утилита ThreadPoolExecutor
для выполнения кода в потоке.
В этом обучающем модуле мы используем ThreadPoolExecutor
для ускоренной отправки сетевых запросов. Мы определим функцию, хорошо подходящую для вызова в потоках, используем ThreadPoolExecutor
для выполнения этой функции и обработаем результаты выполнения.
В этом обучающем модуле мы будем составлять сетевые запросы для проверки существования страниц на портале Wikipedia.
Примечание. Тот факт, что операции ввода-вывода получают больше выгод от потоков, чем операции процессора, связан с использованием в Python глобальной блокировки интерпретатора, которая позволяет только одному потоку сохранять контроль над интерпретатором Python. Если хотите, вы можете узнать больше о глобальном блокировке интерпретатора Python в официальной документации по Python.
Для наиболее эффективного прохождения этого обучающего модуля требуется знакомство с программированием на Python и локальной средой программирования Python с requests
.
Необходимую информацию можно получить, пройдя следующие обучающие модули:
Установка Python 3 и настройка среды программирования в Ubuntu 18.04
Чтобы установить пакет requests
в локальную среду программирования Python, запустите следующую команду:
- pip install --user requests==2.23.0
Для начала определим функцию, которую мы хотим выполнить с помощью потоков.
Откройте этот файл, используя nano
или предпочитаемый текстовый редактор или среду разработки:
- nano wiki_page_function.py
Для этого обучающего модуля мы напишем функцию, проверяющую существование страницы на портале Wikipedia:
import requests
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
Функция get_wiki_page_existence
принимает два аргумента: URL страницы Wikipedia (wiki_page_url
) и timeout
— количество секунд ожидания ответа от этого URL.
get_wiki_page_existence
использует пакет requests
для отправки веб-запроса на этот URL. В зависимости от кода состояния ответа
HTTP функция возвращает строку, описывающую наличие или отсутствие страницы. Разные коды состояния соответствуют разным результатам выполнения запроса HTTP. Эта процедура предполагает, что код состояния 200
(успех) означает, что страница Wikipedia существует, а код состояния 404
(не найдено) означает, что страница Wikipedia не существует.
Как указывалось в разделе «Предварительные требования», для запуска этой функции должен быть установлен пакет requests
.
Попробуем запустить функцию, добавив url
и вызов функции после функции get_wiki_page_existence
:
. . .
url = "https://en.wikipedia.org/wiki/Ocean"
print(get_wiki_page_existence(wiki_page_url=url))
После добавления кода сохраните и закройте файл.
Если мы запустим этот код:
- python wiki_page_function.py
Результат будет выглядеть примерно следующим образом:
Outputhttps://en.wikipedia.org/wiki/Ocean - exists
Вызов функции get_wiki_page_existence
для существующей страницы Wikipedia возвращает строку, подтверждающую фактическое существование страницы.
Предупреждение. Обычно небезопасно делать объекты или состояния Python доступными для всех потоков, не приняв особых мер для предотвращения ошибок параллельной обработки. При определении функции для выполнения в потоке лучше всего определить функцию, которая выполняет одну задачу и не делится своим состоянием с другими потоками. get_wiki_page_existence
— хороший пример такой функции.
Теперь у нас есть функция, подходящая для вызова в потоках, и мы можем использовать ThreadPoolExecutor
для многократного ускоренного вызова этой функции.
Добавьте следующий выделенный код в свою программу в файле wiki_page_function.py
:
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = [
"https://en.wikipedia.org/wiki/Ocean",
"https://en.wikipedia.org/wiki/Island",
"https://en.wikipedia.org/wiki/this_page_does_not_exist",
"https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for url in wiki_page_urls:
futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
for future in concurrent.futures.as_completed(futures):
print(future.result())
Посмотрим, как работает этот код:
concurrent.futures
импортируется, чтобы предоставить нам доступ к ThreadPoolExecutor
.with
используется для создания исполнительного блока
экземпляра ThreadPoolExecutor
, который будет быстро очищать потоки после выполнения.отправляются
в исполнительный блок
: по одному для каждого URL из списка wiki_page_urls
.submit
возвращает экземпляр Future
, хранящийся в списке futures
.as_completed
ожидает каждого вызова Future
get_wiki_page_existence
для выполнения, чтобы дать нам возможность распечатать результат.Если мы снова запустим эту программу с помощью следующей команды:
- python wiki_page_function.py
Результат будет выглядеть примерно следующим образом:
Outputhttps://en.wikipedia.org/wiki/Island - exists
https://en.wikipedia.org/wiki/Ocean - exists
https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist
https://en.wikipedia.org/wiki/Shark - exists
Этот вывод имеет смысл: 3 адреса URL указывают на существующие страницы Wikipedia, а один из них this_page_does_not_exist
не существует. Обратите внимание. что вывод может иметь другой порядок, отличающийся от показанного здесь. Функция concurrent.futures.as_completed
в этом примере возвращает результаты сразу же, как только они становятся доступными, вне зависимости от порядка отправки заданий.
На предыдущем шаге функция get_wiki_page_existence
успешно вернула значения во всех случаях вызова. На этом шаге мы увидим, что ThreadPoolExecutor
также может выводить исключения при вызове функций в потоках.
Рассмотрим в качестве примера следующий блок кода:
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = [
"https://en.wikipedia.org/wiki/Ocean",
"https://en.wikipedia.org/wiki/Island",
"https://en.wikipedia.org/wiki/this_page_does_not_exist",
"https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for url in wiki_page_urls:
futures.append(
executor.submit(
get_wiki_page_existence, wiki_page_url=url, timeout=0.00001
)
)
for future in concurrent.futures.as_completed(futures):
try:
print(future.result())
except requests.ConnectTimeout:
print("ConnectTimeout.")
Этот блок кода практически идентичен использованному нами на шаге 2, но имеет два важных отличия:
timeout=0.00001
для функции get_wiki_page_existence
. Поскольку пакет requests
не может выполнить веб-запрос сайта Wikipedia за 0,00001
секунды, он выдаст исключение ConnectTimeout
.ConnectTimeout
, выдаваемые future.result()
, и выводим строку в каждом таком случае.Если мы запустим программу снова, мы получим следующий результат:
OutputConnectTimeout.
ConnectTimeout.
ConnectTimeout.
ConnectTimeout.
Выведено четыре сообщения ConnectTimeout
, по одному для каждого из четырех значений wiki_page_urls
, поскольку ни один запрос не мог быть выполнен за 0,00001
секунды, и каждый из четырех вызовов get_wiki_page_existence
завершился исключением ConnectTimeout
.
Мы увидели, что если вызов функции, отправленный в ThreadPoolExecutor
, завершается исключением, это исключение может быть выведено обычным образом посредством вызова Future.result
. Вызов Future.result
для всех вызванных функций гарантирует, что ваша программа не пропустит никаких исключений при выполнении функции в потоке.
Убедимся, что использование ThreadPoolExecutor
действительно ускоряет нашу программу.
Вначале определим время выполнения функции get_wiki_page_existence
при ее запуске без потоков:
import time
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
print("Running without threads:")
without_threads_start = time.time()
for url in wiki_page_urls:
print(get_wiki_page_existence(wiki_page_url=url))
print("Without threads time:", time.time() - without_threads_start)
В этом пример кода мы вызываем функцию get_wiki_page_existence
с пятьюдесятью разными URL страниц Wikipedia по одной. Мы используем функцию time.time()
для вывода количества секунд выполнения нашей программы.
Если мы запустим этот код снова, как и раньше, мы увидим следующий результат:
OutputRunning without threads:
https://en.wikipedia.org/wiki/0 - exists
https://en.wikipedia.org/wiki/1 - exists
. . .
https://en.wikipedia.org/wiki/48 - exists
https://en.wikipedia.org/wiki/49 - exists
Without threads time: 5.803015232086182
Записи 2–47 в выводимых результатах пропущены для краткости.
Количество секунд, выводимое после Without threads time
, будет отличаться для вашего компьютера, и это нормально, ведь это просто базовое число для сравнения с получаемым при использовании ThreadPoolExecutor
. В данном случае мы получили результат ~5,803 секунды
.
Теперь снова пропустим те же пятьдесят URL страниц Wikipedia через функцию get_wiki_page_existence
, но в этот раз с использованием ThreadPoolExecutor
:
import time
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
print("Running threaded:")
threaded_start = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for url in wiki_page_urls:
futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
for future in concurrent.futures.as_completed(futures):
print(future.result())
print("Threaded time:", time.time() - threaded_start)
Это тот же самый код, который мы создали на шаге 2, только в него добавлены выражения print, показывающие время выполнения нашего кода в секундах.
Если мы снова запустим программу, мы увидим следующий результат:
OutputRunning threaded:
https://en.wikipedia.org/wiki/1 - exists
https://en.wikipedia.org/wiki/0 - exists
. . .
https://en.wikipedia.org/wiki/48 - exists
https://en.wikipedia.org/wiki/49 - exists
Threaded time: 1.2201685905456543
Количество секунд после Threaded time
на вашем компьютере будет отличаться (как и порядок вывода).
Теперь вы можете сравнить время выполнения при доставке пятидесяти URL страниц Wikipedia с потоками и без потоков.
На компьютере, использованном для этого обучающего модуля, выполнение операций без потоков заняло ~5,803
секунды, а с потоками — ~1,220
секунды. С потоками наша программа работала значительно быстрее.
В этом обучающем модуле мы научились использовать утилиту ThreadPoolExecutor
в Python 3 для эффективного выполнения кода, связанного с операциями ввода-вывода. Вы создали функцию, хорошо подходящую для вызова в потоках, научились получать результаты и исключения при выполнении этой фукнции в потоках и оценили прирост производительности, достигаемый за счет использования потоков.
Далее вас могут заинтересовать другие функции параллельной обработки, доступные в модуле concurrent.futures
.
Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.
This textbox defaults to using Markdown to format your answer.
You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!
Sign up for Infrastructure as a Newsletter.
Working on improving health and education, reducing inequality, and spurring economic growth? We'd like to help.
Get paid to write technical tutorials and select a tech-focused charity to receive a matching donation.