2011-06-24 21 views
5

Przepraszamy za przesłanie tego na listę dyskusyjną użytkownika hadoopa i tutaj, ale jest to dla mnie ważna kwestia.Zestaw skrzyżowań i zestaw różnicy rekordów dwóch plików z hadoopem

Mój problem jest następujący: Mam dwa pliki wejściowe i chcę, aby określić

  • a) liczba linii, które występują tylko w pliku 1
  • b) liczbę wierszy, które wystąpić tylko w pliku 2
  • c) liczba linii wspólnych dla obu (na przykład w odniesieniu do równości string)

przykład:

File 1: 
a 
b 
c 

File 2: 
a 
d 

Pożądany wyjście dla każdego przypadku:

lines_only_in_1: 2   (b, c) 
lines_only_in_2: 1   (d) 
lines_in_both: 1   (a) 

Zasadniczo moje podejście jest następujące: pisałem własne LineRecordReader tak, że odwzorowujący odbiera parę składającą się z linii (tekst) i bajt wskazanie pliku źródłowego (0 lub 1). Mapper zwraca tylko parę ponownie, tak naprawdę nie robi nic. Jednakże skutkiem ubocznym jest, że układ łączący odbiera

Map<Line, Iterable<SourceId>> 

(gdzie sourceid oznacza 0 lub 1).

Teraz dla każdej linii można uzyskać zestaw źródeł wydaje się. W związku z tym, mogę napisać sumator który zlicza dla każdego przypadku (a, b, c) liczbę linii (Listing 1)

Kombinator wyświetla "streszczenie" tylko po oczyszczeniu (czy to bezpieczne?). Więc to zestawienie wygląda następująco:

lines_only_in_1 2531 
lines_only_in_2 3190 
lines_in_both  901 

W reduktora I wtedy tylko zsumować wartości dla tych zestawień. (Tak więc wydajność reduktora wygląda tak, jak w przypadku sumatora).

Jednak głównym problemem jest to, że trzeba traktować oba pliki źródłowe w postaci pojedynczego pliku wirtualnego, które rekordy wydajność forma (linia, sourceid) // sourceid 0 lub 1

i jestem nie wiem, jak to osiągnąć. Pytanie więc, czy uda mi się uniknąć wstępnego przetwarzania i łączenia plików wcześniej i zrobić to w locie z czymś takim jak wirtualnie połączony czytnik plików i niestandardowy czytnik rekordów. Każdy przykład kodu jest bardzo doceniany.

poważaniem, Claus

Listing 1:

public static class SourceCombiner 
    extends Reducer<Text, ByteWritable, Text, LongWritable> { 

    private long countA = 0; 
    private long countB = 0; 
    private long countC = 0; // C = lines (c)ommon to both sources 

    @Override 
    public void reduce(Text key, Iterable<ByteWritable> values, Context context) throws IOException, InterruptedException { 
     Set<Byte> fileIds = new HashSet<Byte>(); 
     for (ByteWritable val : values) { 
      byte fileId = val.get(); 

      fileIds.add(fileId); 
     } 

     if(fileIds.contains((byte)0)) { ++countA; } 
     if(fileIds.contains((byte)1)) { ++countB; } 
     if(fileIds.size() >= 2) { ++countC; } 
    } 

    protected void cleanup(Context context) 
      throws java.io.IOException, java.lang.InterruptedException 
    { 
     context.write(new Text("in_a_distinct_count_total"), new LongWritable(countA)); 
     context.write(new Text("in_b_distinct_count_total"), new LongWritable(countB)); 
     context.write(new Text("out_common_distinct_count_total"), new LongWritable(countC)); 
    } 
} 

Odpowiedz

2

Dobra, muszę przyznać, że tak naprawdę nie złapać istotę tego, co już próbowałem tak daleko, ale mam proste podejście do robienia rzeczy, których możesz potrzebować.

Spójrz na filemapper. Ten otrzyma nazwę pliku i prześle go z każdym wierszem wejścia.

public class FileMapper extends Mapper<LongWritable, Text, Text, Text> { 

     static Text fileName; 

     @Override 
     protected void map(LongWritable key, Text value, Context context) 
       throws IOException, InterruptedException { 
      context.write(value, fileName); 
     } 

     @Override 
     protected void setup(Context context) throws IOException, 
       InterruptedException { 

      String name = ((FileSplit) context.getInputSplit()).getPath().getName(); 
      fileName = new Text(name); 
     } 
    } 

Teraz mamy kilka kluczowych/wartości, które wyglądają następująco (w odniesieniu do przykładu)

a File 1 
    b File 1 
    c File 1 

    a File 2 
    d File 2 

Oczywiście sprowadzając je będzie Ci wejście takiego:

a File 1,File 2 
    b File 1 
    c File 1 
    d File 2 

Co musisz zrobić w swoim reduktorze może wyglądać tak:

public class FileReducer extends Reducer<Text, Text, Text, Text> { 

    enum Counter { 
     LINES_IN_COMMON, LINES_IN_FIRST, LINES_IN_SECOND 
    } 

    @Override 
    protected void reduce(Text key, Iterable<Text> values, Context context) 
      throws IOException, InterruptedException { 
     HashSet<String> set = new HashSet<String>(); 
     for (Text t : values) { 
      set.add(t.toString()); 
     } 

     // if we have only two files and we have just two records in our hashset 
     // the line is contained in both files 
     if (set.size() == 2) { 
      context.getCounter(Counter.LINES_IN_COMMON).increment(1); 
     } else { 
      // sorry this is a bit dirty... 
      String t = set.iterator().next(); 
      // determine which file it was by checking for the name: 
      if(t.toString().equals("YOUR_FIRST_FILE_NAME")){ 
       context.getCounter(Counter.LINES_IN_FIRST).increment(1); 
      } else { 
       context.getCounter(Counter.LINES_IN_SECOND).increment(1); 
      } 
     } 
    } 

} 

Musisz zastąpić ciąg wewnątrz instrukcji if swoimi nazwami plików.

Myślę, że używanie licznika zadań jest nieco bardziej przejrzyste niż używanie własnych prymitywów i zapisywanie ich w kontekście czyszczenia. Można pobierać liczniki pracy przez wywołanie tej rzeczy po zakończeniu:

Job job = new Job(new Configuration()); 
//setup stuff etc omitted.. 
job.waitForCompletion(true); 
// do the same line with the other enums 
long linesInCommon = job.getCounters().findCounter(Counter.LINES_IN_COMMON).getValue(); 

Nigdy nie mniej, jeśli trzeba numery linii w powszechnym itp w swoim HDFS, a następnie przejść do rozwiązania.

Mam nadzieję, że ci pomogłem.

+0

Witam, byłem nieco niejasny: Chodzi o to, że chcę, aby kombinatory podały tylko podsumowanie (liczbę linii w 1, 2 i wspólnej) do reduktora - nie ma potrzeby, aby wszystkie linie są wysyłane z powrotem do reduktora. Aby to zadziałało, kombinatory muszą zobaczyć rekordy obu plików razem (mój RecordReader już tworzy pary (line, fileId), mapowanie z nazwy pliku na fileId jest przekazywane za pomocą obiektu config). Jednak podczas dodawania plików za pomocą dwóch instrukcji FileInputFormat.addInputPath (zadanie, plik) pliki są przetwarzane indywidualnie, więc kombinatory nie widzą swojego "związku". –

+0

puh to naprawdę dziwna "optymalizacja". Ale dobry punkt. –

+0

Sry za późną odpowiedź; Czy to możliwe, że mój pomysł nie działa: Plik źródłowy jest podzielony, a podziały są wysyłane do węzłów. Węzły następnie odczytują zapisy z odpowiadającego im podziału. Tak więc zduplikowane rekordy w pliku źródłowym mogą znajdować się w kilku partycjach, a zatem rozprzestrzeniać się w wielu węzłach. Dlatego uzyskanie grupowania duplikatów jest możliwe tylko w reduktorze. Czy to jest poprawne? –