2017-09-28 68 views
5

Mam aplikację java, która musi odczytać dużą ilość danych z MongoDB 3.2 i przenieść ją do Hadoop.Czytaj duże dane mongody

Ta aplikacja wsadowa jest uruchamiana co 4 godziny 6 razy dziennie.

Specyfikacje danych:

  • Dokumenty: 80000 naraz (co 4 godziny)
  • Rozmiar: 3GB

Obecnie używam MongoTemplate i Morphia w aby uzyskać dostęp do MongoDB. jednak otrzymuję wyjątek OOM podczas przetwarzania tych danych za pomocą następujących czynności:

List<MYClass> datalist = datasource.getCollection("mycollection").find().asList(); 

Jaki jest najlepszy sposób, aby odczytać te dane i wypełnić do Hadoop?

  • MongoTemplate::Stream() i pisać do Hadoop jeden po drugim?
  • batchSize(someLimit) i napisać całą partię do Hadoop?
  • Cursor.batch() i pisać do hdfs jeden po drugim?

Odpowiedz

1

Twój problem polega na asList() rozmowy

To zmusza kierowcę do iteracji całej kursora (80,000 Docs kilka koncertów), utrzymując wszystko w pamięci.

batchSize(someLimit) i Cursor.batch() potrzebujesz pomocy podczas przechodzenia przez cały kursor, niezależnie od wielkości partii.

Zamiast można:

1) Iteracja kursor: List<MYClass> datalist = datasource.getCollection("mycollection").find()

2) czytać dokumenty po jednym na raz i paszy dokumentów do bufora (powiedzmy listę)

3) Na każde 1000 dokumentów (powiedzmy) zadzwoń do interfejsu API Hadoop, wyczyść bufor, a następnie rozpocznij ponownie.

0

Wywołanie asList() będzie próbowało załadować całą kolekcję Mongodb do pamięci. Próba ustawienia obiektu w pamięci o wielkości przekraczającej 3 gb.

Iterowanie kolekcji za pomocą kursora rozwiąże ten problem. Możesz to zrobić przy użyciu klasy Datasource, ale wolę bezpieczniejsze abstrakcje typu, które oferuje Morphia z klasami DAO:

class Dao extends BasicDAO<Order, String> { 
    Dao(Datastore ds) { 
     super(Order.class, ds); 
    } 
    } 

    Datastore ds = morphia.createDatastore(mongoClient, DB_NAME); 
    Dao dao = new Dao(ds); 

    Iterator<> iterator = dao.find().fetch(); 
    while (iterator.hasNext()) { 
     Order order = iterator.next; 
     hadoopStrategy.add(order); 
    }