У меня возникают проблемы с подключением к машине, на которой работает Kafka, с клиента, на котором запущен сценарий Faust. Сценарий выглядит следующим образом:
Импорт Фауста
журнал импорта
из асинхронного импорта сна
Тест класса (faust.Record):
сообщение: ул
приложение = faust.App('myapp', broker='kafka://10.0.0.20:9092')
тема = приложение.тема('тест', value_type=тест)
@app.agent(тема)
async def привет (сообщения):
async для сообщения в сообщениях:
print(f'Получено {сообщение.msg}')
@app.timer (интервал = 5,0)
асинхронная защита example_sender():
жду привет.отправить(
значение = Тест (msg = 'Привет, мир!'),
)
если __name__ == '__main__':
приложение.основное()
Когда я запускаю скрипт:
# faust -A myapp worker -l info
âÆaµSâ v0.8.1ââ¬ââââââââââââ ... -  â
— id — мое приложение —
- транспорт - [URL('kafka://10.0.0.20:9092')] -
- хранить - память: -
веб-сайт http://hubbabubba:6066
â log â -stderr- (информация) â
âpid 260765 â
✓ имя хоста ✓ hubbabubba ✓
— платформа — CPython 3.8.10 (Linux x86_64) —
водители â
— транспорт — aiokafka=0.7.2 —
â веб â aiohttp=3.8.1 â
— каталог данных — /Git/faust-kafka/myapp-data —
â appdir /Git/faust-kafka/myapp-data/v1 â
- ... ... -
[2022-01-28 13:09:57,018] [260765] [INFO] [^Worker]: Начало...
[2022-01-28 13:09:57,021] [260765] [INFO] [^-App]: Запуск...
[2022-01-28 13:09:57,021] [260765] [INFO] [^--Monitor]: Запуск...
[2022-01-28 13:09:57,021] [260765] [INFO] [^--Producer]: Начало...
[2022-01-28 13:09:57,022] [260765] [INFO] [^---ProducerBuffer]: Запуск...
[2022-01-28 13:09:57,024] [260765] [ОШИБКА] Невозможно подключиться к "10.0.0.20:9092": [Errno 113] Ошибка подключения ('10.0.0.20', 9092)
[2022-01-28 13:09:57,025] [260765] [ОШИБКА] [^Worker]: Ошибка: KafkaConnectionError("Невозможно выполнить загрузку с [('10.0.0.20', 9092, <AddressFamily.AF_INET: 2>) ]")
Traceback (последний последний вызов):
Файл "/Git/faust-kafka/venv/lib/python3.8/site-packages/mode/worker.py", строка 276, в execute_from_commandline
self.loop.run_until_complete(self._starting_fut)
Файл "/usr/lib/python3.8/asyncio/base_events.py", строка 616, в run_until_complete
вернуть будущее.результат()
Файл "/Git/faust-kafka/venv/lib/python3.8/site-packages/mode/services.py", строка 759, в начале
ждать self._default_start()
Файл "/media/eric/DISK3/Git/faust-kafka/venv/lib/python3.8/site-packages/mode/services.py", строка 766, в _default_start
ожидайте self._actually_start()...
Файл "/Git/faust-kafka/venv/lib/python3.8/site-packages/aiokafka/client.py", строка 249, в начальной загрузке
поднять KafkaConnectionError(
kafka.errors.KafkaConnectionError: KafkaConnectionError: невозможно выполнить загрузку с [('10.0.0.20', 9092, <AddressFamily.AF_INET: 2>)]
[2022-01-28 13:09:57,027] [260765] [INFO] [^Worker]: Остановка...
[2022-01-28 13:09:57,027] [260765] [INFO] [^-App]: Остановка...
[2022-01-28 13:09:57,027] [260765] [INFO] [^-App]: очистить буфер производителя...
[2022-01-28 13:09:57,028] [260765] [INFO] [^--TableManager]: остановка...
[2022-01-28 13:09:57,028] [260765] [INFO] [^---Fetcher]: Остановка...
[2022-01-28 13:09:57,028] [260765] [ИНФОРМАЦИЯ] [^---Кондуктор]: Остановка...
[2022-01-28 13:09:57,028] [260765] [INFO] [^--AgentManager]: остановка...
[2022-01-28 13:09:57,029] [260765] [ИНФОРМАЦИЯ] [^Агент: myapp.hello]: Остановка...
[2022-01-28 13:09:57,029] [260765] [INFO] [^--ReplyConsumer]: Остановка...
[2022-01-28 13:09:57,029] [260765] [INFO] [^--LeaderAssignor]: Остановка...
[2022-01-28 13:09:57,029] [260765] [INFO] [^--Consumer]: Остановка...
[2022-01-28 13:09:57,030] [260765] [INFO] [^--Web]: Остановка...
[2022-01-28 13:09:57,030] [260765] [INFO] [^--CacheBackend]: остановка...
[2022-01-28 13:09:57,030] [260765] [INFO] [^--Producer]: Остановка...
[2022-01-28 13:09:57,030] [260765] [INFO] [^---ProducerBuffer]: остановка...
[2022-01-28 13:09:57,031] [260765] [INFO] [^--Monitor]: Остановка...
[2022-01-28 13:09:57,032] [260765] [INFO] [^Worker]: Сбор сервисных задач...
[2022-01-28 13:09:57,032] [260765] [ИНФОРМАЦИЯ] [^Worker]: Сбор всех фьючерсов...
[2022-01-28 13:09:58,033] [260765] [INFO] [^Worker]: закрытие цикла событий
Kafka (v.2.8.1) работает на 10.0.0.20, порт 9092. Конфигурация Kafka выглядит так:
# Лицензия Apache Software Foundation (ASF) под одним или несколькими
# лицензионные соглашения участника. См. файл NOTICE, распространяемый вместе с
# эту работу для получения дополнительной информации об авторских правах.
# ASF предоставляет вам лицензию на этот файл по лицензии Apache версии 2.0.
# («Лицензия»); вы не можете использовать этот файл, кроме как в соответствии с
# Лицензия. Вы можете получить копию Лицензии по адресу
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Если это не требуется применимым законодательством или не согласовано в письменной форме, программное обеспечение
# распространяется по Лицензии распространяется на ОСНОВЕ «КАК ЕСТЬ»,
# БЕЗ КАКИХ-ЛИБО ГАРАНТИЙ ИЛИ УСЛОВИЙ, явных или подразумеваемых.
# См. Лицензию для конкретного языка, управляющего разрешениями и
# ограничения по Лицензии.
# см. kafka.server.KafkaConfig для получения дополнительной информации и значений по умолчанию
############################# Основы сервера #################### ##########
# Идентификатор брокера. Это должно быть установлено в уникальное целое число для каждого брокера.
брокер.id=0
############################# Настройки сокет-сервера ################### ###########
# Адрес, который слушает сервер сокетов. Он получит значение, возвращаемое из
# java.net.InetAddress.getCanonicalHostName(), если он не настроен.
# ФОРМАТ:
# listeners = listener_name://host_name:port
# ПРИМЕР:
# слушатели = PLAINTEXT://your.host.name:9092
слушатели=ОБЫЧНЫЙТЕКСТ://:9092
# Имя хоста и порт, которые брокер будет рекламировать производителям и потребителям. Если не установлено,
# он использует значение для "слушателей", если оно настроено. В противном случае будет использоваться значение
# возвращается из java.net.InetAddress.getCanonicalHostName().
рекламируемый.listeners=PLAINTEXT://localhost:9092
# Сопоставляет имена прослушивателей с протоколами безопасности, по умолчанию они одинаковы. См. документацию по конфигурации для более подробной информации.
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# Количество потоков, которые сервер использует для приема запросов из сети и отправки ответов в сеть
количество.network.threads=3
# Количество потоков, используемых сервером для обработки запросов, которые могут включать дисковый ввод-вывод.
число.ио.потоков=8
# Буфер отправки (SO_SNDBUF), используемый сервером сокетов
socket.send.buffer.bytes=102400
# Буфер приема (SO_RCVBUF), используемый сервером сокетов
сокет.получить.буфер.байты=102400
# Максимальный размер запроса, который примет сокет-сервер (защита от OOM)
сокет.запрос.макс.байты=104857600
############################# Основы журнала #################### ##########
# Список каталогов, разделенных запятыми, в которых будут храниться файлы журналов.
log.dirs=/tmp/kafka-журналы
# Количество разделов журнала по умолчанию на тему. Больше разделов позволяет больше
# параллелизм для потребления, но это также приведет к увеличению количества файлов
# брокеры.
число разделов = 1
# Количество потоков на каталог данных, которое будет использоваться для восстановления журнала при запуске и сброса при завершении работы.
# Это значение рекомендуется увеличить для установок с каталогами данных, расположенными в массиве RAID.
num.recovery.threads.per.data.dir=1
############################# Внутренние настройки темы ################### ###########
# Фактор репликации для внутренних тем групповых метаданных "__consumer_offsets" и "__transaction_state"
# Для всего, кроме тестирования разработки, рекомендуется значение больше 1, чтобы обеспечить доступность, например 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Политика очистки журнала ################### ###########
# Сообщения немедленно записываются в файловую систему, но по умолчанию мы используем только fsync() для синхронизации
# кеш ОС лениво. Следующие конфигурации управляют сбросом данных на диск.
# Здесь есть несколько важных компромиссов:
# 1. Долговечность: Неиспользованные данные могут быть потеряны, если вы не используете репликацию.
# 2. Задержка. Очень большие интервалы сброса могут привести к скачкам задержки, когда сброс произойдет, так как будет много данных для сброса.
# 3. Пропускная способность. Сброс обычно является самой дорогостоящей операцией, и небольшой интервал сбрасывания может привести к избыточному поиску.
# Приведенные ниже параметры позволяют настроить политику сброса для сброса данных через определенный период времени или
# каждые N сообщений (или оба). Это можно сделать глобально и переопределить для каждой темы.
# Количество сообщений, которое необходимо принять перед принудительной записью данных на диск
#log.flush.interval.messages=10000
# Максимальное количество времени, в течение которого сообщение может находиться в журнале, прежде чем мы принудительно сбросим
#log.flush.interval.ms=1000
############################# Политика хранения журналов ################### ###########
# Следующие конфигурации управляют удалением сегментов журнала. Политика может
# быть настроенным на удаление сегментов через определенный период времени или после накопления заданного размера.
# Сегмент будет удален всякий раз, когда будет выполнено *любое* из этих условий. Удаление происходит всегда
# с конца журнала.
# Минимальный возраст файла журнала, который можно удалить из-за возраста
log.retention.hours=168
# Политика хранения журналов на основе размера. Сегменты удаляются из журнала, если оставшиеся
# сегментов становится меньше log.retention.bytes. Функционирует независимо от log.retention.hours.
#log.retention.bytes=1073741824
# Максимальный размер файла сегмента журнала. Когда этот размер будет достигнут, будет создан новый сегмент журнала.
лог.сегмент.байты=1073741824
# Интервал, с которым сегменты журнала проверяются, чтобы увидеть, могут ли они быть удалены в соответствии с
# к политикам хранения
log.retention.check.interval.ms=300000
############################# Работник зоопарка #################### #########
# Строка подключения Zookeeper (подробности см. в документации по zookeeper).
# Это пары хост:порт, разделенные запятыми, каждая из которых соответствует zk
# сервер. например «127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002».
# Вы также можете добавить необязательную строку chroot к URL-адресам, чтобы указать
# корневой каталог для всех узлов kafka.
zookeeper.connect = локальный: 2181
# Тайм-аут в мс для подключения к zookeeper
zookeeper.connection.timeout.ms=18000
############################# Настройки координатора группы ################### ###########
# Следующая конфигурация указывает время в миллисекундах, на которое GroupCoordinator будет задерживать первоначальную перебалансировку потребителей.
# Перебалансировка будет дополнительно отложена на значение group.initial.rebalance.delay.ms по мере присоединения новых участников к группе, максимум до max.poll.interval.ms.
# Значение по умолчанию для этого 3 секунды.
# Здесь мы заменяем это значение на 0, так как это обеспечивает лучший готовый опыт разработки и тестирования.
# Однако в производственных средах больше подходит значение по умолчанию, равное 3 секундам, так как это поможет избежать ненужных и потенциально дорогостоящих перебалансировок во время запуска приложения.
group.initial.rebalance.delay.ms=0
Kafka-broker запускается без сучка и задоринки:
$ sudo bin/kafka-server-start.sh -daemon config/server.properties
Я понимаю тему:
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 1 --topic test
Затем я проверяю:
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
тестовое задание
Вот и думаю, где я накосячил. Кстати: сервер доступен с клиентской машины:
$ пинг -с 5 10.0.0.20 -р 9092
ШАБЛОН: 0x9092
PING 10.0.0.20 (10.0.0.20) 56 (84) байт данных.
64 байта из 10.0.0.20: icmp_seq=1 ttl=64 время=0,468 мс
64 байта из 10.0.0.20: icmp_seq=2 ttl=64 время=0,790 мс
64 байта из 10.0.0.20: icmp_seq=3 ttl=64 время=0,918 мс
64 байта из 10.0.0.20: icmp_seq=4 ttl=64 время=0,453 мс
64 байта из 10.0.0.20: icmp_seq=5 ttl=64 время=0,827 мс
--- 10.0.0.20 статистика пинга ---
5 пакетов передано, 5 получено, 0% потери пакетов, время 4095 мс
rtt min/avg/max/mdev = 0,453/0,691/0,918/0,192 мс