Biblioteka Spring Kafka umożliwia obsługę błędów przez sprawdzony mechanizm exponential back-off. Jednak jeśli błędem jest HTTP 429 „Too Many Requests” (czyli rate limiting), zwykłe wykładnicze zwiększanie przerw między kolejnymi próbami jest trochę jak atak brute force: marnujemy zasoby na próbowanie przed czasem, albo marnujemy czas czekając za długo. Lepiej byłoby czekać dokładnie tyle, ile wysyła serwer.
W dalszej części pokażę, jak podejść do implementacji. W dużym skrócie, wystarczy napisanie jednej nieskomplikowanej klasy.
Spis treści
HTTP 429
Status HTTP 429 znany jako „Too Many Requests” powinien być używany przez typowe implementacje rate limitingu. Gdy wysyłamy za dużo zapytań, zaczną być odrzucane z tym statusem oraz informacją w nagłówku Retry-After, jak długo powinniśmy odczekać przed następną próbą. Mówi o tym RFC 7231.
Dependencje
Krótko: są 2.
Do implementacji nie są potrzebne żadne zewnętrzne biblioteki. Skoro artykuł jest o bibliotece spring-kafka, to taką dependencję przyjmę za oczywistość.
Skoro mamy zderzyć się z problemem rate limitingu przy wysyłaniu zapytań do obcego serwisu, potrzebna będzie biblioteka do ich wykonywania. Pozostając z ekosystemie Springa, przyjąłem że ta biblioteka to spring-web.
spring-web można spiąć z zewnętrzną biblioteką typu Apache Http Client, może też używać klienta HTTP wbudowanego w JVM. Kwestię tego, co jest używane jako konkretna implementacja, pomijam, ponieważ nie wpływa na sposób implementacji pokazany w artykule.
Spring ma też reaktywnego klienta. Wtedy zamiast spring-web, drugą dependencją jest spring-webflux.
Wpięcie w bibliotekę Spring Kafka
Skracając rozdział Handling Exceptions z dokumentacji: zazwyczaj obsługę błędów z wykorzystaniem exponential backoff ustawiamy mniej więcej tak:
@Bean
fun kafkaListenerContainerFactory(consumerFactory: ConsumerFactory<String, String>) =
ConcurrentKafkaListenerContainerFactory<String, String>().apply {
setConsumerFactory(consumerFactory)
setCommonErrorHandler(
DefaultErrorHandler(
ExponentialBackOffWithMaxRetries(6).apply {
initialInterval = 1_000
multiplier = 2.0
maxInterval = 120_000
}
)
)
}
Pierwszym krokiem jest wybranie bardziej rozbudowanego konstruktora klasy DefaultErrorHandler:
val backOffHandler = TooManyRequestsBackOffHandler(DefaultBackOffHandler()) val defaultErrorHandler = DefaultErrorHandler(null, backOff, backOffHandler)
Tutaj:
-
backOffto znany już namExponentialBackOffWithMaxRetries DefaultBackOffHandlerto klasa dostarczana przez bibliotekę; zajmuje się stosowaniem back-offu (nic nie liczy, robi mniej więcej to, coThread.sleep())TooManyRequestsBackOffHandlerto nasza klasa, która będzie przekazywać dalej zmodyfikowany czas pauzy
Interfejs, który musimy zaimplementować, nie jest przesadnie skomplikowany:
class TooManyRequestsBackOffHandler(private val delegate: BackOffHandler) : BackOffHandler {
override fun onNextBackOff(container: MessageListenerContainer?, exception: Exception, nextBackOff: Long) {
delegate.onNextBackOff(
container,
exception,
maxOf(
// unwrap ListenerExecutionFailedException
backOffFromException(exception.cause!!),
nextBackOff
)
)
}
}
Wpięcie w klienta HTTP
Jeśli używamy spring-web, miłą rzeczą jest, że niezależnie czy używamy starej klasy RestTemplate czy nowej klasy RestClient, czy jako implementacja jest używany klient HTTP z JDK czy klient Apache albo okHttp – zawsze rzucane są te same wyjątki, związane z klasą org.springframework.web.client.HttpClientErrorException.
Nasza implementacja może być więc tak prosta jak:
import org.springframework.web.client.HttpClientErrorException.TooManyRequests
private fun backOffFromException(exception: Throwable): Long {
if (exception !is TooManyRequests)
return Long.MIN_VALUE
return backOffFromHeaders(exception.responseHeaders!!)
}
W reaktywnym kliencie WebClient z spring-webflux hierarchia wyjątków jest osobna (wychodzi od org.springframework.web.reactive.function.client.), ale zasada działania jest dokładnie ta sama.
Parsowanie nagłówka Retry-After
Pewną komplikacją z perspektywy klienta jest, że RFC pozwala serwerowi wysyłać dwie zupełnie różne formy nagłówka:
Retry-After: Fri, 31 Dec 1999 23:59:59 GMT
Retry-After: 120
O ile druga postać nie nastręcza żadnych trudności (przeliczamy sekundy z nagłówka na milisekundy oczekiwane przez bibliotekę Spring Kafka), to przy formacie daty w pierwszym typie nagłówka trzeba się zatrzymać.
RFC opisuje go jako „HTTP-Date”. Niestety, w ekosystemie Javy nie jest to format daty obsługiwany „out of the box” przez bibliotekę standardową, a też znalezienie sposobu jego obsługi w Springu nie jest oczywiste.
Poszukiwania w źródłach klasy HttpHeaders pokazują to, czego nie zawiera JavaDoc: metoda getFirstDate() jest w stanie sparsować ten nagłówek. Oto proponowane użycie:
private fun backOffFromHeaders(httpHeaders: HttpHeaders): Long {
val retryAfterString = httpHeaders.getFirst(HttpHeaders.RETRY_AFTER)
?: return Long.MIN_VALUE
retryAfterString.toLongOrNull()?.let {
return it * 1000 + randomJitterMillis()
}
return try {
httpHeaders.getFirstDate(HttpHeaders.RETRY_AFTER) - System.currentTimeMillis() + randomJitterMillis()
} catch (e: IllegalArgumentException) {
Long.MIN_VALUE
}
}
Jitter i Retry Storm
Ostatnia mikrooptymalizacja to funkcja randomJitterMillis. Służy unikaniu zjawiska „Thundering Herd”/„Retry Storm”.
Problem powstaje, jeśli mamy wiele wątków i wszystkie otrzymują to samo HTTP 429 „po 14:31:00”. Jeśli każdy wątek czeka dokładnie do 14:31:00, a potem wysyła zapytanie, kilka rzeczy może pójść nie tak. Jeśli jeden wątek zawsze odpala się chwilę wcześniej niż długi, może nawet zagłodzić ten wolniejszy, kradnąc tokeny w rate limitingu. Zasoby sieciowe i pamięć dostają nagły spike użycia, a potem nie są używane.
Jitter, czyli mały losowy „szum”, może wygładzić taki spike, rozkładając zapytania na szerszy zakres czasu. Nie będę wchodził w dyskusję, jak to zrobić najlepiej, czy lepszy jest procentowy, czy bezwzględny. Na potrzeby tego artykułu weźmy coś prostego:
private fun randomJitterMillis(): Long =
Random.nextLong(100, 2_500)
Podsumowanie
Jesteśmy w stanie połączyć ze sobą dobre cechy wbudowanego w Spring Kafka mechanizmu exponential backoff (maksymalna liczba prób, próbujemy z coraz większą przerwą) i reagowania na standardowe nagłówki związane z rate limitingiem (nie strzelamy na ślepo z czasem przerwy, ale bierzemy informację udzieloną przez serwer).
Czemu nic takiego nie jest wbudowane w Spring Kafka? Mogę tylko zgadywać. Dobrą zasadą przy budowaniu bibliotek jest minimalizowanie zależności. Spring Kafka nie zawiera kodu związanego z zapytaniami REST-owymi. Byłoby dziwne, żeby twórcy dodali zależność od spring-web tylko po to, by obsługiwać błędy HTTP. Dodatkowo, są różne sposoby wykonywania zapytań: jest jeszcze spring-webflux z osobną hierarchią wyjątków, ktoś może pominąć biblioteki Springa i bezpośrednio używać jakiegoś klienta HTTP z własną obsługą błędów. Nie dziwi mnie więc, że trzeba samodzielnie napisać kod łączący rate limiting z back-offem biblioteki Spring Kafka.