LINUX.ORG.RU

Помогите настроить kafka consumer

 , ,


0

1

Приветствую всех.

Собственно сабж.

Одно приложение публикует сообщение, второе его принимает, и отправляет ответ.

То, которое отправляет ответ, не стартует. Пишет:

java.lang.IllegalStateException: a KafkaTemplate is required to support replies

Вот настройки consumera:

@Configuration
public class KafkaConfig {

    @Value("${kafka.group.id}")
    private String groupId;

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        return props;
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> requestConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>());
    }

    @Bean
    public ProducerFactory<String, Reply> replyProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> requestReplyListenerContainerFactory() {
        var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(requestConsumerFactory());
        factory.setReplyTemplate(replyTemplate());
        return factory;
    }

    @Bean
    public KafkaTemplate<String, Reply> replyTemplate() {
        return new KafkaTemplate<>(replyProducerFactory());
    }

}

Бин kafkaTemplate в контексте есть.

Кто подскажет как настроить правильно потребитель?


Предыдущий оратор прав. Уже не говоря о том, что ты проперти-мэпы зачем-то бинами объявил. Смотри в сторону spring-kafka. Как прозреешь, можешь ещё взглянуть на spring-cloud-stream-binder-kafka, но вряд ли в ближайшем будущем тебе это может понадобиться.

bytecode ★★ ()
Ответ на: комментарий от slyjoeh

А как правильно?

Вот такую задачу пытаюсь решить:

У меня есть один производитель и два потребителя.

Производитель отправляет в первый потребитель fooDto1, в ответ получает barDto1. И, соответственно, во второй fooDto2, ответ - barDto2. Ответы получаю, пометив, потребителей @SendTo

Вот тут лежит код

Там падает исключение как в первом сообщении темы. Посмотри, пожалуйста, а то чуть в сторону от примеров из документации, и всё - затык, а подсказать не кому.

pol01 ()
Ответ на: комментарий от bytecode

Так я и пытаюсь использовать spring-kafka. Только пример пытаюсь сделать чуть сложнее, чем в документации. Можешь поделиться работающим примером обмена dto`шками, где kafka транспортом выступает? Ссылку на мой код (неработающий) я выше привел.

pol01 ()