当前位置 : 首页 » 互动问答 » 正文

使用Actor将数据发送到Akka websockets

分类 : 互动问答 | 发布时间 : 2017-10-06 11:03:32 | 评论 : 1 | 浏览 : 37 | 喜欢 : 0

I am using Akka websockets 将数据推送到某个客户端

这是我迄今为止所做的:

 import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

进口akka.Not未使用;
导入akka.actor.ActorSystem;
导入akka.http.javadsl.ConnectHttp;
导入akka.http.javadsl.Http;
导入akka.http.javadsl.ServerBinding;
导入akka.http.javadsl.model.HttpRequest;
导入akka.http.javadsl.model.HttpResponse;
导入akka.http.javadsl.model.ws.Message;
导入akka.http.javadsl.model.ws.WebSocket;
导入akka.japi.Function;
导入akka.stream.ActorMaterializer;
进口akka.stream.Materializer;
导入akka.stream.javadsl.Flow;
导入akka.stream.javadsl.Sink;
导入akka.stream.javadsl.Source;

公共类Server {

  公共静态HttpResponse handleRequest(HttpRequest请求){
    System.out.println(“处理请求到”+ request.getUri());
    if(request.getUri().path()。equals(“/ greeter”)){
      最终流程<消息,消息,NotUsed> greeterFlow = greeterHello();
      返回WebSocket.handleWebSocketRequestWith(request,greeterFlow);
    } else {
      返回HttpResponse.create()。withStatus(404);
    }
  }

  公共静态无效的主要(字符串[]参数)抛出异常{
    ActorSystem system = ActorSystem.create();

    尝试{
      final Materializer materializer = ActorMaterializer.create(system);

      final函数<HttpRequest,HttpResponse> handler = request  - > handleRequest(request);
      CompletionStage <ServerBinding> serverBindingFuture = Http.get(system).bindAndHandleSync(handler,
          ConnectHttp.toHost(“localhost”,8080),materializer);

      如果绑定失败,//会抛出
      serverBindingFuture.toCompletableFuture()。get(1,TimeUnit.SECONDS);
      System.out.println(“按ENTER停止。”);
      新的BufferedReader(新的InputStreamReader(System.in))。readLine();
    } finally {
      system.terminate();
    }
  }

  public static Flow <Message,Message,NotUsed> greeterHello(){
    返回Flow.fromSinkAndSource(Sink.ignore(),
        Source.single(new akka.http.scaladsl.model.ws.TextMessage.Strict(“Hello!”)));
  }
}

在客户端,我成功地收到了'Hello!'信息。 但是,现在我想动态地发送数据(最好来自Actor),如下所示:

 import akka.actor.ActorRef;
import akka.actor.UntypedActor;

公共类PushActor扩展UntypedActor {
  @覆盖
  public void onReceive(Object message){
    if(message instanceof String){
      字符串statusChangeMes​​sage =(String)消息;
      //如何将此消息推送到套接字?
    } else {
      System.out.println(String.format(“'%s':\ n接收未知消息'%s'!”,selfActorPath,message));
    }
  }

}

我无法找到关于这个在线的任何示例

以下是正在使用的软件堆栈:

回答(1)

  • 1楼
  • 一个 - 不一定非常优雅 - 这样做的方式是使用 Source.actorRef 并根据您的需求将物化actor(可能是路由器actor)发送到其他地方(根据您的需要)。

     public static Flow <Message,消息,NotUsed> greeterHello(){
        返回Flow.fromSinkAndSourceMat(Sink.ignore(),
            Source.actorRef(100,OverflowStrategy.fail()),
            Keep.right())。mapMaterializedValue(/ *发送你的actorRef到路由器?* /);
    }
    

    接收连接客户端的actorRefs的人员必须负责将消息路由到他们。

相关阅读: