Lagom学习 六 Akka Stream

lagom中的stream 流数据处理是基于akka stream的,异步的处理流数据的。如下看代码:

流式service好处是:   

         A: 并行:  hellos.mapAsync(8, name -> helloService.hello(name).invoke())),  八个线程并行处理;

        B: 异步: 返回completedFuture, 使用基于Web Socket的方式。

        C: 全双工: 

package com.example.hello.stream.impl;

import akka.NotUsed;
import akka.stream.javadsl.Source;
import com.lightbend.lagom.javadsl.api.ServiceCall;
import com.example.hello.hello.api.HelloService;
import com.example.hello.stream.api.StreamService;

import javax.inject.Inject;

import static java.util.concurrent.CompletableFuture.completedFuture;

/**
 * Implementation of the HelloString.
 */
public class StreamServiceImpl implements StreamService {

  private final HelloService helloService;
  private final StreamRepository repository;

  @Inject
  public StreamServiceImpl(HelloService helloService, StreamRepository repository) {
    this.helloService = helloService;
    this.repository = repository;
  }

  @Override
  public ServiceCall<Source<String, NotUsed>, Source<String, NotUsed>> directStream() {
    return hellos -> completedFuture(
      hellos.mapAsync(8, name ->  helloService.hello(name).invoke()));
  }

  @Override
  public ServiceCall<Source<String, NotUsed>, Source<String, NotUsed>> autonomousStream() {
    return hellos -> completedFuture(
        hellos.mapAsync(8, name -> repository.getMessage(name).thenApply( message ->
            String.format("%s, %s!", message.orElse("Hello"), name)
        ))
    );
  }
}

调用streamed service 接口的方式:

  Source<String, ?> response = await(streamService.directStream().invoke(
                Source.from(Arrays.asList("a", "b", "c"))
                        .concat(Source.maybe())));
        List<String> messages = await(response.take(3).runWith(Sink.seq(), mat));
        assertEquals(Arrays.asList("Hello, a!", "Hello, b!", "Hello, c!"), messages);

private <T> T await(CompletionStage<T> future) throws Exception {   //等待10秒 拿结果
return future.toCompletableFuture().get(10, TimeUnit.SECONDS);
}
原文地址:https://www.cnblogs.com/liufei1983/p/8486082.html