响应式编程(一)

1. 响应式编程

Reactor是反应式编程范例的实现,可以总结如下:反应式编程是一种异步编程范式,涉及数据流和变化的传播。这意味着可以通过所采用的编程语言轻松地表达静态(例如数组)或动态(例如事件发射器)数据流。

作为响应式编程方向的第一步,Microsoft在.NET生态系统中创建了响应式扩展(Rx)库。然后RxJava在JVM上实现了反应式编程。随着时间的流逝,通过Reactive Streams的努力出现了Java的标准化,该规范定义了JVM上的响应库的一组接口和交互规则。它的接口已在Flow类下集成到Java 9中。

反应式编程范例通常以面向对象的语言表示,作为Observer设计模式的扩展。您还可以将主要的反应流模式与熟悉的Iterator设计模式进行比较,因为所有这些库中的Iterable-Iterator对都有双重性。一个主要的区别是,虽然Iterator是基于pull的,但是反应流却是基于push的。

使用迭代器是命令式编程模式,即使访问值的方法仅由Iterable负责。实际上,由开发人员决定何时选择序列中的next()项。在反应式流中,上述一对等效于Publisher-Subscriber。但是是发布者在新可用值到来时通知订阅者,而此推送方面是做出反应的关键。同样,应用于推入值的操作以声明方式而不是命令方式表示:程序员表示计算的逻辑,而不是描述其确切的控制流程。

除了推送值之外,还以明确定义的方式涵盖了错误处理和完成方面。发布者可以将新值推送到其订阅服务器(通过调用onNext),但也可以发出错误信号(通过调用onError)或完成(通过调用onComplete)。错误和完成都会终止序列。可以总结如下:onNext x 0..N [onError | onComplete]

这种方法非常灵活。该模式支持没有值,一个值或n个值(包括无限的值序列,例如时钟的连续滴答声)的用例。

1.1 阻塞

现代应用程序可以吸引大量并发用户,即使现代硬件的功能不断提高,现代软件的性能仍然是关键问题。

广义上讲,有两种方法可以提高程序的性能:

  • 并行使用更多线程和更多硬件资源。
  • 在使用现有资源方面寻求更高的效率。

通常,Java开发人员通过使用阻塞代码来编写程序。除非存在性能瓶颈,否则这种做法很好。然后是时候引入其他线程,运行类似的阻塞代码了。但是,资源利用的这种扩展会迅速引入争用和并发问题。

更糟糕的是,阻塞会浪费资源。如果仔细观察,程序一旦遇到一些延迟(特别是I / O,例如数据库请求或网络调用),就会浪费资源,因为线程(可能有很多线程)现在处于空闲状态,等待数据。

因此,并行化方法不是灵丹妙药。有必要访问硬件的全部功能,但是推理和资源浪费也很复杂。

1.2 异步

前面提到的第二种方法,寻求更高的效率,可以解决资源浪费的问题。通过编写异步的非阻塞代码,您可以将执行切换到使用相同基础资源的另一个活动任务,并在异步处理完成后返回到当前进程。

但是如何在JVM上生成异步代码? Java提供了两种异步编程模型:

  • Callbacks:异步方法没有返回值,但是带有一个额外的回调参数(lambda或匿名类),该参数在结果可用时被调用。一个著名的例子是Swing的EventListener层次结构。
  • Futures:异步方法立即返回Future 。异步过程计算T值,但是Future对象包装对其的访问。该值不是立即可用的,并且可以轮询该对象,直到该值可用为止。例如,运行Callable 任务的ExecutorService使用Future对象。

这些技术够好吗?并非针对每个用例,这两种方法都有局限性。

Callbacks 很难组合在一起,很快导致难以阅读和维护的代码(称为“回调地狱”)。

考虑一个示例:在用户界面上显示用户的前五个收藏夹,如果没有收藏夹则显示建议。这需要三项服务(一项提供最爱ID,第二项获取最喜欢的详细信息,第三项提供带有详细信息的建议),如下所示:

userService.getFavorites(userId, new Callback<List<String>>() { 
  public void onSuccess(List<String> list) { 
    if (list.isEmpty()) { 
      suggestionService.getSuggestions(new Callback<List<Favorite>>() {
        public void onSuccess(List<Favorite> list) { 
          UiUtils.submitOnUiThread(() -> { 
            list.stream()
                .limit(5)
                .forEach(uiList::show); 
            });
        }
        public void onError(Throwable error) { 
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream() 
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId, 
            new Callback<Favorite>() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }

              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  }

  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});

使用响应式编程:

userService.getFavorites(userId) 
           .flatMap(favoriteService::getDetails) 
           .switchIfEmpty(suggestionService.getSuggestions()) 
           .take(5) 
           .publishOn(UiUtils.uiThreadScheduler()) 
           .subscribe(uiList::show, UiUtils::errorPopup); 

如果您想确保在少于800毫秒的时间内检索到喜欢的ID,或者如果花费更长的时间从缓存中获取它们,该怎么办?在基于回调的代码中,这是一项复杂的任务。在Reactor中,就像在链中添加超时操作符一样容易,如下所示:

userService.getFavorites(userId)
           .timeout(Duration.ofMillis(800)) 
           .onErrorResume(cacheService.cachedFavoritesFor(userId)) 
           .flatMap(favoriteService::getDetails) 
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);

Future 对象比 Callbacks 要好一些,但是尽管CompletableFuture对Java 8进行了改进,但它们在组合方面仍然表现不佳。一起编排多个Future对象是可行的,但并不容易。另外,未来还有其他问题:

  • 通过调用get() 方法很容易以Future对象结束另一个阻塞情况。
  • 它们不支持惰性计算。
  • 他们缺乏对多个值和高级错误处理的支持。

再看一个例子:我们得到一个ID列表,我们要从中获取一个名称和一个统计信息,并将它们成对组合,所有这些都是异步的。下面的示例使用CompletableFuture类型的列表执行此操作:

CompletableFuture<List<String>> ids = ifhIds(); 

CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { 
    Stream<CompletableFuture<String>> zip =
            l.stream().map(i -> { 
                CompletableFuture<String> nameTask = ifhName(i); 
                CompletableFuture<Integer> statTask = ifhStat(i); 

                return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); 
            });
    List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); 
    CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);

    CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); 
    return allDone.thenApply(v -> combinationList.stream()
            .map(CompletableFuture::join) 
            .collect(Collectors.toList()));
});

List<String> results = result.join(); 
assertThat(results).contains(
        "Name NameJoe has stats 103",
        "Name NameBart has stats 104",
        "Name NameHenry has stats 105",
        "Name NameNicole has stats 106",
        "Name NameABSLAJNFOAJNFOANFANSF has stats 121");

由于Reactor提供了更多的组合运算符,因此可以简化此过程,如下所示:

Flux<String> ids = ifhrIds(); 

Flux<String> combinations =
        ids.flatMap(id -> { 
            Mono<String> nameTask = ifhrName(id); 
            Mono<Integer> statTask = ifhrStat(id); 

            return nameTask.zipWith(statTask, 
                    (name, stat) -> "Name " + name + " has stats " + stat);
        });

Mono<List<String>> result = combinations.collectList(); 

List<String> results = result.block(); 
assertThat(results).containsExactly( 
        "Name NameJoe has stats 103",
        "Name NameBart has stats 104",
        "Name NameHenry has stats 105",
        "Name NameNicole has stats 106",
        "Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);

使用Callbacks和Future对象的风险是相似的,并且是响应式编程在Publisher-Subscriber对中的作用。

1.3 从命令式编程到反应式编程

反应性库(例如Reactor)旨在解决JVM上“经典”异步方法的这些缺点,同时还着重于其他一些方面:

  • 可组合性和可读性
  • 以丰富的运算符词汇操纵数据流
  • 订阅之前没有任何反应
  • 背压或消费者向生产者发出排放速率过高信号的能力
  • 并发不可知的高级但高价值的抽象

1.3.1 可组合性和可读性

所谓“可组合性”,是指协调多个异步任务的能力,其中我们使用先前任务的结果将输入反馈给后续任务。或者,我们可以以fork-join样式运行多个任务。另外,我们可以将异步任务重用为更高级别系统中的离散组件。

编排任务的能力与代码的可读性和可维护性紧密相关。随着异步过程的层数和复杂度的增加,能够编写和读取代码变得越来越困难。如我们所见,回调模型很简单,但是它的主要缺点之一是,对于复杂的流程,您需要从一个回调中执行一个回调,它本身嵌套在另一个回调中,依此类推。这种混乱被称为“回调地狱”。您可以猜测(或从经验中学到),很难找到这样的代码并进行推理。

Reactor提供了丰富的组合选项,其中代码反映了抽象过程的组织,并且通常将所有内容保持在同一级别(将嵌套最小化)。

1.3.2 流式风格

您可以将反应式应用程序处理的数据视为在组装线中移动。反应堆既是传送带又是工作站。原材料从来源(原始发布者)倾泻而出,最终成为准备好推向消费者(或订阅者)的成品。

原材料可以经过各种转换和其他中间步骤,也可以成为将中间零件聚集在一起的较大装配线的一部分。如果某一点出现故障或堵塞(也许装箱的产品花费的时间过长),那么受灾的工作站可以向上游发出信号,以限制原材料的流动。

1.3.3 Operators

在Reactor中,Operator是我们装配类比中的工作站。每个Operator都会向Publisher添加行为,并将上一步的Publisher包装到新实例中。因此,整个链被链接在一起,这样数据就从第一个发布者起源并向下移动,并由每个链接转换。最终,订户完成了该过程。请记住,直到订阅者订阅发布者为止,什么都不会发生,正如我们很快看到的那样。

尽管反应式流规范根本没有指定运算符,但是反应式库的最佳附加值之一(例如Reactor)是它们提供的运算符的丰富词汇表。从简单的转换和过滤到复杂的编排和错误处理,这些内容涉及很多领域。

1.3.4 订阅之后才有数据

在Reactor中,当您编写发布者链时,默认情况下不会开始将数据泵入其中。相反,您将创建异步过程的抽象描述(这有助于重用和组合)。

通过订阅,您将发布者与订阅者绑定在一起,从而触发了整个链中的数据流。这是通过从订阅服务器发出的单个请求信号在内部实现的,该请求信号一直向上游传播,一直返回到源Publisher。

1.3.5 背压

上游传播的信号也用于实现背压,我们在组装流水线中将其描述为当工作站的处理速度比上游工作站慢时,沿生产线向上发送的反馈信号。

Reactive Streams规范定义的实际机制非常类似于此类比:订户可以以无界模式工作,并让源以最快可获得的速率推送所有数据,或者可以使用请求机制向源发出信号,告知它准备处理最多n个元素。

中级Operator还可以在途中更改请求。想象一下一个缓冲运算符,它以十个为一组对元素进行分组。如果订户请求一个缓冲区,则源产生十个元素是可以接受的。一些运算符还实现了预取策略,避免了request(1)往返,如果在请求之前生成元素的成本不太高的话,这将是有益的。

这将推模型转换为推挽混合模型,如果下游元素很容易获得,下游可以从上游拉取n个元素。但是,如果元素尚未准备就绪,则每当它们被生产时就会被上游推。

1.3.6 冷热对比

Rx反应库的家族将反应序列分为两大类:热和冷。这种区别主要与反应流对订户的反应有关:

  • 对于每个订户,包括在数据源处,冷序列都会重新开始。例如,如果源包装了HTTP调用,则将为每个预订发出一个新的HTTP请求。
  • 并非每个订阅者都可以从头开始热序列。相反,迟到的订户会在订阅后接收发射的信号。但是请注意,某些热反应流可以全部或部分缓存或重放排放历史。从一般的角度来看,即使没有订阅者在收听,热序列甚至会发出(“订阅之前什么也没有发生”规则的例外)。

文章作者: HoldDie
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 HoldDie !
评论
  目录