Skip to main content

What is event enveloper in avro model design ? Why use it instead of defining type per topic with event header?

Oczywiście, oto szczegółowe omówienie zarządzania schematami Avro w Kafce, koncentrujące się na dwóch głównych podejściach, z uwzględnieniem generowania kodu w Javie i kluczowej różnicy w obsłude typów przez konsumenta (wymagane rzutowanie vs. jego brak).

Wprowadzenie: Dlaczego Zarządzanie Schematami jest Kluczowe?

W ekosystemie Kafki, gdzie producent i konsument są od siebie odseparowani, kluczowe jest zapewnienie, że "rozumieją" oni nawzajem format danych. Bez tego mechanizmu, zmiana w strukturze danych po stronie producenta mogłaby spowodować awarię konsumenta w czasie rzeczywistym.

Avro w połączeniu z Confluent Schema Registry rozwiązuje ten problem.

  • Avro: Format serializacji danych, który dołącza schemat do danych. Schemat w formacie JSON definiuje strukturę, typy pól i ich nazwy.
  • Schema Registry: Scentralizowana usługa do przechowywania i zarządzania wersjami schematów. Producent przed wysłaniem wiadomości rejestruje jej schemat w Registry i otrzymuje unikalne ID. W wiadomości Kafka umieszcza tylko to ID, a nie cały schemat, co oszczędza miejsce. Konsument, odczytując wiadomość, pobiera ID, odpytuje Schema Registry o odpowiedni schemat i na jego podstawie deserializuje dane.

Przejdźmy teraz do dwóch architektonicznych podejść do organizacji tematów i schematów.


Podejście 1: Jeden Typ Eventu na Topic (One-Topic-Per-Event-Type)

Jest to najbardziej intuicyjne i proste podejście. Każdy odrębny typ zdarzenia (np. UserCreated, OrderPlaced, PaymentProcessed) ma swój dedykowany topic w Kafce.

Koncepcja

  • Topic users-created przechowuje tylko eventy typu UserCreated.
  • Topic users-updated przechowuje tylko eventy typu UserUpdated.
  • Każdy topic jest powiązany z dokładnie jednym schematem Avro (lub jego kolejnymi, kompatybilnymi wersjami).

Przykład Schematu Avro (UserCreated.avsc)

{
"namespace": "com.example.events",
"type": "record",
"name": "UserCreated",
"fields": [
{ "name": "userId", "type": "string" },
{ "name": "username", "type": "string" },
{ "name": "email", "type": "string" },
{ "name": "timestamp", "type": "long", "logicalType": "timestamp-millis" }
]
}

Generowanie Typów w Javie

Używając avro-maven-plugin, na podstawie powyższego schematu zostanie wygenerowana klasa com.example.events.UserCreated. Jest to konkretna, silnie typowana klasa POJO (Plain Old Java Object) z getterami, setterami i konstruktorami.

// Wygenerowana klasa (uproszczony szkielet)
package com.example.events;

public class UserCreated extends org.apache.avro.specific.SpecificRecordBase {
private java.lang.String userId;
private java.lang.String username;
// ... gettery i settery
}

Kod Producenta (Java)

Producent jest skonfigurowany do pracy z konkretnym typem. Zwróć uwagę na typy generyczne KafkaProducer<String, UserCreated>.

// Konfiguracja producenta z Avro Serializer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
props.put("schema.registry.url", "http://schema-registry:8081");

KafkaProducer<String, UserCreated> producer = new KafkaProducer<>(props);

UserCreated event = UserCreated.newBuilder()
.setUserId("user-123")
.setUsername("jankowalski")
.setEmail("[email protected]")
.setTimestamp(System.currentTimeMillis())
.build();

ProducerRecord<String, UserCreated> record = new ProducerRecord<>("users-created", event.getUserId(), event);
producer.send(record);
producer.close();

Kod Konsumenta (Java)

To tutaj widać największą zaletę tego podejścia. Konsument subskrybujący topic users-created wie, że otrzyma wyłącznie obiekty typu UserCreated.

// Konfiguracja konsumenta z Avro Deserializer
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "user-processor");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put("schema.registry.url", "http://schema-registry:8081");
// Ważne: pozwala na deserializację do konkretnego typu, a nie generycznego Avro Record
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");

KafkaConsumer<String, UserCreated> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("users-created"));

while (true) {
ConsumerRecords<String, UserCreated> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, UserCreated> record : records) {
// --- KLUCZOWY PUNKT ---
// Rzutowanie nie jest wymagane!
// Metoda record.value() zwraca bezpośrednio obiekt typu UserCreated.
UserCreated userEvent = record.value();

System.out.printf("Otrzymano nowego użytkownika: ID=%s, Nazwa=%s\n",
userEvent.getUserId(), userEvent.getUsername());
}
}

Zalety:

  • Prostota: Łatwe do zrozumienia i wdrożenia.
  • Silne typowanie: Pełne bezpieczeństwo typów na poziomie kompilacji zarówno dla producenta, jak i konsumenta.
  • Czysty kod konsumenta: Brak potrzeby rzutowania (casting) i instrukcji if/else lub switch do sprawdzania typu.
  • Niezależna ewolucja: Zmiana schematu dla jednego typu zdarzenia nie wpływa na inne.

Wady:

  • Proliferacja topików: Może prowadzić do bardzo dużej liczby topików w systemie, co utrudnia zarządzanie.
  • Brak gwarancji kolejności między typami: Jeśli dla encji User istotna jest kolejność zdarzeń UserCreated -> UserUpdated -> UserDeleted, to podejście tego nie gwarantuje, ponieważ zdarzenia te trafiają do różnych topików (i potencjalnie różnych partycji).

Podejście 2: Wiele Typów Eventów w Jednym Topicu (Envelope/Avro Union)

To podejście polega na grupowaniu powiązanych logicznie zdarzeń w jednym topiku, np. user-events. Aby było to możliwe, używa się wzorca "koperty" (Envelope) i typu union w Avro.

Koncepcja

  • Tworzymy jeden główny topic, np. user-events.
  • Definiujemy schemat "koperty", np. UserEventEnvelope.
  • Pole danych w tej kopercie jest typu union, który może zawierać jeden z kilku możliwych typów zdarzeń (UserCreated, UserUpdated itp.).

Przykład Schematów Avro

  1. Schemat UserCreated.avsc (taki sam jak w podejściu 1)
  2. Schemat UserDeleted.avsc
{
"namespace": "com.example.events",
"type": "record",
"name": "UserDeleted",
"fields": [
{ "name": "userId", "type": "string" },
{ "name": "reason", "type": ["null", "string"], "default": null },
{ "name": "timestamp", "type": "long", "logicalType": "timestamp-millis" }
]
}
  1. Schemat "Koperty" UserEventEnvelope.avsc
{
"namespace": "com.example.events",
"type": "record",
"name": "UserEventEnvelope",
"fields": [
{ "name": "eventId", "type": "string" },
{ "name": "eventTimestamp", "type": "long", "logicalType": "timestamp-millis" },
// To jest kluczowy element - pole 'payload' może być jednym z dwóch typów.
{
"name": "payload",
"type": ["com.example.events.UserCreated", "com.example.events.UserDeleted"]
}
]
}

Generowanie Typów w Javie

Plugin Avro wygeneruje trzy klasy: UserCreated, UserDeleted oraz UserEventEnvelope. Klasa UserEventEnvelope będzie miała pole payload typu java.lang.Object, ponieważ kompilator Javy nie wie, który z typów z unii zostanie użyty w danym momencie.

Kod Producenta (Java)

Producent tworzy konkretny event, a następnie opakowuje go w "kopertę".

KafkaProducer<String, UserEventEnvelope> producer = ... // Konfiguracja jak wyżej

// Tworzenie eventu UserCreated
UserCreated createdPayload = UserCreated.newBuilder()...build();

// Opakowanie w kopertę
UserEventEnvelope envelope1 = UserEventEnvelope.newBuilder()
.setEventId(java.util.UUID.randomUUID().toString())
.setEventTimestamp(System.currentTimeMillis())
.setPayload(createdPayload) // Avro-plugin zapewni poprawną obsługę
.build();

producer.send(new ProducerRecord<>("user-events", createdPayload.getUserId(), envelope1));

// Analogicznie dla UserDeleted
UserDeleted deletedPayload = UserDeleted.newBuilder()...build();
UserEventEnvelope envelope2 = UserEventEnvelope.newBuilder()
.setEventId(java.util.UUID.randomUUID().toString())
.setEventTimestamp(System.currentTimeMillis())
.setPayload(deletedPayload)
.build();

producer.send(new ProducerRecord<>("user-events", deletedPayload.getUserId(), envelope2));

Kod Konsumenta (Java)

Konsument subskrybuje topic user-events i zawsze otrzymuje obiekt UserEventEnvelope. Jego zadaniem jest sprawdzenie, co znajduje się w środku.

KafkaConsumer<String, UserEventEnvelope> consumer = ... // Konfiguracja jak wyżej

consumer.subscribe(Collections.singletonList("user-events"));

while (true) {
ConsumerRecords<String, UserEventEnvelope> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, UserEventEnvelope> record : records) {
UserEventEnvelope envelope = record.value();
Object payload = envelope.getPayload(); // Zwraca java.lang.Object

// --- KLUCZOWY PUNKT ---
// Wymagane jest sprawdzenie typu i rzutowanie (casting).

if (payload instanceof UserCreated) {
UserCreated userCreatedEvent = (UserCreated) payload;
System.out.printf("Przetwarzanie UserCreated: ID=%s, Nazwa=%s\n",
userCreatedEvent.getUserId(), userCreatedEvent.getUsername());
} else if (payload instanceof UserDeleted) {
UserDeleted userDeletedEvent = (UserDeleted) payload;
System.out.printf("Przetwarzanie UserDeleted: ID=%s, Powód=%s\n",
userDeletedEvent.getUserId(), userDeletedEvent.getReason());
} else {
// Dobra praktyka: obsługa nieznanych typów
System.err.println("Otrzymano nieznany typ eventu: " + payload.getClass().getName());
}
}
}

Zalety:

  • Mniej topików: Znacząco redukuje liczbę topików do zarządzania.
  • Gwarancja kolejności: Wszystkie zdarzenia dotyczące tego samego klucza (np. userId) trafiają do tej samej partycji tego samego topiku, co gwarantuje ich przetwarzanie w kolejności wysłania. Jest to kluczowe dla wzorców takich jak Event Sourcing.
  • Logiczne grupowanie: Eventy są naturalnie pogrupowane według domeny biznesowej.

Wady:

  • Zwiększona złożoność: Schematy są bardziej skomplikowane (unia), a logika konsumenta wymaga dodatkowego kodu do obsługi różnych typów.
  • Konieczność rzutowania: Kod konsumenta jest mniej "czysty" i podatny na błędy, jeśli zapomnimy obsłużyć nowy typ dodany do unii.
  • Sprzężona ewolucja: Dodanie nowego typu eventu (np. UserUpdated) wymaga modyfikacji schematu koperty (UserEventEnvelope.avsc), co jest zmianą wpływającą na wszystkich konsumentów tego topiku.

Podsumowanie i Rekomendacje

KryteriumPodejście 1 (Topic per Type)Podejście 2 (Envelope/Union)
ZłożonośćNiskaWysoka
Silne typowaniePełne, od producenta do konsumentaTylko dla "koperty", zawartość jest typu Object
Logika konsumentaProsta, bez rzutowaniaZłożona, wymaga instanceof i rzutowania
Zarządzanie topicamiTrudne (dużo topików)Łatwe (mało topików)
Gwarancja kolejnościTylko w obrębie jednego typu eventuGwarantowana dla różnych typów z tym samym kluczem
Ewolucja schematówNiezależna dla każdego typuSprzężona - zmiana w unii wpływa na całość

Kiedy wybrać które podejście?

  • Wybierz Podejście 1 (Jeden Typ na Topic), jeśli:

  • Priorytetem jest prostota i szybkość wdrożenia.

  • Liczba typów zdarzeń jest stosunkowo niewielka i łatwa do zarządzania.

  • Kolejność zdarzeń pomiędzy różnymi typami nie jest krytyczna.

  • Każdy typ eventu jest przetwarzany przez zupełnie inny, niezależny mikroserwis.

  • Wybierz Podejście 2 (Koperta/Unia), jeśli:

  • Bezwzględnie wymagasz gwarancji kolejności dla różnych zdarzeń dotyczących tej samej encji (np. Created -> Updated -> Deleted). Jest to klasyczny przypadek użycia dla Event Sourcing.

  • Chcesz ograniczyć liczbę topików w klastrze Kafki z powodów operacyjnych.

  • Eventy są ze sobą ściśle powiązane logicznie i często przetwarzane razem.