Software Architecture

Resposive System Design with Reactor

回顾

@RestController
public class HelloController {
    @GetMapping("/hello/{latency}")
    public String hello(@PathVariable long latency) {
        try {
            TimeUnit.MILLISECONDS.sleep(latency);
        } catch (InterruptedException e) {
            return "Error during thread sleep";
        }
        return "Welcome to reactive world ~";
    }
}

Another Version

@RestController
public class HelloController {
    @GetMapping("/hello/{latency}")
    public Mono<String> hello(@PathVariable int latency) {
        return Mono.just("Welcome to reactive world ~")
        .delayElement(Duration.ofMillis(latency));
    }
}

https://www.bilibili.com/video/BV1dt4y1y7bC?p=1 0:30

sa-spring/spring-webflux

Gatling

class LoadSimulation extends Simulation {
    val baseUrl = System.getProperty("base.url")
    val testPath = System.getProperty("test.path")
    val sim_users = System.getProperty("sim.users").toInt
    val httpConf = http.baseUrl(baseUrl)
      // 定义模拟的请求,重复100次
    val helloRequest = repeat(100) {
      exec(http("hello-with-latency")
        .get(testPath)).pause(0.5 second, 1 seconds)
    }
      // 定义模拟的场景
    val scn = scenario("hello")
        .exec(helloRequest)
    // 配置并发用户的数量在10秒内均匀提高至sim_users指定的数量
    setUp(scn.inject(rampUsers(sim_users).during(10 seconds)).protocols(httpConf))
}

结果对比

Why?

It's blocked. Use more threads and more hardware resources for better parallelism.

@RestController
public class HelloController {
    @GetMapping("/hello/{latency}")
    public String hello(@PathVariable long latency) {
        try {
            TimeUnit.MILLISECONDS.sleep(latency);
        } ...;
    }
}

Thread Pool

  • 线程内存开销: 1MB/Thread
  • 线程调度开销: 5000~10000 clock cycles

如何更高效?异步(Asynchronous)执行

Asynchronous approaches: Callback

height:300px

Callbacks

userService.getFavorites(userId, new Callback<List<String>>() { 
    public void onSuccess(List<String> list) { 
      if (list.isEmpty()) { 
        suggestionService.getSuggestions(new Callback<List<Favorite>>() {
          public void onSuccess(List<Favorite> list) { 
            UiUtils.submitOnUiThread(() -> { 
              list.stream().limit(5).forEach(uiList::show); 
              });
          }
          public void onError(Throwable error) { 
            UiUtils.errorPopup(error);
          }
        });
      } else {
        list.stream().limit(5).forEach(favId -> favoriteService.getDetails(favId, 
              new Callback<Favorite>() {
                public void onSuccess(Favorite details) {
                  UiUtils.submitOnUiThread(() -> uiList.show(details));
                }
                public void onError(Throwable error) {
                  UiUtils.errorPopup(error);
                }
              }
            ));
      }
    }
    public void onError(Throwable error) {
      UiUtils.errorPopup(error);
    }
  });

Asynchronous approaches: Future/CompletableFuture

height:350px

Future/CompletableFuture

CompletableFuture<List<String>> ids = ifhIds(); 

CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { 
    Stream<CompletableFuture<String>> zip =
            l.stream().map(i -> { 
                CompletableFuture<String> nameTask = ifhName(i); 
                CompletableFuture<Integer> statTask = ifhStat(i); 

                return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); 
            });
    List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); 
    CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);

    CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); 
    return allDone.thenApply(v -> combinationList.stream()
            .map(CompletableFuture::join) 
            .collect(Collectors.toList()));
});

List<String> results = result.join(); 

Asynchronous approaches

其实都在控制数据的一步步加工处理

Reactive









“Reactive,” refers to programming models that are built around reacting to change.

JDK9 java.util.concurrent.Flow
public final class Flow {
    private Flow() {}
    @FunctionalInterface
    public static interface Publisher<T> {
        public void subscribe(Subscriber<? super T> subscriber);
    }
    public static interface Subscriber<T> {
        public void onSubscribe(Subscription subscription);
        public void onNext(T item);
        public void onError(Throwable throwable);
        public void onComplete();
    }
    public static interface Subscription {
        public void request(long n);
        public void cancel();
    }
    public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {}
}

Reactive Streams

height:180px

height:150px


https://www.reactive-streams.org/

示例

public class PrintSubscriber implements Subscriber<Integer> {
    private Subscription subscription;
    @Override
    public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); }
    @Override
    public void onNext(Integer item) { System.out.println("Received item: " + item); subscription.request(1); }
    @Override
    public void onError(Throwable error) { System.out.println("Error occurred: " + error.getMessage()); }
    @Override
    public void onComplete() { System.out.println("PrintSubscriber is complete"); }
}
public class SubmissionPublisherExample {
    public static void main(String... args) throws InterruptedException {
        SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
        publisher.subscribe(new PrintSubscriber());
        System.out.println("Submitting items...");
        for (int i = 0; i < 10; i++) {
            publisher.submit(i);
        }
        Thread.sleep(1000);
        publisher.close();
    }
}

运行

Submitting items...
Received item: 0
Received item: 1
Received item: 2
Received item: 3
Received item: 4
Received item: 5
Received item: 6
Received item: 7
Received item: 8
Received item: 9
PrintSubscriber is complete

Back Pressure

Resistance or force opposing the desired flow of fluid through pipes.

— Wikipedia

背压(Back Pressure),通常是指运动流体在密闭容器中沿其路径(譬如管路或风通路)流动时,由于受到障碍物或急转弯道的阻碍而被施加的与运动方向相反的压力。

— 百度

Back Pressure









Backpressure is when a downstream can tell an upstream to send it fewer data in order to prevent it from being overwhelmed.

反思一下

In synchronous, blocking calls serve as a natural form of back pressure that forces the caller to wait.

height:200px

供需平衡

It becomes important to control the rate of events so that a fast producer does not overwhelm its destination.

回顾示例:带“背压”控制的pub-sub

public class PrintSubscriber implements Subscriber<Integer> {
    private Subscription subscription;
    @Override
    public void onSubscribe(Subscription subscription) { 
      this.subscription = subscription; 
      subscription.request(1); 
    }
    @Override
    public void onNext(Integer item) { 
      System.out.println("Received item: " + item); 
      subscription.request(1); 
    }
    ...
}

Reactive Streams Specification

height:400px

Project Reactor

Flux, an Asynchronous Sequence of 0-N Items

height:400px

Project Reactor

Mono, an Asynchronous 0-1 Result

height:400px

Operators

Operators

Reactive Programming

     userService.getFavorites(userId)
                .timeout(Duration.ofMillis(800)) 
                .onErrorResume(cacheService.cachedFavoritesFor(userId)) 
                .flatMap(favoriteService::getDetails) 
                .switchIfEmpty(suggestionService.getSuggestions())
                .take(5)
                .publishOn(UiUtils.uiThreadScheduler())
                .subscribe(uiList::show, UiUtils::errorPopup);

Reactive Programming

Flux<String> ids = ifhrIds(); 

Flux<String> combinations =
        ids.flatMap(id -> { 
            Mono<String> nameTask = ifhrName(id); 
            Mono<Integer> statTask = ifhrStat(id); 

            return nameTask.zipWith(statTask, 
                    (name, stat) -> "Name " + name + " has stats " + stat);
        });

Mono<List<String>> result = combinations.collectList(); 

List<String> results = result.block();

Imperative vs. Declarative

“You know, imperative programming is like how you do something, and declarative programming is more like what you do, or something.”

仅描述应该发生什么,但什么都尚未发生... 直到有人subscribe, then push (message-driven)。

A message-driven architecture provides you with an asynchronous boundary that decouples you from time and space.

Spring Web Stacks

  • In Spring MVC, it is assumed that applications may block the current thread and for this reason servlet containers use a large thread pool, to absorb potential blocking during request handling.
  • In Spring WebFlux, it is assumed that applications will not block, and therefore non-blocking servers use a small, fixed-size thread pool (event loop workers) to handle requests.
Spring WebFlux - Client
client.get().uri("/employees").retrieve().bodyToFlux(Employee.class).map(this::doSomeSlowWork)
          .subscribe(new Subscriber<Employee>() {
              private Subscription subscription;
              private Integer count = 0;
              @Override
              public void onNext(Employee t) {
                  count++;
                  if (count >= 2) {
                      count = 0;
                      subscription.request(2); System.out.println("Client requested 2 ");
                  }
                  System.out.println("Client subscribes: " + t);
              }
              @Override
              public void onSubscribe(Subscription subscription) {
                  this.subscription = subscription;
                  subscription.request(2); System.out.println("Client requested 2 ");
              }
              ...
          });

WebFlux - Server

A data repository (acting as Publisher) can produce data that an HTTP server (acting as Subscriber) can then write to the response. The main purpose of Reactive Streams is to let the subscriber control how quickly or how slowly the publisher produces data.

    @GetMapping(produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    private Flux<Employee> getAllEmployees() {
        return employeeRepository.findAllEmployees()
                                .delayElements(Duration.ofMillis(100))
                                .doOnNext(employee -> System.out.println("Server produces:" + employee));
    }

https://www.bilibili.com/video/BV1dt4y1y7bC?p=5 2:00

sa-spring/employee-reactive

R2DBC

The Reactive Relational Database Connectivity (R2DBC) project brings reactive programming APIs to relational databases.


https://r2dbc.io/

The Reactive Manifesto

Organizations working in disparate domains are independently discovering patterns for building software that look the same. These systems are more robust, more resilient, more flexible and better positioned to meet modern demands.


https://www.reactivemanifesto.org/

The Reactive Manifesto -2

These changes are happening because application requirements have changed dramatically in recent years. Only a few years ago a large application had tens of servers, seconds of response time, hours of offline maintenance and gigabytes of data. Today applications are deployed on everything from mobile devices to cloud-based clusters running thousands of multi-core processors. Users expect millisecond response times and 100% uptime. Data is measured in Petabytes. Today's demands are simply not met by yesterday’s software architectures.

The Reactive Manifesto -3

We believe that a coherent approach to systems architecture is needed, and we believe that all necessary aspects are already recognized individually: we want systems that are Responsive, Resilient, Elastic and Message Driven. We call these Reactive Systems.

Reactive System

Systems built as Reactive Systems are more flexible, loosely-coupled and scalable. This makes them easier to develop and amenable to change. They are significantly more tolerant of failure and when failure does occur they meet it with elegance rather than disaster. Reactive Systems are highly responsive, giving users effective interactive feedback.

Elastic

The system stays responsive under varying workload. Reactive Systems can react to changes in the input rate by increasing or decreasing the resources allocated to service these inputs. This implies designs that have no contention points or central bottlenecks, resulting in the ability to shard or replicate components and distribute inputs among them. Reactive Systems support predictive, as well as Reactive, scaling algorithms by providing relevant live performance measures. They achieve elasticity in a cost-effective way on commodity hardware and software platforms.

Message Driven

Reactive Systems rely on asynchronous message-passing to establish a boundary between components that ensures loose coupling, isolation and location transparency. This boundary also provides the means to delegate failures as messages. Employing explicit message-passing enables load management, elasticity, and flow control by shaping and monitoring the message queues in the system and applying back-pressure when necessary. Location transparent messaging as a means of communication makes it possible for the management of failure to work with the same constructs and semantics across a cluster or within a single host. Non-blocking communication allows recipients to only consume resources while active, leading to less system overhead.

Resilient

The system stays responsive in the face of failure. This applies not only to highly-available, mission-critical systems — any system that is not resilient will be unresponsive after a failure. Resilience is achieved by replication, containment, isolation and delegation. Failures are contained within each component, isolating components from each other and thereby ensuring that parts of the system can fail and recover without compromising the system as a whole. Recovery of each component is delegated to another (external) component and high-availability is ensured by replication where necessary. The client of a component is not burdened with handling its failures.

Responsive

The system responds in a timely manner if at all possible. Responsiveness is the cornerstone of usability and utility, but more than that, responsiveness means that problems may be detected quickly and dealt with effectively. Responsive systems focus on providing rapid and consistent response times, establishing reliable upper bounds so they deliver a consistent quality of service. This consistent behavior in turn simplifies error handling, builds end user confidence, and encourages further interaction.