欧美1区2区3区激情无套,两个女人互添下身视频在线观看,久久av无码精品人妻系列,久久精品噜噜噜成人,末发育娇小性色xxxx

Sentinel源碼—7.參數(shù)限流和注解的實(shí)現(xiàn)

大綱

1.參數(shù)限流的原理和源碼

2.@SentinelResource注解的使用和實(shí)現(xiàn)

1.參數(shù)限流的原理和源碼

參數(shù)限流規(guī)則ParamFlowRule的配置Demo

(2)ParamFlowSlot根據(jù)參數(shù)限流規(guī)則驗(yàn)證請(qǐng)求

(1)參數(shù)限流規(guī)則ParamFlowRule的配置Demo

一.參數(shù)限流的應(yīng)用場(chǎng)景

二.參數(shù)限流規(guī)則的屬性

三.參數(shù)限流規(guī)則的配置Demo

一.參數(shù)限流的應(yīng)用場(chǎng)景

傳統(tǒng)的流量控制,一般是通過(guò)資源維度來(lái)限制某接口或方法的調(diào)用頻率。但有時(shí)需要更細(xì)粒度地控制不同參數(shù)條件下的訪問(wèn)速率,即參數(shù)限流。參數(shù)限流允許根據(jù)不同的參數(shù)條件設(shè)置不同的流量控制規(guī)則,這種方式非常適合處理特定條件下的請(qǐng)求,因?yàn)槟芨泳?xì)地管理流量。

假設(shè)有一個(gè)在線電影訂票系統(tǒng),某個(gè)接口允許用戶查詢電影的放映時(shí)間。但只希望每個(gè)用戶每10秒只能查詢接口1次,以避免過(guò)多的查詢請(qǐng)求。這時(shí)如果直接將接口的QPS限制為5是不能滿足要求的,因?yàn)樾枨笫敲總€(gè)用戶每5分鐘只能查詢1次,而不是每秒一共只能查詢5次,因此參數(shù)限流就能派上用場(chǎng)了。

可以設(shè)置一個(gè)規(guī)則,根據(jù)用戶ID來(lái)限制每個(gè)用戶的查詢頻率,將限流的維度從資源維度細(xì)化到參數(shù)維度,從而實(shí)現(xiàn)每個(gè)用戶每10秒只能查詢接口1次。比如希望影院工作人員可以每秒查詢10次,老板可以每秒查詢100次,而購(gòu)票者則只能每10秒查詢一次,其中工作人員的userId值為100和200,老板的userId值為9999,那么可以如下配置:需要注意限流閾值是以秒為單位的,所以需要乘以統(tǒng)計(jì)窗口時(shí)長(zhǎng)10。

二.參數(shù)限流規(guī)則的屬性

public class ParamFlowRule extends AbstractRule {
    ...
    //The threshold type of flow control (0: thread count, 1: QPS).
    //流量控制的閾值類型(0表示線程數(shù),1表示QPS)
    private int grade = RuleConstant.FLOW_GRADE_QPS;

    //Parameter index.
    //參數(shù)下標(biāo)
    private Integer paramIdx;

    //The threshold count.
    //閾值
    private double count;

    //Original exclusion items of parameters.
    //針對(duì)特定參數(shù)的流量控制規(guī)則列表
    private List<ParamFlowItem> paramFlowItemList = new ArrayList<ParamFlowItem>();
    
    //Indicating whether the rule is for cluster mode.
    //是否集群
    private boolean clusterMode = false;
    ...
}

//針對(duì)特定參數(shù)的流量控制規(guī)則
public class ParamFlowItem {
    private String object;
    private Integer count;
    private String classType;
    ...
}

三.參數(shù)限流規(guī)則的配置Demo

//This demo demonstrates flow control by frequent ("hot spot") parameters.
public class ParamFlowQpsDemo {
    private static final int PARAM_A = 1;
    private static final int PARAM_B = 2;
    private static final int PARAM_C = 3;
    private static final int PARAM_D = 4;

    //Here we prepare different parameters to validate flow control by parameters.
    private static final Integer[] PARAMS = new Integer[] {PARAM_A, PARAM_B, PARAM_C, PARAM_D};
    private static final String RESOURCE_KEY = "resA";
    
    public static void main(String[] args) throws Exception {
        initParamFlowRules();
        final int threadCount = 20;
        ParamFlowQpsRunner<Integer> runner = new ParamFlowQpsRunner<>(PARAMS, RESOURCE_KEY, threadCount, 120);
        runner.tick();
        Thread.sleep(1000);
        runner.simulateTraffic();
    }

    private static void initParamFlowRules() {
        //QPS mode, threshold is 5 for every frequent "hot spot" parameter in index 0 (the first arg).
        ParamFlowRule rule = new ParamFlowRule(RESOURCE_KEY)
            .setParamIdx(0)
            .setGrade(RuleConstant.FLOW_GRADE_QPS)
            .setCount(5);
        //We can set threshold count for specific parameter value individually.
        //Here we add an exception item. That means: 
        //QPS threshold of entries with parameter `PARAM_B` (type: int) in index 0 will be 10, rather than the global threshold (5).
        ParamFlowItem item = new ParamFlowItem().setObject(String.valueOf(PARAM_B))
            .setClassType(int.class.getName())
            .setCount(10);
        rule.setParamFlowItemList(Collections.singletonList(item));

        //ParamFlowRuleManager類加載的一個(gè)時(shí)機(jī)是:它的靜態(tài)方法被調(diào)用了
        //所以下面會(huì)先初始化ParamFlowRuleManager,再執(zhí)行l(wèi)oadRules()方法
        ParamFlowRuleManager.loadRules(Collections.singletonList(rule));
    }
}

public final class ParamFlowRuleManager {
    private static final Map<String, List<ParamFlowRule>> PARAM_FLOW_RULES = new ConcurrentHashMap<>();
    private final static RulePropertyListener PROPERTY_LISTENER = new RulePropertyListener();
    private static SentinelProperty<List<ParamFlowRule>> currentProperty = new DynamicSentinelProperty<>();
    static {
        currentProperty.addListener(PROPERTY_LISTENER);
    }

    //Load parameter flow rules. Former rules will be replaced.
    public static void loadRules(List<ParamFlowRule> rules) {
        try {
            //設(shè)置規(guī)則的值為rules
            currentProperty.updateValue(rules);
        } catch (Throwable e) {
            RecordLog.info("[ParamFlowRuleManager] Failed to load rules", e);
        }
    }

    static class RulePropertyListener implements PropertyListener<List<ParamFlowRule>> {
        @Override
        public void configUpdate(List<ParamFlowRule> list) {
            Map<String, List<ParamFlowRule>> rules = aggregateAndPrepareParamRules(list);
            if (rules != null) {
                PARAM_FLOW_RULES.clear();
                PARAM_FLOW_RULES.putAll(rules);
            }
            RecordLog.info("[ParamFlowRuleManager] Parameter flow rules received: {}", PARAM_FLOW_RULES);
        }

        @Override
        public void configLoad(List<ParamFlowRule> list) {
            Map<String, List<ParamFlowRule>> rules = aggregateAndPrepareParamRules(list);
            if (rules != null) {
                PARAM_FLOW_RULES.clear();
                PARAM_FLOW_RULES.putAll(rules);
            }
            RecordLog.info("[ParamFlowRuleManager] Parameter flow rules received: {}", PARAM_FLOW_RULES);
        }
        ...
    }
    ...
}

public class DynamicSentinelProperty<T> implements SentinelProperty<T> {
    protected Set<PropertyListener<T>> listeners = new CopyOnWriteArraySet<>();
    private T value = null;
    public DynamicSentinelProperty() {

    }

    //添加監(jiān)聽(tīng)器到集合
    @Override
    public void addListener(PropertyListener<T> listener) {
        listeners.add(listener);
        //回調(diào)監(jiān)聽(tīng)器的configLoad()方法初始化規(guī)則配置
        listener.configLoad(value);
    }

    //更新值
    @Override
    public boolean updateValue(T newValue) {
        //如果值沒(méi)變化,直接返回
        if (isEqual(value, newValue)) {
            return false;
        }
        RecordLog.info("[DynamicSentinelProperty] Config will be updated to: {}", newValue);
        //如果值發(fā)生了變化,則遍歷監(jiān)聽(tīng)器,回調(diào)監(jiān)聽(tīng)器的configUpdate()方法更新對(duì)應(yīng)的值
        value = newValue;
        for (PropertyListener<T> listener : listeners) {
            listener.configUpdate(newValue);
        }
        return true;
    }
    ...
}

//A traffic runner to simulate flow for different parameters.
class ParamFlowQpsRunner<T> {
    private final T[] params;
    private final String resourceName;
    private int seconds;
    private final int threadCount;
    private final Map<T, AtomicLong> passCountMap = new ConcurrentHashMap<>();
    private final Map<T, AtomicLong> blockCountMap = new ConcurrentHashMap<>();
    private volatile boolean stop = false;

    public ParamFlowQpsRunner(T[] params, String resourceName, int threadCount, int seconds) {
        this.params = params;
        this.resourceName = resourceName;
        this.seconds = seconds;
        this.threadCount = threadCount;
        for (T param : params) {
            passCountMap.putIfAbsent(param, new AtomicLong());
            blockCountMap.putIfAbsent(param, new AtomicLong());
        }
    }

    public void tick() {
        Thread timer = new Thread(new TimerTask());
        timer.setName("sentinel-timer-task");
        timer.start();
    }

    public void simulateTraffic() {
        for (int i = 0; i < threadCount; i++) {
            Thread t = new Thread(new RunTask());
            t.setName("sentinel-simulate-traffic-task-" + i);
            t.start();
        }
    }

    final class TimerTask implements Runnable {
        @Override
        public void run() {
            long start = System.currentTimeMillis();
            System.out.println("Begin to run! Go go go!");
            System.out.println("See corresponding metrics.log for accurate statistic data");
            Map<T, Long> map = new HashMap<>(params.length);
            for (T param : params) {
                map.putIfAbsent(param, 0L);
            }
            while (!stop) {
                sleep(1000);
                //There may be a mismatch for time window of internal sliding window.
                //See corresponding `metrics.log` for accurate statistic log.
                for (T param : params) {
                    System.out.println(String.format(
                        "[%d][%d] Parameter flow metrics for resource %s: pass count for param <%s> is %d, block count: %d",
                        seconds, TimeUtil.currentTimeMillis(), resourceName, param,
                        passCountMap.get(param).getAndSet(0), blockCountMap.get(param).getAndSet(0)
                    ));
                }
                System.out.println("=============================");
                if (seconds-- <= 0) {
                    stop = true;
                }
            }
            long cost = System.currentTimeMillis() - start;
            System.out.println("Time cost: " + cost + " ms");
            System.exit(0);
        }
    }

    final class RunTask implements Runnable {
        @Override
        public void run() {
            while (!stop) {
                Entry entry = null;
                T param = generateParam();
                try {
                    entry = SphU.entry(resourceName, EntryType.IN, 1, param);
                    //Add pass for parameter.
                    passFor(param);
                } catch (BlockException e) {
                    //block.incrementAndGet();
                    blockFor(param);
                } catch (Exception ex) {
                    //biz exception
                    ex.printStackTrace();
                } finally {
                    //total.incrementAndGet();
                    if (entry != null) {
                        entry.exit(1, param);
                    }
                }
                sleep(ThreadLocalRandom.current().nextInt(0, 10));
            }
        }
    }

    //Pick one of provided parameters randomly.
    private T generateParam() {
        int i = ThreadLocalRandom.current().nextInt(0, params.length);
        return params[i];
    }

    private void passFor(T param) {
        passCountMap.get(param).incrementAndGet();
    }

    private void blockFor(T param) {
        blockCountMap.get(param).incrementAndGet();
    }

    private void sleep(int timeMs) {
        try {
            TimeUnit.MILLISECONDS.sleep(timeMs);
        } catch (InterruptedException e) {
        }
    }
}

(2)ParamFlowSlot根據(jù)參數(shù)限流規(guī)則驗(yàn)證請(qǐng)求

一.ParamFlowSlot的entry()方法的邏輯

二.不同限流類型 + 閾值類型 + 流控效果的處理

三.流控效果為排隊(duì)等待和直接拒絕的實(shí)現(xiàn)

四.參數(shù)限流是如何進(jìn)行數(shù)據(jù)統(tǒng)計(jì)

五.參數(shù)限流驗(yàn)證請(qǐng)求的流程圖總結(jié)

一.ParamFlowSlot的entry()方法的邏輯

ParamFlowSlot的entry()方法主要干了三件事:參數(shù)驗(yàn)證、獲取當(dāng)前資源的全部參數(shù)限流規(guī)則、循環(huán)每一個(gè)參數(shù)限流規(guī)則并判斷此次請(qǐng)求是否被允許通過(guò)(如果不允許則直接拋出異常)。其中對(duì)每一條獲取到的參數(shù)限流規(guī)則,都會(huì)通過(guò)ParamFlowChecker的passCheck()方法進(jìn)行判斷。

@Spi(order = -3000)
public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
        boolean prioritized, Object... args) throws Throwable {
        //1.如果沒(méi)配置參數(shù)限流規(guī)則,直接觸發(fā)下一個(gè)Slot
        if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
            return;
        }
        //2.如果配置了參數(shù)限流規(guī)則,則調(diào)用ParamFlowSlot的checkFlow()方法,該方法執(zhí)行完成后再觸發(fā)下一個(gè)Slot
        checkFlow(resourceWrapper, count, args);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }

    void applyRealParamIdx(/*@NonNull*/ ParamFlowRule rule, int length) {
        int paramIdx = rule.getParamIdx();
        if (paramIdx < 0) {
            if (-paramIdx <= length) {
                rule.setParamIdx(length + paramIdx);
            } else {
                //Illegal index, give it a illegal positive value, latter rule checking will pass.
                rule.setParamIdx(-paramIdx);
            }
        }
    }

    void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
        //1.如果沒(méi)傳遞參數(shù),則直接放行,代表不做參數(shù)限流邏輯
        if (args == null) {
            return;
        }
        //2.如果沒(méi)給resourceWrapper這個(gè)資源配置參數(shù)限流規(guī)則,則直接放行
        if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
            return;
        }
        //3.獲取此資源的全部參數(shù)限流規(guī)則,規(guī)則可能會(huì)有很多個(gè),所以是個(gè)List
        List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
        //4.遍歷獲取到的參數(shù)限流規(guī)則
        for (ParamFlowRule rule : rules) {
            //進(jìn)行參數(shù)驗(yàn)證
            applyRealParamIdx(rule, args.length);
            //Initialize the parameter metrics.
            ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);
            //進(jìn)行驗(yàn)證的核心方法:檢查當(dāng)前規(guī)則是否允許通過(guò)此請(qǐng)求,如果不允許,則拋出ParamFlowException異常
            if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
                String triggeredParam = "";
                if (args.length > rule.getParamIdx()) {
                    Object value = args[rule.getParamIdx()];
                    //Assign actual value with the result of paramFlowKey method
                    if (value instanceof ParamFlowArgument) {
                        value = ((ParamFlowArgument) value).paramFlowKey();
                    }
                    triggeredParam = String.valueOf(value);
                }
                throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
            }
        }
    }
}

二.不同限流類型 + 閾值類型 + 流控效果的處理

在ParamFlowChecker的passCheck()方法中,參數(shù)值驗(yàn)證通過(guò)之后,會(huì)判斷限流類型。如果是集群限流,則執(zhí)行ParamFlowChecker的passClusterCheck()方法。如果是單機(jī)限流,則執(zhí)行ParamFlowChecker的passLocalCheck()方法。

在ParamFlowChecker的passLocalCheck()方法中,則會(huì)根據(jù)不同的參數(shù)類型調(diào)用ParamFlowChecker的passSingleValueCheck()方法。根據(jù)該方法可以知道,參數(shù)限流支持兩種閾值類型:一種是QPS,另一種是線程數(shù)。而QPS類型還支持兩種流控效果,分別是排隊(duì)等待和直接拒絕,但不支持Warm Up。

//Rule checker for parameter flow control.
public final class ParamFlowChecker {
    public static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count, Object... args) {
        if (args == null) {
            return true;
        }
        //1.判斷參數(shù)索引是否合法,這個(gè)就是配置參數(shù)限流時(shí)設(shè)置的下標(biāo),從0開(kāi)始,也就是對(duì)應(yīng)args里的下標(biāo)
        //比如0就代表args數(shù)組里的第一個(gè)參數(shù),如果參數(shù)不合法直接放行,相當(dāng)于參數(shù)限流沒(méi)生效  
        int paramIdx = rule.getParamIdx();
        if (args.length <= paramIdx) {
            return true;
        }
        //2.判斷參數(shù)值是不是空,如果是空直接放行
        //Get parameter value.
        Object value = args[paramIdx];
        //Assign value with the result of paramFlowKey method
        if (value instanceof ParamFlowArgument) {
            value = ((ParamFlowArgument) value).paramFlowKey();
        }
        //If value is null, then pass
        if (value == null) {
            return true;
        }
        //3.集群限流
        if (rule.isClusterMode() && rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
            return passClusterCheck(resourceWrapper, rule, count, value);
        }
        //4.單機(jī)限流
        return passLocalCheck(resourceWrapper, rule, count, value);
    }

    private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count, Object value) {
        try {
            if (Collection.class.isAssignableFrom(value.getClass())) {//基本類型
                for (Object param : ((Collection)value)) {
                    if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
                        return false;
                    }
                }
            } else if (value.getClass().isArray()) {//數(shù)組類型
                int length = Array.getLength(value);
                for (int i = 0; i < length; i++) {
                    Object param = Array.get(value, i);
                    if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
                        return false;
                    }
                }
            } else {//其他類型,也就是引用類型
                return passSingleValueCheck(resourceWrapper, rule, count, value);
            }
        } catch (Throwable e) {
            RecordLog.warn("[ParamFlowChecker] Unexpected error", e);
        }
        return true;
    }

    static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount, Object value) {
        if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {//類型是QPS
            if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) {
                //流控效果為排隊(duì)等待
                return passThrottleLocalCheck(resourceWrapper, rule, acquireCount, value);
            } else {
                //流控效果為直接拒絕
                return passDefaultLocalCheck(resourceWrapper, rule, acquireCount, value);
            }
        } else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) {//類型是Thread
            Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
            long threadCount = getParameterMetric(resourceWrapper).getThreadCount(rule.getParamIdx(), value);
            if (exclusionItems.contains(value)) {
                int itemThreshold = rule.getParsedHotItems().get(value);
                return ++threadCount <= itemThreshold;
            }
            long threshold = (long)rule.getCount();
            return ++threadCount <= threshold;
        }
        return true;
    }
    ...
}

三.流控效果為排隊(duì)等待和直接拒絕的實(shí)現(xiàn)

當(dāng)設(shè)置了QPS類型的流控效果為排隊(duì)等待時(shí),會(huì)調(diào)用ParamFlowChecker的passThrottleLocalCheck()方法。該方法實(shí)現(xiàn)排隊(duì)等待效果的原理和流控規(guī)則FlowSlot通過(guò)RateLimiterController實(shí)現(xiàn)排隊(duì)等待效果的原理是一樣的。

//Rule checker for parameter flow control.
public final class ParamFlowChecker {
    ...
    static boolean passThrottleLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount, Object value) {
        ParameterMetric metric = getParameterMetric(resourceWrapper);
        CacheMap<Object, AtomicLong> timeRecorderMap = metric == null ? null : metric.getRuleTimeCounter(rule);
        if (timeRecorderMap == null) {
            return true;
        }
        //Calculate max token count (threshold)
        Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
        long tokenCount = (long)rule.getCount();
        if (exclusionItems.contains(value)) {
            tokenCount = rule.getParsedHotItems().get(value);
        }
        if (tokenCount == 0) {
            return false;
        }
        long costTime = Math.round(1.0 * 1000 * acquireCount * rule.getDurationInSec() / tokenCount);
        while (true) {
            long currentTime = TimeUtil.currentTimeMillis();
            AtomicLong timeRecorder = timeRecorderMap.putIfAbsent(value, new AtomicLong(currentTime));
            if (timeRecorder == null) {
                return true;
            }
            //AtomicLong timeRecorder = timeRecorderMap.get(value);
            long lastPassTime = timeRecorder.get();
            long expectedTime = lastPassTime + costTime;
            if (expectedTime <= currentTime || expectedTime - currentTime < rule.getMaxQueueingTimeMs()) {
                AtomicLong lastPastTimeRef = timeRecorderMap.get(value);
                if (lastPastTimeRef.compareAndSet(lastPassTime, currentTime)) {
                    long waitTime = expectedTime - currentTime;
                    if (waitTime > 0) {
                        lastPastTimeRef.set(expectedTime);
                        try {
                            TimeUnit.MILLISECONDS.sleep(waitTime);
                        } catch (InterruptedException e) {
                            RecordLog.warn("passThrottleLocalCheck: wait interrupted", e);
                        }
                    }
                    return true;
                } else {
                    Thread.yield();
                }
            } else {
                return false;
            }
        }
    }

    private static ParameterMetric getParameterMetric(ResourceWrapper resourceWrapper) {
        //Should not be null.
        return ParameterMetricStorage.getParamMetric(resourceWrapper);
    }
}

當(dāng)設(shè)置了QPS類型的流控效果為直接拒絕時(shí),會(huì)調(diào)用ParamFlowChecker的passDefaultLocalCheck()方法。該方法采取令牌桶的方式來(lái)實(shí)現(xiàn):控制每個(gè)時(shí)間窗口只生產(chǎn)一次token令牌,且將令牌放入桶中,每個(gè)請(qǐng)求都從桶中取令牌,當(dāng)可以獲取到令牌時(shí),則正常放行,反之直接拒絕。

//Rule checker for parameter flow control.
public final class ParamFlowChecker {
    ...
    static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount, Object value) {
        ParameterMetric metric = getParameterMetric(resourceWrapper);
        CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
        CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);
        if (tokenCounters == null || timeCounters == null) {
            return true;
        }
        //Calculate max token count (threshold)
        Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
        long tokenCount = (long)rule.getCount();
        if (exclusionItems.contains(value)) {
            tokenCount = rule.getParsedHotItems().get(value);
        }
        if (tokenCount == 0) {
            return false;
        }
        long maxCount = tokenCount + rule.getBurstCount();
        if (acquireCount > maxCount) {
            return false;
        }
        while (true) {
            long currentTime = TimeUtil.currentTimeMillis();
            AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));
            if (lastAddTokenTime == null) {
                //Token never added, just replenish the tokens and consume {@code acquireCount} immediately.
                tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
                return true;
            }
            //Calculate the time duration since last token was added.
            long passTime = currentTime - lastAddTokenTime.get();
            //A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed.
            if (passTime > rule.getDurationInSec() * 1000) {
                AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
                if (oldQps == null) {
                    //Might not be accurate here.
                    lastAddTokenTime.set(currentTime);
                    return true;
                } else {
                    long restQps = oldQps.get();
                    long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
                    long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
                        : (restQps + toAddCount - acquireCount);
                    if (newQps < 0) {
                        return false;
                    }
                    if (oldQps.compareAndSet(restQps, newQps)) {
                        lastAddTokenTime.set(currentTime);
                        return true;
                    }
                    Thread.yield();
                }
            } else {
                AtomicLong oldQps = tokenCounters.get(value);
                if (oldQps != null) {
                    long oldQpsValue = oldQps.get();
                    if (oldQpsValue - acquireCount >= 0) {
                        if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {
                            return true;
                        }
                    } else {
                        return false;
                    }
                }
                Thread.yield();
            }
        }
    }
}

四.參數(shù)限流是如何進(jìn)行數(shù)據(jù)統(tǒng)計(jì)

由于參數(shù)限流的數(shù)據(jù)統(tǒng)計(jì)需要細(xì)化到參數(shù)值的維度,所以使用參數(shù)限流時(shí)需要注意OOM問(wèn)題。比如根據(jù)用戶ID進(jìn)行限流,且用戶數(shù)量有幾千萬(wàn),那么CacheMap將會(huì)包含幾千萬(wàn)個(gè)不會(huì)被移除的鍵值對(duì),而且會(huì)隨著進(jìn)程運(yùn)行時(shí)間的增長(zhǎng)而不斷增加,最后可能會(huì)導(dǎo)致OOM。

public final class ParameterMetricStorage {
    private static final Map<String, ParameterMetric> metricsMap = new ConcurrentHashMap<>();
    //Lock for a specific resource.
    private static final Object LOCK = new Object();

    //Init the parameter metric and index map for given resource.
    //該方法在ParamFlowSlot的checkFlow()方法中被調(diào)用
    public static void initParamMetricsFor(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule) {
        if (resourceWrapper == null || resourceWrapper.getName() == null) {
            return;
        }
        String resourceName = resourceWrapper.getName();
        ParameterMetric metric;
        //Assume that the resource is valid.
        if ((metric = metricsMap.get(resourceName)) == null) {
            synchronized (LOCK) {
                if ((metric = metricsMap.get(resourceName)) == null) {
                    metric = new ParameterMetric();
                    metricsMap.put(resourceWrapper.getName(), metric);
                    RecordLog.info("[ParameterMetricStorage] Creating parameter metric for: {}", resourceWrapper.getName());
                }
            }
        }
        metric.initialize(rule);
    }

    //該方法在ParamFlowChecker的passThrottleLocalCheck()和passDefaultLocalCheck()方法執(zhí)行g(shù)etParameterMetric()方法時(shí)被調(diào)用
    public static ParameterMetric getParamMetric(ResourceWrapper resourceWrapper) {
        if (resourceWrapper == null || resourceWrapper.getName() == null) {
            return null;
        }
        return metricsMap.get(resourceWrapper.getName());
    }
    ...
}

//Metrics for frequent ("hot spot") parameters.
public class ParameterMetric {
    private static final int THREAD_COUNT_MAX_CAPACITY = 4000;
    private static final int BASE_PARAM_MAX_CAPACITY = 4000;
    private static final int TOTAL_MAX_CAPACITY = 20_0000;
    private final Object lock = new Object();
    //Format: (rule, (value, timeRecorder))
    private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTimeCounters = new HashMap<>();
    //Format: (rule, (value, tokenCounter))
    private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTokenCounter = new HashMap<>();
    private final Map<Integer, CacheMap<Object, AtomicInteger>> threadCountMap = new HashMap<>();
    
    public void initialize(ParamFlowRule rule) {
        if (!ruleTimeCounters.containsKey(rule)) {
            synchronized (lock) {
                if (ruleTimeCounters.get(rule) == null) {
                    long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY);
                    ruleTimeCounters.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(size));
                }
            }
        }
        if (!ruleTokenCounter.containsKey(rule)) {
            synchronized (lock) {
                if (ruleTokenCounter.get(rule) == null) {
                    long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY);
                    ruleTokenCounter.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(size));
                }
            }
        }
        if (!threadCountMap.containsKey(rule.getParamIdx())) {
            synchronized (lock) {
                if (threadCountMap.get(rule.getParamIdx()) == null) {
                    threadCountMap.put(rule.getParamIdx(), new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(THREAD_COUNT_MAX_CAPACITY));
                }
            }
        }
    }

    //Get the token counter for given parameter rule.
    //@param rule valid parameter rule
    //@return the associated token counter
    public CacheMap<Object, AtomicLong> getRuleTokenCounter(ParamFlowRule rule) {
        return ruleTokenCounter.get(rule);
    }

    //Get the time record counter for given parameter rule.
    //@param rule valid parameter rule
    //@return the associated time counter
    public CacheMap<Object, AtomicLong> getRuleTimeCounter(ParamFlowRule rule) {
        return ruleTimeCounters.get(rule);
    }

    public long getThreadCount(int index, Object value) {
        CacheMap<Object, AtomicInteger> cacheMap = threadCountMap.get(index);
        if (cacheMap == null) {
            return 0;
        }
        AtomicInteger count = cacheMap.get(value);
        return count == null ? 0L : count.get();
    }
    ...
}

五.參數(shù)限流驗(yàn)證請(qǐng)求的流程圖總結(jié)

2.@SentinelResource注解的使用和實(shí)現(xiàn)

@SentinelResource注解的使用

@SentinelResource注解和實(shí)現(xiàn)

@SentinelResource注解的使用

一.引入Sentinel Spring Boot Starter依賴

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-alibaba-sentinel</artifactId>
    <version>2.2.1.RELEASE</version>
</dependency>

二.為方法添加@SentinelResource注解

下面的代碼為sayHello()方法添加了@SentinelResource注解,并指定了資源名稱為sayHello以及熔斷降級(jí)時(shí)的回調(diào)方法fallback()。這樣在請(qǐng)求sayHello()方法后,就可以在Sentinel Dashboard上看到此資源,然后就可以針對(duì)此資源進(jìn)行一系列的規(guī)則配置了。

@Service
public class MyService {
    @SentinelResource(value = "sayHello", fallback = "fallback")
    public String sayHello(String name) {
        return "Hello, " + name;
    }

    public String fallback(String name, Throwable throwable) {
        return "Fallback: " + name + ", reason: " + throwable.getMessage();
    }
}

(2)@SentinelResource注解和實(shí)現(xiàn)

利用Spring AOP攔截@SentinelResource注解,最后調(diào)用SphU.entry()方法來(lái)進(jìn)行處理。

//Aspect for methods with {@link SentinelResource} annotation.
@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {
    //SentinelResource注解
    @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
    public void sentinelResourceAnnotationPointcut() {
    }

    @Around("sentinelResourceAnnotationPointcut()")
    public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
        //獲取方法
        Method originMethod = resolveMethod(pjp);
        //獲取方法上的SentinelResource注解,有了這個(gè)注解,就可以獲取到注解的各種屬性值了
        SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
        if (annotation == null) {
            //Should not go through here.
            throw new IllegalStateException("Wrong state for SentinelResource annotation");
        }
        //獲取資源名稱
        String resourceName = getResourceName(annotation.value(), originMethod);
        //獲取資源類型
        EntryType entryType = annotation.entryType();
        int resourceType = annotation.resourceType();
        //創(chuàng)建一個(gè)Entry對(duì)象,通過(guò)SphU.entry(resourceName)將當(dāng)前方法納入Sentinel的保護(hù)體系
        //如果當(dāng)前資源的調(diào)用未觸發(fā)任何Sentinel規(guī)則,則正常執(zhí)行被攔截的方法,否則將執(zhí)行對(duì)應(yīng)的限流、熔斷降級(jí)等處理邏輯
        Entry entry = null;
        try {
            entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
            return pjp.proceed();
        } catch (BlockException ex) {
            //發(fā)生異常時(shí),通過(guò)反射執(zhí)行在注解中設(shè)置的降級(jí)方法
            return handleBlockException(pjp, annotation, ex);
        } catch (Throwable ex) {
            Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
            //The ignore list will be checked first.
            if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
                throw ex;
            }
            if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
                traceException(ex);
                return handleFallback(pjp, annotation, ex);
            }
            //No fallback function can handle the exception, so throw it out.
            throw ex;
        } finally {
            if (entry != null) {
                entry.exit(1, pjp.getArgs());
            }
        }  
    }
}

//Some common functions for Sentinel annotation aspect.
public abstract class AbstractSentinelAspectSupport {
    ...
    protected Object handleBlockException(ProceedingJoinPoint pjp, SentinelResource annotation, BlockException ex) throws Throwable {
        //Execute block handler if configured.
        Method blockHandlerMethod = extractBlockHandlerMethod(pjp, annotation.blockHandler(), annotation.blockHandlerClass());
        if (blockHandlerMethod != null) {
            Object[] originArgs = pjp.getArgs();
            //Construct args.
            Object[] args = Arrays.copyOf(originArgs, originArgs.length + 1);
            args[args.length - 1] = ex;
            return invoke(pjp, blockHandlerMethod, args);
        }
        //If no block handler is present, then go to fallback.
        return handleFallback(pjp, annotation, ex);
    }

    private Object invoke(ProceedingJoinPoint pjp, Method method, Object[] args) throws Throwable {
        try {
            if (!method.isAccessible()) {
                makeAccessible(method);
            }
            if (isStatic(method)) {
                return method.invoke(null, args);
            }
            return method.invoke(pjp.getTarget(), args);
        } catch (InvocationTargetException e) {
            //throw the actual exception
            throw e.getTargetException();
        }
    }
    ...
}

詳細(xì)介紹后端技術(shù)棧的基礎(chǔ)內(nèi)容,包括但不限于:MySQL原理和優(yōu)化、Redis原理和應(yīng)用、JVM和G1原理和優(yōu)化、RocketMQ原理應(yīng)用及源碼、Kafka原理應(yīng)用及源碼、ElasticSearch原理應(yīng)用及源碼、JUC源碼、Netty源碼、zk源碼、Dubbo源碼、Spring源碼、Spring Boot源碼、SCA源碼、分布式鎖源碼、分布式事務(wù)、分庫(kù)分表和TiDB、大型商品系統(tǒng)、大型訂單系統(tǒng)等

全部評(píng)論

相關(guān)推薦

游戲行業(yè)的高薪和'帶薪打游戲',讓大伙擠破了頭,&nbsp;想進(jìn)“鵝豬米”這類一線大廠。但現(xiàn)實(shí)是大廠校招堪比“游戲界高考”,沒(méi)兩把刷子簡(jiǎn)歷都過(guò)不去。別光盯著大廠當(dāng)“舔狗”游戲圈里還有很多寶藏公司,改好簡(jiǎn)歷和作品集迅速上車吧!說(shuō)不定你就是下一個(gè)爆款游戲的幕后大佬!一、用利他思維打造你的&quot;求職通關(guān)簡(jiǎn)歷&quot;&nbsp;&nbsp;簡(jiǎn)歷不是自嗨的成就清單,而是一份&quot;企業(yè)需求解決方案書&quot;。你需要站在HR和用人部門的角度思考:他們需要什么樣的人才?你能為他們解決什么問(wèn)題?&nbsp;&nbsp;一份高轉(zhuǎn)化率的簡(jiǎn)歷應(yīng)該做到:&nbsp;&nbsp;?&nbsp;價(jià)值可視化——不堆砌經(jīng)歷,而是展示你能為團(tuán)隊(duì)帶來(lái)的實(shí)際貢獻(xiàn)&nbsp;&nbsp;?&nbsp;機(jī)會(huì)敲門磚——用精準(zhǔn)匹配的證明,撬開(kāi)面試室的大門&nbsp;&nbsp;記住:好簡(jiǎn)歷不是說(shuō)&quot;我有多優(yōu)秀&quot;,而是告訴企業(yè)&quot;選我能解決你的痛點(diǎn)&quot;。也就是展示自我價(jià)值獲取HR好感獲得面試機(jī)會(huì)二、精準(zhǔn)狙擊崗位需求的簡(jiǎn)歷秘訣①&nbsp;先做&quot;崗位解碼器&quot;-&nbsp;像分析游戲任務(wù)說(shuō)明一樣拆解JD&nbsp;&nbsp;-&nbsp;標(biāo)出關(guān)鍵詞:必備技能、核心職責(zé)、隱性需求&nbsp;②&nbsp;啟動(dòng)人崗匹配系統(tǒng)-&nbsp;你的技能庫(kù)&nbsp;→&nbsp;對(duì)接企業(yè)需求池&nbsp;&nbsp;-&nbsp;用項(xiàng)目經(jīng)驗(yàn)證明:你正好能解決他們最頭疼的問(wèn)題&nbsp;&nbsp;(效果:把被動(dòng)投遞變成精準(zhǔn)打擊,用游戲化語(yǔ)言增強(qiáng)代入感)三、簡(jiǎn)歷構(gòu)成1、個(gè)人資料(姓名,聯(lián)系方式,教育經(jīng)歷,照片,專業(yè)等)2、實(shí)習(xí)經(jīng)歷/項(xiàng)目簡(jiǎn)歷(采用STAR法則能夠讓你的實(shí)習(xí)經(jīng)歷描述更加結(jié)構(gòu)化和有說(shuō)服力;①情景:實(shí)習(xí)公司,時(shí)間,部門②任務(wù):實(shí)習(xí)期間主要任務(wù)③行動(dòng):如何解決問(wèn)題,使用工具④結(jié)果:用數(shù)據(jù)成果說(shuō)明價(jià)值)3、技能、證書、自我評(píng)價(jià)4、作品集(作品集和簡(jiǎn)歷分開(kāi))作品集要重質(zhì)量而非數(shù)量,3-4個(gè)完整項(xiàng)目>10個(gè)零散的作品。要保證項(xiàng)目的完整性,像不成體系的設(shè)計(jì)稿、沒(méi)有明確目標(biāo)的項(xiàng)目、過(guò)時(shí)的作品等可以酌情刪除。四、網(wǎng)申過(guò)程中能寫游戲經(jīng)歷的一定要寫上去;比如&nbsp;:1、自己熟悉了解的游戲按照類別列舉&nbsp;&nbsp;&nbsp;2、這款游戲有哪里做的比較吸引人&nbsp;&nbsp;3、有哪里不足可優(yōu)化&nbsp;4、中間的玩法,關(guān)卡設(shè)置對(duì)比#我的求職精神狀態(tài)##游戲求職進(jìn)展匯總##我的上岸簡(jiǎn)歷長(zhǎng)這樣##簡(jiǎn)歷中的項(xiàng)目經(jīng)歷要怎么寫##簡(jiǎn)歷被掛麻了,求建議##最后再改一次簡(jiǎn)歷#
點(diǎn)贊 評(píng)論 收藏
分享
評(píng)論
1
收藏
分享

創(chuàng)作者周榜

更多
正在熱議
更多
# 面試問(wèn)題記錄 #
60811次瀏覽 873人參與
# 硬件人的簡(jiǎn)歷怎么寫 #
254311次瀏覽 2884人參與
# 京東TGT #
47553次瀏覽 174人參與
# 你遇到過(guò)哪些神仙同事 #
71724次瀏覽 641人參與
# 我的2024小目標(biāo) #
58090次瀏覽 390人參與
# 工作中,你有沒(méi)有遇到非常愛(ài)罵人的領(lǐng)導(dǎo)? #
17006次瀏覽 126人參與
# 百度工作體驗(yàn) #
204784次瀏覽 1907人參與
# 實(shí)習(xí)生應(yīng)該準(zhǔn)時(shí)下班嗎 #
201617次瀏覽 1314人參與
# 上班到公司第一件事做什么? #
37850次瀏覽 347人參與
# 國(guó)企和大廠硬件兄弟怎么選? #
120105次瀏覽 1656人參與
# 工作一周年分享 #
19185次瀏覽 111人參與
# 沒(méi)有合適的工作,你會(huì)先找個(gè)干著,還是考公考研 #
104390次瀏覽 1076人參與
# 面試吐槽bot #
14211次瀏覽 87人參與
# 互聯(lián)網(wǎng)行業(yè)現(xiàn)在還值得去嗎 #
6379次瀏覽 42人參與
# 面試經(jīng)驗(yàn)談 #
39051次瀏覽 509人參與
# 拼多多工作體驗(yàn) #
16707次瀏覽 152人參與
# 入職第五天,你被拉進(jìn)了幾個(gè)工作群 #
17956次瀏覽 80人參與
# 假如我穿越到了媽媽的18歲 #
6308次瀏覽 43人參與
# 國(guó)企vs私企,你更想去? #
217556次瀏覽 2072人參與
# 你們的畢業(yè)論文什么進(jìn)度了 #
999333次瀏覽 9398人參與
# 機(jī)械人,你的第一份感謝信是誰(shuí)給的 #
26817次瀏覽 296人參與
??途W(wǎng)
??推髽I(yè)服務(wù)