Retry Topics - non blocking retries
Doskonale, przechodzimy do jednego z najpotężniejszych i najbardziej użytecznych mechanizmów w Spring for Kafka: Non-Blocking Retries (ponowienia nieblokujące). To nowoczesne podejście do obsługi błędów, które rozwiązuje wiele problemów klasycznego, blokującego ponawiania.
Czym są i dlaczego są lepsze niż blokujące ponowienia?
Problem z klasycznymi (blokującymi) ponowieniami:
Wyobraź sobie, że w metodzie @KafkaListener masz pętlę try-catch z Thread.sleep().
- Odbierasz wiadomość A.
- Przetwarzanie kończy się błędem (np. niedostępne API).
- Twój kod czeka 5 sekund (
Thread.sleep(5000)). - Próbuje ponownie. W tym czasie wątek konsumenta jest zablokowany. Nie może on przetwarzać żadnych innych wiadomości, nawet tych, które przyszły po wiadomości A na tej samej partycji i mogłyby być przetworzone pomyślnie. To prowadzi do zatrzymania całego przetwarzania na partycji i jest bardzo nieefektywne.
Rozwiązanie: Non-Blocking Retries Mechanizm ten działa zupełnie inaczej. Zamiast blokować wątek, robi coś znacznie sprytniejszego:
- Odbierasz wiadomość A.
- Przetwarzanie kończy się błędem.
- Zamiast czekać, Spring Kafka przechwytuje wyjątek i wysyła wiadomość A na specjalny, dedykowany topic ponowień (
retry topic), dodając do niej informację, kiedy powinna być przetworzona następnym razem. - Wątek konsumenta jest natychmiast wolny i może pobrać kolejną wiadomość (np. B) z oryginalnego topicu.
- Osobny listener nasłuchuje na topicu ponowień. Gdy nadejdzie czas (
timestampwiadomości), pobiera wiadomość A i próbuje ją przetworzyć ponownie.
To jest "nieblokujące", ponieważ główny wątek konsumenta nigdy nie czeka.
Implementacja Twojego scenariusza krok po kroku
Scenariusz:
- Obsługa wiadomości
CloudUploadRequest. - Ponowienie tylko w przypadku
SlowCloudUploadException. - Maksymalnie 3 próby.
- Opóźnienia: 5s, 20s, 1 minuta.
- Po nieudanych próbach wiadomość trafia do Dead Letter Topic (DLT).
- Listener DLT tworzy zdarzenie
CancelOperationEvent, aby anulować sagę.
Krok 1: Zdefiniowanie wyjątków i zdarzeń
// Wyjątek, który chcemy ponawiać
public class SlowCloudUploadException extends RuntimeException {
public SlowCloudUploadException(String message) {
super(message);
}
}
// Wyjątek, który NIE powinien być ponawiany (od razu do DLT)
public class InvalidFileFormatException extends RuntimeException {
public InvalidFileFormatException(String message) {
super(message);
}
}
// Model wiadomości przychodzącej
public class CloudUploadRequest {
private String fileId;
private String content;
// getters, setters, constructor...
}
// Zdarzenie anulujące sagę
public class CancelOperationEvent {
private String fileId;
private String reason;
// getters, setters, constructor...
}
Krok 2: Konfiguracja w kodzie (@RetryableTopic)
To najprostszy i najbardziej deklaratywny sposób konfiguracji. Używamy adnotacji @RetryableTopic bezpośrednio na metodzie listenera.
@Service
public class CloudUploadProcessor {
private final KafkaTemplate<String, Object> kafkaTemplate;
// Symulacja zawodnego serwisu
private int attemptCounter = 0;
public CloudUploadProcessor(KafkaTemplate<String, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@RetryableTopic(
// Opóźnienia w milisekundach: 5s, 20s, 60s
backoff = @Backoff(delay = 5000, multiplier = 4.0, maxDelay = 60000),
// Liczba prób = 1 (pierwsza) + 2 (ponowienia) = 3
attempts = "3",
// Ponawiaj TYLKO dla tego wyjątku
include = { SlowCloudUploadException.class },
// Można też wykluczyć inne: exclude = { InvalidFileFormatException.class }
// Włącza mechanizm DLT
dltStrategy = DltStrategy.FAIL_ON_ERROR
)
@KafkaListener(id = "upload-processor", topics = "cloud-uploads")
public void handleUpload(CloudUploadRequest request) {
System.out.println("Attempting to upload file: " + request.getFileId());
// Symulacja błędu walidacji (nie będzie ponawiany)
if (request.getContent() == null || request.getContent().isEmpty()) {
throw new InvalidFileFormatException("File content is empty for fileId: " + request.getFileId());
}
// Symulacja niestabilnego serwisu chmurowego
attemptCounter++;
if (attemptCounter <= 3) {
System.err.println("Cloud service is slow, throwing SlowCloudUploadException. Attempt: " + attemptCounter);
throw new SlowCloudUploadException("Upload failed for fileId: " + request.getFileId());
}
// Symulacja sukcesu po kilku próbach (ten kod nie zostanie osiągnięty w naszym przykładzie)
System.out.println("File " + request.getFileId() + " uploaded successfully!");
attemptCounter = 0; // Reset for next message
}
// Listener dla Dead Letter Topic
// Domyślna nazwa DLT to <nazwa-topicu>.dlt
@DltHandler
public void handleDlt(CloudUploadRequest failedRequest,
@Header(KafkaHeaders.RECEIVED_TOPIC) String originalTopic,
@Header(KafkaHeaders.EXCEPTION_MESSAGE) String exceptionMessage) {
System.err.println("--- DLT Handler ---");
System.err.println("Received failed message from topic: " + originalTopic);
System.err.println("Failed to upload file: " + failedRequest.getFileId());
System.err.println("Reason: " + exceptionMessage);
// Tworzymy event anulujący sagę
CancelOperationEvent cancelEvent = new CancelOperationEvent(
failedRequest.getFileId(),
"Upload failed after multiple retries."
);
// Wysyłamy event na odpowiedni topic
System.out.println("Publishing CancelOperationEvent for saga compensation...");
kafkaTemplate.send("saga-cancellation-events", cancelEvent);
}
}
Analiza adnotacji @RetryableTopic
backoff = @Backoff(...): Definiuje strategię opóźnień.delay = 5000: Pierwsze opóźnienie to 5 sekund.multiplier = 4.0: Każde kolejne opóźnienie jest mnożone przez ten czynnik (5s * 4 = 20s,20s * 4 = 80s, ale...).maxDelay = 60000: ...ale opóźnienie nigdy nie przekroczy 60 sekund. Więc sekwencja to 5s, 20s, 60s. Idealnie pasuje do Twoich wymagań.attempts = "3": Maksymalna liczba prób (pierwsza + ponowienia).include = { SlowCloudUploadException.class }: Lista wyjątków, które mają uruchomić mechanizm ponowień. Jeśli zostanie rzucony jakikolwiek inny wyjątek (np.InvalidFileFormatException), wiadomość od razu trafi do DLT (lub spowoduje błąd, w zależności od konfiguracji).dltStrategy = DltStrategy.FAIL_ON_ERROR: Ta strategia oznacza, że po wyczerpaniu prób wiadomość jest wysyłana do DLT. Inne opcje to np.ALWAYS_RETRY_ON_ERROR(wiadomość DLT jest też ponawiana).@DltHandler: Magiczna adnotacja Springa. Oznacza metodę, która ma być wywołana dla wiadomości z DLT powiązanego z tym listenerem. Spring automatycznie kieruje tu wiadomości z odpowiedniego topicu DLT.
Co się dzieje pod spodem?
- Spring widzi adnotację
@RetryableTopici automatycznie tworzy dodatkowe topici:
cloud-uploads-retry-0(lub podobna nazwa, z opóźnieniem 5s)cloud-uploads-retry-1(z opóźnieniem 20s)cloud-uploads-retry-2(z opóźnieniem 60s)cloud-uploads-dlt(Dead Letter Topic)- Uwaga: od nowszych wersji Spring Kafka (3.0+), domyślnie tworzony jest jeden topic retry z partycjami per opóźnienie, co jest bardziej efektywne.
- Tworzy również odpowiednich konsumentów dla tych topiców.
Przepływ wiadomości:
- Wiadomość trafia na
cloud-uploads. handleUploadrzucaSlowCloudUploadException.- Spring wysyła wiadomość na topic
cloud-uploads-retry-0. - Po 5 sekundach konsument topicu retry pobiera ją i przekazuje z powrotem do
handleUpload. - Proces się powtarza z opóźnieniem 20s (trafia na
cloud-uploads-retry-1). - Proces się powtarza z opóźnieniem 60s (trafia na
cloud-uploads-retry-2). handleUploadpo raz trzeci rzuca wyjątek. Liczba prób została wyczerpana.- Spring wysyła wiadomość na
cloud-uploads-dlt. - Listener oznaczony
@DltHandler(handleDlt) zostaje wywołany. handleDltpublikujeCancelOperationEvent, co pozwala innym częściom systemu zareagować i wycofać zmiany w ramach sagi.
Ten mechanizm jest niezwykle potężny, bo pozwala w sposób deklaratywny i czysty zaimplementować skomplikowaną logikę obsługi błędów, która jest odporna, wydajna i nie blokuje przetwarzania.