What is purpose of RoutingKafkaTemplate and why I should choos producer on runtime?
Jasne, świetne pytanie! Dokumentacja Springa bywa zwięzła, więc rozbijmy to na czynniki pierwsze.
Twoje pytania są bardzo trafne:
- Do czego to służy?
- Dlaczego miałbym wybierać producenta podczas działania aplikacji?
- Jak to w ogóle działa?
Odpowiem na nie po kolei.
1. Do czego służy RoutingKafkaTemplate?
W najprostszych słowach, RoutingKafkaTemplate to "inteligentny" KafkaTemplate, który potrafi używać różnych konfiguracji do wysyłania wiadomości, w zależności od nazwy tematu (topicu), na który wysyłasz.
Pomyśl o tym jak o spedytorze w firmie kurierskiej. Zamiast mieć jednego pracownika, który pakuje wszystkie paczki tak samo, masz jednego "głównego" spedytora, który patrzy na adres docelowy i decyduje:
- "Ta paczka jedzie za granicę, musi być w drewnianej skrzyni" (inna serializacja).
- "Ta paczka jedzie lokalnie, wystarczy karton" (standardowa serializacja).
- "Ta paczka jest bardzo cenna, musi mieć dodatkowe ubezpieczenie i plombę" (inna konfiguracja producenta, np.
acks=all).
RoutingKafkaTemplate jest właśnie takim spedytorem dla Twoich wiadomości Kafki. Jest to jeden, centralny punkt do wysyłania wiadomości, który pod spodem dynamicznie wybiera odpowiedniego "pracownika" (czyli ProducerFactory, który tworzy producentów z określoną konfiguracją).
2. Dlaczego miałbym wybierać producenta w trakcie działania aplikacji?
To jest kluczowe pytanie. Potrzeba taka pojawia się w bardziej złożonych systemach, gdzie jedna aplikacja musi komunikować się z różnymi częściami ekosystemu Kafki, które mają odmienne wymagania.
Oto najczęstsze powody:
- Różne formaty danych (serializacja): To jest główny przypadek użycia pokazany w przykładzie.
- Na topic
user-eventschcesz wysyłać obiektyUserserializowane do formatu JSON. - Na topic
image-uploadschcesz wysyłać surowe dane jako tablicę bajtów (byte[]). - Na topic
legacy-system-logschcesz wysyłać wiadomości jako zwykły String.
Bez RoutingKafkaTemplate, musiałbyś stworzyć i wstrzyknąć do swoich serwisów trzy różne beany KafkaTemplate:
-
KafkaTemplate<String, User> -
KafkaTemplate<String, byte[]> -
KafkaTemplate<String, String>To komplikuje kod. ZRoutingKafkaTemplatemasz jeden bean, a on sam decyduje, jakiego serializatora użyć na podstawie nazwy topicu. -
Różne wymagania dotyczące niezawodności:
-
Dla topicu
orders(zamówienia), chcesz mieć maksymalną gwarancję dostarczenia, więc ustawiaszacks=all. -
Dla topicu
user-tracking(śledzenie kliknięć), zależy Ci na wydajności i akceptujesz ewentualną utratę pojedynczych wiadomości, więc ustawiaszacks=1. -
Komunikacja z różnymi klastrami Kafki:
-
Wiadomości na topici
internal-processing-.*mają iść do wewnętrznego klastra Kafki. -
Wiadomości na topic
public-eventsmają iść do klastra w chmurze, który wymaga innej konfiguracji security (np. SASL/SSL).
We wszystkich tych przypadkach RoutingKafkaTemplate upraszcza kod aplikacji. Zamiast zarządzać wieloma KafkaTemplate, masz jeden, który załatwia wszystko.
3. Jak to w ogóle działa? (Analiza kodu krok po kroku)
Mechanizm działania opiera się na mapie, która wiąże wzorce (wyrażenia regularne) z odpowiednimi fabrykami producentów (ProducerFactory).
Przeanalizujmy kod z dokumentacji:
Krok 1: Przygotowanie "alternatywnej" fabryki producenta
// pf to domyślna ProducerFactory, prawdopodobnie skonfigurowana w application.properties
// z użyciem StringSerializer
ProducerFactory<Object, Object> pf
// 1. Klonujemy konfigurację z domyślnej fabryki
Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
// 2. NADPISUJEMY tylko jedną właściwość: serializator wartości
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
// 3. Tworzymy NOWĄ fabrykę producenta z tą zmodyfikowaną konfiguracją
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
W tym momencie mamy dwie fabryki:
pf: Domyślna, używającaStringSerializer.bytesPF: Specjalna, używającaByteArraySerializer.
Krok 2: Stworzenie mapy routingowej
// Używamy LinkedHashMap, ponieważ KOLEJNOŚĆ ma znaczenie!
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
// Reguła 1: Jeśli nazwa topicu pasuje do wzorca "two", użyj fabryki bytesPF
map.put(Pattern.compile("two"), bytesPF);
// Reguła 2: Jeśli nazwa topicu pasuje do wzorca ".+" (czyli cokolwiek), użyj domyślnej fabryki pf
map.put(Pattern.compile(".+"), pf);
Dlaczego kolejność jest ważna? RoutingKafkaTemplate przechodzi przez tę mapę w kolejności dodawania wpisów i używa pierwszej fabryki, której wzorzec pasuje do nazwy topicu.
Gdybyś odwrócił kolejność, wzorzec .+ (dopasuj cokolwiek) pasowałby do każdej nazwy topicu (w tym "two"), więc reguła dla topicu "two" nigdy nie zostałaby użyta. Dlatego bardziej szczegółowe wzorce umieszczamy na początku.
Krok 3: Stworzenie i użycie RoutingKafkaTemplate
// Tworzymy finalny template z naszą mapą reguł
return new RoutingKafkaTemplate(map);
// ... w innym miejscu aplikacji ...
@Bean
public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
return args -> {
// Wywołanie 1
routingTemplate.send("one", "thing1");
// Wywołanie 2
routingTemplate.send("two", "thing2".getBytes());
};
}
Co się dzieje podczas wywołań send?
routingTemplate.send("one", "thing1");
RoutingKafkaTemplatebierze nazwę topicu:"one".- Sprawdza pierwszą regułę w mapie: Czy
"one"pasuje do wzorca"two"? Nie. - Sprawdza drugą regułę: Czy
"one"pasuje do wzorca".+"? Tak. - Wynik: Używa fabryki
pf. Producent stworzony przezpfoczekujeString(bo maStringSerializer), więc wiadomość"thing1"jest serializowana poprawnie.
routingTemplate.send("two", "thing2".getBytes());
RoutingKafkaTemplatebierze nazwę topicu:"two".- Sprawdza pierwszą regułę w mapie: Czy
"two"pasuje do wzorca"two"? Tak. - Wynik: Używa fabryki
bytesPF. Producent stworzony przezbytesPFoczekujebyte[](bo maByteArraySerializer), więc wiadomość"thing2".getBytes()jest obsługiwana poprawnie.
Podsumowanie
RoutingKafkaTemplate to potężne narzędzie do centralizacji i upraszczania logiki wysyłania wiadomości w aplikacjach, które muszą obsługiwać topici o różnych wymaganiach (głównie dotyczących serializacji). Działa jak router, który na podstawie nazwy topicu kieruje wiadomość do odpowiednio skonfigurowanego producenta, dzięki czemu Twój kod biznesowy pozostaje czysty i nie musi martwić się o szczegóły techniczne Kafki.