Sentinel源碼—7.參數(shù)限流和注解的實(shí)現(xiàn)
大綱
1.參數(shù)限流的原理和源碼
2.@SentinelResource注解的使用和實(shí)現(xiàn)
1.參數(shù)限流的原理和源碼
參數(shù)限流規(guī)則ParamFlowRule的配置Demo
(2)ParamFlowSlot根據(jù)參數(shù)限流規(guī)則驗(yàn)證請(qǐng)求
(1)參數(shù)限流規(guī)則ParamFlowRule的配置Demo
一.參數(shù)限流的應(yīng)用場(chǎng)景
二.參數(shù)限流規(guī)則的屬性
三.參數(shù)限流規(guī)則的配置Demo
一.參數(shù)限流的應(yīng)用場(chǎng)景
傳統(tǒng)的流量控制,一般是通過(guò)資源維度來(lái)限制某接口或方法的調(diào)用頻率。但有時(shí)需要更細(xì)粒度地控制不同參數(shù)條件下的訪問(wèn)速率,即參數(shù)限流。參數(shù)限流允許根據(jù)不同的參數(shù)條件設(shè)置不同的流量控制規(guī)則,這種方式非常適合處理特定條件下的請(qǐng)求,因?yàn)槟芨泳?xì)地管理流量。
假設(shè)有一個(gè)在線電影訂票系統(tǒng),某個(gè)接口允許用戶查詢電影的放映時(shí)間。但只希望每個(gè)用戶每10秒只能查詢接口1次,以避免過(guò)多的查詢請(qǐng)求。這時(shí)如果直接將接口的QPS限制為5是不能滿足要求的,因?yàn)樾枨笫敲總€(gè)用戶每5分鐘只能查詢1次,而不是每秒一共只能查詢5次,因此參數(shù)限流就能派上用場(chǎng)了。
可以設(shè)置一個(gè)規(guī)則,根據(jù)用戶ID來(lái)限制每個(gè)用戶的查詢頻率,將限流的維度從資源維度細(xì)化到參數(shù)維度,從而實(shí)現(xiàn)每個(gè)用戶每10秒只能查詢接口1次。比如希望影院工作人員可以每秒查詢10次,老板可以每秒查詢100次,而購(gòu)票者則只能每10秒查詢一次,其中工作人員的userId值為100和200,老板的userId值為9999,那么可以如下配置:需要注意限流閾值是以秒為單位的,所以需要乘以統(tǒng)計(jì)窗口時(shí)長(zhǎng)10。
二.參數(shù)限流規(guī)則的屬性
public class ParamFlowRule extends AbstractRule { ... //The threshold type of flow control (0: thread count, 1: QPS). //流量控制的閾值類型(0表示線程數(shù),1表示QPS) private int grade = RuleConstant.FLOW_GRADE_QPS; //Parameter index. //參數(shù)下標(biāo) private Integer paramIdx; //The threshold count. //閾值 private double count; //Original exclusion items of parameters. //針對(duì)特定參數(shù)的流量控制規(guī)則列表 private List<ParamFlowItem> paramFlowItemList = new ArrayList<ParamFlowItem>(); //Indicating whether the rule is for cluster mode. //是否集群 private boolean clusterMode = false; ... } //針對(duì)特定參數(shù)的流量控制規(guī)則 public class ParamFlowItem { private String object; private Integer count; private String classType; ... }
三.參數(shù)限流規(guī)則的配置Demo
//This demo demonstrates flow control by frequent ("hot spot") parameters. public class ParamFlowQpsDemo { private static final int PARAM_A = 1; private static final int PARAM_B = 2; private static final int PARAM_C = 3; private static final int PARAM_D = 4; //Here we prepare different parameters to validate flow control by parameters. private static final Integer[] PARAMS = new Integer[] {PARAM_A, PARAM_B, PARAM_C, PARAM_D}; private static final String RESOURCE_KEY = "resA"; public static void main(String[] args) throws Exception { initParamFlowRules(); final int threadCount = 20; ParamFlowQpsRunner<Integer> runner = new ParamFlowQpsRunner<>(PARAMS, RESOURCE_KEY, threadCount, 120); runner.tick(); Thread.sleep(1000); runner.simulateTraffic(); } private static void initParamFlowRules() { //QPS mode, threshold is 5 for every frequent "hot spot" parameter in index 0 (the first arg). ParamFlowRule rule = new ParamFlowRule(RESOURCE_KEY) .setParamIdx(0) .setGrade(RuleConstant.FLOW_GRADE_QPS) .setCount(5); //We can set threshold count for specific parameter value individually. //Here we add an exception item. That means: //QPS threshold of entries with parameter `PARAM_B` (type: int) in index 0 will be 10, rather than the global threshold (5). ParamFlowItem item = new ParamFlowItem().setObject(String.valueOf(PARAM_B)) .setClassType(int.class.getName()) .setCount(10); rule.setParamFlowItemList(Collections.singletonList(item)); //ParamFlowRuleManager類加載的一個(gè)時(shí)機(jī)是:它的靜態(tài)方法被調(diào)用了 //所以下面會(huì)先初始化ParamFlowRuleManager,再執(zhí)行l(wèi)oadRules()方法 ParamFlowRuleManager.loadRules(Collections.singletonList(rule)); } } public final class ParamFlowRuleManager { private static final Map<String, List<ParamFlowRule>> PARAM_FLOW_RULES = new ConcurrentHashMap<>(); private final static RulePropertyListener PROPERTY_LISTENER = new RulePropertyListener(); private static SentinelProperty<List<ParamFlowRule>> currentProperty = new DynamicSentinelProperty<>(); static { currentProperty.addListener(PROPERTY_LISTENER); } //Load parameter flow rules. Former rules will be replaced. public static void loadRules(List<ParamFlowRule> rules) { try { //設(shè)置規(guī)則的值為rules currentProperty.updateValue(rules); } catch (Throwable e) { RecordLog.info("[ParamFlowRuleManager] Failed to load rules", e); } } static class RulePropertyListener implements PropertyListener<List<ParamFlowRule>> { @Override public void configUpdate(List<ParamFlowRule> list) { Map<String, List<ParamFlowRule>> rules = aggregateAndPrepareParamRules(list); if (rules != null) { PARAM_FLOW_RULES.clear(); PARAM_FLOW_RULES.putAll(rules); } RecordLog.info("[ParamFlowRuleManager] Parameter flow rules received: {}", PARAM_FLOW_RULES); } @Override public void configLoad(List<ParamFlowRule> list) { Map<String, List<ParamFlowRule>> rules = aggregateAndPrepareParamRules(list); if (rules != null) { PARAM_FLOW_RULES.clear(); PARAM_FLOW_RULES.putAll(rules); } RecordLog.info("[ParamFlowRuleManager] Parameter flow rules received: {}", PARAM_FLOW_RULES); } ... } ... } public class DynamicSentinelProperty<T> implements SentinelProperty<T> { protected Set<PropertyListener<T>> listeners = new CopyOnWriteArraySet<>(); private T value = null; public DynamicSentinelProperty() { } //添加監(jiān)聽(tīng)器到集合 @Override public void addListener(PropertyListener<T> listener) { listeners.add(listener); //回調(diào)監(jiān)聽(tīng)器的configLoad()方法初始化規(guī)則配置 listener.configLoad(value); } //更新值 @Override public boolean updateValue(T newValue) { //如果值沒(méi)變化,直接返回 if (isEqual(value, newValue)) { return false; } RecordLog.info("[DynamicSentinelProperty] Config will be updated to: {}", newValue); //如果值發(fā)生了變化,則遍歷監(jiān)聽(tīng)器,回調(diào)監(jiān)聽(tīng)器的configUpdate()方法更新對(duì)應(yīng)的值 value = newValue; for (PropertyListener<T> listener : listeners) { listener.configUpdate(newValue); } return true; } ... } //A traffic runner to simulate flow for different parameters. class ParamFlowQpsRunner<T> { private final T[] params; private final String resourceName; private int seconds; private final int threadCount; private final Map<T, AtomicLong> passCountMap = new ConcurrentHashMap<>(); private final Map<T, AtomicLong> blockCountMap = new ConcurrentHashMap<>(); private volatile boolean stop = false; public ParamFlowQpsRunner(T[] params, String resourceName, int threadCount, int seconds) { this.params = params; this.resourceName = resourceName; this.seconds = seconds; this.threadCount = threadCount; for (T param : params) { passCountMap.putIfAbsent(param, new AtomicLong()); blockCountMap.putIfAbsent(param, new AtomicLong()); } } public void tick() { Thread timer = new Thread(new TimerTask()); timer.setName("sentinel-timer-task"); timer.start(); } public void simulateTraffic() { for (int i = 0; i < threadCount; i++) { Thread t = new Thread(new RunTask()); t.setName("sentinel-simulate-traffic-task-" + i); t.start(); } } final class TimerTask implements Runnable { @Override public void run() { long start = System.currentTimeMillis(); System.out.println("Begin to run! Go go go!"); System.out.println("See corresponding metrics.log for accurate statistic data"); Map<T, Long> map = new HashMap<>(params.length); for (T param : params) { map.putIfAbsent(param, 0L); } while (!stop) { sleep(1000); //There may be a mismatch for time window of internal sliding window. //See corresponding `metrics.log` for accurate statistic log. for (T param : params) { System.out.println(String.format( "[%d][%d] Parameter flow metrics for resource %s: pass count for param <%s> is %d, block count: %d", seconds, TimeUtil.currentTimeMillis(), resourceName, param, passCountMap.get(param).getAndSet(0), blockCountMap.get(param).getAndSet(0) )); } System.out.println("============================="); if (seconds-- <= 0) { stop = true; } } long cost = System.currentTimeMillis() - start; System.out.println("Time cost: " + cost + " ms"); System.exit(0); } } final class RunTask implements Runnable { @Override public void run() { while (!stop) { Entry entry = null; T param = generateParam(); try { entry = SphU.entry(resourceName, EntryType.IN, 1, param); //Add pass for parameter. passFor(param); } catch (BlockException e) { //block.incrementAndGet(); blockFor(param); } catch (Exception ex) { //biz exception ex.printStackTrace(); } finally { //total.incrementAndGet(); if (entry != null) { entry.exit(1, param); } } sleep(ThreadLocalRandom.current().nextInt(0, 10)); } } } //Pick one of provided parameters randomly. private T generateParam() { int i = ThreadLocalRandom.current().nextInt(0, params.length); return params[i]; } private void passFor(T param) { passCountMap.get(param).incrementAndGet(); } private void blockFor(T param) { blockCountMap.get(param).incrementAndGet(); } private void sleep(int timeMs) { try { TimeUnit.MILLISECONDS.sleep(timeMs); } catch (InterruptedException e) { } } }
(2)ParamFlowSlot根據(jù)參數(shù)限流規(guī)則驗(yàn)證請(qǐng)求
一.ParamFlowSlot的entry()方法的邏輯
二.不同限流類型 + 閾值類型 + 流控效果的處理
三.流控效果為排隊(duì)等待和直接拒絕的實(shí)現(xiàn)
四.參數(shù)限流是如何進(jìn)行數(shù)據(jù)統(tǒng)計(jì)
五.參數(shù)限流驗(yàn)證請(qǐng)求的流程圖總結(jié)
一.ParamFlowSlot的entry()方法的邏輯
ParamFlowSlot的entry()方法主要干了三件事:參數(shù)驗(yàn)證、獲取當(dāng)前資源的全部參數(shù)限流規(guī)則、循環(huán)每一個(gè)參數(shù)限流規(guī)則并判斷此次請(qǐng)求是否被允許通過(guò)(如果不允許則直接拋出異常)。其中對(duì)每一條獲取到的參數(shù)限流規(guī)則,都會(huì)通過(guò)ParamFlowChecker的passCheck()方法進(jìn)行判斷。
@Spi(order = -3000) public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> { @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { //1.如果沒(méi)配置參數(shù)限流規(guī)則,直接觸發(fā)下一個(gè)Slot if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) { fireEntry(context, resourceWrapper, node, count, prioritized, args); return; } //2.如果配置了參數(shù)限流規(guī)則,則調(diào)用ParamFlowSlot的checkFlow()方法,該方法執(zhí)行完成后再觸發(fā)下一個(gè)Slot checkFlow(resourceWrapper, count, args); fireEntry(context, resourceWrapper, node, count, prioritized, args); } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { fireExit(context, resourceWrapper, count, args); } void applyRealParamIdx(/*@NonNull*/ ParamFlowRule rule, int length) { int paramIdx = rule.getParamIdx(); if (paramIdx < 0) { if (-paramIdx <= length) { rule.setParamIdx(length + paramIdx); } else { //Illegal index, give it a illegal positive value, latter rule checking will pass. rule.setParamIdx(-paramIdx); } } } void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException { //1.如果沒(méi)傳遞參數(shù),則直接放行,代表不做參數(shù)限流邏輯 if (args == null) { return; } //2.如果沒(méi)給resourceWrapper這個(gè)資源配置參數(shù)限流規(guī)則,則直接放行 if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) { return; } //3.獲取此資源的全部參數(shù)限流規(guī)則,規(guī)則可能會(huì)有很多個(gè),所以是個(gè)List List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName()); //4.遍歷獲取到的參數(shù)限流規(guī)則 for (ParamFlowRule rule : rules) { //進(jìn)行參數(shù)驗(yàn)證 applyRealParamIdx(rule, args.length); //Initialize the parameter metrics. ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule); //進(jìn)行驗(yàn)證的核心方法:檢查當(dāng)前規(guī)則是否允許通過(guò)此請(qǐng)求,如果不允許,則拋出ParamFlowException異常 if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) { String triggeredParam = ""; if (args.length > rule.getParamIdx()) { Object value = args[rule.getParamIdx()]; //Assign actual value with the result of paramFlowKey method if (value instanceof ParamFlowArgument) { value = ((ParamFlowArgument) value).paramFlowKey(); } triggeredParam = String.valueOf(value); } throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule); } } } }
二.不同限流類型 + 閾值類型 + 流控效果的處理
在ParamFlowChecker的passCheck()方法中,參數(shù)值驗(yàn)證通過(guò)之后,會(huì)判斷限流類型。如果是集群限流,則執(zhí)行ParamFlowChecker的passClusterCheck()方法。如果是單機(jī)限流,則執(zhí)行ParamFlowChecker的passLocalCheck()方法。
在ParamFlowChecker的passLocalCheck()方法中,則會(huì)根據(jù)不同的參數(shù)類型調(diào)用ParamFlowChecker的passSingleValueCheck()方法。根據(jù)該方法可以知道,參數(shù)限流支持兩種閾值類型:一種是QPS,另一種是線程數(shù)。而QPS類型還支持兩種流控效果,分別是排隊(duì)等待和直接拒絕,但不支持Warm Up。
//Rule checker for parameter flow control. public final class ParamFlowChecker { public static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count, Object... args) { if (args == null) { return true; } //1.判斷參數(shù)索引是否合法,這個(gè)就是配置參數(shù)限流時(shí)設(shè)置的下標(biāo),從0開(kāi)始,也就是對(duì)應(yīng)args里的下標(biāo) //比如0就代表args數(shù)組里的第一個(gè)參數(shù),如果參數(shù)不合法直接放行,相當(dāng)于參數(shù)限流沒(méi)生效 int paramIdx = rule.getParamIdx(); if (args.length <= paramIdx) { return true; } //2.判斷參數(shù)值是不是空,如果是空直接放行 //Get parameter value. Object value = args[paramIdx]; //Assign value with the result of paramFlowKey method if (value instanceof ParamFlowArgument) { value = ((ParamFlowArgument) value).paramFlowKey(); } //If value is null, then pass if (value == null) { return true; } //3.集群限流 if (rule.isClusterMode() && rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) { return passClusterCheck(resourceWrapper, rule, count, value); } //4.單機(jī)限流 return passLocalCheck(resourceWrapper, rule, count, value); } private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count, Object value) { try { if (Collection.class.isAssignableFrom(value.getClass())) {//基本類型 for (Object param : ((Collection)value)) { if (!passSingleValueCheck(resourceWrapper, rule, count, param)) { return false; } } } else if (value.getClass().isArray()) {//數(shù)組類型 int length = Array.getLength(value); for (int i = 0; i < length; i++) { Object param = Array.get(value, i); if (!passSingleValueCheck(resourceWrapper, rule, count, param)) { return false; } } } else {//其他類型,也就是引用類型 return passSingleValueCheck(resourceWrapper, rule, count, value); } } catch (Throwable e) { RecordLog.warn("[ParamFlowChecker] Unexpected error", e); } return true; } static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount, Object value) { if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {//類型是QPS if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) { //流控效果為排隊(duì)等待 return passThrottleLocalCheck(resourceWrapper, rule, acquireCount, value); } else { //流控效果為直接拒絕 return passDefaultLocalCheck(resourceWrapper, rule, acquireCount, value); } } else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) {//類型是Thread Set<Object> exclusionItems = rule.getParsedHotItems().keySet(); long threadCount = getParameterMetric(resourceWrapper).getThreadCount(rule.getParamIdx(), value); if (exclusionItems.contains(value)) { int itemThreshold = rule.getParsedHotItems().get(value); return ++threadCount <= itemThreshold; } long threshold = (long)rule.getCount(); return ++threadCount <= threshold; } return true; } ... }
三.流控效果為排隊(duì)等待和直接拒絕的實(shí)現(xiàn)
當(dāng)設(shè)置了QPS類型的流控效果為排隊(duì)等待時(shí),會(huì)調(diào)用ParamFlowChecker的passThrottleLocalCheck()方法。該方法實(shí)現(xiàn)排隊(duì)等待效果的原理和流控規(guī)則FlowSlot通過(guò)RateLimiterController實(shí)現(xiàn)排隊(duì)等待效果的原理是一樣的。
//Rule checker for parameter flow control. public final class ParamFlowChecker { ... static boolean passThrottleLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount, Object value) { ParameterMetric metric = getParameterMetric(resourceWrapper); CacheMap<Object, AtomicLong> timeRecorderMap = metric == null ? null : metric.getRuleTimeCounter(rule); if (timeRecorderMap == null) { return true; } //Calculate max token count (threshold) Set<Object> exclusionItems = rule.getParsedHotItems().keySet(); long tokenCount = (long)rule.getCount(); if (exclusionItems.contains(value)) { tokenCount = rule.getParsedHotItems().get(value); } if (tokenCount == 0) { return false; } long costTime = Math.round(1.0 * 1000 * acquireCount * rule.getDurationInSec() / tokenCount); while (true) { long currentTime = TimeUtil.currentTimeMillis(); AtomicLong timeRecorder = timeRecorderMap.putIfAbsent(value, new AtomicLong(currentTime)); if (timeRecorder == null) { return true; } //AtomicLong timeRecorder = timeRecorderMap.get(value); long lastPassTime = timeRecorder.get(); long expectedTime = lastPassTime + costTime; if (expectedTime <= currentTime || expectedTime - currentTime < rule.getMaxQueueingTimeMs()) { AtomicLong lastPastTimeRef = timeRecorderMap.get(value); if (lastPastTimeRef.compareAndSet(lastPassTime, currentTime)) { long waitTime = expectedTime - currentTime; if (waitTime > 0) { lastPastTimeRef.set(expectedTime); try { TimeUnit.MILLISECONDS.sleep(waitTime); } catch (InterruptedException e) { RecordLog.warn("passThrottleLocalCheck: wait interrupted", e); } } return true; } else { Thread.yield(); } } else { return false; } } } private static ParameterMetric getParameterMetric(ResourceWrapper resourceWrapper) { //Should not be null. return ParameterMetricStorage.getParamMetric(resourceWrapper); } }
當(dāng)設(shè)置了QPS類型的流控效果為直接拒絕時(shí),會(huì)調(diào)用ParamFlowChecker的passDefaultLocalCheck()方法。該方法采取令牌桶的方式來(lái)實(shí)現(xiàn):控制每個(gè)時(shí)間窗口只生產(chǎn)一次token令牌,且將令牌放入桶中,每個(gè)請(qǐng)求都從桶中取令牌,當(dāng)可以獲取到令牌時(shí),則正常放行,反之直接拒絕。
//Rule checker for parameter flow control. public final class ParamFlowChecker { ... static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount, Object value) { ParameterMetric metric = getParameterMetric(resourceWrapper); CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule); CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule); if (tokenCounters == null || timeCounters == null) { return true; } //Calculate max token count (threshold) Set<Object> exclusionItems = rule.getParsedHotItems().keySet(); long tokenCount = (long)rule.getCount(); if (exclusionItems.contains(value)) { tokenCount = rule.getParsedHotItems().get(value); } if (tokenCount == 0) { return false; } long maxCount = tokenCount + rule.getBurstCount(); if (acquireCount > maxCount) { return false; } while (true) { long currentTime = TimeUtil.currentTimeMillis(); AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime)); if (lastAddTokenTime == null) { //Token never added, just replenish the tokens and consume {@code acquireCount} immediately. tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount)); return true; } //Calculate the time duration since last token was added. long passTime = currentTime - lastAddTokenTime.get(); //A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed. if (passTime > rule.getDurationInSec() * 1000) { AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount)); if (oldQps == null) { //Might not be accurate here. lastAddTokenTime.set(currentTime); return true; } else { long restQps = oldQps.get(); long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000); long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount) : (restQps + toAddCount - acquireCount); if (newQps < 0) { return false; } if (oldQps.compareAndSet(restQps, newQps)) { lastAddTokenTime.set(currentTime); return true; } Thread.yield(); } } else { AtomicLong oldQps = tokenCounters.get(value); if (oldQps != null) { long oldQpsValue = oldQps.get(); if (oldQpsValue - acquireCount >= 0) { if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) { return true; } } else { return false; } } Thread.yield(); } } } }
四.參數(shù)限流是如何進(jìn)行數(shù)據(jù)統(tǒng)計(jì)
由于參數(shù)限流的數(shù)據(jù)統(tǒng)計(jì)需要細(xì)化到參數(shù)值的維度,所以使用參數(shù)限流時(shí)需要注意OOM問(wèn)題。比如根據(jù)用戶ID進(jìn)行限流,且用戶數(shù)量有幾千萬(wàn),那么CacheMap將會(huì)包含幾千萬(wàn)個(gè)不會(huì)被移除的鍵值對(duì),而且會(huì)隨著進(jìn)程運(yùn)行時(shí)間的增長(zhǎng)而不斷增加,最后可能會(huì)導(dǎo)致OOM。
public final class ParameterMetricStorage { private static final Map<String, ParameterMetric> metricsMap = new ConcurrentHashMap<>(); //Lock for a specific resource. private static final Object LOCK = new Object(); //Init the parameter metric and index map for given resource. //該方法在ParamFlowSlot的checkFlow()方法中被調(diào)用 public static void initParamMetricsFor(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule) { if (resourceWrapper == null || resourceWrapper.getName() == null) { return; } String resourceName = resourceWrapper.getName(); ParameterMetric metric; //Assume that the resource is valid. if ((metric = metricsMap.get(resourceName)) == null) { synchronized (LOCK) { if ((metric = metricsMap.get(resourceName)) == null) { metric = new ParameterMetric(); metricsMap.put(resourceWrapper.getName(), metric); RecordLog.info("[ParameterMetricStorage] Creating parameter metric for: {}", resourceWrapper.getName()); } } } metric.initialize(rule); } //該方法在ParamFlowChecker的passThrottleLocalCheck()和passDefaultLocalCheck()方法執(zhí)行g(shù)etParameterMetric()方法時(shí)被調(diào)用 public static ParameterMetric getParamMetric(ResourceWrapper resourceWrapper) { if (resourceWrapper == null || resourceWrapper.getName() == null) { return null; } return metricsMap.get(resourceWrapper.getName()); } ... } //Metrics for frequent ("hot spot") parameters. public class ParameterMetric { private static final int THREAD_COUNT_MAX_CAPACITY = 4000; private static final int BASE_PARAM_MAX_CAPACITY = 4000; private static final int TOTAL_MAX_CAPACITY = 20_0000; private final Object lock = new Object(); //Format: (rule, (value, timeRecorder)) private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTimeCounters = new HashMap<>(); //Format: (rule, (value, tokenCounter)) private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTokenCounter = new HashMap<>(); private final Map<Integer, CacheMap<Object, AtomicInteger>> threadCountMap = new HashMap<>(); public void initialize(ParamFlowRule rule) { if (!ruleTimeCounters.containsKey(rule)) { synchronized (lock) { if (ruleTimeCounters.get(rule) == null) { long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY); ruleTimeCounters.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(size)); } } } if (!ruleTokenCounter.containsKey(rule)) { synchronized (lock) { if (ruleTokenCounter.get(rule) == null) { long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY); ruleTokenCounter.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(size)); } } } if (!threadCountMap.containsKey(rule.getParamIdx())) { synchronized (lock) { if (threadCountMap.get(rule.getParamIdx()) == null) { threadCountMap.put(rule.getParamIdx(), new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(THREAD_COUNT_MAX_CAPACITY)); } } } } //Get the token counter for given parameter rule. //@param rule valid parameter rule //@return the associated token counter public CacheMap<Object, AtomicLong> getRuleTokenCounter(ParamFlowRule rule) { return ruleTokenCounter.get(rule); } //Get the time record counter for given parameter rule. //@param rule valid parameter rule //@return the associated time counter public CacheMap<Object, AtomicLong> getRuleTimeCounter(ParamFlowRule rule) { return ruleTimeCounters.get(rule); } public long getThreadCount(int index, Object value) { CacheMap<Object, AtomicInteger> cacheMap = threadCountMap.get(index); if (cacheMap == null) { return 0; } AtomicInteger count = cacheMap.get(value); return count == null ? 0L : count.get(); } ... }
五.參數(shù)限流驗(yàn)證請(qǐng)求的流程圖總結(jié)
2.@SentinelResource注解的使用和實(shí)現(xiàn)
@SentinelResource注解和實(shí)現(xiàn)
@SentinelResource注解的使用
一.引入Sentinel Spring Boot Starter依賴
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-sentinel</artifactId> <version>2.2.1.RELEASE</version> </dependency>
二.為方法添加@SentinelResource注解
下面的代碼為sayHello()方法添加了@SentinelResource注解,并指定了資源名稱為sayHello以及熔斷降級(jí)時(shí)的回調(diào)方法fallback()。這樣在請(qǐng)求sayHello()方法后,就可以在Sentinel Dashboard上看到此資源,然后就可以針對(duì)此資源進(jìn)行一系列的規(guī)則配置了。
@Service public class MyService { @SentinelResource(value = "sayHello", fallback = "fallback") public String sayHello(String name) { return "Hello, " + name; } public String fallback(String name, Throwable throwable) { return "Fallback: " + name + ", reason: " + throwable.getMessage(); } }
(2)@SentinelResource注解和實(shí)現(xiàn)
利用Spring AOP攔截@SentinelResource注解,最后調(diào)用SphU.entry()方法來(lái)進(jìn)行處理。
//Aspect for methods with {@link SentinelResource} annotation. @Aspect public class SentinelResourceAspect extends AbstractSentinelAspectSupport { //SentinelResource注解 @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)") public void sentinelResourceAnnotationPointcut() { } @Around("sentinelResourceAnnotationPointcut()") public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable { //獲取方法 Method originMethod = resolveMethod(pjp); //獲取方法上的SentinelResource注解,有了這個(gè)注解,就可以獲取到注解的各種屬性值了 SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class); if (annotation == null) { //Should not go through here. throw new IllegalStateException("Wrong state for SentinelResource annotation"); } //獲取資源名稱 String resourceName = getResourceName(annotation.value(), originMethod); //獲取資源類型 EntryType entryType = annotation.entryType(); int resourceType = annotation.resourceType(); //創(chuàng)建一個(gè)Entry對(duì)象,通過(guò)SphU.entry(resourceName)將當(dāng)前方法納入Sentinel的保護(hù)體系 //如果當(dāng)前資源的調(diào)用未觸發(fā)任何Sentinel規(guī)則,則正常執(zhí)行被攔截的方法,否則將執(zhí)行對(duì)應(yīng)的限流、熔斷降級(jí)等處理邏輯 Entry entry = null; try { entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs()); return pjp.proceed(); } catch (BlockException ex) { //發(fā)生異常時(shí),通過(guò)反射執(zhí)行在注解中設(shè)置的降級(jí)方法 return handleBlockException(pjp, annotation, ex); } catch (Throwable ex) { Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore(); //The ignore list will be checked first. if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) { throw ex; } if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) { traceException(ex); return handleFallback(pjp, annotation, ex); } //No fallback function can handle the exception, so throw it out. throw ex; } finally { if (entry != null) { entry.exit(1, pjp.getArgs()); } } } } //Some common functions for Sentinel annotation aspect. public abstract class AbstractSentinelAspectSupport { ... protected Object handleBlockException(ProceedingJoinPoint pjp, SentinelResource annotation, BlockException ex) throws Throwable { //Execute block handler if configured. Method blockHandlerMethod = extractBlockHandlerMethod(pjp, annotation.blockHandler(), annotation.blockHandlerClass()); if (blockHandlerMethod != null) { Object[] originArgs = pjp.getArgs(); //Construct args. Object[] args = Arrays.copyOf(originArgs, originArgs.length + 1); args[args.length - 1] = ex; return invoke(pjp, blockHandlerMethod, args); } //If no block handler is present, then go to fallback. return handleFallback(pjp, annotation, ex); } private Object invoke(ProceedingJoinPoint pjp, Method method, Object[] args) throws Throwable { try { if (!method.isAccessible()) { makeAccessible(method); } if (isStatic(method)) { return method.invoke(null, args); } return method.invoke(pjp.getTarget(), args); } catch (InvocationTargetException e) { //throw the actual exception throw e.getTargetException(); } } ... }
詳細(xì)介紹后端技術(shù)棧的基礎(chǔ)內(nèi)容,包括但不限于:MySQL原理和優(yōu)化、Redis原理和應(yīng)用、JVM和G1原理和優(yōu)化、RocketMQ原理應(yīng)用及源碼、Kafka原理應(yīng)用及源碼、ElasticSearch原理應(yīng)用及源碼、JUC源碼、Netty源碼、zk源碼、Dubbo源碼、Spring源碼、Spring Boot源碼、SCA源碼、分布式鎖源碼、分布式事務(wù)、分庫(kù)分表和TiDB、大型商品系統(tǒng)、大型訂單系統(tǒng)等