Przepraszamy za przesłanie tego na listę dyskusyjną użytkownika hadoopa i tutaj, ale jest to dla mnie ważna kwestia.Zestaw skrzyżowań i zestaw różnicy rekordów dwóch plików z hadoopem
Mój problem jest następujący: Mam dwa pliki wejściowe i chcę, aby określić
- a) liczba linii, które występują tylko w pliku 1
- b) liczbę wierszy, które wystąpić tylko w pliku 2
- c) liczba linii wspólnych dla obu (na przykład w odniesieniu do równości string)
przykład:
File 1:
a
b
c
File 2:
a
d
Pożądany wyjście dla każdego przypadku:
lines_only_in_1: 2 (b, c)
lines_only_in_2: 1 (d)
lines_in_both: 1 (a)
Zasadniczo moje podejście jest następujące: pisałem własne LineRecordReader tak, że odwzorowujący odbiera parę składającą się z linii (tekst) i bajt wskazanie pliku źródłowego (0 lub 1). Mapper zwraca tylko parę ponownie, tak naprawdę nie robi nic. Jednakże skutkiem ubocznym jest, że układ łączący odbiera
Map<Line, Iterable<SourceId>>
(gdzie sourceid oznacza 0 lub 1).
Teraz dla każdej linii można uzyskać zestaw źródeł wydaje się. W związku z tym, mogę napisać sumator który zlicza dla każdego przypadku (a, b, c) liczbę linii (Listing 1)
Kombinator wyświetla "streszczenie" tylko po oczyszczeniu (czy to bezpieczne?). Więc to zestawienie wygląda następująco:
lines_only_in_1 2531
lines_only_in_2 3190
lines_in_both 901
W reduktora I wtedy tylko zsumować wartości dla tych zestawień. (Tak więc wydajność reduktora wygląda tak, jak w przypadku sumatora).
Jednak głównym problemem jest to, że trzeba traktować oba pliki źródłowe w postaci pojedynczego pliku wirtualnego, które rekordy wydajność forma (linia, sourceid) // sourceid 0 lub 1
i jestem nie wiem, jak to osiągnąć. Pytanie więc, czy uda mi się uniknąć wstępnego przetwarzania i łączenia plików wcześniej i zrobić to w locie z czymś takim jak wirtualnie połączony czytnik plików i niestandardowy czytnik rekordów. Każdy przykład kodu jest bardzo doceniany.
poważaniem, Claus
Listing 1:
public static class SourceCombiner
extends Reducer<Text, ByteWritable, Text, LongWritable> {
private long countA = 0;
private long countB = 0;
private long countC = 0; // C = lines (c)ommon to both sources
@Override
public void reduce(Text key, Iterable<ByteWritable> values, Context context) throws IOException, InterruptedException {
Set<Byte> fileIds = new HashSet<Byte>();
for (ByteWritable val : values) {
byte fileId = val.get();
fileIds.add(fileId);
}
if(fileIds.contains((byte)0)) { ++countA; }
if(fileIds.contains((byte)1)) { ++countB; }
if(fileIds.size() >= 2) { ++countC; }
}
protected void cleanup(Context context)
throws java.io.IOException, java.lang.InterruptedException
{
context.write(new Text("in_a_distinct_count_total"), new LongWritable(countA));
context.write(new Text("in_b_distinct_count_total"), new LongWritable(countB));
context.write(new Text("out_common_distinct_count_total"), new LongWritable(countC));
}
}
Witam, byłem nieco niejasny: Chodzi o to, że chcę, aby kombinatory podały tylko podsumowanie (liczbę linii w 1, 2 i wspólnej) do reduktora - nie ma potrzeby, aby wszystkie linie są wysyłane z powrotem do reduktora. Aby to zadziałało, kombinatory muszą zobaczyć rekordy obu plików razem (mój RecordReader już tworzy pary (line, fileId), mapowanie z nazwy pliku na fileId jest przekazywane za pomocą obiektu config). Jednak podczas dodawania plików za pomocą dwóch instrukcji FileInputFormat.addInputPath (zadanie, plik) pliki są przetwarzane indywidualnie, więc kombinatory nie widzą swojego "związku". –
puh to naprawdę dziwna "optymalizacja". Ale dobry punkt. –
Sry za późną odpowiedź; Czy to możliwe, że mój pomysł nie działa: Plik źródłowy jest podzielony, a podziały są wysyłane do węzłów. Węzły następnie odczytują zapisy z odpowiadającego im podziału. Tak więc zduplikowane rekordy w pliku źródłowym mogą znajdować się w kilku partycjach, a zatem rozprzestrzeniać się w wielu węzłach. Dlatego uzyskanie grupowania duplikatów jest możliwe tylko w reduktorze. Czy to jest poprawne? –