怎么处理java异步事件的阻塞和非阻塞

技术怎么处理java异步事件的阻塞和非阻塞本篇内容主要讲解“怎么处理java异步事件的阻塞和非阻塞”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“怎么处理java异步事件的阻塞

本文主要解释如何处理java异步事件的阻塞和非阻塞。感兴趣的朋友不妨看看。本文介绍的方法简单、快速、实用。让边肖学习“如何处理java异步事件的阻塞和非阻塞”!

前言

由于多核系统的广泛存在,并发编程的应用无疑比以往任何时候都更加广泛。然而,并发很难正确实现,用户需要新的工具来使用它。很多基于JVM的语言都属于这类开发工具,Scala在这一领域尤为活跃。本系列文章将介绍Java和Scala语言的一些新的并发编程方法。

在任何并发应用程序中,异步事件处理都非常重要。事件的来源可能是不同的计算任务、输入/输出操作或与外部系统的交互。无论来源如何,应用程序代码都必须跟踪事件并协调响应事件所采取的操作。

Java应用程序可以采用两种基本的异步事件处理方法:应用程序有一个协调线程等待事件,然后采取行动,或者事件完成后可以直接执行一个行动(通常是通过执行应用程序提供的代码)。让线程等待事件的方法称为阻塞方法。允许事件执行操作并且线程不需要显式等待事件的方法称为非阻塞方法。

在计算中,根据具体的上下文,阻塞和非阻塞这两个词的用法通常是不同的。例如,共享数据结构的非阻塞算法不需要线程等待访问数据结构。在非阻塞I/O中,应用线程可以启动一个I/O操作,然后离开去做其他事情,操作将异步执行。在本文中,非阻塞指的是在不等待线程的情况下完成执行操作的事件。这些用法中的一个常见概念是阻塞操作需要一个线程来等待某个结果,而非阻塞操作则不需要。

合成事件

等待事件完成很简单:您有一个线程在等待事件,当线程恢复运行时,您将知道事件已经完成。如果你的线程在此期间有其他事情要做,它会完成这些事情并等待。线程甚至可以使用轮询方法,通过该方法中断其他活动来检查事件是否已经完成。但是基本的原理是一样的:当你需要事件的结果时,你就停放线程等待事件结束。

阻塞很简单,只要有一个主线程等待事件完成。当使用多个因相互等待而被阻塞的线程时,您可能会遇到一些问题,例如:

死锁:两个或多个线程分别控制其他线程继续执行所需的资源。饥饿:一些线程可能无法继续执行,因为其他线程贪婪地消耗共享资源。锁定:线程试图相互调整,但最终没有进展。

非阻塞方法为创造性留下了更大的空间。回调是非阻塞事件处理的常用技术。回调是灵活性的象征,因为当事件发生时,您可以执行任何想要的代码。回调的缺点是,当您使用回调来处理许多事件时,您的代码会变得混乱。此外,回调特别难以调试,因为控制流与应用程序中的代码顺序不匹配。

Java CompletableFuture支持阻塞和非阻塞事件处理方法,包括常规回调。CompletableFuture还提供了多种合成和组合事件的方式,实现了回调的灵活性和代码的干净、简单、可读。在本节中,您将看到处理CompletableFuture表示的事件的阻塞和非阻塞方法的示例。

任务和排序

应用程序通常必须在特定操作中执行多个处理步骤。例如,在向用户返回结果之前,网络应用程序可能需要:

1.在数据库中查找用户信息。使用找到的信息执行网络服务调用和另一个数据库查询。3.根据上一步的结果执行数据库更新。

图1说明了这种类型的结构。

图1。应用任务流程

图1将处理分解成四个不同的任务,这些任务由指示顺序相关性的箭头连接。任务1可以直接执行,任务2和任务3在任务1完成后执行,任务4在任务2和任务3完成后执行。这是我在本文中用来演示异步事件处理的任务结构。真实的应用程序(尤其是具有多个移动部件的服务器应用程序)可能要复杂得多,但这个简单的示例仅用于演示所涉及的原理。

建模异步事件

在实际系统中,异步事件的来源通常是并行计算或某种形式的输入/输出操作。然而,用简单的时滞来建模这个系统更容易,这也是本文所用的方法。清单1显示了我用来生成事件的基本定时事件代码,格式为CompletableFuture。

清单1。计时事件代码

导入Java . util . timer;导入Java . util . timertask;导入Java . util . concurrent.completablefuture;公共类TimedEventSupport {私有静态最终定时器Timer=new Timer();/***构建一个未来,在延迟后返回值。** @param延迟* @param值* @返回未来*/p

ublic static <T> CompletableFuture<T> delayedSuccess(int delay, T value) {CompletableFuture<T> future = new CompletableFuture<T>();TimerTask task = new TimerTask() {public void run() {future.complete(value);}};timer.schedule(task, delay * 1000);return future;}/*** Build a future to return a throwable after a delay.* * @param delay* @param t* @return future*/public static <T> CompletableFuture<T> delayedFailure(int delay, Throwable t) {CompletableFuture<T> future = new CompletableFuture<T>();TimerTask task = new TimerTask() {public void run() {future.completeExceptionally(t);}};timer.schedule(task, delay * 1000);return future;}}

为什么不采用 lambda?

清单 1 中的 TimerTask 被实现为一个匿名内部类,仅包含一个 run() 方法。您可能认为这里可以使用 lambda 代替内部类。但是,lambda 仅能用作接口的实例,而 TimerTask 被定义为一种抽象类。除非 lambda 特性的 future 扩展添加了对抽象类的支持(有可能,但由于设计问题,未必行得通),或者为 TimerTask 等情形定义了并行接口,否则您必须继续使用 Java 内部类创建单一方法实现。

清单 1 的代码使用一个 java.util.Timer 来计划 java.util.TimerTask 在一定的延迟后执行。每个 TimerTask 在运行时完成一个有关联的 future。delayedSuccess() 计划一个任务来成功完成一个 CompletableFuture<T> 并将 future 返回调用方。delayedFailure() 计划了一个任务来完成一个 CompletableFuture<T> 并抛出异常,然后将 future 返回给调用方。

清单 2 展示了如何使用 清单 1 中的代码创建 CompletableFuture<Integer> 形式的事件,这些事件与 图 1 中的 4 个任务相匹配。(此代码来自示例代码中的 EventComposition 类。)

清单 2. 示例任务的事件

// task definitionsprivate static CompletableFuture<Integer> task1(int input) {return TimedEventSupport.delayedSuccess(1, input + 1);}private static CompletableFuture<Integer> task2(int input) {return TimedEventSupport.delayedSuccess(2, input + 2);}private static CompletableFuture<Integer> task3(int input) {return TimedEventSupport.delayedSuccess(3, input + 3);}private static CompletableFuture<Integer> task4(int input) {return TimedEventSupport.delayedSuccess(1, input + 4);}

清单 2 中 4 个任务方法中的每一个都为该任务的完成时刻使用了特定的延迟值:task1 为 1 秒,task2 为 2 秒,task3 为 3 秒,task4 重新变为 1 秒。每个任务还接受一个输入值,是该输入加上任务编号作为 future 的(最终)结果值。这些方法都使用了 future 的成功形式;稍后我们将会查看一些使用失败形式的例子。

这些任务要求您按 图 1 中所示的顺序运行它们,向每个任务传递上一个任务返回的结果值(或者对于 task4,传递前两个任务结果的和)。如果中间两个任务是同时执行的,那么总执行时间大约为 5 秒(1 秒 + (2 秒、3 秒中的最大值)+ 1 秒。

如果 task1 的输入为 1,那么结果为 2。如果该结果传递给 task2 和 task3,结果将为 4 和 5。如果这两个结果的和 (9) 作为输入传递给 task4,最终结果将为 13。

阻塞等待

在设置了执行环境后,是时候设置一些操作了。协调 4 个任务的执行的最简单方式是使用阻塞等待:主要线程等待每个任务完成。清单 3(同样来自示例代码中的 EventComposition 类)给出了此方法。

清单 3. 阻塞等待任务执行

private static CompletableFuture<Integer> runBlocking() {Integer i1 = task1(1).join();CompletableFuture<Integer> future2 = task2(i1);CompletableFuture<Integer> future3 = task3(i1);Integer result = task4(future2.join() + future3.join()).join();return CompletableFuture.completedFuture(result);}

清单 3 使用 CompletableFuture 的 join() 方法来完成阻塞等待。join() 等待任务完成,然后,如果成功完成任务,则返回结果值,或者如果失败或被取消,则抛出一个未经检查的异常。该代码首先等待 task1 的结果,然后同时启动 task2 和 task3,并等待两个任务依次返回 future,最后等待 task4 的结果。runBlocking() 返回一个 CompletableFuture,以便与我接下来将展示的非阻塞形式保持一致,但在本例中,future 实际上将在该方法返回之前完成。

合成和组合 future

清单 4(同样来自示例代码中的 EventComposition 类)展示了如何将 future 连接在一起,以便按正确顺序并使用正确的依赖关系执行任务,而不使用阻塞。

清单 4. 非阻塞的合成和组合

private static CompletableFuture<Integer> runNonblocking() {return task1(1).thenCompose(i1 -> ((CompletableFuture<Integer>)task2(i1).thenCombine(task3(i1), (i2,i3) -> i2+i3))).thenCompose(i4 -> task4(i4));}

清单 4 中的代码基本上构造了一个执行计划,指定不同的任务如何执行和它们彼此有何关联。此代码精美而简洁,但是,如果您不熟悉 CompletableFuture 方法,或许难以理解该代码。清单 5 通过将 task2 和 task3 部分分离到一个新方法 runTask2and3 中,将同样的代码重构为更容易理解的形式。

清单 5. 重构后的非阻塞的合成和组合

private static CompletableFuture<Integer> runTask2and3(Integer i1) {CompletableFuture<Integer> task2 = task2(i1);CompletableFuture<Integer> task3 = task3(i1);BiFunction<Integer, Integer, Integer> sum = (a, b) -> a + b;return task2.thenCombine(task3, sum);}private static CompletableFuture<Integer> runNonblockingAlt() {CompletableFuture<Integer> task1 = task1(1);CompletableFuture<Integer> comp123 = task1.thenCompose(EventComposition::runTask2and3);return comp123.thenCompose(EventComposition::task4); }

在 清单 5 中,runTask2and3() 方法表示任务流的中间部分,其中 task2 和 task3 同时执行,然后将它们的结果值组合在一起。此顺序是使用一个 future 上的 thenCombine() 方法来编码的,该方法接受另一个 future 作为它的第一个参数,接受一个二进制函数实例(其输入类型与 future 的结果类型匹配)作为其第二个参数。thenCombine() 返回了第三个 future,表示应用到最初的两个 future 的结果上的函数的值。在本例中,两个 future 是 task2 和 task3,该函数将结果值求和。

runNonblockingAlt() 方法使用在一个 future 上调用了 thenCompose() 方法两次。thenCompose() 的参数是一个函数实例,它接收原始 future 的值类型作为输入,返回另一个 future 作为输出。thenCompose() 的结果是第三个 future,具有与该函数相同的结果类型。这个 future 用作在原始 future 完成后,该函数最终将返回的 future 的占位符。

对 task1.thenCompose() 的调用将会返回一个 future,表示对 task1 的结果应用 runTask2and3() 函数的结果,该结果被保存为 comp123。对 comp123.thenCompose() 的调用返回一个 future,表示对第一个 henCompose() 的结果应用 task4() 函数的结果,这是执行所有任务的总体结果。

试用示例

示例代码包含一个 main() 方法,以便依次运行事件代码的每个版本,并显示完成事件(约 5 秒)和结果 (13) 是正确的。清单 6 显示了从一个控制台运行这个 main() 方法的结果。

清单 6. 运行 main() 方法

dennis@linux-guk3:~/devworks/scala3/code/bin> java com.sosnoski.concur.article3.EventCompositionStarting runBlockingrunBlocking returned 13 in 5008 ms.Starting runNonblockingrunNonblocking returned 13 in 5002 ms.Starting runNonblockingAltrunNonblockingAlt returned 13 in 5001 ms.

不顺利的道路

目前为止,您看到了以 future 形式协调事件的代码,这些代码总是能够成功完成。在真实应用程序中,不能寄希望于事情总是这么顺利。处理任务过程中将发生问题,而且在 Java 术语中,这些问题通常表示为 Throwable。

更改 清单 2 中的任务定义很容易,只需使用 delayedFailure() 代替 delayedSuccess() 方法即可,如这里的 task4 所示:

private static CompletableFuture<Integer> task4(int input) {return TimedEventSupport.delayedFailure(1, new IllegalArgumentException("This won't work!"));}

如果运行 清单 3 并且仅将 task4 修改为完成时抛出异常,那么您会得到 task4 上的 join() 调用所抛出的预期的 IllegalArgumentException。如果在 runBlocking() 方法中没有捕获该问题,该异常会在调用链中一直传递,最终如果仍未捕获问题,则会终止执行线程。幸运的是,修改该代码很容易,因此,如果在任何任务完成时抛出异常,该异常会通过返回的 future 传递给调用方来处理。清单 7 展示了这一更改。

清单 7. 具有异常的阻塞等待

private static CompletableFuture<Integer> runBlocking() {try {Integer i1 = task1(1).join();CompletableFuture<Integer> future2 = task2(i1);CompletableFuture<Integer> future3 = task3(i1);Integer result = task4(future2.join() + future3.join()).join();return CompletableFuture.completedFuture(result);} catch (CompletionException e) {CompletableFuture<Integer> result = new CompletableFuture<Integer>();result.completeExceptionally(e.getCause());return result;}}

清单 7 非常浅显易懂。最初的代码包装在一个 try/catch 中,catch 在返回的 future 完成时传回异常。此方法稍微增加了一点复杂性,但任何 Java 开发人员应该仍然很容易理解它。

清单 4 中的非阻塞代码甚至不需要添加 try/catch。CompletableFuture 合成和组合操作负责自动为您传递异常,以便依赖的 future 也会在完成时抛出异常。

阻塞还是不阻塞

您已经查看了由 CompletableFuture 表示的事件的阻塞和非阻塞处理方法。至少对于本文中建模的基本的任务流,两种方法都非常简单。对于更复杂的任务流,该代码也会更加复杂。

在阻塞情况下,增加的复杂性不是大问题,您仍然只需要等待事件完成。如果要在线程之间执行其他类型的同步,则会遇到线程饥饿问题,甚至是死锁问题。

在非阻塞情况下,事件的完成所触发的代码执行很难调试。在执行许多类型的事件且事件之间存在许多交互时,跟踪哪个事件触发了哪次执行就会变得很难。这种情形基本上就是回调的噩梦,无论是使用传统的回调还是 CompletableFuture 组合和合成操作。

总而言之,阻塞代码通常具有简单性优势。那么为什么有人希望使用非阻塞方法?本节将给出一些重要的理由。

切换的成本

一个线程阻塞时,以前执行该线程的处理器核心会转而执行另一个线程。以前执行的线程的执行状态必须保存到内存中,并加载新线程的状态。这种将核心从运行一个线程切换到运行另一个线程的操作称为上下文切换。

除了直接的上下文切换性能成本,新线程一般会使用来自前一个线程的不同数据。内存访问比处理器时钟慢得多,所以现代系统会在处理器核心与主要内存之间使用多层缓存。尽管比主要内存快得多,但缓存的容量也小得多(一般而言,缓存越快,容量越小),所以任何时刻只能在缓存中保存总内存的小部分。

发生线程切换且一个核心开始执行一个新线程时,新线程需要的内存数据可能不在缓存中,所以该核心必须等待该数据从主要内存加载。

组合的上下文切换和内存访问延迟,会导致直接的显著性能成本。图 2 显示了我使用 Oracle 的 Java 8 for 64-bit Linux 的四核 AMD 系统上的线程切换开销。

此测试使用了可变数量的线程,数量从 1 到 4,096 按 2 的幂次变化,每个线程的内存块大小也是可变的,介于 0 到 64KB 之间。线程依次执行,使用 CompletableFuture 来触发线程执行。每次一个线程执行时,它首先使用针对线程的数据返回一个简单计算结果,以显示将此数据加载到缓存中的开销,然后增加一个共享的静态变量。

最后创建一个新 CompletableFuture 实例来触发它的下一次执行,然后通过完成该线程等待的 CompletableFuture 来启动序列中的下一个线程。最后,如果需要再次执行它,那么该线程会等待新创建的 CompletableFuture 完成。

图 2. 线程切换成本

可以在图 2 的图表中看到线程数量和每个线程的内存的影响。线程数量为 4 个时影响最大,只要特定于线程的数据足够小,两个线程的运行速度几乎与单个线程一样快。线程数量超过 4 个后,对性能的影响相对较小。每个线程的内存量越大,两层缓存就会越快溢出,导致切换成本增高。

图 2 中所示的时间值来自我的有点过时的主要系统。您系统上相应的时间将不同,可能会小得多。但是,曲线的形状应大体相同。

图 2 显示了一次线程切换的微秒级开销,所以即使线程切换的成本达到数万个处理器时钟,但绝对数字并不大。对于中等数量的线程,16KB 数据具有 12.5 微秒的切换时间(图表中的黄线),系统每秒可执行 80,000 次线程切换。与您在任何精心编写的单用户应用程序以及甚至许多服务器应用程序中看到的结果相比,这一线程切换次数可能多得多。但对于每秒处理数千个事件的高性能服务器应用程序,阻塞的开销可能成为影响性能的主要因素。对于这种应用程序,尽可能使用非阻塞代码来最大限度减少线程切换非常重要。

认识到这些时间数据来自最理想的场景也很重要。运行线程切换程序时,会运行足够的 CPU 活动来让所有核心全速运行(至少在我的系统上是这样)。在真实应用程序中,处理负载可能具有更大的突发性。在活动量低的时间,现代处理器将一些核心过渡到休眠状态,以减少总功耗和产生的热量。这个降低功耗的过程的惟一问题是,在需求增多时,它需要时间来将核心从休眠状态唤醒。从深度休眠状态过渡到全速运行所需的时间可能达到微秒级别,而不是在这个线程切换时间示例中看到的毫秒级。

反应式应用程序

对于许多应用程序,不在特定线程上阻塞的另一个原因是,这些线程用于处理需要及时响应的事件。经典的例子就是 UI 线程。如果在 UI 线程中执行会阻塞来等待异步事件完成的代码,那么您会延迟用户输入事件的处理。没有人喜欢等待应用程序响应他们的键入、单击或触控操作,所以 UI 线程中的阻塞可能很快在来自用户的错误报告中反映出来。

UI 线程概念以一种更一般性的原则作为支撑。许多类型的应用程序,甚至非 GUI 应用程序,也必须迅速响应事件,而且在许多情况下,保持较短的响应事件至关重要。对于这些类型的应用程序,阻塞等待不是可接受的选择。

反应式编程 这个名称表示为响应灵敏且可扩展的应用程序采用的编程风格。反应式编程的核心原则是应用程序应能够:

对事件做出反应:应用程序应是事件驱动的,在由异步通信所链接的每个级别上具有松散耦合的组件。  对负载做出反应:应用程序应该是可扩展的,以便可以轻松地升级应用程序来处理增加的需求。  对故障做出反应:应用程序应具有恢复能力,能将故障的影响局部化并迅速更正。  对用户做出反应:应用程序应能迅速响应用户,甚至在具有负载和存在故障的情况下。

使用阻塞式事件处理方法的应用程序无法满足这些原则。线程是有限的资源,所以在阻塞等待中占用它们会限制可伸缩性,还会增加延迟(应用程序响应时间),因为阻塞的线程无法立即响应事件。非阻塞应用程序可更快地响应事件,降低成本,同时减少线程切换开销并改善吞吐量。

反应式编程比非阻塞代码的复杂得多。反应式编程涉及到关注您应用程序中的数据流并将这些数据流实现为异步交互,而不会让接收方负担过重或让发送方积滞。这种对数据流的关注,有助于避免传统的并发编程的许多复杂性。

到此,相信大家对“怎么处理java异步事件的阻塞和非阻塞”有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/105119.html

(0)

相关推荐

  • 如何解决VB.NET注册表权限问题

    技术如何解决VB.NET注册表权限问题这篇文章主要介绍如何解决VB.NET注册表权限问题,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!本实例需要项目引用:Imports Microsoft.Win

    攻略 2021年12月2日
  • 怎么为SQL Server快照snapshot DB建立login访问

    技术怎么为SQL Server快照snapshot DB建立login访问这篇文章主要讲解了“怎么为SQL Server快照snapshot DB建立login访问”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟

    攻略 2021年11月9日
  • Python程序开发问题举例分析

    技术Python程序开发问题举例分析这篇文章主要介绍“Python程序开发问题举例分析”,在日常操作中,相信很多人在Python程序开发问题举例分析问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家

    攻略 2021年12月1日
  • 美国企业云服务器中的关键功能

    技术美国企业云服务器中的关键功能并非每个美国云服务器都会为您的企业提供相同的功能集。在为您的企业选择完美的美国云服务器环境时,您需要注意某些能够帮助您获得最佳性能、可扩展性和安全性的功能。1. 稳固的服务器网络 美国云服

    礼包 2021年11月9日
  • ps如何添加字体,ps如何添加字体样式文件

    技术ps如何添加字体,ps如何添加字体样式文件1ps如何添加字体、打开PS,新建一个图形文件,并随意输入一行文字2、把该行文字选上,点击如图中位置,可以选择字体样式、大小等。3、在右下侧图层栏把该图像选上,在图像中空白处

    生活 2021年10月20日
  • 微信支付集成工具是什么

    技术微信支付集成工具是什么这篇文章将为大家详细讲解有关微信支付集成工具是什么,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。微信支付集成工具最近老板又安排了新项目,要接入微

    攻略 2021年10月20日