使用 guava-retrying 實(shí)現(xiàn)靈活的重試機(jī)制
在業(yè)務(wù)中可能會(huì)出現(xiàn)接口調(diào)用失敗、網(wǎng)絡(luò)擁塞超時(shí)、任務(wù)執(zhí)行失敗、系統(tǒng)錯(cuò)誤等異常情況,需要進(jìn)行重試操作。但某些場(chǎng)景下我們對(duì)重試有特殊要求,比如延遲重試、降頻重試等,此時(shí)自己編寫重試代碼會(huì)很繁瑣,在 Java 中,可以使用 guava-retrying 幫我們實(shí)現(xiàn)靈活的重試機(jī)制。
guava-retrying 簡介
guava-retrying 是一個(gè)線程安全的 Java 重試類庫,提供了一種通用方法去處理任意需要重試的代碼,可以方便靈活地控制重試次數(shù)、重試時(shí)機(jī)、重試頻率、停止時(shí)機(jī)等,并具有異常處理功能。
GitHub地址:https://github.com/rholder/guava-retrying
有意思的是,這個(gè)項(xiàng)目最初源于 Jean-Baptiste Nizet 在 guava 倉庫下的評(píng)論。
guava-retrying 入門
下面通過一個(gè)場(chǎng)景幫助大家快速入門 guava-retrying,再具體講解其更多用法。
作者在 GitHub 提供了入門代碼,先通過 maven 或 gradle 引入:
maven:
<dependency> <groupId>com.github.rholder</groupId> <artifactId>guava-retrying</artifactId> <version>2.0.0</version> </dependency>
gradle:
compile "com.github.rholder:guava-retrying:2.0.0"
假定我們需要調(diào)用一個(gè)qps限制很低的第三方接口,如果調(diào)用失敗,需要依次在失敗后的第10s、30s、60s進(jìn)行降頻重試。
如果不使用框架,實(shí)現(xiàn)邏輯大致如下:
// 調(diào)用接口 boolean result; AtomicInteger atomicInteger = new AtomicInteger(0); int sleepNum = 10000; while(!result && atomicInteger.get() < 4) { atomicInteger.incrementAndGet(); result = thirdApi.invoke(); Thread.sleep(sleepNum); sleepNum += sleepNum * atomicInteger.get(); }
雖然看起來代碼行數(shù)并不多,只需要自己定義計(jì)數(shù)器、計(jì)算休眠時(shí)間等,但是再考慮到異常處理、異步等情況,重試邏輯的代碼占整體代碼的比重太大了(真正的業(yè)務(wù)邏輯只有 thirdApi.invoke 對(duì)么?)。如果業(yè)務(wù)中多處需要重試,還要反復(fù)編寫類似的代碼,而這不應(yīng)該是開發(fā)者關(guān)心的。
guava-retrying 為我們封裝了一套很好的通用重試方法,來試下用它實(shí)現(xiàn)上述邏輯:
Callable<Boolean> callable = () -> { return thirdApi.invoke(); // 業(yè)務(wù)邏輯 }; // 定義重試器 Retryer<Boolean> retryer = RetryerBuilder.<Boolean>newBuilder() .retryIfResult(Predicates.<Boolean>isNull()) // 如果結(jié)果為空則重試 .retryIfExceptionOfType(IOException.class) // 發(fā)生IO異常則重試 .retryIfRuntimeException() // 發(fā)生運(yùn)行時(shí)異常則重試 .withWaitStrategy(WaitStrategies.incrementingWait(10, TimeUnit.SECONDS, 10, TimeUnit.SECONDS)) // 等待 .withStopStrategy(StopStrategies.stopAfterAttempt(4)) // 允許執(zhí)行4次(首次執(zhí)行 + 最多重試3次) .build(); try { retryer.call(callable); // 執(zhí)行 } catch (RetryException | ExecutionException e) { // 重試次數(shù)超過閾值或被強(qiáng)制中斷 e.printStackTrace(); }
分析上述代碼:
首先定義了一個(gè) Callable 任務(wù),其中執(zhí)行我們需要重試的業(yè)務(wù)邏輯。
通過 RetryerBuilder 構(gòu)造重試器,構(gòu)造包含如下部分:
重試條件 retryIfResult、retryIfExceptionOfType、retryIfRuntimeException
重試等待策略(延遲)withWaitStrategy
重試停止策略 withStopStrategy
阻塞策略、超時(shí)限制、注冊(cè)重試***(上述代碼未使用)
通過 retryer.call 執(zhí)行任務(wù)
當(dāng)重試次數(shù)超過設(shè)定值或者被強(qiáng)制中斷時(shí),會(huì)拋出異常,需要捕獲處理
通過上述代碼我們定義了一個(gè)重試器來實(shí)現(xiàn)降頻重試機(jī)制。顯然這種方式相較自己實(shí)現(xiàn)重試來說具有如下優(yōu)點(diǎn):
- 對(duì)代碼的侵入性更小
- 更直觀,改動(dòng)方便
- 可復(fù)用重試器至多個(gè)任務(wù)(代碼段)
RetryerBuilder 方法介紹
RetryerBuilder 用于構(gòu)造重試器,是整個(gè) guava-retrying 庫的核心,決定了重試的行為,下面詳細(xì)介紹 RetryerBuilder 的方法。
通過 newBuilder 方法獲取 RetryerBuilder 實(shí)例,通過 build 方法構(gòu)造 Retryer:
RetryerBuilder<V> newBuilder() Retryer<V> build()
可以通過下面的方法改變重試器的行為:
重試條件
根據(jù)執(zhí)行結(jié)果判斷是否重試 retryIfResult :
RetryerBuilder<V> retryIfResult(@Nonnull Predicate<V> resultPredicate)
發(fā)生異常時(shí)重試
// 發(fā)生任何異常都重試 retryIfException() // 發(fā)生 Runtime 異常都重試 RetryerBuilder<V> retryIfRuntimeException() // 發(fā)生指定 type 異常時(shí)重試 RetryerBuilder<V> retryIfExceptionOfType(@Nonnull Class<? extends Throwable> exceptionClass) // 匹配到指定類型異常時(shí)重試 RetryerBuilder<V> retryIfException(@Nonnull Predicate<Throwable> exceptionPredicate)
等待策略
等待策略可以控制重試的時(shí)間間隔,通過 withWaitStrategy 方法注冊(cè)等待策略:
RetryerBuilder<V> withWaitStrategy(@Nonnull WaitStrategy waitStrategy) throws IllegalStateException
WaitStrategy 是等待策略接口,可通過 WaitStrategies 的方法生成該接口的策略實(shí)現(xiàn)類,共有7種策略:
FixedWaitStrategy:固定等待時(shí)長策略,比如每次重試等待5s
// 參數(shù):等待時(shí)間,時(shí)間單位 WaitStrategy fixedWait(long sleepTime, @Nonnull TimeUnit timeUnit) throws IllegalStateException
RandomWaitStrategy:隨機(jī)等待時(shí)長策略,每次重試等待指定區(qū)間的隨機(jī)時(shí)長
// 參數(shù):隨機(jī)上限,時(shí)間單位 WaitStrategy randomWait(long maximumTime, @Nonnull TimeUnit timeUnit) // 參數(shù):隨機(jī)下限,下限時(shí)間單位,隨機(jī)上限,上限時(shí)間單位 WaitStrategy randomWait(long minimumTime, @Nonnull TimeUnit minimumTimeUnit, long maximumTime, @Nonnull TimeUnit maximumTimeUnit)
IncrementingWaitStrategy:遞增等待時(shí)長策略,指定初始等待值,然后重試間隔隨次數(shù)等差遞增,比如依次等待10s、30s、60s(遞增值為10)
// 參數(shù):初始等待時(shí)長,初始值時(shí)間單位,遞增值,遞增值時(shí)間單位 WaitStrategy incrementingWait(long initialSleepTime, @Nonnull TimeUnit initialSleepTimeUnit, long increment, @Nonnull TimeUnit incrementTimeUnit)
ExponentialWaitStrategy:指數(shù)等待時(shí)長策略,指定初始值,然后每次重試間隔乘2(即間隔為2的冪次方),如依次等待 2s、6s、14s??梢栽O(shè)置最大等待時(shí)長,達(dá)到最大值后每次重試將等待最大時(shí)長。
// 無參數(shù)(默認(rèn)初始值為1) WaitStrategy exponentialWait() // 參數(shù):最大等待時(shí)長,最大等待時(shí)間單位(默認(rèn)初始值為1) WaitStrategy exponentialWait(long maximumTime, @Nonnull TimeUnit maximumTimeUnit) // 參數(shù):初始值,最大等待時(shí)長,最大等待時(shí)間單位 WaitStrategy exponentialWait(long multiplier, long maximumTime, @Nonnull TimeUnit maximumTimeUnit)
FibonacciWaitStrategy :斐波那契等待時(shí)長策略,類似指數(shù)等待時(shí)長策略,間隔時(shí)長為斐波那契數(shù)列。
// 無參數(shù)(默認(rèn)初始值為1) WaitStrategy fibonacciWait() // 參數(shù):最大等待時(shí)長,最大等待時(shí)間單位(默認(rèn)初始值為1) WaitStrategy fibonacciWait(long maximumTime, @Nonnull TimeUnit maximumTimeUnit) // 參數(shù):最大等待時(shí)長,最大等待時(shí)間單位(默認(rèn)初始值為1) WaitStrategy fibonacciWait(long multiplier, long maximumTime, @Nonnull TimeUnit maximumTimeUnit)
ExceptionWaitStrategy:異常時(shí)長等待策略,根據(jù)出現(xiàn)的異常類型決定等待的時(shí)長
// 參數(shù):異常類型,計(jì)算等待時(shí)長的函數(shù) <T extends Throwable> WaitStrategy exceptionWait(@Nonnull Class<T> exceptionClass, @Nonnull Function<T, Long> function)
CompositeWaitStrategy :復(fù)合時(shí)長等待策略,可以組合多個(gè)等待策略,基本可以滿足所有等待時(shí)長的需求
// 參數(shù):等待策略數(shù)組 WaitStrategy join(WaitStrategy... waitStrategies)
阻塞策略
阻塞策略控制當(dāng)前重試結(jié)束至下次重試開始前的行為,通過 withBlockStrategy 方法注冊(cè)阻塞策略:
RetryerBuilder<V> withBlockStrategy(@Nonnull BlockStrategy blockStrategy) throws IllegalStateException
BlockStrategy 是等待策略接口,可通過 BlockStrategies 的方法生成實(shí)現(xiàn)類,默認(rèn)只提供一種策略 ThreadSleepStrategy:
@Immutable private static class ThreadSleepStrategy implements BlockStrategy { @Override public void block(long sleepTime) throws InterruptedException { Thread.sleep(sleepTime); } }
很好理解,除了睡眠,阻塞著啥也不干。
停止策略
停止策略決定了何時(shí)停止重試,比如限制次數(shù)、時(shí)間等,通過 withStopStrategy 方法注冊(cè)等待策略:
RetryerBuilder<V> withStopStrategy(@Nonnull StopStrategy stopStrategy) throws IllegalStateException
可通過 StopStrategies 的方法生成 StopStrategy 接口的策略實(shí)現(xiàn)類,共有3種策略:
- NeverStopStrategy:永不停止,直到重試成功
- StopAfterAttemptStrategy:指定最多重試次數(shù),超過次數(shù)拋出 RetryException 異常
- StopAfterDelayStrategy:指定最長重試時(shí)間,超時(shí)則中斷當(dāng)前任務(wù)執(zhí)行且不再重試,并拋出 RetryException 異常
超時(shí)限制
通過 withAttemptTimeLimiter 方法為任務(wù)添加單次執(zhí)行時(shí)間限制,超時(shí)則中斷執(zhí)行,繼續(xù)重試。
RetryerBuilder<V> withAttemptTimeLimiter(@Nonnull AttemptTimeLimiter<V> attemptTimeLimiter)
默認(rèn)提供了兩種 AttemptTimeLimiter:
- NoAttemptTimeLimit:不限制執(zhí)行時(shí)間
- FixedAttemptTimeLimit:限制執(zhí)行時(shí)間為固定值
***
可以通過 withRetryListener 方法為重試器注冊(cè)***,每次重試結(jié)束后,會(huì)按注冊(cè)順序依次回調(diào) Listener 的 onRetry 方法,可在其中獲取到當(dāng)前執(zhí)行的信息,比如重試次數(shù)等。
示例代碼如下:
import com.github.rholder.retry.Attempt; import com.github.rholder.retry.RetryListener; public class MyRetryListener<T> implements RetryListener { @Override public <T> void onRetry(Attempt<T> attempt) { // 第幾次重試,(注意:第一次重試其實(shí)是第一次調(diào)用) System.out.print("[retry]time=" + attempt.getAttemptNumber()); // 距離第一次重試的延遲 System.out.print(",delay=" + attempt.getDelaySinceFirstAttempt()); // 重試結(jié)果: 是異常終止, 還是正常返回 System.out.print(",hasException=" + attempt.hasException()); System.out.print(",hasResult=" + attempt.hasResult()); // 是什么原因?qū)е庐惓? if (attempt.hasException()) { System.out.print(",causeBy=" + attempt.getExceptionCause().toString()); } else { // 正常返回時(shí)的結(jié)果 System.out.print(",result=" + attempt.getResult()); } } }
看下原理
顧名思義,guava-retrying 依賴 guava 庫,如作者所說,源碼中大量依賴 guava 的 Predicates(斷言)來判斷是否繼續(xù)重試。
通過方法、對(duì)象名也可以看出,該庫主要使用了策略模式、構(gòu)造器模式和觀察者模式(Listener),對(duì)調(diào)用方非常友好。
從哪兒開始執(zhí)行任務(wù)就從哪兒開始看,直接打開 Retryer 類的 call 方法:
/** * Executes the given callable. If the rejection predicate * accepts the attempt, the stop strategy is used to decide if a new attempt * must be made. Then the wait strategy is used to decide how much time to sleep * and a new attempt is made. * * @param callable the callable task to be executed * @return the computed result of the given callable * @throws ExecutionException if the given callable throws an exception, and the * rejection predicate considers the attempt as successful. The original exception * is wrapped into an ExecutionException. * @throws RetryException if all the attempts failed before the stop strategy decided * to abort, or the thread was interrupted. Note that if the thread is interrupted, * this exception is thrown and the thread's interrupt status is set. */ public V call(Callable<V> callable) throws ExecutionException, RetryException { long startTime = System.nanoTime(); // 1. 記錄開始時(shí)間,用于后續(xù)的時(shí)間計(jì)算 for (int attemptNumber = 1; ; attemptNumber++) { Attempt<V> attempt; try { V result = attemptTimeLimiter.call(callable); // 2. 執(zhí)行callable任務(wù),得到attempt attempt = new ResultAttempt<V>(result, attemptNumber, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)); } catch (Throwable t) { attempt = new ExceptionAttempt<V>(t, attemptNumber, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)); } for (RetryListener listener : listeners) { // 3. 如果有***,通知 listener.onRetry(attempt); } if (!rejectionPredicate.apply(attempt)) { // 4. 如果執(zhí)行callable出現(xiàn)異常,則返回異常的attempt return attempt.get(); } if (stopStrategy.shouldStop(attempt)) { // 5. 根據(jù)停止策略判斷是否停止重試 throw new RetryException(attemptNumber, attempt); // 若停止,拋出異常 } else { long sleepTime = waitStrategy.computeSleepTime(attempt); // 6. 根據(jù)等待策略計(jì)算休眠時(shí)間 try { blockStrategy.block(sleepTime); // 7. 根據(jù)阻塞策略決定休眠行為,默認(rèn)為sleep } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RetryException(attemptNumber, attempt); } } } }
這個(gè)方法邏輯很清晰,可以結(jié)合作者的注釋閱讀,主要流程如下:
記錄開始時(shí)間,便于后續(xù)判斷是否超過限制時(shí)間
通過 attemptTimeLimiter 執(zhí)行 callable 任務(wù),得到 attempt。attempt 代表著每次執(zhí)行,記錄了如執(zhí)行結(jié)果、執(zhí)行次數(shù)、距離第一次執(zhí)行的延遲時(shí)間、異常原因等信息。
- 如果 attemptTimeLimiter 是 NoAttemptTimeLimit,則直接調(diào)用 callable.call 執(zhí)行
- 如果 attemptTimeLimiter 是 FixedAttemptTimeLimit,則調(diào)用 timeLimiter.callWithTimeout 限制執(zhí)行時(shí)間
通知***,進(jìn)行一些回調(diào)操作
rejectionPredicate 默認(rèn)為 alwaysFalse,如果執(zhí)行 callable 出現(xiàn)異常,則 rejectionPredicate 會(huì)返回異常的 attempt
rejectionPredicate = Predicates.or(rejectionPredicate, new ExceptionClassPredicate<V>(RuntimeException.class));
根據(jù)停止策略判斷是否停止重試,若停止,拋出 RetryException 異常表示最終重試失敗
根據(jù)等待策略計(jì)算休眠時(shí)間
根據(jù)阻塞策略決定休眠行為,默認(rèn)為 Thread.sleep(躺著啥也不干)
大概就是這樣,該庫能夠?qū)崿F(xiàn)靈活的重試,并不復(fù)雜,有興趣的同學(xué)可以去看下源碼~
#Java#