2015-04-23 10 views
12

Chcę używać RxJS wewnątrz mojego socket.on('sense',function(data){});. Utknąłem i byłem zdezorientowany z bardzo małą dostępną dokumentacją i moim brakiem zrozumienia RxJS. Oto mój problem.Jak używać RxJs z Socket.IO na wydarzenie

Mam distSensor.js który ma pingEnd function()

function pingEnd(x){ 
socket.emit("sense", dist); //pingEnd is fired when an Interrupt is generated. 
} 

wewnątrz mojego App.js mam

io.on('connection', function (socket) { 
    socket.on('sense', function (data) { 
     //console.log('sense from App4 was called ' + data); 
    }); 
}); 

Funkcja sens ma dużą liczbę danych czujnika które chcesz filtrować za pomocą RxJS i ja nie wiem, co powinienem zrobić, żeby użyć tutaj RxJs. Wszelkie wskazówki do właściwych dokumentów lub próbki pomogłyby.

Odpowiedz

12

Myślę, że można użyć Rx.Observable.fromEvent (https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/fromevent.md).

Oto jak zrobiłem coś podobnego, używając Bacon.js, który ma bardzo podobną API: https://github.com/raimohanska/bacon-minsk-2015/blob/gh-pages/server.js#L13

Więc w Bacon.js to pójdzie jak

io.on('connection', function(socket){ 
    Bacon.fromEvent(socket, "sense") 
    .filter(function(data) { return true }) 
    .forEach(function(data) { dealWith(data) }) 
}) 

I RxJs wy mieliście zamień Bacon.fromEvent na Rx.Observable.fromEvent.

+0

Proszę usunąć" myślę ", ponieważ działa poprawnie. Zamień też Bacona na 'Rx.Observable', tak jak już opisałeś. – Mick

8

Można tworzyć zauważalny tak:

var senses = Rx.Observable.fromEventPattern(
    function add (h) { 
     socket.on('sense',h); 
    } 
); 

Następnie użyj senses jak każdy inny widoczne.

+0

Jaka jest funkcja metody add (h)? Czy ta funkcja jest częścią RxJs? – Mitul

+1

Po prostu nazwę funkcji, po prostu wykonaj: 'function (h) {...}'. – Xesued

+0

fromEvent nie działa dla mnie. I zamiast tego znalazłem rozwiązanie. W TS to: 'let obs = Observable.fromEventPattern (h => this.socket.on ('my_event', h));' – DarkNeuron

15

doznałem jakieś dziwne problemy z wykorzystaniem metody fromEvent, więc wolę, żeby stworzyć własną Obserwowalne:

function RxfromIO (io, eventName) { 
    return Rx.Observable.create(observer => { 
    io.on(eventName, (data) => { 
     observer.onNext(data) 
    }); 
    return { 
     dispose : io.close 
    } 
}); 

można następnie wykorzystać tak:

let $connection = RxfromIO(io, 'connection'); 
+0

Po prostu użyj fromEvent, nie ma potrzeby pisania tego dodatkowego kodu! const connection $ = Rx.Observable.fromEvent (io, 'connection'); – Mick

0

Można używać rxjs -dom,

Rx.DOM.fromWebSocket(url, protocol, [openObserver], [closeObserver]) 


// an observer for when the socket is open 
var openObserver = Rx.Observer.create(function(e) { 
    console.info('socket open'); 

    // Now it is safe to send a message 
    socket.onNext('test'); 
}); 

// an observer for when the socket is about to close 
var closingObserver = Rx.Observer.create(function() { 
    console.log('socket is about to close'); 
}); 

// create a web socket subject 
socket = Rx.DOM.fromWebSocket(
    'ws://echo.websocket.org', 
    null, // no protocol 
    openObserver, 
    closingObserver); 

// subscribing creates the underlying socket and will emit a stream of incoming 
// message events 
socket.subscribe(
    function(e) { 
    console.log('message: %s', e.data); 
    }, 
    function(e) { 
    // errors and "unclean" closes land here 
    console.error('error: %s', e); 
    }, 
    function() { 
    // the socket has been closed 
    console.info('socket closed'); 
    } 
); 
+0

Wygląda na to, że Rx.DOM.fromWebSocket nie może używać z socket.io-server. Rx.DOM.fromWebSocket używa standardowego interfejsu API w3c. – clark

1

ES6 jedna wkładka, które za pomocą za pomocą ES7 bind składnię:
(czytaj $ jak stream)

import { Observable } from 'rxjs' 

// create socket 

const message$ = Observable.create($ => socket.on('message', ::$.next)) 
// translates to: Observable.create($ => socket.on('message', $.next.bind(this))) 

// filter example 
const subscription = message$ 
    .filter(message => message.text !== 'spam') 
    //or .filter(({ text }) => text !== 'spam') 
    .subscribe(::console.log) 
0

Wystarczy użyć fromEvent(). Oto pełny przykład w Node.js, ale działa tak samo w przeglądarce. Zauważ, że używam first() i takeUntil(), aby zapobiec wyciekowi pamięci: first() tylko słucha jednego zdarzenia, a następnie kończy. Teraz użyj takeUntil() na wszystkich innych wydarzeń gniazdowym słuchać tak obserwable zupełny na Disconnect:

const app = require('express')(); 
const server = require('http').createServer(app); 
const io = require('socket.io')(server); 
const Rx = require('rxjs/Rx'); 

connection$ = Rx.Observable.fromEvent(io, 'connection'); 

connection$.subscribe(socket => { 
    console.log(`Client connected`); 

    // Observables 
    const disconnect$ = Rx.Observable.fromEvent(socket, 'disconnect').first(); 
    const message$ = Rx.Observable.fromEvent(socket, 'message').takeUntil(disconnect$); 

    // Subscriptions 
    message$.subscribe(data => { 
     console.log(`Got message from client with data: ${data}`); 
     io.emit('message', data); // Emit to all clients 
    }); 

    disconnect$.subscribe(() => { 
     console.log(`Client disconnected`); 
    }) 
}); 

server.listen(3000);