计数器限流方式比较粗暴,一次访问就增加一次计数,在系统内设置每 N 秒的访问量,超过访问量的访问直接丢弃,从而实现限流访问。
具体大概是以下步骤:
这种算法的弊端
在开始的时间,访问量被使用完后,1 s 内会有很长时间的真空期是处于接口不可用的状态的,同时也有可能在一秒内出现两倍的访问量。
T窗口的前1/2时间 无流量进入,后1/2时间通过5个请求;
代码实现
private final Semaphore count = new Semaphore(5); @PostConstruct public void init() { //初始化定时任务线程池 ScheduledExecutorService service = new ScheduledThreadPoolExecutor(2, t -> { Thread thread = new Thread(t); thread.setName("limit"); return thread; }); // 每10s执行5次 service.scheduleAtFixedRate(() -> count.release(5), 10, 10, TimeUnit.SECONDS); } /** * 计数器限流 */ public void count() { try { count.acquire(); System.out.println("count"); } catch (InterruptedException e) { e.printStackTrace(); } }
控制并发访问量
具体大概是以下步骤:
代码实现
private final Semaphore flag = new Semaphore(5); /** * 信号量限流 */ public void flag() { try { flag.acquire(); System.out.println("flag"); int i = new Random().nextInt(10); TimeUnit.SECONDS.sleep(i); } catch (InterruptedException e) { e.printStackTrace(); } finally { flag.release(); } }
具体大概是以下步骤:
代码实现
private final AtomicInteger[] window = new AtomicInteger[10]; @PostConstruct public void init() { //初始化定时任务线程池 ScheduledExecutorService service = new ScheduledThreadPoolExecutor(2, t -> { Thread thread = new Thread(t); thread.setName("limit"); return thread; }); // 10个窗口,每次滑动1s Arrays.fill(window, new AtomicInteger(0)); service.scheduleAtFixedRate(() -> { int index = (int) (System.currentTimeMillis() / 1000 % 10); window[index] = new AtomicInteger(0); }, 1, 1, TimeUnit.SECONDS); } /** * 滑动窗口 */ public void window() { int sum = 0; for (int i = 0; i < window.length; i++) { sum += window[i].get(); } if (sum > 10) { return; } System.out.println("window"); int index = (int) (System.currentTimeMillis() / 1000 % 10); window[index].getAndAdd(1); }
具体大概是以下步骤:
代码实现
private final BlockingQueue<Long> queue = new LinkedBlockingDeque<>(5); @PostConstruct public void init() { //初始化定时任务线程池 ScheduledExecutorService service = new ScheduledThreadPoolExecutor(2, t -> { Thread thread = new Thread(t); thread.setName("limit"); return thread; }); // 一恒定的速率执行 service.scheduleAtFixedRate(() -> { try { if (System.currentTimeMillis() - queue.take() > 1000L) { process(); } } catch (InterruptedException e) { e.printStackTrace(); } }, 100, 100, TimeUnit.MILLISECONDS); } /** * 漏桶限流 */ public void bucket() { try { queue.put(System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } } private void process() { System.out.println("process"); }
令牌桶算法是漏斗算法的改进版,为了处理短时间的突发流量而做了优化,令牌桶算法主要由三部分组成:令牌流、数据流、令牌桶。
名词释义:
具体大概是以下步骤:
代码实现
private final BlockingQueue<Integer> token = new LinkedBlockingDeque<>(5); @PostConstruct public void init() { //初始化定时任务线程池 ScheduledExecutorService service = new ScheduledThreadPoolExecutor(2, t -> { Thread thread = new Thread(t); thread.setName("limit"); return thread; }); // 以恒定的速率放入令牌 service.scheduleAtFixedRate(() -> { try { token.put(1); } catch (InterruptedException e) { e.printStackTrace(); } }, 1, 1, TimeUnit.SECONDS); } public void token() { try { token.take(); System.out.println("token"); } catch (InterruptedException e) { e.printStackTrace(); } }
@Resource private LimitDemo demo; @Test public void count() throws InterruptedException { process(() -> demo.count()); } @Test public void flag() throws InterruptedException { process(() -> demo.flag()); } @Test public void window() throws InterruptedException { process(() -> demo.window()); } @Test public void bucket() throws InterruptedException { process(() -> demo.bucket()); } @Test public void token() throws InterruptedException { process(() -> demo.token()); } private void process(Process process) throws InterruptedException { CompletableFuture<?>[] objects = IntStream.range(0, 10).mapToObj(i -> CompletableFuture.runAsync(() -> { while (true) { process.execute(); } })).collect(Collectors.toList()).toArray(new CompletableFuture<?>[] {}); CompletableFuture.allOf(objects); new CountDownLatch(1).await(); } @FunctionalInterface public interface Process { void execute(); }
源码地址 https://github.com/googalAmbition/googol/tree/master/limit