Skip to main content

What is purpose of RoutingKafkaTemplate and why I should choos producer on runtime?

Jasne, świetne pytanie! Dokumentacja Springa bywa zwięzła, więc rozbijmy to na czynniki pierwsze.

Twoje pytania są bardzo trafne:

  1. Do czego to służy?
  2. Dlaczego miałbym wybierać producenta podczas działania aplikacji?
  3. Jak to w ogóle działa?

Odpowiem na nie po kolei.


1. Do czego służy RoutingKafkaTemplate?

W najprostszych słowach, RoutingKafkaTemplate to "inteligentny" KafkaTemplate, który potrafi używać różnych konfiguracji do wysyłania wiadomości, w zależności od nazwy tematu (topicu), na który wysyłasz.

Pomyśl o tym jak o spedytorze w firmie kurierskiej. Zamiast mieć jednego pracownika, który pakuje wszystkie paczki tak samo, masz jednego "głównego" spedytora, który patrzy na adres docelowy i decyduje:

  • "Ta paczka jedzie za granicę, musi być w drewnianej skrzyni" (inna serializacja).
  • "Ta paczka jedzie lokalnie, wystarczy karton" (standardowa serializacja).
  • "Ta paczka jest bardzo cenna, musi mieć dodatkowe ubezpieczenie i plombę" (inna konfiguracja producenta, np. acks=all).

RoutingKafkaTemplate jest właśnie takim spedytorem dla Twoich wiadomości Kafki. Jest to jeden, centralny punkt do wysyłania wiadomości, który pod spodem dynamicznie wybiera odpowiedniego "pracownika" (czyli ProducerFactory, który tworzy producentów z określoną konfiguracją).


2. Dlaczego miałbym wybierać producenta w trakcie działania aplikacji?

To jest kluczowe pytanie. Potrzeba taka pojawia się w bardziej złożonych systemach, gdzie jedna aplikacja musi komunikować się z różnymi częściami ekosystemu Kafki, które mają odmienne wymagania.

Oto najczęstsze powody:

  • Różne formaty danych (serializacja): To jest główny przypadek użycia pokazany w przykładzie.
  • Na topic user-events chcesz wysyłać obiekty User serializowane do formatu JSON.
  • Na topic image-uploads chcesz wysyłać surowe dane jako tablicę bajtów (byte[]).
  • Na topic legacy-system-logs chcesz wysyłać wiadomości jako zwykły String.

Bez RoutingKafkaTemplate, musiałbyś stworzyć i wstrzyknąć do swoich serwisów trzy różne beany KafkaTemplate:

  • KafkaTemplate<String, User>

  • KafkaTemplate<String, byte[]>

  • KafkaTemplate<String, String> To komplikuje kod. Z RoutingKafkaTemplate masz jeden bean, a on sam decyduje, jakiego serializatora użyć na podstawie nazwy topicu.

  • Różne wymagania dotyczące niezawodności:

  • Dla topicu orders (zamówienia), chcesz mieć maksymalną gwarancję dostarczenia, więc ustawiasz acks=all.

  • Dla topicu user-tracking (śledzenie kliknięć), zależy Ci na wydajności i akceptujesz ewentualną utratę pojedynczych wiadomości, więc ustawiasz acks=1.

  • Komunikacja z różnymi klastrami Kafki:

  • Wiadomości na topici internal-processing-.* mają iść do wewnętrznego klastra Kafki.

  • Wiadomości na topic public-events mają iść do klastra w chmurze, który wymaga innej konfiguracji security (np. SASL/SSL).

We wszystkich tych przypadkach RoutingKafkaTemplate upraszcza kod aplikacji. Zamiast zarządzać wieloma KafkaTemplate, masz jeden, który załatwia wszystko.


3. Jak to w ogóle działa? (Analiza kodu krok po kroku)

Mechanizm działania opiera się na mapie, która wiąże wzorce (wyrażenia regularne) z odpowiednimi fabrykami producentów (ProducerFactory).

Przeanalizujmy kod z dokumentacji:

Krok 1: Przygotowanie "alternatywnej" fabryki producenta

// pf to domyślna ProducerFactory, prawdopodobnie skonfigurowana w application.properties
// z użyciem StringSerializer
ProducerFactory<Object, Object> pf

// 1. Klonujemy konfigurację z domyślnej fabryki
Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());

// 2. NADPISUJEMY tylko jedną właściwość: serializator wartości
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);

// 3. Tworzymy NOWĄ fabrykę producenta z tą zmodyfikowaną konfiguracją
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);

W tym momencie mamy dwie fabryki:

  • pf: Domyślna, używająca StringSerializer.
  • bytesPF: Specjalna, używająca ByteArraySerializer.

Krok 2: Stworzenie mapy routingowej

// Używamy LinkedHashMap, ponieważ KOLEJNOŚĆ ma znaczenie!
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();

// Reguła 1: Jeśli nazwa topicu pasuje do wzorca "two", użyj fabryki bytesPF
map.put(Pattern.compile("two"), bytesPF);

// Reguła 2: Jeśli nazwa topicu pasuje do wzorca ".+" (czyli cokolwiek), użyj domyślnej fabryki pf
map.put(Pattern.compile(".+"), pf);

Dlaczego kolejność jest ważna? RoutingKafkaTemplate przechodzi przez tę mapę w kolejności dodawania wpisów i używa pierwszej fabryki, której wzorzec pasuje do nazwy topicu. Gdybyś odwrócił kolejność, wzorzec .+ (dopasuj cokolwiek) pasowałby do każdej nazwy topicu (w tym "two"), więc reguła dla topicu "two" nigdy nie zostałaby użyta. Dlatego bardziej szczegółowe wzorce umieszczamy na początku.

Krok 3: Stworzenie i użycie RoutingKafkaTemplate

// Tworzymy finalny template z naszą mapą reguł
return new RoutingKafkaTemplate(map);

// ... w innym miejscu aplikacji ...
@Bean
public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
return args -> {
// Wywołanie 1
routingTemplate.send("one", "thing1");
// Wywołanie 2
routingTemplate.send("two", "thing2".getBytes());
};
}

Co się dzieje podczas wywołań send?

  1. routingTemplate.send("one", "thing1");
  • RoutingKafkaTemplate bierze nazwę topicu: "one".
  • Sprawdza pierwszą regułę w mapie: Czy "one" pasuje do wzorca "two"? Nie.
  • Sprawdza drugą regułę: Czy "one" pasuje do wzorca ".+"? Tak.
  • Wynik: Używa fabryki pf. Producent stworzony przez pf oczekuje String (bo ma StringSerializer), więc wiadomość "thing1" jest serializowana poprawnie.
  1. routingTemplate.send("two", "thing2".getBytes());
  • RoutingKafkaTemplate bierze nazwę topicu: "two".
  • Sprawdza pierwszą regułę w mapie: Czy "two" pasuje do wzorca "two"? Tak.
  • Wynik: Używa fabryki bytesPF. Producent stworzony przez bytesPF oczekuje byte[] (bo ma ByteArraySerializer), więc wiadomość "thing2".getBytes() jest obsługiwana poprawnie.

Podsumowanie

RoutingKafkaTemplate to potężne narzędzie do centralizacji i upraszczania logiki wysyłania wiadomości w aplikacjach, które muszą obsługiwać topici o różnych wymaganiach (głównie dotyczących serializacji). Działa jak router, który na podstawie nazwy topicu kieruje wiadomość do odpowiednio skonfigurowanego producenta, dzięki czemu Twój kod biznesowy pozostaje czysty i nie musi martwić się o szczegóły techniczne Kafki.