Muszę napisać wylewkę burzową do odczytu danych z portu. Chciałem się dowiedzieć, czy to było logicznie możliwe.Storm: Wylewka do odczytu danych z portu
Mając to na uwadze, zaprojektowałem prostą topologię zaprojektowaną tak samo z jedną wylewką i jedną śrubą. Dziób zbierałby żądania HTTP wysyłane za pomocą wget, a rygiel wyświetlałby żądanie - właśnie to.
Moja struktura wylewka jest następujący:
public class ProxySpout extends BaseRichSpout{
//The O/P collector
SpoutOutputCollector sc;
//The socket
Socket clientSocket;
//The server socket
ServerSocket sc;
public ProxySpout(int port){
this.sc=new ServerSocket(port);
try{
clientSocket=sc.accept();
}catch(IOException ex){
//Handle it
}
}
public void nextTuple(){
try{
InputStream ic=clientSocket.getInputStream();
byte b=new byte[8196];
int len=ic.read(b);
sc.emit(new Values(b));
ic.close();
}catch(//){
//Handle it
}finally{
clientSocket.close();
}
}
}
I wprowadziły resztę metod zbyt.
Kiedy przekształcić topologii i uruchomić go, otrzymuję komunikat o błędzie, kiedy wysyła pierwsze żądanie:
java.lang.RuntimeException: java.io.NotSerializableException: java.net.Socket
Wystarczy wiedzieć, czy coś jest nie tak ze sposobem, w jaki implementuję ten dziobek. Czy nawet wylewka może zbierać dane z portu? Lub aby wylewka działała jako instancja proxy?
Edit
Got to działa.
Kod jest:
public class ProxySpout extends BaseRichSpout{
//The O/P collector
static SpoutOutputCollector _collector;
//The socket
static Socket _clientSocket;
static ServerSocket _serverSocket;
static int _port;
public ProxySpout(int port){
_port=port;
}
public void open(Map conf,TopologyContext context, SpoutOutputCollector collector){
_collector=collector;
_serverSocket=new ServerSocket(_port);
}
public void nextTuple(){
_clientSocket=_serverSocket.accept();
InputStream incomingIS=_clientSocket.getInputStream();
byte[] b=new byte[8196];
int len=b.incomingIS.read(b);
_collector.emit(new Values(b));
}
}
Zgodnie @ sugestia Shawa, próbował inicjowanie _serverSocket
w metodzie open()
i _clientSocket
działa w nextTuple()
sposobu słuchania żądań.
Dunno z metrices wydajności to jedno, ale to działa .. :-)
Więc jest to możliwe? Aby funkcja wylewki działała jak proxy? –
Tak, ale nextTuple() będzie wywoływane co chwilę i musisz to zarządzać, jeśli dzióbek nic nie otrzyma, błędy ... – gasparms