2014-05-13 6 views
6

Muszę napisać wylewkę burzową do odczytu danych z portu. Chciałem się dowiedzieć, czy to było logicznie możliwe.Storm: Wylewka do odczytu danych z portu

Mając to na uwadze, zaprojektowałem prostą topologię zaprojektowaną tak samo z jedną wylewką i jedną śrubą. Dziób zbierałby żądania HTTP wysyłane za pomocą wget, a rygiel wyświetlałby żądanie - właśnie to.

Moja struktura wylewka jest następujący:

public class ProxySpout extends BaseRichSpout{ 
     //The O/P collector 
     SpoutOutputCollector sc; 
     //The socket 
     Socket clientSocket; 
     //The server socket 
     ServerSocket sc; 

     public ProxySpout(int port){ 
      this.sc=new ServerSocket(port); 
      try{ 
       clientSocket=sc.accept(); 
      }catch(IOException ex){ 
       //Handle it 
      } 
     } 

     public void nextTuple(){ 
      try{ 
       InputStream ic=clientSocket.getInputStream(); 
       byte b=new byte[8196]; 
       int len=ic.read(b); 

       sc.emit(new Values(b)); 
       ic.close(); 
      }catch(//){ 
       //Handle it 
      }finally{ 
       clientSocket.close(); 
      } 
     } 
} 

I wprowadziły resztę metod zbyt.

Kiedy przekształcić topologii i uruchomić go, otrzymuję komunikat o błędzie, kiedy wysyła pierwsze żądanie:

java.lang.RuntimeException: java.io.NotSerializableException: java.net.Socket

Wystarczy wiedzieć, czy coś jest nie tak ze sposobem, w jaki implementuję ten dziobek. Czy nawet wylewka może zbierać dane z portu? Lub aby wylewka działała jako instancja proxy?

Edit

Got to działa.

Kod jest:

public class ProxySpout extends BaseRichSpout{ 
     //The O/P collector 
     static SpoutOutputCollector _collector; 
     //The socket 
     static Socket _clientSocket; 
     static ServerSocket _serverSocket; 
     static int _port; 

     public ProxySpout(int port){ 
      _port=port; 
     } 

     public void open(Map conf,TopologyContext context, SpoutOutputCollector collector){ 
      _collector=collector; 
      _serverSocket=new ServerSocket(_port); 
     } 

     public void nextTuple(){ 
      _clientSocket=_serverSocket.accept(); 
      InputStream incomingIS=_clientSocket.getInputStream(); 
      byte[] b=new byte[8196]; 
      int len=b.incomingIS.read(b); 
      _collector.emit(new Values(b)); 
    } 
} 

Zgodnie @ sugestia Shawa, próbował inicjowanie _serverSocket w metodzie open() i _clientSocket działa w nextTuple() sposobu słuchania żądań.

Dunno z metrices wydajności to jedno, ale to działa .. :-)

Odpowiedz

6

W konstruktorze wystarczy przypisać zmienne. Spróbuj utworzyć instancję ServerSocket w metodzie przygotowywania, nie pisz żadnych nowych ... w konstruktorze. Zmieniaj nazwy zmiennych, masz dwie zmienne sc.

public class ProxySpout extends BaseRichSpout{ 

    int port; 

    public ProxySpout(int port){ 
     this.port=port; 
    } 

    @Override 
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 
     //new ServerSocket 
    } 

    @Override 
    public void nextTuple() { 

    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 

    } 
} 

Jeśli umieścisz go w przygotowanie metody następnie zostanie wywołana tylko raz wylewka jest już wdrożony, więc nie musi być w odcinkach, a zostanie ona wywołana tylko raz w życiu wylewki, więc nie jest to nieefektywne.

+0

Więc jest to możliwe? Aby funkcja wylewki działała jak proxy? –

+4

Tak, ale nextTuple() będzie wywoływane co chwilę i musisz to zarządzać, jeśli dzióbek nic nie otrzyma, błędy ... – gasparms