Автор выбрал фонд Free and Open Source Fund для получения пожертвования в рамках программы Write for DOnations.
Apache Kafka — это популярный распределённый программный брокер сообщений, предназначенный для эффективного управления большим объемом данных в реальном времени. Кластер Kafka отличается не только масштабируемостью и устойчивостью к ошибкам, но имеет гораздо большую пропускную способность по сравнению с другими брокерами сообщений, такими как ActiveMQ и RabbitMQ. Хотя он обычно используется в качестве системы рассылки сообщений издатель/подписчик, многие организации также используют этого брокера для агрегации логов, поскольку он предоставляет надежное хранение для публикуемых сообщений.
Система рассылки сообщений издатель/подписчик позволяет одному или нескольким производителям публиковать сообщения без учета количества потребителей или того, как эти потребители будут обрабатывать сообщения. Подписанные клиенты уведомляются автоматически об обновлениях и создании новых сообщений. Эта система является более эффективной и масштабируемой по сравнению с системами, где клиенты периодически выполняют опрос, чтобы определить, доступны ли новые сообщения.
В этом руководстве вы установите и будете использовать Apache Kafka 2.1 на Ubuntu 18.04.
Чтобы выполнить описанные ниже шаги, вам потребуется следующее:
Поскольку Kafka может обрабатывать запросы через сеть, вам следует создать для нее специального пользователя. Это позволит снизить ущерб для вашей машины на Ubuntu, если сервер Kafka будет скомпрометирован. Мы создадим специального пользователя kafka на этом шаге, но вам нужно создать другого пользователя без прав root для выполнения других задач на этом сервере после завершения настройки Kafka.
Выполните вход с помощью пользователя без прав root с привилегиями sudo и создайте пользователя kafka с помощью команды useradd
:
- sudo useradd kafka -m
Флаг -m
гарантирует, что для пользователя будет создана домашняя директория. Эта домашняя директория /home/kafka
будет выполнять роль директории нашего рабочего пространства для выполнения команд, описанных ниже.
Установите пароль с помощью команды passwd
:
- sudo passwd kafka
Добавьте пользователя kafka в группу sudo
с помощью команды adduser
, чтобы предоставить ему права на установку зависимостей Kafka:
- sudo adduser kafka sudo
Ваш пользователь kafka готов к работе. Выполните вход в учетную запись с помощью команды su
:
- su -l kafka
Теперь, когда мы создали пользователя для брокера Kafka, мы можем перейти к загрузке и извлечению двоичных файлов Kafka.
Давайте загрузим и извлечем двоичные файлы Kafka в специальные папки в домашней директории пользователя kafka.
Сначала создайте директорию в /home/kafka
с названием Downloads
, чтобы сохранить там загруженные данные:
- mkdir ~/Downloads
Воспользуйтесь curl
для загрузки двоичных файлов Kafka:
- curl "https://www.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz" -o ~/Downloads/kafka.tgz
Создайте директорию с названием kafka
и замените ее на эту директорию. Она будет служить базовой директорией для установки Kafka:
- mkdir ~/kafka && cd ~/kafka
Извлеките архив, который вы загрузили, с помощью команды tar
:
- tar -xvzf ~/Downloads/kafka.tgz --strip 1
Мы указали флаг --strip 1
для обеспечения того, что содержимое архива будет извлечено в саму директорию ~/kafka/
и ни в какую другую директорию (например, ~/kafka/kafka_2.11-2.1.1/
) внутри нее.
Теперь, когда мы загрузили и извлекли двоичные файлы, мы можем перейти к настройке на Kafka, чтобы позволить удаление тем.
Поведение Kafka по умолчанию не позволяет нам удалять название темы, категории, группы или ветки, где было опубликованы сообщения. Чтобы изменить это, мы изменим файл конфигурации.
Опции конфигурации Kafka указаны в файле server.properties
. Откройте файл в nano
или вашем любимом редакторе:
- nano ~/kafka/config/server.properties
Давайте добавим настройку, которая позволит нам удалять темы Kafka. Добавьте следующую строку внизу файла:
delete.topic.enable = true
Сохраните файл и закройте nano
. Теперь, когда мы настроили Kafka, мы можем перейти к созданию файлов systemd.unit для запуска и их активации при запуске службы.
В этом шаге мы создадим файлы systemd.unit для службы Kafka. Это поможет нам выполнять такие действия, как запуск, остановка и перезапуск Kafka, совместимым с другими службами Linux образом.
Zookeeper — это служба, которую использует Kafka для управления состоянием кластера и конфигурациями. Она широко используется во многих распределенных системах в качестве неотъемлемого компонента. Если вы хотите подробнее познакомиться с Zookeeper, изучите официальную документацию по Zookeeper.
Создайте unit-файл для zookeeper
:
- sudo nano /etc/systemd/system/zookeeper.service
Введите следующее определение unit в файле:
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
В разделе [Unit]
указывается, что для начала работы Zookeeper требуется подготовить сетевое подключение и файловую систему.
В разделе [Service]
указывается, что systemd использует файлы оболочки zookeeper-server-start.sh
и zookeeper-server-stop.sh
для запуска и остановки службы. В нем также указывается, что Zookeeper автоматически перезапускается в случае аварийного завершения работы.
Затем создайте системный сервисный файл для kafka
:
- sudo nano /etc/systemd/system/kafka.service
Введите следующее определение unit в файле:
[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
В разделе [Unit]
указывается, что этот unit является зависимым от zookeeper.service
. Это позволит автоматически запускать zookeeper
после запуска службы kafka.
В разделе [Service]
указывается, что systemd использует файлы оболочки kafka-server-start.sh
и kafka-server-stop.sh
для запуска и остановки службы. В нем также указывается, что Kafka автоматически перезапускается в случае аварийного завершения работы.
После определения всех юнитов выполните запуск Kafka с помощью следующей команды:
- sudo systemctl start kafka
Чтобы убедиться, что сервер успешно запущен, проверьте логи журнала для юнита kafka
:
- sudo journalctl -u kafka
Вывод должен выглядеть примерно следующим образом:
OutputJul 17 18:38:59 kafka-ubuntu systemd[1]: Started kafka.service.
Теперь у вас есть сервер Kafka, который подключен к порту 9092
.
Хотя мы и запустили службу kafka
, но если нам потребуется перезапустить наш сервер, она не будет запускаться автоматически. Чтобы активировать запуск kafka
при загрузке сервера, выполните следующую команду:
- sudo systemctl enable kafka
Теперь, когда мы запустили и активировали службы, нужно проверить установку.
Давайте опубликуем и получим сообщение Hello World, чтобы убедиться, что сервер Kafka ведет себя корректно. Для публикации сообщений в Kafka требуется следующее:
Сначала создайте тему с названием TutorialTopic
, введя следующую команду:
- ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic
Вы можете создать издателя из командной строки с помощью скрипта kafka-console-producer.sh
. Ему требуется имя хоста сервера Kafka, порт и название темы в качестве аргументов.
Добавьте строку "Hello, World"
в тему TutorialTopic
, введя следующую команду:
- echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null
Затем вы можете создать подписчика Kafka с помощью скрипта kafka-console-consumer.sh
. Ему требуется имя хоста сервера ZooKeeper, порт и название темы в качестве аргументов.
Следующая команда получает сообщения из TutorialTopic
. Обратите внимание, что использование флага --from-beginning
позволяет получать сообщения, которые были опубликованы до запуска подписчика:
- ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning
Если проблемы конфигурации отсутствуют, вы должны увидеть Hello, World
в своем терминале:
OutputHello, World
Скрипт продолжит работать и будет ждать, когда будут опубликованы новые сообщения для темы. Вы можете открыть новый терминал и запустить издателя для нескольких новых сообщений. Вы должны увидеть все сообщения в выводе подписчика.
После окончания тестирование нажмите CTRL+C
, чтобы остановить скрипт подписчика. Теперь, когда мы протестировали установку, можно переходить к установке KafkaT.
KafkaT — это инструмент от Airbnb, который упрощает просмотр данных о кластере Kafka и выполняет некоторые административные задачи из командной строки. Поскольку он написан на Ruby, для его использования вам потребуется Ruby. Также вам потребуется пакет build-essential
для создания других «бриллиантов», используемых в качестве зависимостей. Выполните установку с помощью apt
:
- sudo apt install ruby ruby-dev build-essential
Теперь вы можете выполнить установку KafkaT с помощью команды gem:
- sudo gem install kafkat
KafkaT использует .kafkatcfg
в качестве файла конфигурации для определения директорий и логов на вашем сервере Kafka. Также нужно добавить запись, указывающую KafkaT путь к экземпляру ZooKeeper.
Создайте новый файл .kafkatcfg
:
- nano ~/.kafkatcfg
Добавьте следующие строки для указания требуемой информации о вашем сервере Kafka и экземпляре Zookeeper:
{
"kafka_path": "~/kafka",
"log_path": "/tmp/kafka-logs",
"zk_path": "localhost:2181"
}
Теперь вы можете использовать KafkaT. Для начала продемонстрируем, как вы можете использовать его, чтобы просмотреть подробную информацию обо всех частях Kafka:
- kafkat partitions
Вывод должен выглядеть следующим образом:
OutputTopic Partition Leader Replicas ISRs
TutorialTopic 0 0 [0] [0]
__consumer_offsets 0 0 [0] [0]
...
...
Вы увидите TutorialTopic
, а также __consumer_offsets
, т.е. внутреннюю тему, которая используется в Kafka для хранения информации о клиентах. Вы можете спокойно игнорировать строки, начинающиеся с __consumer_offsets
.
Чтобы узнать больше о KafkaT, перейдите в соответствующий репозиторий на GitHub.
Если вы хотите создать кластер с несколькими брокерами, использующий несколько серверов на Ubuntu 18.04, повторите шаги 1, 4 и 5 для каждой новой машины. Кроме того, вам нужно внести следующие изменения в файл server.properties
:
Значение свойства broker.id
должно быть уникальным внутри кластера. Это свойство служит уникальным идентификатором каждого сервера в кластере и может содержать любую строку в качестве значения. Например, server1
, server2
и т.д.
Значение свойства zookeeper.connect
необходимо изменить таким образом, чтобы все узлы указывали на один экземпляр ZooKeeper. Это свойство указывает адрес экземпляра в Zookeeper и соответствует формату <HOSTNAME/IP_ADDRESS>:<PORT>
. Например, «203.0.113.0:2181»
, «203.0.113.1:2181»
и т. д.
Если вы хотите использовать несколько экземпляров ZooKeeper для вашего кластера, то значение свойства zookeeper.connect
должно представлять собой одинаковую, разделенную запятыми строку со списком IP-адресов и номеров портов для всех экземпляров ZooKeeper.
Теперь, когда все установки выполнены, вы можете удалить права администратора пользователя kafka. Прежде чем сделать это, выйдите и войдите снова с помощью любого пользователя без прав root с привилегиями sudo. Если вы все еще используете один сеанс командной строки, которой вы запустили в начале прохождения руководства, просто введите exit
.
Удалите пользователя kafka из группы sudo:
- sudo deluser kafka sudo
Чтобы дополнительно повысить безопасность вашего сервера Kafka, заблокируйте пароль пользователя kafka с помощью команды passwd
. Это гарантирует, что никто не сможет напрямую выполнить вход на сервер с помощью этой учетной записи:
- sudo passwd kafka -l
На данный момент только пользователь root или sudo может выполнить вход как пользователь kafka
, воспользовавшись следующей командой:
- sudo su - kafka
В будущем, если вы захотите разблокировать его, используйте passwd
с флагом -u
:
- sudo passwd kafka -u
Вы успешно ограничили права администратора пользователя kafka.
Теперь на вашем сервере Ubuntu безопасно запущена служба Apache Kafka. Вы можете использовать ее в своих проектах, создав издателей и подписчиков Kafka с помощью клиентов Kafka, которые доступны для большинства языков программирования. Чтобы узнать больше о Kafka, вы можете ознакомиться с документацией.
Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.
This textbox defaults to using Markdown to format your answer.
You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!
Sign up for Infrastructure as a Newsletter.
Working on improving health and education, reducing inequality, and spurring economic growth? We'd like to help.
Get paid to write technical tutorials and select a tech-focused charity to receive a matching donation.