What is event enveloper in avro model design ? Why use it instead of defining type per topic with event header?
Oczywiście, oto szczegółowe omówienie zarządzania schematami Avro w Kafce, koncentrujące się na dwóch głównych podejściach, z uwzględnieniem generowania kodu w Javie i kluczowej różnicy w obsłude typów przez konsumenta (wymagane rzutowanie vs. jego brak).
Wprowadzenie: Dlaczego Zarządzanie Schematami jest Kluczowe?
W ekosystemie Kafki, gdzie producent i konsument są od siebie odseparowani, kluczowe jest zapewnienie, że "rozumieją" oni nawzajem format danych. Bez tego mechanizmu, zmiana w strukturze danych po stronie producenta mogłaby spowodować awarię konsumenta w czasie rzeczywistym.
Avro w połączeniu z Confluent Schema Registry rozwiązuje ten problem.
- Avro: Format serializacji danych, który dołącza schemat do danych. Schemat w formacie JSON definiuje strukturę, typy pól i ich nazwy.
- Schema Registry: Scentralizowana usługa do przechowywania i zarządzania wersjami schematów. Producent przed wysłaniem wiadomości rejestruje jej schemat w Registry i otrzymuje unikalne ID. W wiadomości Kafka umieszcza tylko to ID, a nie cały schemat, co oszczędza miejsce. Konsument, odczytując wiadomość, pobiera ID, odpytuje Schema Registry o odpowiedni schemat i na jego podstawie deserializuje dane.
Przejdźmy teraz do dwóch architektonicznych podejść do organizacji tematów i schematów.
Podejście 1: Jeden Typ Eventu na Topic (One-Topic-Per-Event-Type)
Jest to najbardziej intuicyjne i proste podejście. Każdy odrębny typ zdarzenia (np. UserCreated, OrderPlaced, PaymentProcessed) ma swój dedykowany topic w Kafce.
Koncepcja
- Topic
users-createdprzechowuje tylko eventy typuUserCreated. - Topic
users-updatedprzechowuje tylko eventy typuUserUpdated. - Każdy topic jest powiązany z dokładnie jednym schematem Avro (lub jego kolejnymi, kompatybilnymi wersjami).
Przykład Schematu Avro (UserCreated.avsc)
{
"namespace": "com.example.events",
"type": "record",
"name": "UserCreated",
"fields": [
{ "name": "userId", "type": "string" },
{ "name": "username", "type": "string" },
{ "name": "email", "type": "string" },
{ "name": "timestamp", "type": "long", "logicalType": "timestamp-millis" }
]
}
Generowanie Typów w Javie
Używając avro-maven-plugin, na podstawie powyższego schematu zostanie wygenerowana klasa com.example.events.UserCreated. Jest to konkretna, silnie typowana klasa POJO (Plain Old Java Object) z getterami, setterami i konstruktorami.
// Wygenerowana klasa (uproszczony szkielet)
package com.example.events;
public class UserCreated extends org.apache.avro.specific.SpecificRecordBase {
private java.lang.String userId;
private java.lang.String username;
// ... gettery i settery
}
Kod Producenta (Java)
Producent jest skonfigurowany do pracy z konkretnym typem. Zwróć uwagę na typy generyczne KafkaProducer<String, UserCreated>.
// Konfiguracja producenta z Avro Serializer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
props.put("schema.registry.url", "http://schema-registry:8081");
KafkaProducer<String, UserCreated> producer = new KafkaProducer<>(props);
UserCreated event = UserCreated.newBuilder()
.setUserId("user-123")
.setUsername("jankowalski")
.setEmail("[email protected]")
.setTimestamp(System.currentTimeMillis())
.build();
ProducerRecord<String, UserCreated> record = new ProducerRecord<>("users-created", event.getUserId(), event);
producer.send(record);
producer.close();
Kod Konsumenta (Java)
To tutaj widać największą zaletę tego podejścia. Konsument subskrybujący topic users-created wie, że otrzyma wyłącznie obiekty typu UserCreated.
// Konfiguracja konsumenta z Avro Deserializer
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "user-processor");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put("schema.registry.url", "http://schema-registry:8081");
// Ważne: pozwala na deserializację do konkretnego typu, a nie generycznego Avro Record
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
KafkaConsumer<String, UserCreated> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("users-created"));
while (true) {
ConsumerRecords<String, UserCreated> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, UserCreated> record : records) {
// --- KLUCZOWY PUNKT ---
// Rzutowanie nie jest wymagane!
// Metoda record.value() zwraca bezpośrednio obiekt typu UserCreated.
UserCreated userEvent = record.value();
System.out.printf("Otrzymano nowego użytkownika: ID=%s, Nazwa=%s\n",
userEvent.getUserId(), userEvent.getUsername());
}
}
Zalety:
- Prostota: Łatwe do zrozumienia i wdrożenia.
- Silne typowanie: Pełne bezpieczeństwo typów na poziomie kompilacji zarówno dla producenta, jak i konsumenta.
- Czysty kod konsumenta: Brak potrzeby rzutowania (
casting) i instrukcjiif/elselubswitchdo sprawdzania typu. - Niezależna ewolucja: Zmiana schematu dla jednego typu zdarzenia nie wpływa na inne.
Wady:
- Proliferacja topików: Może prowadzić do bardzo dużej liczby topików w systemie, co utrudnia zarządzanie.
- Brak gwarancji kolejności między typami: Jeśli dla encji
Useristotna jest kolejność zdarzeńUserCreated->UserUpdated->UserDeleted, to podejście tego nie gwarantuje, ponieważ zdarzenia te trafiają do różnych topików (i potencjalnie różnych partycji).
Podejście 2: Wiele Typów Eventów w Jednym Topicu (Envelope/Avro Union)
To podejście polega na grupowaniu powiązanych logicznie zdarzeń w jednym topiku, np. user-events. Aby było to możliwe, używa się wzorca "koperty" (Envelope) i typu union w Avro.
Koncepcja
- Tworzymy jeden główny topic, np.
user-events. - Definiujemy schemat "koperty", np.
UserEventEnvelope. - Pole danych w tej kopercie jest typu
union, który może zawierać jeden z kilku możliwych typów zdarzeń (UserCreated,UserUpdateditp.).
Przykład Schematów Avro
- Schemat
UserCreated.avsc(taki sam jak w podejściu 1) - Schemat
UserDeleted.avsc
{
"namespace": "com.example.events",
"type": "record",
"name": "UserDeleted",
"fields": [
{ "name": "userId", "type": "string" },
{ "name": "reason", "type": ["null", "string"], "default": null },
{ "name": "timestamp", "type": "long", "logicalType": "timestamp-millis" }
]
}
- Schemat "Koperty"
UserEventEnvelope.avsc
{
"namespace": "com.example.events",
"type": "record",
"name": "UserEventEnvelope",
"fields": [
{ "name": "eventId", "type": "string" },
{ "name": "eventTimestamp", "type": "long", "logicalType": "timestamp-millis" },
// To jest kluczowy element - pole 'payload' może być jednym z dwóch typów.
{
"name": "payload",
"type": ["com.example.events.UserCreated", "com.example.events.UserDeleted"]
}
]
}
Generowanie Typów w Javie
Plugin Avro wygeneruje trzy klasy: UserCreated, UserDeleted oraz UserEventEnvelope. Klasa UserEventEnvelope będzie miała pole payload typu java.lang.Object, ponieważ kompilator Javy nie wie, który z typów z unii zostanie użyty w danym momencie.
Kod Producenta (Java)
Producent tworzy konkretny event, a następnie opakowuje go w "kopertę".
KafkaProducer<String, UserEventEnvelope> producer = ... // Konfiguracja jak wyżej
// Tworzenie eventu UserCreated
UserCreated createdPayload = UserCreated.newBuilder()...build();
// Opakowanie w kopertę
UserEventEnvelope envelope1 = UserEventEnvelope.newBuilder()
.setEventId(java.util.UUID.randomUUID().toString())
.setEventTimestamp(System.currentTimeMillis())
.setPayload(createdPayload) // Avro-plugin zapewni poprawną obsługę
.build();
producer.send(new ProducerRecord<>("user-events", createdPayload.getUserId(), envelope1));
// Analogicznie dla UserDeleted
UserDeleted deletedPayload = UserDeleted.newBuilder()...build();
UserEventEnvelope envelope2 = UserEventEnvelope.newBuilder()
.setEventId(java.util.UUID.randomUUID().toString())
.setEventTimestamp(System.currentTimeMillis())
.setPayload(deletedPayload)
.build();
producer.send(new ProducerRecord<>("user-events", deletedPayload.getUserId(), envelope2));
Kod Konsumenta (Java)
Konsument subskrybuje topic user-events i zawsze otrzymuje obiekt UserEventEnvelope. Jego zadaniem jest sprawdzenie, co znajduje się w środku.
KafkaConsumer<String, UserEventEnvelope> consumer = ... // Konfiguracja jak wyżej
consumer.subscribe(Collections.singletonList("user-events"));
while (true) {
ConsumerRecords<String, UserEventEnvelope> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, UserEventEnvelope> record : records) {
UserEventEnvelope envelope = record.value();
Object payload = envelope.getPayload(); // Zwraca java.lang.Object
// --- KLUCZOWY PUNKT ---
// Wymagane jest sprawdzenie typu i rzutowanie (casting).
if (payload instanceof UserCreated) {
UserCreated userCreatedEvent = (UserCreated) payload;
System.out.printf("Przetwarzanie UserCreated: ID=%s, Nazwa=%s\n",
userCreatedEvent.getUserId(), userCreatedEvent.getUsername());
} else if (payload instanceof UserDeleted) {
UserDeleted userDeletedEvent = (UserDeleted) payload;
System.out.printf("Przetwarzanie UserDeleted: ID=%s, Powód=%s\n",
userDeletedEvent.getUserId(), userDeletedEvent.getReason());
} else {
// Dobra praktyka: obsługa nieznanych typów
System.err.println("Otrzymano nieznany typ eventu: " + payload.getClass().getName());
}
}
}
Zalety:
- Mniej topików: Znacząco redukuje liczbę topików do zarządzania.
- Gwarancja kolejności: Wszystkie zdarzenia dotyczące tego samego klucza (np.
userId) trafiają do tej samej partycji tego samego topiku, co gwarantuje ich przetwarzanie w kolejności wysłania. Jest to kluczowe dla wzorców takich jak Event Sourcing. - Logiczne grupowanie: Eventy są naturalnie pogrupowane według domeny biznesowej.
Wady:
- Zwiększona złożoność: Schematy są bardziej skomplikowane (unia), a logika konsumenta wymaga dodatkowego kodu do obsługi różnych typów.
- Konieczność rzutowania: Kod konsumenta jest mniej "czysty" i podatny na błędy, jeśli zapomnimy obsłużyć nowy typ dodany do unii.
- Sprzężona ewolucja: Dodanie nowego typu eventu (np.
UserUpdated) wymaga modyfikacji schematu koperty (UserEventEnvelope.avsc), co jest zmianą wpływającą na wszystkich konsumentów tego topiku.
Podsumowanie i Rekomendacje
| Kryterium | Podejście 1 (Topic per Type) | Podejście 2 (Envelope/Union) |
|---|---|---|
| Złożoność | Niska | Wysoka |
| Silne typowanie | Pełne, od producenta do konsumenta | Tylko dla "koperty", zawartość jest typu Object |
| Logika konsumenta | Prosta, bez rzutowania | Złożona, wymaga instanceof i rzutowania |
| Zarządzanie topicami | Trudne (dużo topików) | Łatwe (mało topików) |
| Gwarancja kolejności | Tylko w obrębie jednego typu eventu | Gwarantowana dla różnych typów z tym samym kluczem |
| Ewolucja schematów | Niezależna dla każdego typu | Sprzężona - zmiana w unii wpływa na całość |
Kiedy wybrać które podejście?
-
Wybierz Podejście 1 (Jeden Typ na Topic), jeśli:
-
Priorytetem jest prostota i szybkość wdrożenia.
-
Liczba typów zdarzeń jest stosunkowo niewielka i łatwa do zarządzania.
-
Kolejność zdarzeń pomiędzy różnymi typami nie jest krytyczna.
-
Każdy typ eventu jest przetwarzany przez zupełnie inny, niezależny mikroserwis.
-
Wybierz Podejście 2 (Koperta/Unia), jeśli:
-
Bezwzględnie wymagasz gwarancji kolejności dla różnych zdarzeń dotyczących tej samej encji (np.
Created->Updated->Deleted). Jest to klasyczny przypadek użycia dla Event Sourcing. -
Chcesz ograniczyć liczbę topików w klastrze Kafki z powodów operacyjnych.
-
Eventy są ze sobą ściśle powiązane logicznie i często przetwarzane razem.