What is EIP (Enterprise Integration Patterns)?
Doskonały wybór! Wzorce Integracyjne (Enterprise Integration Patterns - EIP) to temat-rzeka, ale niezwykle ważny, zwłaszcza w świecie mikroserwisów i systemów rozproszonych. To one pozwalają zapanować nad chaosem komunikacji między różnymi aplikacjami.
Wzorce Integracyjne (Enterprise Integration Patterns - EIP)
Czym są i po co?
Wyobraź sobie, że budujesz system składający się z wielu niezależnych aplikacji (mikroserwisów, systemów legacy, zewnętrznych API). Każda z nich może:
- Mówić w innym "języku" (np. jedna używa JSON, inna XML).
- Korzystać z innego protokołu (HTTP, kolejki JMS, pliki FTP, bazy danych).
- Mieć inną dostępność (jedna odpowiada natychmiast, inna przetwarza dane wsadowo).
EIP to zbiór sprawdzonych rozwiązań i uniwersalny, wizualny język do opisywania, projektowania i implementowania przepływu danych między takimi systemami.
Książka autorstwa Gregora Hohpe i Bobby'ego Woolfa, "Enterprise Integration Patterns", jest biblią w tym temacie. Wprowadzili oni prostą i genialną analogię, która wszystko wyjaśnia.
Analogia: System Pocztowy
Pomyśl o integracji systemów jak o globalnym systemie pocztowym:
- Wiadomość (Message): To jak list lub paczka. Zawiera dane, które chcesz przesłać.
- Kanał (Channel): To jak skrzynka pocztowa lub taśmociąg na sortowni. Miejsce, przez które przepływają wiadomości.
- Nadawca i Odbiorca (Sender/Receiver): To Twoje aplikacje.
- Router, Filtr, Tłumacz (Router, Filter, Translator): To pracownicy poczty i maszyny na sortowni, które decydują, co zrobić z paczką.
EIP to właśnie opis ról tych "pracowników poczty" i zasad organizacji całej "sortowni".
Kluczowe Komponenty i Wzorce
Zanim przejdziemy do wzorców, poznajmy podstawowe elementy składowe:
- Wiadomość (Message): Podstawowa jednostka danych. Składa się z:
- Nagłówka (Header): Metadane o wiadomości (np. ID, adres zwrotny, typ danych).
- Ciała (Body / Payload): Właściwe dane, które przesyłamy (np. obiekt zamówienia w formacie JSON).
-
Kanał (Channel): "Rura", którą płyną wiadomości z jednego punktu do drugiego. Może to być np. kolejka komunikatów (RabbitMQ, Kafka) lub prosty bufor w pamięci.
-
Punkt Końcowy (Endpoint): "Brama" przez którą aplikacja łączy się z systemem integracyjnym, aby wysyłać lub odbierać wiadomości. Może to być kontroler REST, listener kolejki JMS, czy katalog na serwerze FTP.
A teraz najważniejsze wzorce, czyli co można z tymi wiadomościami robić:
1. Wzorce Przesyłania Wiadomości (Messaging Patterns)
- Point-to-Point: Jedna wiadomość ma dokładnie jednego odbiorcę. Jak list polecony. Idealne do zadań, które muszą być wykonane raz (np. przetworzenie płatności).
- Publish-Subscribe (Pub/Sub): Jeden nadawca wysyła wiadomość do kanału (zwanego "tematem"), a wszyscy zainteresowani subskrybenci otrzymują jej kopię. Jak gazeta lub newsletter. Idealne do rozgłaszania zdarzeń (np. "użytkownik się zarejestrował").
2. Wzorce Trasowania (Routing Patterns)
Decydują, dokąd ma trafić wiadomość.
-
Content-Based Router (Router oparty na treści): "Sortownik". Patrzy na treść wiadomości (lub jej nagłówki) i na tej podstawie kieruje ją do odpowiedniego kanału.
-
Przykład: Zamówienia od klientów VIP trafiają do kanału priorytetowego, a standardowe do zwykłego.
-
Message Filter (Filtr): "Ochroniarz". Decyduje, czy wiadomość może przejść dalej, czy ma być odrzucona.
-
Przykład: Przepuszczaj tylko wiadomości o statusie "ZATWIERDZONE", a resztę ignoruj.
-
Recipient List (Lista odbiorców): Podobne do routera, ale jedna wiadomość może być wysłana do wielu odbiorców na podstawie określonych reguł.
-
Przykład: Wiadomość o nowym produkcie jest wysyłana jednocześnie do serwisu inwentaryzacji, serwisu marketingu i serwisu analitycznego.
-
Splitter (Rozdzielacz): Dzieli jedną złożoną wiadomość na wiele mniejszych.
-
Przykład: Jedno duże zamówienie zawierające 10 produktów jest dzielone na 10 osobnych wiadomości, każda z jednym produktem, aby przetwarzać je niezależnie.
-
Aggregator (Agregator): "Kolekcjoner". Odwrotność Splittera. Czeka na serię powiązanych wiadomości i łączy je w jedną, spójną wiadomość.
-
Przykład: Po przetworzeniu wszystkich 10 wiadomości o produktach, agregator zbiera wyniki i tworzy jedną wiadomość z potwierdzeniem całego zamówienia.
3. Wzorce Transformacji (Transformation Patterns)
Zmieniają zawartość lub strukturę wiadomości.
-
Translator / Mapper (Tłumacz): Tłumaczy dane z jednego formatu na inny (np. z XML na JSON). Nie zmienia sensu biznesowego.
-
Przykład: Serwis A wysyła dane klienta w formacie XML, a serwis B oczekuje ich w formacie JSON. Translator dokonuje konwersji.
-
Content Enricher (Wzbogacacz treści): Pobiera dane z zewnętrznego źródła i dodaje je do wiadomości.
-
Przykład: Wiadomość zawiera tylko ID klienta. Wzbogacacz odpytuje bazę danych o pełne dane klienta (imię, nazwisko, adres) i dołącza je do wiadomości.
4. Wzorce Obsługi Błędów
- Dead-Letter Channel (Kanał martwych listów): Jeśli wiadomość nie może być przetworzona (np. z powodu błędu), zamiast ją odrzucać, jest przenoszona do specjalnego kanału "martwych listów". Dzięki temu można ją później przeanalizować, naprawić i ponownie przetworzyć. Niezwykle ważne dla niezawodności systemów.
Jak to wygląda w praktyce w Javie i Springu?
Nie musisz implementować tych wzorców od zera! Istnieją do tego potężne frameworki. Dwa najpopularniejsze w ekosystemie Javy to:
- Apache Camel: To szwajcarski scyzoryk do integracji. Udostępnia gotowe komponenty dla niemal każdego EIP i setki konektorów do różnych systemów (HTTP, Kafka, FTP, Salesforce, itd.). Definiujesz w nim "trasy" (routes), które opisują przepływ wiadomości.
// Przykład trasy w Apache Camel (Java DSL)
from("file:data/inbox") // Endpoint: pobierz pliki z katalogu
.unmarshal().jacksonxml(Order.class) // Translator: z XML do obiektu Java
.filter(simple("${body.isVip}")) // Filter: przepuść tylko VIPów
.to("jms:queue:vip_orders"); // Endpoint: wyślij do kolejki JMS
- Spring Integration: To część ekosystemu Spring. Pozwala na budowanie przepływów integracyjnych z wykorzystaniem komponentów Springa (beanów). Jest głęboko zintegrowany ze Springiem, co jest jego dużą zaletą, jeśli cała aplikacja jest oparta na tym frameworku. Konfiguracja jest bardzo podobna, choć często bardziej oparta na adnotacjach i beanach.
Podsumowanie korzyści
Stosowanie EIP daje Ci:
- Wspólny Język: Wszyscy w zespole (nawet architekci i analitycy) mogą rozmawiać o integracji, używając tych samych, dobrze zdefiniowanych pojęć.
- Luźne Powiązanie (Loose Coupling): Aplikacje nie wiedzą o sobie nawzajem. Komunikują się przez abstrakcyjne kanały, co ułatwia ich wymianę i modyfikację.
- Elastyczność i Skalowalność: Łatwo jest dodawać nowe kroki w przepływie, zmieniać logikę routingu czy skalować poszczególne części systemu niezależnie.
- Niezawodność: Wzorce takie jak Dead-Letter Channel pozwalają budować systemy odporne na błędy.
- Reużywalność: Gotowe komponenty z frameworków (Camel, Spring Integration) przyspieszają pracę i gwarantują, że używasz sprawdzonych rozwiązań.
Spring Integration Example
Oczywiście. Połączenie Spring Integration z wzorcem Outbox, Kafką i bazą danych (PostgreSQL) to doskonały, zaawansowany przykład pokazujący, jak budować niezawodne systemy rozproszone.
Zanim przejdziemy do kodu, wyjaśnijmy problem, który rozwiązuje wzorzec Outbox.
Problem: Atomowość operacji w systemach rozproszonych
Wyobraź sobie prostą operację: "Użytkownik tworzy zamówienie". W systemie musi się zadziać:
- Zapisanie zamówienia w bazie danych (PostgreSQL).
- Wysłanie zdarzenia
OrderCreateddo Kafki, aby inne serwisy (np. od płatności, wysyłki) mogły na nie zareagować.
Co się stanie, jeśli:
- Zapis do bazy się powiedzie, ale wysłanie do Kafki zawiedzie? → Masz "ciche" zamówienie w systemie, o którym nikt inny nie wie. Błąd.
- Wysłanie do Kafki się powiedzie, ale transakcja bazodanowa zostanie wycofana? → Inne serwisy zaczną przetwarzać zamówienie, które formalnie nie istnieje. Jeszcze gorszy błąd.
Nie da się objąć tych dwóch operacji (zapisu do bazy i wysłania do brokera) jedną, atomową transakcją.
Rozwiązanie: Wzorzec Transakcyjny Outbox
Wzorzec Outbox rozwiązuje ten problem w genialnie prosty sposób:
- W ramach jednej transakcji bazodanowej robisz dwie rzeczy:
- Zapisujesz dane biznesowe (np.
Orderw tabeliorders). - Zapisujesz zdarzenie, które chcesz wysłać, w specjalnej tabeli
outbox(np. jako JSON).
- Ponieważ obie operacje są w tej samej transakcji, albo obie się powiodą, albo obie zostaną wycofane. System pozostaje spójny.
- Osobny, asynchroniczny proces (zwany pollerem lub przekaźnikiem) regularnie odpytuje tabelę
outboxo nieprzetworzone zdarzenia. - Gdy poller znajdzie nowe zdarzenie, publikuje je w Kafce.
- Po pomyślnej publikacji, poller oznacza zdarzenie w tabeli
outboxjako "przetworzone".
Tutaj właśnie wkracza Spring Integration – idealnie nadaje się do zaimplementowania kroku 3, 4 i 5.
Przykład: Implementacja w Spring Boot
1. Zależności (pom.xml)
Potrzebujemy JPA, sterownika PostgreSQL, Kafki i Spring Integration (z modułami JPA i Kafka).
<dependencies>
<!-- Web i JPA -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Spring Integration & Kafka -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
</dependency>
<!-- Narzędzia pomocnicze -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
2. Konfiguracja (application.properties)
# Konfiguracja bazy danych PostgreSQL
spring.datasource.url=jdbc:postgresql://localhost:5432/mydatabase
spring.datasource.username=user
spring.datasource.password=password
# Pokazuje SQL generowany przez Hibernate
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=update
# Konfiguracja Kafki
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
3. Encje: Order i OutboxEvent
Nasza encja biznesowa i encja dla tabeli outbox.
// Encja biznesowa
@Entity
@Table(name = "orders")
@Data // Lombok
@NoArgsConstructor
public class Order {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String product;
private int quantity;
private Instant createdAt;
}
// Encja dla wzorca Outbox
@Entity
@Table(name = "outbox")
@Data // Lombok
@Builder // Lombok
@NoArgsConstructor
@AllArgsConstructor
public class OutboxEvent {
@Id
@GeneratedValue(strategy = GenerationType.UUID)
private UUID id;
private String aggregateType; // Np. "Order"
private String aggregateId; // Np. ID zamówienia
private String eventType; // Np. "OrderCreated"
private String payload; // Zdarzenie jako JSON
private Instant createdAt;
@Builder.Default
private boolean processed = false;
private Instant processedAt;
}
4. Serwis Biznesowy (Krok 1 i 2 z opisu)
Serwis, który w jednej transakcji zapisuje zamówienie i zdarzenie w tabeli outbox.
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxEventRepository outboxEventRepository;
private final ObjectMapper objectMapper; // Do serializacji do JSON
@Transactional
public Order createOrder(String product, int quantity) throws JsonProcessingException {
// 1. Stwórz i zapisz encję biznesową
Order order = new Order();
order.setProduct(product);
order.setQuantity(quantity);
order.setCreatedAt(Instant.now());
Order savedOrder = orderRepository.save(order);
// 2. Stwórz i zapisz zdarzenie w tabeli outbox w tej samej transakcji
String payload = objectMapper.writeValueAsString(savedOrder);
OutboxEvent event = OutboxEvent.builder()
.aggregateType("Order")
.aggregateId(savedOrder.getId().toString())
.eventType("OrderCreated")
.payload(payload)
.createdAt(Instant.now())
.build();
outboxEventRepository.save(event);
return savedOrder;
}
}
5. Konfiguracja Spring Integration (Krok 3, 4 i 5)
To jest serce mechanizmu. Tworzymy przepływ (Integration Flow), który:
- Odpytuje bazę danych o nieprzetworzone zdarzenia.
- Wysyła je do Kafki.
- Oznacza jako przetworzone.
@Configuration
@EnableIntegration
@RequiredArgsConstructor
public class OutboxIntegrationConfig {
private final EntityManagerFactory entityManagerFactory;
private final KafkaTemplate<String, String> kafkaTemplate;
private final OutboxEventRepository outboxEventRepository;
// Definiujemy kanał, którym będą płynąć wiadomości z bazy danych
@Bean
public MessageChannel outboxChannel() {
return new DirectChannel();
}
// 1. INBOUND ADAPTER: Poller odpytujący bazę danych
@Bean
public JpaInboundChannelAdapter jpaInboundAdapter() {
JpaInboundChannelAdapter adapter = new JpaInboundChannelAdapter(entityManagerFactory);
adapter.setQuery("SELECT e FROM OutboxEvent e WHERE e.processed = false ORDER BY e.createdAt ASC");
adapter.setMaxResults(10); // Pobieraj maksymalnie 10 zdarzeń na raz
adapter.setExpectSingleResult(false);
adapter.setEntityClass(OutboxEvent.class);
// Po udanym przetworzeniu całego przepływu, wykonaj zapytanie aktualizujące
adapter.setUpdateQuery("UPDATE OutboxEvent e SET e.processed = true, e.processedAt = CURRENT_TIMESTAMP WHERE e.id IN (:id)");
adapter.setUpdateParameterSourceFactory(new BeanPropertyParameterSourceFactory()); // Automatycznie mapuj 'id' z obiektu
return adapter;
}
// 2. INTEGRATION FLOW: Definicja całego przepływu
@Bean
public IntegrationFlow outboxFlow() {
return IntegrationFlows
// Zaczynamy od poller'a bazodanowego, który działa co 5 sekund
.from(jpaInboundAdapter(), config -> config.poller(Pollers.fixedDelay(5000)))
// Dzielimy listę zdarzeń na pojedyncze wiadomości
.split()
// Przekierowujemy każdą wiadomość do kanału `outboxChannel`
.channel(outboxChannel())
.get();
}
// 3. SERVICE ACTIVATOR: Komponent nasłuchujący na kanale i wysyłający do Kafki
@Bean
@ServiceActivator(inputChannel = "outboxChannel")
public MessageHandler kafkaMessageHandler() {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate);
// Temat Kafki jest dynamicznie pobierany z typu agregatu (np. "Order" -> temat "orders")
handler.setTopicExpression(new SpelExpressionParser().parseExpression("payload.aggregateType.toLowerCase() + 's'"));
// Klucz wiadomości to ID agregatu (dla partycjonowania)
handler.setMessageKeyExpression(new SpelExpressionParser().parseExpression("payload.aggregateId"));
// Ciało wiadomości to payload (JSON)
handler.setPayloadExpression(new SpelExpressionParser().parseExpression("payload.payload"));
return handler;
}
}
Podsumowanie przepływu
- Zewnętrzny klient wywołuje
orderService.createOrder(...). @Transactionalrozpoczyna transakcję.- Do tabeli
orderstrafia nowy wiersz. - Do tabeli
outboxtrafia nowy wiersz zprocessed = false. - Transakcja jest zatwierdzana. Dane są bezpieczne w PostgreSQL.
- Niezależnie, co 5 sekund,
jpaInboundAdapter(poller) ze Spring Integration wykonuje zapytanieSELECT ... WHERE processed = false. - Znajduje nasz nowy
OutboxEventi wysyła go jako wiadomość do przepływu. .split()zapewnia, że jeśli poller pobrał kilka zdarzeń, każde będzie przetwarzane osobno.- Wiadomość trafia na
outboxChannel. kafkaMessageHandler(Service Activator) odbiera wiadomość, wyciąga z niej temat, klucz i payload, a następnie wysyła do Kafki.- Jeśli wysyłka do Kafki się powiedzie,
jpaInboundAdapterwykonuje zdefiniowane zapytanieUPDATE ... SET processed = true, oznaczając zdarzenie jako zakończone. Jeśli wysyłka się nie powiedzie, zdarzenie nie zostanie zaktualizowane i będzie podjęta próba ponownej wysyłki w następnym cyklu pollera.
W ten sposób, używając Spring Integration, stworzyliśmy solidny, odporny na błędy i gwarantujący spójność danych mechanizm do komunikacji między serwisami.