2016-02-04 23 views
5

Aktualnie używamy Cassandra (http://cassandra.apache.org/) dla danych z serii czasowej. Cassandra czyta bardzo szybko, ale zanim ją przedstawimy, musimy wykonać serię obliczeń na naszych danych (skutecznie naśladujemy SUMA i GRUPĘ WEDŁUG funkcjonalności SQL - coś, czego Cassandra nie obsługuje po wyjęciu z pudełka)Duże zbiory danych w Pythonie - jak radzić sobie z bardzo dużymi tablicami?

Znamy Pythonie (do pewnego stopnia) i postanowił zbudować skrypt do zapytania nasz klaster Cassandra jak również wykonywać matematyki i przedstawić wyniki w formacie JSON:

query = (
    "SELECT query here...") 

startTimeQuery = time.time() 

# Executes cassandra query 
rslt = cassession.execute(query) 

print("--- %s seconds to query ---" % (time.time() - startTimeQuery)) 

tally = {} 

startTimeCalcs = time.time() 
for row in rslt: 
    userid = row.site_user_id 

    revenue = (int(row.revenue) - int(row.reversals_revenue or 0)) 
    accepted = int(row.accepted or 0) 
    reversals_revenue = int(row.reversals_revenue or 0) 
    error = int(row.error or 0) 
    impressions_negative = int(row.impressions_negative or 0) 
    impressions_positive = int(row.impressions_positive or 0) 
    rejected = int(row.rejected or 0) 
    reversals_rejected = int(row.reversals_rejected or 0) 

    if tally.has_key(userid): 
     tally[userid]["revenue"] += revenue 
     tally[userid]["accepted"] += accepted 
     tally[userid]["reversals_revenue"] += reversals_revenue 
     tally[userid]["error"] += error 
     tally[userid]["impressions_negative"] += impressions_negative 
     tally[userid]["impressions_positive"] += impressions_positive 
     tally[userid]["rejected"] += rejected 
     tally[userid]["reversals_rejected"] += reversals_rejected 
    else: 
     tally[userid] = { 
      "accepted": accepted, 
      "error": error, 
      "impressions_negative": impressions_negative, 
      "impressions_positive": impressions_positive, 
      "rejected": rejected, 
      "revenue": revenue, 
      "reversals_rejected": reversals_rejected, 
      "reversals_revenue": reversals_revenue 
     } 


print("--- %s seconds to calculate results ---" % (time.time() - startTimeCalcs)) 

startTimeJson = time.time() 
jsonOutput =json.dumps(tally) 
print("--- %s seconds for json dump ---" % (time.time() - startTimeJson)) 

print("--- %s seconds total ---" % (time.time() - startTimeQuery)) 

print "Array Size: " + str(len(tally)) 

jest to rodzaj wyjścia otrzymamy:

--- 0.493520975113 seconds to query --- 
--- 23.1472680569 seconds to calculate results --- 
--- 0.546246051788 seconds for json dump --- 
--- 24.1871240139 seconds total --- 
Array Size: 198124 

Dużo czasu poświęcamy na nasze obliczenia, wiemy, że problem nie dotyczy samych sum i samych grup: to tylko rozmiar tablicy, która jest problemem.

Słyszeliśmy kilka dobrych rzeczy o numpy, ale natura naszych danych sprawia, że ​​rozmiar matrycy jest nieznany.

Szukamy wskazówek, jak się do tego podejść. W tym zupełnie inne podejście do programowania.

+2

Pakiet goto python dla danych timeseries to 'pandas', który używa' numpy' pod maską. Czy sprawdziłeś to? –

+2

Jak duży jest "duży"? –

+0

deserializacja? –

Odpowiedz

0

Cassandra 2.2 i nowsze wersje pozwalają użytkownikom definiować funkcje zagregowane. Możesz go użyć do przeprowadzenia kolizji kolumn po stronie kassandra. Proszę zobaczyć DataStax article dla danych o User Defined Aggregates

1

Zrobiłem bardzo podobne przetwarzanie i martwiłem się również o czas przetwarzania. Myślę, że nie wyjaśniasz czegoś ważnego: obiekt wynikowy otrzymany z kassandra jako zwrot funkcji execute() nie zawiera wszystkich linii, które chcesz. zamiast tego zawiera paginowany wynik i otrzyma linie podczas zamiatania obiektu znajdującego się na liście for. Opiera się to na osobistej obserwacji, ale nie znam więcej technicznych szczegółów, aby o tym wspomnieć.

Proponuję wyizolować zapytanie i przetwarzanie wyników przez dodanie prostej rslt = list(rslt) zaraz po komendzie execute, która zmusiłaby Pythona do przejścia przez wszystkie wiersze w wynikach przed wykonaniem przetwarzania, również zmuszając kierowcę Kasandra do uzyskania wszystkich linie, które chcesz przed przejściem do przetwarzania.

Wydaje mi się, że większość czasu potrzebnego na przetwarzanie zapytań została faktycznie zamaskowana przez kierowcę za pomocą paginacji.