博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊rsocket load balancer的Ewma
阅读量:6228 次
发布时间:2019-06-21

本文共 6130 字,大约阅读时间需要 20 分钟。

  hot3.png

本文主要研究一下rsocket load balancer的Ewma

Moving Average

SMA

SMA(Simple Moving Average),即简单移动平均,其公式如下:

SMAt = (Pt + Pt-1 + Pt-2 + Pt-3 + ... + Pt-n+1) / n

这里的Pt到为Pt-n+1为最近的n个数据

WMA

WMA(Weighted Moving Average),即加权移动平均,其公式如下:

WMAt = (Pt * Wt) + (Pt-1 * Wt-1) + ... + (Pt-n+1 * Wt-n+1)

WMA会给最近的n个数据加上权重,其中这些权重加起来和为1,一般是较近的数据权重比较大

EMA或EWMA

EMA(Exponentially Moving Average)指数移动平均或EWMA(Exponentially Weighted Moving Average)指数加权移动平均,其公式如下:

EMAt = (Pt * S) + (1- S) * EMAt-1

它有一个S参数为平滑指数,一般是取2/(N+1)

Ewma

rsocket-load-balancer-0.12.1-sources.jar!/io/rsocket/stat/Ewma.java

public class Ewma {  private final long tau;  private volatile long stamp;  private volatile double ewma;  public Ewma(long halfLife, TimeUnit unit, double initialValue) {    this.tau = Clock.unit().convert((long) (halfLife / Math.log(2)), unit);    stamp = 0L;    ewma = initialValue;  }  public synchronized void insert(double x) {    long now = Clock.now();    double elapsed = Math.max(0, now - stamp);    stamp = now;    double w = Math.exp(-elapsed / tau);    ewma = w * ewma + (1.0 - w) * x;  }  public synchronized void reset(double value) {    stamp = 0L;    ewma = value;  }  public double value() {    return ewma;  }  @Override  public String toString() {    return "Ewma(value=" + ewma + ", age=" + (Clock.now() - stamp) + ")";  }}
  • Ewma的构造器需要指定halfLife、timeunit、initialValue(ewma初始值)参数;ewma = w * ewma + (1.0 - w) * x,其中x为当前值,w为权重
  • 权重w = Math.exp(-elapsed / tau),即e的-elapsed / tau次方;elapsed为距离上次计算的时间长度;tau(希腊字母)为EWMA的时间常量
  • 这里的tau = halfLife / Math.log(2)根据timeunit转换后的值;其中halfLife参数,代表speed of convergence,即收敛的速度

RSocketSupplier

rsocket-load-balancer-0.12.1-sources.jar!/io/rsocket/client/filter/RSocketSupplier.java

public class RSocketSupplier implements Availability, Supplier
>, Closeable { private static final double EPSILON = 1e-4; private Supplier
> rSocketSupplier; private final MonoProcessor
onClose; private final long tau; private long stamp; private final Ewma errorPercentage; public RSocketSupplier(Supplier
> rSocketSupplier, long halfLife, TimeUnit unit) { this.rSocketSupplier = rSocketSupplier; this.tau = Clock.unit().convert((long) (halfLife / Math.log(2)), unit); this.stamp = Clock.now(); this.errorPercentage = new Ewma(halfLife, unit, 1.0); this.onClose = MonoProcessor.create(); } public RSocketSupplier(Supplier
> rSocketSupplier) { this(rSocketSupplier, 5, TimeUnit.SECONDS); } @Override public double availability() { double e = errorPercentage.value(); if (Clock.now() - stamp > tau) { // If the window is expired artificially increase the availability double a = Math.min(1.0, e + 0.5); errorPercentage.reset(a); } if (e < EPSILON) { e = 0.0; } else if (1.0 - EPSILON < e) { e = 1.0; } return e; } private synchronized void updateErrorPercentage(double value) { errorPercentage.insert(value); stamp = Clock.now(); } @Override public Mono
get() { return rSocketSupplier .get() .doOnNext(o -> updateErrorPercentage(1.0)) .doOnError(t -> updateErrorPercentage(0.0)) .map(AvailabilityAwareRSocketProxy::new); } @Override public void dispose() { onClose.onComplete(); } @Override public boolean isDisposed() { return onClose.isDisposed(); } @Override public Mono
onClose() { return onClose; } private class AvailabilityAwareRSocketProxy extends RSocketProxy { public AvailabilityAwareRSocketProxy(RSocket source) { super(source); onClose.doFinally(signalType -> source.dispose()).subscribe(); } @Override public Mono
fireAndForget(Payload payload) { return source .fireAndForget(payload) .doOnError(t -> errorPercentage.insert(0.0)) .doOnSuccess(v -> updateErrorPercentage(1.0)); } @Override public Mono
requestResponse(Payload payload) { return source .requestResponse(payload) .doOnError(t -> errorPercentage.insert(0.0)) .doOnSuccess(p -> updateErrorPercentage(1.0)); } @Override public Flux
requestStream(Payload payload) { return source .requestStream(payload) .doOnError(th -> errorPercentage.insert(0.0)) .doOnComplete(() -> updateErrorPercentage(1.0)); } @Override public Flux
requestChannel(Publisher
payloads) { return source .requestChannel(payloads) .doOnError(th -> errorPercentage.insert(0.0)) .doOnComplete(() -> updateErrorPercentage(1.0)); } @Override public Mono
metadataPush(Payload payload) { return source .metadataPush(payload) .doOnError(t -> errorPercentage.insert(0.0)) .doOnSuccess(v -> updateErrorPercentage(1.0)); } @Override public double availability() { // If the window is expired set success and failure to zero and return // the child availability if (Clock.now() - stamp > tau) { updateErrorPercentage(1.0); } return source.availability() * errorPercentage.value(); } }}
  • RSocketSupplier实现了Availability、Supplier、Closeable接口,其中它定义了errorPercentage变量,其类型为Ewma;如果没有指定halfLife值,则RSocketSupplier默认halfLife为5秒,ewma的初始值为1.0
  • RSocketSupplier定义了一个常量EPSILON = 1e-4,其availability方法会先计算availability,然后在距离上次计算时间stamp超过tau值时会重置errorPercentage;之后当availability小于EPSILON时返回0,当availability + EPSILON大于1时返回1.0
  • updateErrorPercentage方法用于往ewma插入新值,同时更新stamp;get方法的doOnNext方法updateErrorPercentage值为1.0,doOnError方法updateErrorPercentage值为0.0;map会将RSocket转换为AvailabilityAwareRSocketProxy;AvailabilityAwareRSocketProxy对目标RSocket进行代理,对相关方法的doOnError及doOnSuccess都织入errorPercentage的统计

小结

  • Moving Average有好几种算法,包括SMA(Simple Moving Average)、WMA(Weighted Moving Average)、EMA(Exponentially Moving Average)或EWMA(Exponentially Weighted Moving Average)
  • Ewma的构造器需要指定halfLife、timeunit、initialValue(ewma初始值)参数;ewma = w * ewma + (1.0 - w) * x,其中x为当前值,w为权重;权重w = Math.exp(-elapsed / tau),即e的-elapsed / tau次方;elapsed为距离上次计算的时间长度;tau(希腊字母)为EWMA的时间常量;这里的tau = halfLife / Math.log(2)根据timeunit转换后的值;其中halfLife参数,代表speed of convergence,即收敛的速度
  • rsocket load balancer使用了Ewma了统计服务的availability;其中RSocketSupplier实现了Availability、Supplier、Closeable接口,其中它定义了errorPercentage变量,其类型为Ewma;如果没有指定halfLife值,则RSocketSupplier默认halfLife为5秒,ewma的初始值为1.0;RSocketSupplier的get方法会将RSocket转换为AvailabilityAwareRSocketProxy,而AvailabilityAwareRSocketProxy则会对目标RSocket进行代理,对相关方法的doOnError及doOnSuccess都织入errorPercentage的统计

doc

转载于:https://my.oschina.net/go4it/blog/3036091

你可能感兴趣的文章
UWP: 实现 UWP 应用自启动
查看>>
Windows内核之进程的终止和子进程
查看>>
Python 文件 readline() 方法
查看>>
String,到底创建了多少个对象?
查看>>
linux查找目录下的所有文件中是否含有某个字符串
查看>>
UWP 手绘视频创作工具技术分享系列 - 有 AI 的手绘视频
查看>>
各行业最受欢迎的编程语言,硬件最青睐C和C++
查看>>
监听用户的后退键,解决部分浏览器回退的bug
查看>>
Vivado+FPGA:如何使用Debug Cores(ILA)在线调试(烧录到flash里可以直接启动)
查看>>
[Preference] How to avoid Forced Synchronous Layout or FSL to improve site preference
查看>>
【laravel5.4】php artisan migrate报错:Specified key was too long; max key length is 767 bytes
查看>>
[转]外贸出口流程图
查看>>
微信小程序onLaunch修改globalData的值
查看>>
php实现简单算法3
查看>>
Always run a program in administrator mode in Windows 10
查看>>
打陀螺
查看>>
tcp echo server libuv
查看>>
Random Processes
查看>>
操作argc, argv的经典写法
查看>>
phpStudy中升级MySQL版本到5.7.17的方法步骤
查看>>