Рейтинг:1

Нет подключения к Kafka из клиента Faust

флаг pe

У меня возникают проблемы с подключением к машине, на которой работает Kafka, с клиента, на котором запущен сценарий Faust. Сценарий выглядит следующим образом:

Импорт Фауста
журнал импорта
из асинхронного импорта сна


Тест класса (faust.Record):
    сообщение: ул


приложение = faust.App('myapp', broker='kafka://10.0.0.20:9092')
тема = приложение.тема('тест', value_type=тест)


@app.agent(тема)
async def привет (сообщения):
    async для сообщения в сообщениях:
        print(f'Получено {сообщение.msg}')


@app.timer (интервал = 5,0)
асинхронная защита example_sender():
    жду привет.отправить(
        значение = Тест (msg = 'Привет, мир!'),
    )


если __name__ == '__main__':
    приложение.основное()

Когда я запускаю скрипт:

# faust -A myapp worker -l info
âÆaµSâ v0.8.1ââ¬ââââââââââââ ... -  â
— id — мое приложение —
- транспорт - [URL('kafka://10.0.0.20:9092')] -
- хранить - память: -
веб-сайт http://hubbabubba:6066
â log â -stderr- (информация) â
âpid 260765 â
✓ имя хоста ✓ hubbabubba ✓
— платформа — CPython 3.8.10 (Linux x86_64) —
водители â
— транспорт — aiokafka=0.7.2 —
â веб â aiohttp=3.8.1 â
— каталог данных — /Git/faust-kafka/myapp-data —
â appdir /Git/faust-kafka/myapp-data/v1 â
- ... ... -
[2022-01-28 13:09:57,018] [260765] [INFO] [^Worker]: Начало... 
[2022-01-28 13:09:57,021] [260765] [INFO] [^-App]: Запуск... 
[2022-01-28 13:09:57,021] [260765] [INFO] [^--Monitor]: Запуск... 
[2022-01-28 13:09:57,021] [260765] [INFO] [^--Producer]: Начало... 
[2022-01-28 13:09:57,022] [260765] [INFO] [^---ProducerBuffer]: Запуск... 
[2022-01-28 13:09:57,024] [260765] [ОШИБКА] Невозможно подключиться к "10.0.0.20:9092": [Errno 113] Ошибка подключения ('10.0.0.20', 9092) 
[2022-01-28 13:09:57,025] [260765] [ОШИБКА] [^Worker]: Ошибка: KafkaConnectionError("Невозможно выполнить загрузку с [('10.0.0.20', 9092, <AddressFamily.AF_INET: 2>) ]") 
Traceback (последний последний вызов):
  Файл "/Git/faust-kafka/venv/lib/python3.8/site-packages/mode/worker.py", строка 276, в execute_from_commandline
    self.loop.run_until_complete(self._starting_fut)
  Файл "/usr/lib/python3.8/asyncio/base_events.py", строка 616, в run_until_complete
    вернуть будущее.результат()
  Файл "/Git/faust-kafka/venv/lib/python3.8/site-packages/mode/services.py", строка 759, в начале
    ждать self._default_start()
  Файл "/media/eric/DISK3/Git/faust-kafka/venv/lib/python3.8/site-packages/mode/services.py", строка 766, в _default_start
    ожидайте self._actually_start()...
  Файл "/Git/faust-kafka/venv/lib/python3.8/site-packages/aiokafka/client.py", строка 249, в начальной загрузке
    поднять KafkaConnectionError(
kafka.errors.KafkaConnectionError: KafkaConnectionError: невозможно выполнить загрузку с [('10.0.0.20', 9092, <AddressFamily.AF_INET: 2>)]
[2022-01-28 13:09:57,027] [260765] [INFO] [^Worker]: Остановка... 
[2022-01-28 13:09:57,027] [260765] [INFO] [^-App]: Остановка... 
[2022-01-28 13:09:57,027] [260765] [INFO] [^-App]: очистить буфер производителя... 
[2022-01-28 13:09:57,028] [260765] [INFO] [^--TableManager]: остановка... 
[2022-01-28 13:09:57,028] [260765] [INFO] [^---Fetcher]: Остановка... 
[2022-01-28 13:09:57,028] [260765] [ИНФОРМАЦИЯ] [^---Кондуктор]: Остановка... 
[2022-01-28 13:09:57,028] [260765] [INFO] [^--AgentManager]: остановка... 
[2022-01-28 13:09:57,029] [260765] [ИНФОРМАЦИЯ] [^Агент: myapp.hello]: Остановка... 
[2022-01-28 13:09:57,029] [260765] [INFO] [^--ReplyConsumer]: Остановка... 
[2022-01-28 13:09:57,029] [260765] [INFO] [^--LeaderAssignor]: Остановка... 
[2022-01-28 13:09:57,029] [260765] [INFO] [^--Consumer]: Остановка... 
[2022-01-28 13:09:57,030] [260765] [INFO] [^--Web]: Остановка... 
[2022-01-28 13:09:57,030] [260765] [INFO] [^--CacheBackend]: остановка... 
[2022-01-28 13:09:57,030] [260765] [INFO] [^--Producer]: Остановка... 
[2022-01-28 13:09:57,030] [260765] [INFO] [^---ProducerBuffer]: остановка... 
[2022-01-28 13:09:57,031] [260765] [INFO] [^--Monitor]: Остановка... 
[2022-01-28 13:09:57,032] [260765] [INFO] [^Worker]: Сбор сервисных задач... 
[2022-01-28 13:09:57,032] [260765] [ИНФОРМАЦИЯ] [^Worker]: Сбор всех фьючерсов... 
[2022-01-28 13:09:58,033] [260765] [INFO] [^Worker]: закрытие цикла событий

Kafka (v.2.8.1) работает на 10.0.0.20, порт 9092. Конфигурация Kafka выглядит так:

# Лицензия Apache Software Foundation (ASF) под одним или несколькими
# лицензионные соглашения участника. См. файл NOTICE, распространяемый вместе с
# эту работу для получения дополнительной информации об авторских правах.
# ASF предоставляет вам лицензию на этот файл по лицензии Apache версии 2.0.
# («Лицензия»); вы не можете использовать этот файл, кроме как в соответствии с
# Лицензия. Вы можете получить копию Лицензии по адресу
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Если это не требуется применимым законодательством или не согласовано в письменной форме, программное обеспечение
# распространяется по Лицензии распространяется на ОСНОВЕ «КАК ЕСТЬ»,
# БЕЗ КАКИХ-ЛИБО ГАРАНТИЙ ИЛИ УСЛОВИЙ, явных или подразумеваемых.
# См. Лицензию для конкретного языка, управляющего разрешениями и
# ограничения по Лицензии.

# см. kafka.server.KafkaConfig для получения дополнительной информации и значений по умолчанию

############################# Основы сервера #################### ##########

# Идентификатор брокера. Это должно быть установлено в уникальное целое число для каждого брокера.
брокер.id=0

############################# Настройки сокет-сервера ################### ###########

# Адрес, который слушает сервер сокетов. Он получит значение, возвращаемое из 
# java.net.InetAddress.getCanonicalHostName(), если он не настроен.
# ФОРМАТ:
# listeners = listener_name://host_name:port
#   ПРИМЕР:
# слушатели = PLAINTEXT://your.host.name:9092
слушатели=ОБЫЧНЫЙТЕКСТ://:9092

# Имя хоста и порт, которые брокер будет рекламировать производителям и потребителям. Если не установлено, 
# он использует значение для "слушателей", если оно настроено. В противном случае будет использоваться значение
# возвращается из java.net.InetAddress.getCanonicalHostName().
рекламируемый.listeners=PLAINTEXT://localhost:9092

# Сопоставляет имена прослушивателей с протоколами безопасности, по умолчанию они одинаковы. См. документацию по конфигурации для более подробной информации.
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# Количество потоков, которые сервер использует для приема запросов из сети и отправки ответов в сеть
количество.network.threads=3

# Количество потоков, используемых сервером для обработки запросов, которые могут включать дисковый ввод-вывод.
число.ио.потоков=8

# Буфер отправки (SO_SNDBUF), используемый сервером сокетов
socket.send.buffer.bytes=102400

# Буфер приема (SO_RCVBUF), используемый сервером сокетов
сокет.получить.буфер.байты=102400

# Максимальный размер запроса, который примет сокет-сервер (защита от OOM)
сокет.запрос.макс.байты=104857600


############################# Основы журнала #################### ##########

# Список каталогов, разделенных запятыми, в которых будут храниться файлы журналов.
log.dirs=/tmp/kafka-журналы

# Количество разделов журнала по умолчанию на тему. Больше разделов позволяет больше
# параллелизм для потребления, но это также приведет к увеличению количества файлов
# брокеры.
число разделов = 1

# Количество потоков на каталог данных, которое будет использоваться для восстановления журнала при запуске и сброса при завершении работы.
# Это значение рекомендуется увеличить для установок с каталогами данных, расположенными в массиве RAID.
num.recovery.threads.per.data.dir=1

############################# Внутренние настройки темы ################### ###########
# Фактор репликации для внутренних тем групповых метаданных "__consumer_offsets" и "__transaction_state"
# Для всего, кроме тестирования разработки, рекомендуется значение больше 1, чтобы обеспечить доступность, например 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Политика очистки журнала ################### ###########

# Сообщения немедленно записываются в файловую систему, но по умолчанию мы используем только fsync() для синхронизации
# кеш ОС лениво. Следующие конфигурации управляют сбросом данных на диск.
# Здесь есть несколько важных компромиссов:
# 1. Долговечность: Неиспользованные данные могут быть потеряны, если вы не используете репликацию.
# 2. Задержка. Очень большие интервалы сброса могут привести к скачкам задержки, когда сброс произойдет, так как будет много данных для сброса.
# 3. Пропускная способность. Сброс обычно является самой дорогостоящей операцией, и небольшой интервал сбрасывания может привести к избыточному поиску.
# Приведенные ниже параметры позволяют настроить политику сброса для сброса данных через определенный период времени или
# каждые N сообщений (или оба). Это можно сделать глобально и переопределить для каждой темы.

# Количество сообщений, которое необходимо принять перед принудительной записью данных на диск
#log.flush.interval.messages=10000

# Максимальное количество времени, в течение которого сообщение может находиться в журнале, прежде чем мы принудительно сбросим
#log.flush.interval.ms=1000

############################# Политика хранения журналов ################### ###########

# Следующие конфигурации управляют удалением сегментов журнала. Политика может
# быть настроенным на удаление сегментов через определенный период времени или после накопления заданного размера.
# Сегмент будет удален всякий раз, когда будет выполнено *любое* из этих условий. Удаление происходит всегда
# с конца журнала.

# Минимальный возраст файла журнала, который можно удалить из-за возраста
log.retention.hours=168

# Политика хранения журналов на основе размера. Сегменты удаляются из журнала, если оставшиеся
# сегментов становится меньше log.retention.bytes. Функционирует независимо от log.retention.hours.
#log.retention.bytes=1073741824

# Максимальный размер файла сегмента журнала. Когда этот размер будет достигнут, будет создан новый сегмент журнала.
лог.сегмент.байты=1073741824

# Интервал, с которым сегменты журнала проверяются, чтобы увидеть, могут ли они быть удалены в соответствии с
# к политикам хранения
log.retention.check.interval.ms=300000

############################# Работник зоопарка #################### #########

# Строка подключения Zookeeper (подробности см. в документации по zookeeper).
# Это пары хост:порт, разделенные запятыми, каждая из которых соответствует zk
# сервер. например «127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002».
# Вы также можете добавить необязательную строку chroot к URL-адресам, чтобы указать
# корневой каталог для всех узлов kafka.
zookeeper.connect = локальный: 2181

# Тайм-аут в мс для подключения к zookeeper
zookeeper.connection.timeout.ms=18000


############################# Настройки координатора группы ################### ###########

# Следующая конфигурация указывает время в миллисекундах, на которое GroupCoordinator будет задерживать первоначальную перебалансировку потребителей.
# Перебалансировка будет дополнительно отложена на значение group.initial.rebalance.delay.ms по мере присоединения новых участников к группе, максимум до max.poll.interval.ms.
# Значение по умолчанию для этого 3 секунды.
# Здесь мы заменяем это значение на 0, так как это обеспечивает лучший готовый опыт разработки и тестирования.
# Однако в производственных средах больше подходит значение по умолчанию, равное 3 секундам, так как это поможет избежать ненужных и потенциально дорогостоящих перебалансировок во время запуска приложения.
group.initial.rebalance.delay.ms=0

Kafka-broker запускается без сучка и задоринки:

$ sudo bin/kafka-server-start.sh -daemon config/server.properties 

Я понимаю тему:

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 1 --topic test

Затем я проверяю:

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
тестовое задание

Вот и думаю, где я накосячил. Кстати: сервер доступен с клиентской машины:

$ пинг -с 5 10.0.0.20 -р 9092
ШАБЛОН: 0x9092
PING 10.0.0.20 (10.0.0.20) 56 (84) байт данных.
64 байта из 10.0.0.20: icmp_seq=1 ttl=64 время=0,468 мс
64 байта из 10.0.0.20: icmp_seq=2 ttl=64 время=0,790 мс
64 байта из 10.0.0.20: icmp_seq=3 ttl=64 время=0,918 мс
64 байта из 10.0.0.20: icmp_seq=4 ttl=64 время=0,453 мс
64 байта из 10.0.0.20: icmp_seq=5 ttl=64 время=0,827 мс

--- 10.0.0.20 статистика пинга ---
5 пакетов передано, 5 получено, 0% потери пакетов, время 4095 мс
rtt min/avg/max/mdev = 0,453/0,691/0,918/0,192 мс
Рейтинг:1
флаг cn

Мне это кажется неправильным, так как это будет означать, что ваш удаленный клиент попытается подключиться к локальный хост как только он поговорит с загрузочным сервером, а не с удаленным адресом вашего экземпляра kafka:

рекламируемый.listeners=PLAINTEXT://localhost:9092

Я бы изменил это на внешний (10.x.x.x) IP-адрес вашего экземпляра kafka, перезапустил все и повторил попытку.

ElToro1966 avatar
флаг pe
Установите для ads.listeners значение 10.0.0.20 и добавьте порт 9092 в разрешенные порты. Все перезапустил. Работает! Спасибо.

Ответить или комментировать

Большинство людей не понимают, что склонность к познанию нового открывает путь к обучению и улучшает межличностные связи. В исследованиях Элисон, например, хотя люди могли точно вспомнить, сколько вопросов было задано в их разговорах, они не чувствовали интуитивно связи между вопросами и симпатиями. В четырех исследованиях, в которых участники сами участвовали в разговорах или читали стенограммы чужих разговоров, люди, как правило, не осознавали, что задаваемый вопрос повлияет — или повлиял — на уровень дружбы между собеседниками.