2017-04-12 63 views
5

Próbuję utworzyć prosty serwer proxy dla połączeń Websocket, używając strumieni Play i akka. przepływ ruchu jest tak:Serwer proxy sieci Web korzystający z kanałów Play 2.6 i akka

(Client) request ->   -> request (Server) 
         Proxy 
(Client) response <-   <- response (Server) 

wpadłem na następujący kod po wykonaniu kilka przykładów:

def socket = WebSocket.accept[String, String] { request => 

val uuid = UUID.randomUUID().toString 

// wsOut - actor that deals with incoming websocket frame from the Client 
// wsIn - publisher of the frame for the Server 
val (wsOut: ActorRef, wsIn: Publisher[String]) = { 
    val source: Source[String, ActorRef] = Source.actorRef[String](10, OverflowStrategy.dropTail) 
    val sink: Sink[String, Publisher[String]] = Sink.asPublisher(fanout = false) 
    source.toMat(sink)(Keep.both).run() 
} 

// sink that deals with the incoming messages from the Server 
val serverIncoming: Sink[Message, Future[Done]] = 
    Sink.foreach[Message] { 
    case message: TextMessage.Strict => 
     println("The server has sent: " + message.text) 
    } 

// source for sending a message over the WebSocket 
val serverOutgoing = Source.fromPublisher(wsIn).map(TextMessage(_)) 

// flow to use (note: not re-usable!) 
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://0.0.0.0:6000")) 

// the materialized value is a tuple with 
// upgradeResponse is a Future[WebSocketUpgradeResponse] that 
// completes or fails when the connection succeeds or fails 
// and closed is a Future[Done] with the stream completion from the incoming sink 
val (upgradeResponse, closed) = 
serverOutgoing 
    .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse] 
    .toMat(serverIncoming)(Keep.both) // also keep the Future[Done] 
    .run() 

// just like a regular http request we can access response status which is available via upgrade.response.status 
// status code 101 (Switching Protocols) indicates that server support WebSockets 
val connected = upgradeResponse.flatMap { upgrade => 
    if (upgrade.response.status == StatusCodes.SwitchingProtocols) { 
    Future.successful(Done) 
    } else { 
    throw new RuntimeException(s"Connection failed: ${upgrade.response.status}") 
    } 
} 

// in a real application you would not side effect here 
connected.onComplete(println) 
closed.foreach(_ => println("closed")) 

val actor = system.actorOf(WebSocketProxyActor.props(wsOut, uuid)) 
val finalFlow = { 
    val sink = Sink.actorRef(actor, akka.actor.Status.Success(())) 
    val source = Source.maybe[String] // what the client receives. How to connect with the serverIncoming sink ??? 
    Flow.fromSinkAndSource(sink, source) 
} 

finalFlow 

Z tym kodem, ruch idzie od klienta do serwera proxy do serwera , z powrotem do Proxy i to wszystko. Nie dociera do klienta. Jak mogę to naprawić? myślę, że trzeba jakoś połączyć serverIncoming opadają na source w finalFlow, ale nie mogę dowiedzieć się, jak to zrobić ...

Albo jestem całkowicie błędne z takim podejściem? Czy lepiej używać urządzenia Bidiflow lub Graph? Jestem nowy w strumieniach akka i wciąż próbuję to rozgryźć.

Odpowiedz

3

Następujące wydaje się działać. Uwaga: Zaimplementowałem zarówno gniazdo serwera, jak i gniazdo proxy w tym samym kontrolerze, ale można je podzielić lub wdrożyć ten sam kontroler w oddzielnych instancjach. Adres URL ws do "wyższej" usługi będzie musiał być zaktualizowany w obu przypadkach.

package controllers 

import javax.inject._ 

import akka.actor.{Actor, ActorRef, ActorSystem, Props} 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest, WebSocketUpgradeResponse} 
import akka.stream.Materializer 
import akka.stream.scaladsl.Flow 
import play.api.libs.streams.ActorFlow 
import play.api.mvc._ 

import scala.concurrent.{ExecutionContext, Future} 
import scala.language.postfixOps 

@Singleton 
class SomeController @Inject()(implicit exec: ExecutionContext, 
           actorSystem: ActorSystem, 
           materializer: Materializer) extends Controller { 

    /*--- proxy ---*/ 
    def websocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] = 
    Http().webSocketClientFlow(WebSocketRequest("ws://localhost:9000/upper-socket")) 

    def proxySocket: WebSocket = WebSocket.accept[String, String] { _ => 
    Flow[String].map(s => TextMessage(s)) 
     .via(websocketFlow) 
     .map(_.asTextMessage.getStrictText) 
    } 

    /*--- server ---*/ 
    class UpperService(socket: ActorRef) extends Actor { 
    override def receive: Receive = { 
     case s: String => socket ! s.toUpperCase() 
     case _ => 
    } 
    } 

    object UpperService { 
    def props(socket: ActorRef): Props = Props(new UpperService(socket)) 
    } 

    def upperSocket: WebSocket = WebSocket.accept[String, String] { _ => 
    ActorFlow.actorRef(out => UpperService.props(out)) 
    } 
} 

Musisz trasy należy skonfigurować tak:

GET /upper-socket controllers.SomeController.upperSocket 
GET /proxy-socket controllers.SomeController.proxySocket 

można przetestować wysyłając ciąg WS: // localhost: 9000/proxy-gniazdo. Odpowiedź będzie łańcuchem wielkimi literami.

Nie będzie limitu czasu po 1 minucie bezczynności jednak:

akka.stream.scaladsl.TcpIdleTimeoutException: TCP idle-timeout encountered on connection to [localhost:9000], no bytes passed in the last 1 minute 

Ale patrz: http://doc.akka.io/docs/akka-http/current/scala/http/common/timeouts.html w jaki sposób skonfigurować to.

+0

To nie jest proxy. Jest to bardzo prosty serwer, który przesuwa łańcuchy znaków i wysyła je do klienta za pomocą gniazd internetowych. –

+0

@morganfreeman Miałem na myśli, że sam kontroler jest proxy. Górna usługa może być zastąpiona przez aktora, który wywołuje zewnętrzną usługę do rzeczywistego przetwarzania, coś w tym stylu: http://doc.akka.io/docs/akka/2.4/scala/stream/stream-integrations.html#Integrating_with_External_Services . Ale prawdopodobnie nie rozumiem dokładnie twojego problemu. – botkop

+0

Prawda, usługa UpperService mogłaby teoretycznie wysłać ramki za pośrednictwem gniazda sieci Web do serwera. Próbowałem go na początku. To gniazdo sieciowe na serwerze to przepływ. Nie mogłem "połączyć" metody odbioru aktora z Źródłem Przepływu i Zlewem przepływu z powrotem do 'gniazda' (dane wracają do klienta). Mógłbym zainicjować przepływ Web Socket w metodzie receive, ale to otwierałoby połączenie z serwerem za każdym razem. –

2

zapotrzebowanie Pełnomocnik aby zapewnić dwa strumienie (Pełnomocnik przepływu A/B)

(Client) request -> Proxy Flow A -> request (Server) 

(Client) response <- Proxy Flow B <- response (Server) 

jedną z możliwości realizacji takiego przepływu proxy stosuje ActorSubscriber i SourceQueue:

class Subscriber[T](proxy: ActorRef) extends ActorSubscriber { 
    private var queue = Option.empty[SourceQueueWithComplete[T]] 
    def receive = { 
    case Attach(sourceQueue) => queue = Some(sourceQueue) 
    case msg: T => // wait until queue attached and pass forward all msgs to queue and the proxy actor 
    } 
} 

def proxyFlow[T](proxy: ActorRef): Flow[T, ActorRef] = { 
    val sink = Sink.actorSubscriber(Props(new Subscriber[T](proxy))) 
    val source = Source.queue[T](...) 
    Flow.fromSinkAndSourceMat(sink, source){ (ref, queue) => 
    ref ! Attach(queue) 
    ref 
    } 
} 

Następnie można zmontować klient działa jak:

val proxy = actorOf(...) 
val requestFlow = proxyFlow[Request](proxy) 
val responseFlow = proxyFlow[Response](proxy) 
val finalFlow: Flow[Request, Response] = 
    requestFlow.via(webSocketFlow).via(responseFlow) 
2

Przede wszystkim potrzebujesz około akka import:

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.ws.WebSocketRequest 
import akka.http.scaladsl.model.ws.Message 
import akka.http.scaladsl.model.HttpRequest 
import akka.http.scaladsl.model.HttpResponse 
import akka.stream.scaladsl.Flow 
import akka.http.scaladsl.server.Directives.{ extractUpgradeToWebSocket, complete } 

To jest przykład App że tworzy WebSocket proxy, wiążące 0.0.0.0 na porcie 80, proxing do ws://echo.websocket.org:

object WebSocketProxy extends App { 
    implicit val system = ActorSystem() 
    implicit val materializer = ActorMaterializer() 

    private[this] def manipulateFlow: Flow[Message, Message, akka.NotUsed] = ??? 

    private[this] def webSocketFlow = 
    Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org")) 

    private[this] val route: Flow[HttpRequest, HttpResponse, Any] = 
    extractUpgradeToWebSocket { upgrade => 
     val webSocketFlowProxy = manipulateFlow via webSocketFlow 
     val handleWebSocketProxy = upgrade.handleMessages(webSocketFlowProxy) 
     complete(handleWebSocketProxy) 
    } 

    private[this] val proxyBindingFuture = 
    Http().bindAndHandle(route, "0.0.0.0", 80) 

    println(s"Server online\nPress RETURN to stop...") 
    Console.readLine() 
} 

Trzeba dostosować go do play i na swój struktura aplikacji.

Uwagi:

  • Pamiętaj, aby rozpiąć proxyBindingFuture i zakończyć system w produkcji;
  • potrzebujesz tylko manipulateFlow, jeśli chcesz manipulować wiadomościami.