Tutorial

Como usar o ThreadPoolExecutor em Python 3

PythonDevelopment

O autor selecionou a COVID-19 Relief Fund​​​​​ para receber uma doação como parte do programa Write for DOnations.

Introdução

Os threads em Python são uma forma de paralelismo que permitem que seu programa execute vários procedimentos ao mesmo tempo. O paralelismo em Python também pode ser alcançado usando vários processos, mas os threads são particularmente adequados para acelerar aplicativos que envolvam quantidades significativas de E/S (entrada/saída).

Alguns exemplo de operações limitadas por E/S incluem realizar solicitações Web e ler dados de arquivos. Em contraste com as operações limitadas por E/S, as operações limitadas por CPU (como realizar operações matemáticas com a biblioteca padrão do Python) não serão tão beneficiadas com os threads em Python.

O Python 3 inclui o utilitário ThreadPoolExecutor para executar o código em um thread.

Neste tutorial, usaremos o ThreadPoolExecutor para fazer solicitações de rede de forma conveniente. Definiremos uma função adequada para a invocação dentro de threads, usaremos o ThreadPoolExecutor para executar essa função e processaremos os resultados dessas execuções.

Para este tutorial, faremos solicitações de rede para verificar a existência de páginas da Wikipédia.

Nota: o fato de as operações limitadas por E/S se beneficiarem mais dos threads do que as operações limitadas por CPU tem origem em uma idiossincrasia em Python chamada global interpreter lock. Saiba mais sobre o global interpreter lock do Python na documentação oficial do Python.

Pré-requisitos

Para aproveitar ao máximo este tutorial, é recomendado ter alguma familiaridade com a programação em Python e a um ambiente de programação local do Python com requests (solicitações) instaladas.

Você pode revisar estes tutoriais para as informações básicas necessárias:

  • pip install --user requests==2.23.0

Passo 1 — Definindo uma função para ser executada em threads

Vamos começar definindo uma função que gostaríamos de executar com a ajuda dos threads.

Usando o nano ou seu editor de texto/ambiente de desenvolvimento preferido, abra este arquivo:

  • nano wiki_page_function.py

Para este tutorial, vamos escrever uma função que determina se uma página da Wikipédia existe ou não:

wiki_page_function.py
import requests

def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status

A função get_wiki_page_existence aceita dois argumentos: uma URL de uma página da Wikipédia (wiki_page_url) e um número de segundos timeout para se esperar por uma resposta dessa URL.

A get_wiki_page_existence usa o pacote requests para fazer uma solicitação Web a essa URL. Dependendo do código de status da response (resposta) HTTP, uma string que descreve se a página existe ou não é retornada. Códigos de status diferentes representam resultados diferentes de uma solicitação HTTP. Este procedimento pressupõe que um código de status 200 de “sucesso” significa que a página da Wikipédia existe e um código de status 404 “não encontrado” significa que a página da Wikipédia não existe.

Conforme descrito na seção Pré-requisitos, você precisará do pacote requests instalado para executar esta função.

Vamos tentar executar a função adicionando a url e a chamada de função após a função get_wiki_page_existence:

wiki_page_function.py
. . .
url = "https://en.wikipedia.org/wiki/Ocean"
print(get_wiki_page_existence(wiki_page_url=url))

Uma vez adicionado o código, salve e feche o arquivo.

Se executarmos este código:

  • python wiki_page_function.py

Veremos um resultado como o seguinte:

Output
https://en.wikipedia.org/wiki/Ocean - exists

Chamar a função get_wiki_page_existence com uma página da Wikipédia válida retorna uma string que confirma que a página, de fato, existe.

Aviso: em geral, não é seguro compartilhar o estado ou objetos Python entre threads sem tomar cuidados especiais para evitar erros de simultaneidade. Ao definir uma função a ser executada em um thread, é melhor definir uma função que execute uma tarefa única e não compartilhe ou publique o estado em outros threads. A get_wiki_page_existence é um exemplo de uma função como essa.

Passo 2 — Usando o ThreadPoolExecutor para executar uma função em threads

Agora que temos uma função adequada à invocação com threads, podemos usar o ThreadPoolExecutor para realizar várias invocações dessa função de maneira conveniente.

Vamos adicionar o seguinte código destacado ao seu programa em wiki_page_function.py:

wiki_page_function.py
import requests
import concurrent.futures

def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status

wiki_page_urls = [
    "https://en.wikipedia.org/wiki/Ocean",
    "https://en.wikipedia.org/wiki/Island",
    "https://en.wikipedia.org/wiki/this_page_does_not_exist",
    "https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for url in wiki_page_urls:
        futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
    for future in concurrent.futures.as_completed(futures):
        print(future.result())

Vamos dar uma olhada em como esse código funciona:

  • O concurrent.futures é importado para nos dar acesso ao ThreadPoolExecutor.
  • A declaração with é usada para criar um executor de instância do ThreadPoolExecutor que irá esvaziar os threads imediatamente após a conclusão.
  • Quatro tarefas são submitted (submetidas) ao executor: uma para cada uma das URLs na lista wiki_page_urls.
  • Cada chamada a submit retorna uma instância Future que está armazenada na lista futures.
  • A função as_completed espera cada chamada get_wiki_page_existence Future ser concluída para podermos imprimir seu resultado.

Se executarmos esse programa novamente com o seguinte comando:

  • python wiki_page_function.py

Veremos um resultado como o seguinte:

Output
https://en.wikipedia.org/wiki/Island - exists https://en.wikipedia.org/wiki/Ocean - exists https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist https://en.wikipedia.org/wiki/Shark - exists

Esse resultado faz sentido: 3 das URLs são páginas válidas da Wikipédia, e uma delas, a this_page_does_not_exist, não é. Observe que seu resultado pode estar ordenado de maneira diferente do que este. A função concurrent.futures.as_completed nesse exemplo retorna resultados assim que eles estiverem disponíveis, independentemente da ordem em que as tarefas foram enviadas.

Passo 3 — Processando exceções de execuções de funções em threads

No passo anterior, get_wiki_page_existence retornou com sucesso um valor para todas as nossas invocações. Neste passo, veremos que o ThreadPoolExecutor também pode apurar exceções geradas em invocações de função em threads.

Vamos considerar o seguinte bloco de código de exemplo:

wiki_page_function.py
import requests
import concurrent.futures


def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status


wiki_page_urls = [
    "https://en.wikipedia.org/wiki/Ocean",
    "https://en.wikipedia.org/wiki/Island",
    "https://en.wikipedia.org/wiki/this_page_does_not_exist",
    "https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for url in wiki_page_urls:
        futures.append(
            executor.submit(
                get_wiki_page_existence, wiki_page_url=url, timeout=0.00001
            )
        )
    for future in concurrent.futures.as_completed(futures):
        try:
            print(future.result())
        except requests.ConnectTimeout:
            print("ConnectTimeout.")

Este bloco de código é quase idêntico ao que usamos no Passo 2, mas possui duas diferenças chave:

  • Agora, passamos timeout=0.001 para get_wiki_page_existence. Como o pacote requests não será capaz de completar sua solicitação Web à Wikipédia em 0.00001 segundos, ele criará uma exceção ConnectTimeout.
  • Nós capturamos exceções ConnectTimeout geradas pelo future.result() e imprimimos uma string cada vez que fazemos isso.

Se executarmos o programa novamente, veremos o seguinte resultado:

Output
ConnectTimeout. ConnectTimeout. ConnectTimeout. ConnectTimeout.

Quatro mensagens ConnectTimeout são impressas — uma para cada uma de nossas quatro wiki_page_urls, uma vez que nenhuma delas pôde ser concluída em 0.00001 segundos e cada uma das quatro chamadas get_wiki_page_existence gerou a exceção ConnectTimeout.

Agora, você viu que se uma chamada de função submetida a um ThreadPoolExecutor gera uma exceção, então essa exceção pode ser apurada normalmente chamando o Future.result. Chamar o Future.result em todas as suas invocações enviadas garante que seu programa não perca nenhuma exceção gerada em sua função em threads.

Passo 4 — Comparando o tempo de execução com e sem threads

Agora, vamos verificar se usar o ThreadPoolExecutor realmente torna seu programa mais rápido.

Primeiro, vamos cronometrar o get_wiki_page_existence se executarmos ele sem threads:

wiki_page_function.py
import time
import requests
import concurrent.futures


def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status

wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]

print("Running without threads:")
without_threads_start = time.time()
for url in wiki_page_urls:
    print(get_wiki_page_existence(wiki_page_url=url))
print("Without threads time:", time.time() - without_threads_start)

Nesse exemplo de código, chamamos nossa função get_wiki_page_existence com cinquenta URLs de páginas diferentes da Wikipedia uma a uma. Usamos a função time.time() para imprimir o número de segundos que nosso programa leva para ser executado.

Se executarmos esse código novamente como antes, veremos um resultado como o seguinte:

Output
Running without threads: https://en.wikipedia.org/wiki/0 - exists https://en.wikipedia.org/wiki/1 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Without threads time: 5.803015232086182

As entradas 2-47 nesse resultado foram omitidas para maior concisão.

O número de segundos impressos depois de Without threads time será diferente quando você executar o código em sua máquina – não tem problema, você só está recebendo um número que servirá como base para se comparar com uma solução que usa o ThreadPoolExecutor. Neste caso, foram ~5.803 segundos.

Vamos executar as mesmas cinquenta URLs da Wikipedia através do get_wiki_page_existence, mas desta vez usando o ThreadPoolExecutor:

wiki_page_function.py
import time
import requests
import concurrent.futures


def get_wiki_page_existence(wiki_page_url, timeout=10):
    response = requests.get(url=wiki_page_url, timeout=timeout)

    page_status = "unknown"
    if response.status_code == 200:
        page_status = "exists"
    elif response.status_code == 404:
        page_status = "does not exist"

    return wiki_page_url + " - " + page_status
wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]

print("Running threaded:")
threaded_start = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for url in wiki_page_urls:
        futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
    for future in concurrent.futures.as_completed(futures):
        print(future.result())
print("Threaded time:", time.time() - threaded_start)

O código é o mesmo que criamos no Passo 2, apenas com a adição de algumas declarações de impressão que nos mostram o número de segundos que o nosso código leva para ser executado.

Se executarmos o programa novamente, veremos o seguinte:

Output
Running threaded: https://en.wikipedia.org/wiki/1 - exists https://en.wikipedia.org/wiki/0 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Threaded time: 1.2201685905456543

Novamente, o número de segundos impressos após Threaded time será diferente em seu computador (assim como a ordem do seu resultado).

Agora, compare o tempo de execução para obter as cinquenta URLs de páginas da Wikipédia com e sem threads.

Na máquina usada neste tutorial, o processo sem threads levou ~5.803 segundos e com threads levou ~1.220 segundos. Nosso programa foi executado de maneira significativamente mais rápida com threads.

Conclusão

Neste tutorial, você aprendeu como usar o utilitário ThreadPoolExecutor em Python 3 para executar eficientemente códigos limitados por E/S. Você criou uma função adequada à invocação dentro de threads, aprendeu como recuperar tanto o resultado quanto as exceções de execuções em threads dessa função e observou o ganho de desempenho obtido usando threads.

A partir daqui, você pode aprender mais sobre outras funções de simultaneidade oferecidas pelo módulo concurrent.futures.

Creative Commons License