Я написал потребителю Kafka, чтобы он потреблял поток зашифрованных данных (~ 1 МБ) и расшифровывал их перед добавлением в корзину S3. Обработка 1000 записей занимает ~ 20 минут, и если я удалю логику шифрования и запущу то же самое, обработка 1000 записей займет менее 3 минут.
Ниже приведены конфигурации, которые я использую в настоящее время.
разрешить.auto.create.topics = истина
auto.commit.interval.ms = 5000
auto.offset.reset = последний
проверка.crcs = истина
client.dns.lookup = use_all_dns_ips
соединений.макс.ожидание.мс = 540000
default.api.timeout.ms = 60000
включить.auto.commit = правда
exclude.internal.topics = истина
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
группа.экземпляр.id = ноль
сердцебиение.интервал.мс = 3000
перехватчик.классы = []
внутренняя.выйти.группа.на.закрыть = истина
внутренний.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = класс org.apache.kafka.common.serialization.StringDeserializer
макс.partition.fetch.bytes = 1000000
макс.интервал опроса.мс = 300000
макс.опрос.записей = 500
метаданные.макс.возраст.мс = 300000
metric.reporters = []
метрика.количество.выборок = 2
metrics.recording.level = ИНФОРМАЦИЯ
metrics.sample.window.ms = 30000
partition.assignment.strategy = [класс org.apache.kafka.clients.consumer.RangeAssignor]
получить.буфер.байты = 655360
переподключение.откат.макс.мс = 1000
переподключение.откат.мс = 50
запрос.время ожидания.мс = 30000
retry.backoff.ms = 100
send.buffer.bytes = 131072
сеанс.время ожидания.мс = 10000
value.deserializer = класс org.apache.kafka.common.serialization.StringDeserializer
В теме 10 разделов. Я потреблял с несколькими потребителями (1-10), назначая их в одну и ту же группу потребителей. Но независимо от того, сколько потребителей я использую, он потребляет одинаковое количество данных в заданное время.
Как сделать потребителей быстрее? И может ли в этом помочь Apache Spark?