2016-02-28 30 views
7

Po przeczytaniu kilku stron dokumentacji Apache Flink (official documentation, dataartisans), a także przykładów podanych w official repository, wciąż widząc przykłady gdzie oni użyć jako źródła danych dla streamming plik już pobrane, łącząc się zawsze z localhostem.Get elementy JSON ze wstęgi z Apache Flink

Próbuję użyć Apache Flink do pobrania plików JSON zawierających dynamiczne dane. Moim zamiarem jest próba ustawienia adresu URL, w którym mogę uzyskać dostęp do pliku JSON jako źródła wejściowego Apache Flink, zamiast pobierania go za pomocą innego systemu i przetwarzania pobranego pliku za pomocą Apache Flink.

Czy można ustanowić to połączenie sieciowe z Apache Flink?

Odpowiedz

4

Możesz zdefiniować adresy URL, które chcesz pobrać jako dane wejściowe DataStream, a następnie pobrać dokumenty z poziomu MapFunction. Poniższy kod demonstruje to:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

DataStream<String> inputURLs = env.fromElements("http://www.json.org/index.html"); 

inputURLs.map(new MapFunction<String, String>() { 
    @Override 
    public String map(String s) throws Exception { 
     URL url = new URL(s); 
     InputStream is = url.openStream(); 

     BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is)); 

     StringBuilder builder = new StringBuilder(); 
     String line; 

     try { 
      while ((line = bufferedReader.readLine()) != null) { 
       builder.append(line + "\n"); 
      } 
     } catch (IOException ioe) { 
      ioe.printStackTrace(); 
     } 

     try { 
      bufferedReader.close(); 
     } catch (IOException ioe) { 
      ioe.printStackTrace(); 
     } 

     return builder.toString(); 
    } 
}).print(); 

env.execute("URL download job"); 
+0

Uruchamiam przykładowy kod, ale uruchamiany jest tylko raz i odczytuje cały plik. Jednak Iit nie jest streaming, myślałem, że będzie contiune czytać, gdy jest incease w pliku json. – zt1983811

+0

W tym celu musiałbyś użyć 'ContinuousFileMonitoringFunction'. Przesyłanie strumieniowe jako takie nie oznacza, że ​​zadanie będzie działać nieskończenie długo. Dzieje się tak tylko wtedy, gdy masz nieskończone źródło. Ale w tym przypadku funkcja 'env.fromElements' generuje skończone źródło strumieniowe. Gdy to źródło osiągnie koniec, program się kończy. –