Future用来获取某个并发操作的结果,这个结果可以同步(阻塞)或异步(非阻塞)的方式访问。
执行上下文
Future 需要一个ExecutionContext, 它与java.util.concurrent.Executor 很相像. 如果你在作用域内有一个 ActorSystem , 它可以用system.dispatcher()作 ExecutionContext。你也可以用ExecutionContext 伴生对象提供的工厂方法来将 Executors 和 ExecutorServices 进行包裹, 或者甚至创建自己的实例.
//执行上下文可以自己指定线程池类型 ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool())
Future的创建方法 Future<String> f1 = Futures.successful("
f1", ec); Future<String> f2 = Futures.future(
new Callable() { @Override Object call() {
return "
f2" } }, ec) Future<String> f3 = Futures.successful("
f3", ActorSystem.create("
test").dispatcher()); //使用actor的ask方法发送消息是也能创建一个Future Future f4 = akka.pattern.Patterns.ask(actor, "msg", 1000 * 60)
函数式 Future Akka 的 Future 有一些与Scala集合所使用的非常相似的 monadic 方法. 这使你可以构造出结果可以传递的‘管道’ 或 ‘数据流’.
map(对未来返回的结果进行处理)
让Future以函数式风格工作的第一个方法是 map. 它需要一个函数来对Future的结果进行处理, 返回一个新的结果。map 方法的返回值是包含新结果的另一个 Future:
static void map()
throws Exception { ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool()); Future<String> f1 = Futures.successful("
fof1o", ec);
//map的作用是:对Futrue:f1的返回结果进行处理,返回一个新的结果 Future<Integer> f2 = f1.map(
new Mapper<String, Integer>() {
public Integer apply(String s) {
return s.length(); } });
//这里对未来f1返回的字符串计算其长度 对Future完成结果的处理方法 //System.out.println(Await.result(f2, Duration.create(5, "s"))); //阻塞式,当前线程在此等待 //下面是非阻塞式,异步返回 f2.onComplete(
new OnComplete<Integer>() { @Override
public void onComplete(Throwable failure, Integer success) { System.out.println("
f2返回结果:" + success + "
,failure:" + failure); } }); f2.onSuccess(
new OnSuccess<Integer>() { @Override
public void onSuccess(Integer result) { System.out.println("
f2返回结果:" + result); } }); f2.onFailure(
new OnFailure() { @Override
public void onFailure(Throwable failure) { System.out.println("
f2返回failure:" + failure); } }); }
flatMap(对多个Future返回的结果进行处理)
static void flatMap()
throws Exception {
final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool()); Future<String> f1 = Futures.successful("
hello", ec);
// Future<Integer> fr = f1.flatMap(
new Mapper<String, Future<Integer>>() {
public Future<Integer> apply(
final String s) {
return Futures.future(
new Callable<Integer>() {
public Integer call() {
return s.length(); } }, ec); } });
// System.out.println(Await.result(fr, Duration.create(5, "
s")));
//阻塞式 }
//对两个Future的结果处理 static void flatMap_concat2()
throws Exception {
final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());
final Future<String> f1 = Futures.successful("
hello", ec);
final Future<String> f2 = Futures.successful("
world", ec);
//如果要对多个Future的结果进行处理,需要用flatMap //本例中对f1和f2返回的结果用空格连接成“hello world” Future<String> fr = f1.flatMap(
new Mapper<String, Future<String>>() {
public Future<String> apply(
final String s) {
return f2.map(
new Mapper<String, String>() { @Override
public String apply(String v) {
return s + "
" + v; } }); } }); System.out.println(Await.result(fr, Duration.create(5, "
s")));
//阻塞式 }
//对三个Future的结果处理 static void flatMap_concat3()
throws Exception {
final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());
final Future<String> f1 = Futures.successful("
How", ec);
final Future<String> f2 = Futures.successful("
are", ec);
final Future<String> f3 = Futures.successful("
you", ec);
//如果要对多个Future的结果进行处理,需要用flatMap //本例中对f1、f2、f3返回的结果用空格连接成“How are you” Future<String> fr = f1.flatMap(
new Mapper<String, Future<String>>() {
public Future<String> apply(
final String s) {
return f2.flatMap(
new Mapper<String, Future<String>>() { @Override
public Future<String> apply(
final String s2) {
return f3.map(
new Mapper<String, String>() { @Override
public String apply(String s3) {
return s + "
" + s2 + "
" + s3; } }); } }); } });
/*用java写比较繁琐,用scala的话就简单多了 val future1 = for { a: String <- actor ? "How" // returns How b: String <- actor ? "are" // returns "are" c: String <- actor ? "you" // returns "you" } yield a + " " + b + "" + c*/ System.out.println(Await.result(fr, Duration.create(5, "
s")));
//阻塞式 }
filter(对Future进行条件筛选)
static void filter()
throws Exception { ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool()); Future<String> f1 = Futures.successful("
fof1o", ec); Future<String> f2 = Futures.successful("
fo", ec);
//map的作用是:对Futrue:f1的返回结果进行处理,返回一个新的结果 Future<String> fs = f1.filter(Filter.filterOf(
new Function<String, Boolean>() { @Override
public Boolean apply(String param) {
return param.length() == 5; } })); System.out.println(Await.result(fs, Duration.create(5, "
s"))); Future<String> ff = f2.filter(Filter.filterOf(
new Function<String, Boolean>() { @Override
public Boolean apply(String param) {
return param.length() == 5; } }));
//不匹配的话会抛scala.MatchError异常 System.out.println(Await.result(ff, Duration.create(5, "
s"))); }
组合Futures
如果Future的数目较多,用flatMap组合的话代码就过于复杂。可以使用sequence和traverse。
sequence(将 T[Future[A]] 转换为 Future[T[A]])
public static void sequence()
throws Exception {
//将 T[Future[A]] 转换为 Future[T[A]] //简化了用flatMap来组合 final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());
final Future<String> f1 = Futures.successful("
How", ec);
final Future<String> f2 = Futures.successful("
are", ec);
final Future<String> f3 = Futures.successful("
you", ec); List<Future<String>> futureList =
new ArrayList<Future<String>>(); futureList.add(f1); futureList.add(f2); futureList.add(f3);
//这里将List<Future<String>> 组合成一个Future:Future<Iterable<String>> Future<Iterable<String>> future = Futures.sequence(futureList, ec); Future<String> fr = future.map(
new Mapper<Iterable<String>, String>() { @Override
public String apply(Iterable<String> parameter) { String result = "";
for (String s : parameter) { result += s + "
"; }
return result; } }); System.out.println(Await.result(fr, Duration.create(5, "
s"))); }
traverse(将 T[A] 转换为 Future[T[A]]) public static void traverse()
throws Exception {
//将 T[A] 转换为 Future[T[A]] final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool()); Iterable<String> list = Arrays.asList("
How", "
are", "
you");
//这里将List<String> 组合成一个Future:Future<Iterable<String>> ,对list中的每个元素做加工处理 Future<Iterable<String>> future = Futures.traverse(list,
new Function<String, Future<String>>() { @Override
public Future<String> apply(
final String param) {
return Futures.future(
new Callable<String>() { @Override
public String call()
throws Exception {
return param.toUpperCase(); } }, ec); } }, ec); Future<String> fr = future.map(
new Mapper<Iterable<String>, String>() { @Override
public String apply(Iterable<String> parameter) { String result = "";
for (String s : parameter) { result += s + "
"; }
return result; } }); System.out.println(Await.result(fr, Duration.create(5, "
s"))); }
fold(从一个初始值开始递归地对Future序列进行处理(它将sequence和map操作合并成一步了)) public static void fold()
throws Exception {
//fold从一个初始值开始递归地对Future序列进行处理(它将sequence和map操作合并成一步了) final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());
final Future<String> f1 = Futures.successful("
How", ec);
final Future<String> f2 = Futures.successful("
are", ec);
final Future<String> f3 = Futures.successful("
you", ec); List<Future<String>> futureList =
new ArrayList<Future<String>>(); futureList.add(f1); futureList.add(f2); futureList.add(f3);
//本例从初始值“Init”开始,递归地对futureList的返回值用"_"连接,返回“Init_How_are_you” Future<String> fr = Futures.fold("
Init", futureList,
new Function2<String, String, String>() { @Override
public String apply(String arg1, String arg2) { System.out.println("
arg1----" + arg1);
//第一次为Init,第二次为Init_How ,第三次为Init_How_are System.out.println("
arg2----" + arg2);
//第一次为How ,第二次为are 第三次为you return arg1 + "
_" + arg2; } }, ec);
//如果futureList为空列表,则返回初始值“Init” System.out.println(Await.result(fr, Duration.create(5, "
s")));
//结果为Init_How_are_you }
reduce(如果不想从给定的初始值开始递归,而想从future序列的第一个开始,则用reduce(它将sequence和map合并成一步了)) public static void reduce()
throws Exception {
//如果不想从给定的初始值开始递归,而想从future序列的第一个开始,则用reduce(它将sequence和map合并成一步了) final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());
final Future<String> f1 = Futures.successful("
How", ec);
final Future<String> f2 = Futures.successful("
are", ec);
final Future<String> f3 = Futures.successful("
you", ec); List<Future<String>> futureList =
new ArrayList<Future<String>>(); futureList.add(f1); futureList.add(f2); futureList.add(f3);
//本例从初始值“How”开始,递归地对futureList的返回值用"_"连接,返回“How_are_you” Future<String> fr = Futures.reduce(futureList,
new Function2<String, String, String>() { @Override
public String apply(String arg1, String arg2) { System.out.println("
arg1----" + arg1);
//第一次为How ,第二次为How_are System.out.println("
arg2----" + arg2);
//第一次为are ,第二次为you return arg1 + "
_" + arg2; } }, ec);
//如果futureList为空列表,则返回初始值“Init” System.out.println(Await.result(fr, Duration.create(5, "
s")));
//结果为Init_How_are_you }
andThen(由于回调的执行是无序的,而且可能是并发执行的, 当你需要一组有序操作的时候需要一些技巧。)
public static void andThen()
throws Exception {
//如果要对Future的结果分多次依次处理,需要使用andThen final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool()); Future<String> future = Futures.successful("
hello", ec).andThen(
new OnComplete<String>() { @Override
public void onComplete(Throwable failure, String success) { System.out.println("
先收到:" + success); } }).andThen(
new OnComplete<String>() { @Override
public void onComplete(Throwable failure, String success) { System.out.println("
又收到:" + success); } }).andThen(
new OnSuccess<Either<Throwable, String>>() { @Override
public void onSuccess(Either<Throwable, String> result) { System.out.println("
收到onSuccess:" + result); } }); }
fallbackTo(将两个 Futures 合并成一个新的 Future, 如果第一个Future失败了,它将持有第二个 Future 的成功值) public static void fallbackTo()
throws Exception {
final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool()); Future<String> f1 = Futures.failed(
new RuntimeException("
ex1"), ec); Future<String> f2 = Futures.failed(
new RuntimeException("
ex2"), ec); Future<String> f3 = Futures.successful("
ok", ec);
//fallbackTo 将两个 Futures 合并成一个新的 Future, 如果第一个Future失败了,它将持有第二个 Future 的成功值 Future<String> fr = f1.fallbackTo(f2).fallbackTo(f3); System.out.println(Await.result(fr, Duration.create(5, "
s"))); }
zip(操作将两个 Futures 组合压缩成一个新的Future,返回的新的Future持hold一个tuple实例,它包含二者成功的结果)
public static void zip()
throws Exception {
final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool()); Future<String> f1 = Futures.future(
new Callable<String>() { @Override
public String call()
throws Exception { System.out.println("
f1---" + Thread.currentThread().getName()); Thread.sleep(1000 * 10);
return "
hello"; } }, ec); Future<String> f2 = Futures.future(
new Callable<String>() { @Override
public String call()
throws Exception { System.out.println("
f2---" + Thread.currentThread().getName()); Thread.sleep(1000 * 5);
return "
world"; } }, ec);
//zip操作将两个 Futures 组合压缩成一个新的Future,返回的新的Future持hold一个tuple实例,它包含二者成功的结果 Future<String> fr = f1.zip(f2).map(
new Mapper<Tuple2<String, String>, String>() { @Override
public String apply(Tuple2<String, String> ziped) { System.out.println("
zip---" + Thread.currentThread().getName());
return ziped._1() + "
" + ziped._2();
//f1和f2的返回结果包含在zipped对象中 } }); System.out.println("
主线程----" + Thread.currentThread().getName()); System.out.println(Await.result(fr, Duration.create(15, "
s"))); }
public static void zip2()
throws Exception {
final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool()); Future<String> f1 = Futures.successful("
hello", ec); Future<String> f2 = Futures.future(
new Callable<String>() { @Override
public String call()
throws Exception { System.out.println("
f2---" + Thread.currentThread().getName()); Thread.sleep(1000 * 10);
return (1 / 0) + ""; } }, ec);
//zip操作将两个 Futures 组合压缩成一个新的Future,返回的新的Future持hold一个tuple实例,它包含二者成功的结果 Future<String> fr = f1.zip(f2).map(
new Mapper<Tuple2<String, String>, String>() { @Override
public String apply(Tuple2<String, String> ziped) { System.out.println("
zip----" + Thread.currentThread().getName());
return ziped._1() + "
" + ziped._2();
//f1和f2的返回结果包含在zipped对象中 } }); System.out.println("
主线程----" + Thread.currentThread().getName()); System.out.println(Await.result(fr, Duration.create(15, "
s"))); }
recover(对Future的异常进行处理,相当于try..catch中对捕获异常后的处理)
public static void recover()
throws Exception {
final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());
//recover对Future的异常进行处理,相当于try..catch中对捕获异常后的处理 Future<Integer> future = Futures.future(
new Callable<Integer>() {
public Integer call() {
return 1 / 0; } }, ec).recover(
new Recover<Integer>() {
public Integer recover(Throwable problem)
throws Throwable { System.out.println("
捕获到异常:" + problem);
// if (problem instanceof RuntimeException) { // return 0; // } else { // throw problem; // } return -2;
//这里捕获到异常后直接返回新值了,并没有再抛出异常,所以后面的recover不会再收到异常 } }).recover(
new Recover<Integer>() {
public Integer recover(Throwable problem)
throws Throwable { System.out.println("
捕获到异常:" + problem);
if (problem
instanceof ArithmeticException) {
//捕获异常并处理,捕获到后,后面得到的result将会是-1 return -1; }
else {
throw problem; } } });
int result = Await.result(future, Duration.create(1, TimeUnit.SECONDS)); System.out.println("
result----" + result); }
recoverWith(和recover很类似,只是捕获到异常后返回Future,使其能够异步并发处理)
public static void recoverWith()
throws Exception {
final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());
//recoverWith和recover很类似,只是捕获到异常后返回Future,使其能够异步并发处理 Future<Integer> future = Futures.future(
new Callable<Integer>() {
public Integer call() {
return 1 / 0; } }, ec).recoverWith(
new Recover<Future<Integer>>() { @Override
public Future<Integer> recover(Throwable failure)
throws Throwable {
if (failure
instanceof ArithmeticException) {
return Futures.future(
new Callable<Integer>() { @Override
public Integer call()
throws Exception {
return 0; } }, ec); }
else throw failure; } });
int result = Await.result(future, Duration.create(1, TimeUnit.SECONDS)); System.out.println("
result----" + result); }