微服务异步并行调用优化

微服务,并行,异步

Posted by Deadline on October 24, 2018

我们先来设想一个场景。

有一个 http 的接口 A,该接口内部实际上是由另外三个接口 B、C、D 返回结果的组合,这三个接口不存在相互依赖。我们一般的写法就是 B、C、D 同步顺序执行,依次拿到结果后组装在一起。那么假如这三个接口分别耗时 2 秒,那么 A 接口就要耗时 6 秒。如果可以让 B、C、D 同时执行的话,那么 A 接口理论上只要耗时 2 秒。

当然实际情况肯定复杂的多,如果一个接口内部存在不相互依赖的耗时调用的话,那么我们可以做这样的bingx,响应时间上的减少还是非常明显的。整个接口的响应时间取决于最长的那个内部接口。

那么我们来看看在 Java 中有哪些方法可以达到这样的目的。认真思考下你会发现,如果要并行处理的话,在 Java 中只能用多线程来做。实际情况中每个线程处理完的时间肯定不一样,那么如何让线程先处理完的停下来等最后那个处理完的呢。如果经常用多线程的小伙伴肯定能想到 CountDownLatch 工具类。当然也有直接简单暴力的方法,在空循环里轮询每个线程是否执行完。

那下面就直接上代码了: 假设有个学生服务提供查询学生名字,年龄和家庭信息,每个服务之间没有相互依赖。 我们就简单模拟下来获取学生信息的一个接口。

常规方法

    @RequestMapping("/getStudentInfo")
    public Object getStudentInfo() {
        long start = System.currentTimeMillis();
        Map<String, Object> resultMap = new HashMap<>(10);

        try {
            resultMap.put("studentName", studentService.getStudentName());
            resultMap.put("studentAge", studentService.getSutdentAge());
            resultMap.put("studentFamilyInfo", studentService.getSutdentFamilyInfo());
        } catch (Exception e) {
            resultMap.put("errMsg", e.getMessage());
        }
        resultMap.put("total cost", System.currentTimeMillis() - start);
        return resultMap;
    }

顺序同步执行,耗时 6 秒。

1. Future

 @RequestMapping("/getStudentInfoWithFuture")
    public Object testWhitCallable() {
        long start = System.currentTimeMillis();
        Map<String, Object> resultMap = new HashMap<>(10);

        try {
            CountDownLatch countDownLatch = new CountDownLatch(3);

            Future futureStudentName = es.submit(() -> {
                Object studentName = studentService.getStudentName();
                countDownLatch.countDown();
                return studentName;
            });

            Future futureStudentAge = es.submit(() -> {
                Object studentAge = studentService.getSutdentAge();
                countDownLatch.countDown();
                return studentAge;
            });

            Future futureStudentFamilyInfo = es.submit(() -> {
                Object studentFamilyInfo = studentService.getSutdentFamilyInfo();
                countDownLatch.countDown();
                return studentFamilyInfo;
            });

            //同步等待所有线程执行完之后再继续
            countDownLatch.await();

            resultMap.put("studentName", futureStudentName.get());
            resultMap.put("studentAge", futureStudentAge.get());
            resultMap.put("studentFamilyInfo", futureStudentFamilyInfo.get());
        } catch (Exception e) {
            resultMap.put("errMsg", e.getMessage());
        }

        resultMap.put("total cost", System.currentTimeMillis() - start);

        return resultMap;
    }

2.RxJava

@RequestMapping("/getStudentInfoWithRxJava")
    public Object testWithRxJava() {
        long start = System.currentTimeMillis();
        Map<String, Object> resultMap = new HashMap<>(10);

        try {
            CountDownLatch countDownLatch = new CountDownLatch(1);

            Observable studentNameObservable = Observable.create(observableEmitter -> {
                resultMap.put("studentName", studentService.getStudentName());
                observableEmitter.onComplete();
            }).subscribeOn(Schedulers.io());

            Observable studentAgeObservable = Observable.create(observableEmitter -> {
                resultMap.put("studentAge", studentService.getSutdentAge());
                observableEmitter.onComplete();
            }).subscribeOn(Schedulers.io());

            Observable familyInfoObservable = Observable.create(observableEmitter -> {
                resultMap.put("studentFamilyInfo", studentService.getSutdentFamilyInfo());
                observableEmitter.onComplete();
            }).subscribeOn(Schedulers.io());
            //创建一个下游 Observer
            Observer<Object> observer = new Observer<Object>() {
                @Override
                public void onSubscribe(Disposable d) {
                }

                @Override
                public void onNext(Object o) {
                }

                @Override
                public void onError(Throwable e) {
                }

                @Override
                public void onComplete() {
                    //因为后面用了 merge 操作符,所以会合并后发射,那么只要 countdown 一次就行了。
                    countDownLatch.countDown();
                }
            };
            //建立连接,
            Observable.merge(studentNameObservable, studentAgeObservable, familyInfoObservable).subscribe(observer);
            //等待异步线程完成
            countDownLatch.await();

        } catch (Exception e) {
            resultMap.put("errMsg", e.getMessage());
        }
        resultMap.put("total cost", System.currentTimeMillis() - start);
        return resultMap;
    }

对于 RxJava 我不熟,我也是临时学习的,不知道这种写法是不是最佳的。

3.CompletableFutures

    @RequestMapping("/getStudentInfoWithCompletableFuture")
    public Object getStudentInfoWithCompletableFuture() {
        long start = System.currentTimeMillis();
        Map<String, Object> resultMap = new HashMap<>(10);

        try {
            CompletableFuture<Object> completableFutureStudentName = CompletableFuture.supplyAsync(() -> {
                try {
                    return studentService.getStudentName();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return null;
            });

            CompletableFuture<Object> completableFutureSutdentAge = CompletableFuture.supplyAsync(() -> {
                try {
                    return studentService.getSutdentAge();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return null;
            });

            CompletableFuture<Object> completableFutureFamilyInfo = CompletableFuture.supplyAsync(() -> {
                try {
                    return studentService.getSutdentFamilyInfo();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return null;
            });

            CompletableFuture.allOf(completableFutureStudentName, completableFutureSutdentAge, completableFutureFamilyInfo).join();

            resultMap.put("studentName", completableFutureStudentName.get());
            resultMap.put("studentAge", completableFutureSutdentAge.get());
            resultMap.put("studentFamilyInfo", completableFutureFamilyInfo.get());

        } catch (Exception e) {
            resultMap.put("errMsg", e.getMessage());
        }

        resultMap.put("total cost", System.currentTimeMillis() - start);

        return resultMap;
    }

自带最后的同步等待,不需要 CountDownLatch。CompletableFuture 还有很多其他好用的方法。

有兴趣的可以自己来实验下。 github 项目地址 reactive-programming-sample

参考资料

这是我在 v2ex 上提问的一个帖子,感谢网友提供的帮助。 Processing streaming data with Spring WebFlux 这文章虽然是说 webflux 的,但是列出了很多其他方案。 RxJava Essentials 中文翻译版

  • Java 5 提供的 Futures
  • Java 8 提供的 CompletableFutures
  • 第三方库 RxJava
  • Spring 5 提供的 Reactive Streams, Reactor 实现的。
  • Spring 5 中的 Webflux。

There are no comments on this post.