飞道的博客

[享学Netflix] 十六、Hystrix断路器:初体验及RxJava简介

712人阅读  评论(0)

减少与单点的交互,是存在单点的系统的核心优化方向。常见的方法有批量写、客户端缓存等等

–> 返回专栏总目录 <–
代码下载地址:https://github.com/f641385712/netflix-learning

前言

Hystrix也是Netflix OSS的一部分,它是一个断路器,用于保护你的微服务。随着微服务的流行,熔断作为其中一项很重要的技术也广为人知。当微服务的运行质量低于某个临界值时(静态阈值的实现方式),启动熔断机制,暂停微服务调用一段时间,以保障后端的微服务不会因为持续过负荷而宕机(熔断、限流)。

在分布式系统中,单个应用通常会有多个不同类型的外部依赖服务,内部通常依赖于各种RPC服务(当然也可能是Http实现),外部则依赖于各种HTTP服务。这些依赖服务不可避免的会出现调用失败,比如超时、异常等情况,如何在外部依赖出问题的情况,仍然保证自身应用的稳定,就是Hystrix这类服务保障框架的工作了,这便是隔离的概念,当然还有防止雪崩等功能。

简单说:某一个功能不可用,是永远优于全部不可用的。这个特点就像每家每户的保险丝:某家用大功率电器导致停电了,并不会影响到其它住户。


正文


源码地址:https://github.com/Netflix/Hystrix
hystrix中文释义:豪猪。它大概长这样(豪猪身上有很多刺,这些刺可以很好的保护自己,估计开发团队给取这个名字也是这个意思):

Hystrix是一个延迟和容错库,旨在隔离对远程系统,服务和第三方库的访问点,停止级联故障,并在不可避免发生故障的复杂分布式系统中实现弹性。

Hystrix的目标就是能够在1个或多个依赖出现问题时,系统依然可以稳定的运行,其手段包括隔离、限流和降级等。顺道复习一下高可用常用的7种手段:

  1. 隔离
  2. 限流
    1. 限流:即限制流量的最大值,是流控的一种方式
  3. 降级fallback
  4. 负载均衡
  5. 超时与重试
  6. 回滚
  7. 压测与预案

如果做一个简单的限流功能,那是很容易的,但如果想做更精准的控制、处理后的细分和快速恢复,还有大量的工作需要做。很多RPC框架一般都自带流控、熔断的能力,但一般都不够强大,离自动化还有距离,这就是为何这块要专门拿出来做的原因(因为很很很很重要)。


Netflix Hystrix

<dependency>
    <groupId>com.netflix.hystrix</groupId>
    <artifactId>hystrix-core</artifactId>
    <version>1.5.18</version>
</dependency>

Hystrix(1.5.18版,2018.11发布)已经足够稳定,可以满足Netflix现有应用程序的需求。

官方认为,接下来它们的重心是要转向对应用程序的实时性能做出反应的自适应性实现,而不是预先配置的设置。也就是说限流使用动态、弹性值,而非事先设定好的阈值来实现。针对于此,若你想有这样的诉求,官方推荐你使用resilience4j来做。

说明:resilience4j是受Hystrix启发而做的熔断器,通过管理远程调用的容错处理来帮助实现一个健壮的系统。resilience4j提供了更好用的API,并且提供了很多其他功能比如Rate Limiter(限流器),Bulkhead(舱壁隔离)。

现状

Hystrix不再处于主动开发阶段,目前处于维护模式。Netflix Hystrix现在正式处于维护模式,它已经于2018.11发布了最后一个版本1.5.18,后期也不会再接口社区的pull request,简单的说就是不会再升级了。

当然,若你觉得有能力扛起这面大旗,你可以给它们团队发邮件:hystrixoss@googlegroups.com,让它重新回到活动状态~


hystrix-core的依赖项截图如下:


这里有值的一说的两个核心依赖项:

  • Archaius:配置管理库。这不就是该系列前十几篇文章讲述的重点麽,这里就用到了,很激动有木有
  • rxjava:响应式编程库。这个也是Netflix开源出来的一套异步编程库,问下会有介绍

HdrHistogram这个依赖,用于分析采样的数据,方便画直方图等等,和它的metric有关,一般不用太关注


有何用?

说了这么多,你可能还不知道Hystrix有何用,这里罗列它的作用如下:

  • 延迟和容错:停止级联故障。fallabck和优雅的降级,Fail fast和快速恢复。使用circuit breakers通过线程 或者 信号量隔离。
  • 实时操作:实时监控和配置更改,可以让属性被修改后能够立刻生效(很显然,这种能力由archauis提供支持)。得到提醒,做出决定,影响改变,并在几秒钟内看到结果。
  • 并发:并行执行。支持并发的请求缓存。自动批处理(通过请求合并)。

断路器:HystrixCircuitBreaker是整个Hystrix里一个很重要的抽象,后面也会当作重点详细说明。
Hystrix包含限流、熔断等功能的库类,它能给系统提供快速失败快速恢复的能力,让其更具“弹性”。

说明:流控、熔断和快速恢复是现在大型分布式系统中各个服务节点应该具备的基本抗灾容错能力


工作流程

Hystrix相较于其它组件,属于比较复杂的。官网里有一张描述其工作流程的图示,因为过于复杂本人决定不引用(容易懵逼),而引用一大神的自绘图,个人觉得把核心、关键节点均圈出来了,供以参考:

每个请求都会被包装成一个Command对象来执行,该图示展示的一个请求执行的关键流程。


快速示例

public class CommandHelloWorld extends HystrixCommand<String> {
    private final String name;

    // 指定一个HystrixCommandGroupKey,这样熔断策略会按照此组执行
    public CommandHelloWorld(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("MyAppGroup"));
        this.name = name;
    }

    @Override
    protected String run() {
        if(name == null){
            throw new NullPointerException();
        }
        return "Hello " + name + "!";
    }

    @Override
    protected String getFallback() {
        // super.getFallback():No fallback available.
        return "this is fallback msg";
    }
}

测试代码:

@Test
public void fun2() {
    // 三种执行方式:

    // 1、普通方式
    String s = new CommandHelloWorld("Bob").execute();
    System.out.println(s); // Hello Bob!

    String fallbackValue = new CommandHelloWorld(null).execute();
    // 说明:若你没有提供fallback函数,那结果是:
    // com.netflix.hystrix.exception.HystrixRuntimeException: CommandHelloWorld failed and no fallback available.
    System.out.println(fallbackValue); // "this is fallback msg"

    // 2、异步方式。什么时候需要时候什么时候get
    // Future<String> s = new CommandHelloWorld("Bob").queue();
    // System.out.println(s.get()); // Hello Bob!

    // 3、RxJava方式。吞吐量更高,但对程序员的要求更高
    // Observable<String> s = new CommandHelloWorld("Bob").observe();
    // s.subscribe(d -> System.out.println(d)); // Hello Bob!
}

实例中使用三种方式来执行,均是可以的,各位可自行选择。


RxJava有话说

由于hystrixy-core依赖于RxJava构建,因此需要做个简单了解

那么什么是RxJava呢?

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxjava</artifactId>
    <version>1.3.8</version>
</dependency>

这是RxJava的1.x版本(1.x现已停更,于2018.5发布发布最后一版1.3.8

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.2.18</version>
</dependency>

说明:1.x和2.x的GAV、包名均不同,所以可以和平共处

现行流行版本为2.x分支,若你想单独使用,推荐使用2.x。但是,但是,但是1.x还存在较大的存量市场,Netflix套件依赖均为1.x,所以依旧存在学习的价值,不容忽视。

当年的Netflix也是为了增加服务器的性能和吞吐量来编写RxJava并开源,简单的说它是一个对响应式编程提供支持的库,在Android中使用得极多,但实际在Java Server端使用得很少。

RxJava的实质是一个异步操作库,用于简化异步开发。本文学习的Hystrix虽有涉及到,但并不会深究。


核心概念

注意:以下讲解、示例均基于1.x版本

它的核心思想和Java的观察者模式非常像:被观察者和观察者通过订阅产生一种关系,当被观察者发生一些改变,通知观察者,观察者对应做出相应的回应。

主要有三个关键词需要记住:被观察者(Observable),订阅(subscribe),观察者(Observer)。

  • Observable(被观察者,也就是数据发射器):public class Observable<T>代表一个被观察对象
  • Observer(观察者,也就是数据接收器) :public interface Observer<T>实现此接口便是一个观察者,有onCompleted/onError/onNext的监听方法
  • subscribe(订阅,也就是把发射器和接收器关联起来):Observable#subscribe(action),订阅此被观察者。

线程调控Scheduler

RxJava很优势的一个方面就是他的线程切换,基本是依靠ObserveOnSubscribeOn这两个操作符来完成的。

  • subscribeOn:指定上游事件发射器所用的线程,若多次设定,则只有一次起作用
  • observeOn:指定下游操作所使用的线程,若多次指定则每次均起作用

Scheduler种类

  • Schedulers.io():用于IO密集型的操作,例如读取SD卡文件、查询数据库、访问网络等,具有线程缓存机制
  • Schedulers.newThread():在每执行一次任务时创建一个新的线程,不具有线程缓存机制,效率比Scheduler.io()低,并发性很高,一般不建议使用
  • Schedulers.computation():用于CPU密集型计算任务,即不会被I/O等操作限制性能的耗时操作,例如xml,json文件解析,Bitmap图片压缩取样等。具有固定的线程池,大小为CPU的核数。不可以用于I/O操作,因为I/O操作的等待时间浪费CPU。
  • Schedulers.trampoline()在当前线程立即执行任务,如果当前线程有任务在这执行,则将其停止,等插入进来的任务执行完成之后,在将未执行完成的任务接着执行。
  • Schedulers.immediate():注意此类型在2.x版本已经被废弃,效果同2.x中的Schedulers.trampoline()
  • Schedulers.from(@NonNull Executor executor):用户自己指定一个线程调度器,由此调度器来控制任务的执行策略
  • Schedulers.test():用于你debug的时候使用

操作符

RxJava操作符:其实质是函数式编程中的高阶函数(帮你定义好一些处理逻辑,无需自行实现),方便你操作数据流。

  • 创建:
    • create:使用OnSubscribe从头创建一个Observable实例:Observable.create(new Observable.OnSubscribe<String>()
    • from:将一个Iterable, 一个Future, 或者一个数组,内部通过代理的方式转换成一个实例:Observable.from(list)
    • just:将一个或多个对象变为一个实例:Observable.just(1, 2, 3, 4, 5, 6)
    • empty:创建一个什么都不做直接通知完成的实例
    • error:创建一个什么都不做直接通知错误的实例
    • never:创建一个什么都不做的实例
    • timer:创建一个在给定的延时之后发射数据项为0的实例Observable<Long>
    • interval:按照给定的时间间隔发射从0开始的整数序列的Observable<Long>
    • range:
    • defer:
  • 过滤:
    • filter:过滤数据。
    • ofType:过滤指定类型的数据,与filter类似
    • take:只发射开始的N项数据或者一定时间内的数据
    • takeLast:
    • takeFirst:
    • firstOrDefault:
    • last/lastOrDefault:
    • skip:
    • skipLast:
    • elementAt/elementAtOrDefault:
    • ignoreElements:
    • distinct:
    • timeout:
    • distinctUntilChanged:
    • throttleFirst:
    • throttleWithTimeout/debounce:
  • 组合/合并:
    • concat:按顺序连接多个Observables:Observable.concat(a,b)/a.concatWith(b)
    • startWith:在数据序列的开头增加一项数据
    • merge:将多个Observable合并为一个。不同于concat,merge不是按照添加顺序连接,而是按照时间线来连接。
    • zip:使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果
    • combineLatest
  • 变换:map/flatMap/cast/flatMapIterable/groupBy
  • 聚合:reduce/collect/count/countLong
  • 转换:toList/toSortedList/toMap/toMultiMap
  • 错误处理/重试机制:onErrorResumeNext/onExceptionResumeNext/onErrorReturn/retry/retryWhen…

操作符实在太多了,但是最为常用的不会太多,掌握即可。这里就不用给使用示例了,因为对于已经能够很熟练使用Java Stream API的你,这都是小意思~


背压Backpressure

被压策略有很多种,比如:

  • ERROR:生产比消费快,那就抛错
  • DROP:生产比消费快,丢弃新生产的数据

使用示例

@Test
public void fun1() {
    // 自定义一个线程池:用于处理消费者任务
    ExecutorService myDiyThreadExe = Executors.newFixedThreadPool(1, r -> {
        Thread thread = new Thread(r);
        thread.setName("myDiyThread");
        return thread;
    });

    // Observable.just(1, 2, 3, 4, 5, 6)
    Observable.from(new Integer[]{1, 2, 3, 4, 5, 6})
            .subscribeOn(Schedulers.io()) //(发送事件的线程所在地,只能生效一次)
            .observeOn(Schedulers.immediate()) // 设置下面的Map操作,在当前线程立马执行(可生效多次)
            .map(i -> {
                System.out.println("map操作执行线程:" + Thread.currentThread().getName());
                return i * i;
            })
            .observeOn(Schedulers.newThread()) // 因为这是新线程,所以控制台的日志打印换乱串~~~
            .filter(i -> {
                System.out.println("filter操作执行线程:" + Thread.currentThread().getName());
                return i > 10;
            })

            .observeOn(Schedulers.from(myDiyThreadExe)) // 任务在自定义的线程池里执行
            // 处理事件:订阅:使用Action处理
            .subscribe(i -> System.out.printf("subscribe订阅处理线程 %s,值为%s \n", Thread.currentThread().getName(), i));

    // hold主线程
    while (true) {
    }

}

运行程序,控制台输出:

map操作执行线程:RxIoScheduler-2
map操作执行线程:RxIoScheduler-2
filter操作执行线程:RxNewThreadScheduler-1
filter操作执行线程:RxNewThreadScheduler-1
map操作执行线程:RxIoScheduler-2
map操作执行线程:RxIoScheduler-2
filter操作执行线程:RxNewThreadScheduler-1
map操作执行线程:RxIoScheduler-2
filter操作执行线程:RxNewThreadScheduler-1
map操作执行线程:RxIoScheduler-2
filter操作执行线程:RxNewThreadScheduler-1
filter操作执行线程:RxNewThreadScheduler-1
subscribe订阅处理线程 myDiyThread,值为16 
subscribe订阅处理线程 myDiyThread,值为25 
subscribe订阅处理线程 myDiyThread,值为36 

说明:因为filter操作使用的是新线程RxNewThreadScheduler,所以它的日志打印会乱串哦。

当你自己写main/单测测试异步程序的时候,请务必hold住主线程,否则你将看不到效果,这是初学者常犯的一个小错误,此处提醒你一下。


关于RxJava的介绍就先到这,这是一个极简介绍而已,这里我贴出几篇文章,有兴趣者可前往阅读:

异步、响应式编程从来都不是件容易的事,实操起来更是利弊共存,请大家在实际生产中酌情选型。


总结

关于Netflix Hystrix断路器:初体验及RxJava简介就先介绍到这,通过本文能了解到如下两部分知识:

  1. Hystrix是什么,有何用,怎么用?
  2. RxJava是什么,有何用,怎么用?

当然,怎么用是个较大的话题,关于RxJava怎么用就先止步于此,有兴趣的小朋友自行研究。接下来的文章将重点分析Hystrix的使用方式,以及深度掌握其工作原理,这也能为使用、理解resilience4j奠定一个基础。

声明

原创不易,码字不易,多谢你的点赞、收藏、关注。把本文分享到你的朋友圈是被允许的,但拒绝抄袭。你也可【左边扫码/或加wx:fsx641385712】邀请你加入我的 Java高工、架构师 系列群大家庭学习和交流。


转载:https://blog.csdn.net/f641385712/article/details/104505957
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场