2015-06-24 20 views
5

Mam następujący kod napisany w języku C#, ale zgodnie z tym, migracja danych z bazy danych Oracle do Elasticsearch zajmie mi 4-5 dni. Wstawiam rekordy w partiach po 100. Czy istnieje inny sposób, w którym migracja 4 milionów rekordów odbywa się szybciej (prawdopodobnie w mniej niż dzień, jeśli to możliwe)?Jak szybciej wstawić 4 miliony rekordów z Oracle do tabeli Elasticsearch przy użyciu C#?

public static void Selection() 
     { 
      for(int i = 1; i < 4000000; i += 1000) 
      { 
       for(int j = i; j < (i+1000); j += 100) 
       { 
        OracleCommand cmd = new OracleCommand(BuildQuery(j), 
                oracle_connection); 
        OracleDataReader reader = cmd.ExecuteReader(); 
        List<Record> list=CreateRecordList(reader); 
        insert(list); 
       } 
      } 
     } 

    private static List<Record> CreateRecordList(OracleDataReader reader) 
     { 
      List<Record> l = new List<Record>(); 
      string[] str = new string[7]; 
      try 
      { 
       while (reader.Read()) 
       { 
        for (int i = 0; i < 7; i++) 
        { 
         str[i] = reader[i].ToString(); 
        } 

        Record r = new Record(str[0], str[1], str[2], str[3],        
           str[4], str[5], str[6]); 
        l.Add(r); 
       } 
      } 
      catch (Exception er) 
      { 
       string msg = er.Message; 
      } 
      return l; 
     } 

    private static string BuildQuery(int from) 
     { 
      int to = from + change - 1; 
      StringBuilder builder = new StringBuilder(); 
      builder.AppendLine(@"select * from"); 
      builder.AppendLine("("); 
      builder.AppendLine("select FIELD_1, FIELD_2, 
      FIELD_3, FIELD_4, FIELD_5, FIELD_6, 
      FIELD_7, "); 
      builder.Append(" row_number() over(order by FIELD_1) 
      rn"); 
      builder.AppendLine(" from tablename"); 
      builder.AppendLine(")"); 
      builder.AppendLine(string.Format("where rn between {0} and {1}", 
      from, to)); 
      builder.AppendLine("order by rn"); 
      return builder.ToString(); 
     } 

    public static void insert(List<Record> l) 
     { 
      try 
      { 
       foreach(Record r in l) 
        client.Index<Record>(r, "index", "type"); 
      } 
      catch (Exception er) 
      { 
       string msg = er.Message; 
      } 
     } 
+0

Wymień 'client.Index' z 'client.IndexMany (..)' i spróbuj ustalić optymalny rozmiar porcji dla wkładki zbiorczej https://www.elastic.co/guide/en/elasticsearch/guide/current/bulk.html#_how_big_is_too_big – Rob

+1

* weź 4-5 dni * ... czy uruchomiłeś i sprawdź, czy migracja 4 milionów wierszy zajmuje 4/5 dni? – Rahul

+2

Funkcja 'ROW_NUMBER()' wpłynie negatywnie na wydajność, a ty będziesz ją uruchamiał tysiące razy. Używasz już 'OracleDataReader' - nie będzie on pobierał wszystkich czterech milionów wierszy na raz, w zasadzie przesyła je po jednej lub kilku naraz. Zamiast tego powinieneś mieć jedno zapytanie, a podczas budowania obiektów "Record", co 100 lub 500 lub 1000 (np. Zachowaj "count", który zwiększa liczbę każdej pętli), zatwierdz je (np. W 'count% 500 == 0'). To musi być wykonalne w ciągu kilku minut, a nie dni. –

Odpowiedz

4

Funkcja ROW_NUMBER() ma negatywny wpływ na wydajność, a używasz go tysiące razy. Już używasz OracleDataReader - nie spowoduje to od razu wszystkich czterech milionów wierszy na komputerze, w zasadzie przesyła je po jednej lub kilku naraz.

To musi być wykonalne w kilka minut lub godzin, a nie dni - mamy kilka procesów, które przenoszą miliony rekordów między serwerem Sybase i SQL w podobny sposób i trwa mniej niż pięć minut.

Może dać to strzał:

OracleCommand cmd = new OracleCommand("SELECT ... FROM TableName", oracle_connection); 
int batchSize = 500;  
using (OracleDataReader reader = cmd.ExecuteReader()) 
{ 
    List<Record> l = new List<Record>(batchSize); 
    string[] str = new string[7]; 
    int currentRow = 0; 

    while (reader.Read()) 
    { 
     for (int i = 0; i < 7; i++) 
     { 
      str[i] = reader[i].ToString(); 
     } 

     l.Add(new Record(str[0], str[1], str[2], str[3], str[4], str[5], str[6])); 

     // Commit every time batchSize records have been read 
     if (++currentRow == batchSize) 
     { 
      Commit(l); 
      l.Clear(); 
      currentRow = 0; 
     } 
    } 

    // commit remaining records 
    Commit(l); 
} 

Oto co Commit może wyglądać następująco:

public void Commit(IEnumerable<Record> records) 
{ 
    // TODO: Use ES's BULK features, I don't know the exact syntax 

    client.IndexMany<Record>(records, "index", "type"); 
    // client.Bulk(b => b.IndexMany(records))... something like this 
} 
+0

Wielkie dzięki za pomoc! Użyłem funkcji ES Bulk, o czym wspomniałeś. Zaczęło się wstawianie rekordów z bardzo dużą szybkością, ale po pewnym czasie argument Argument nie mógł być pustym błędem. Odpowiednio zredagowałem pytanie. –

+0

@AakritiMittal: Nie powinieneś nadpisywać swojego pytania - jeśli masz nowy problem, zadaj nowe pytanie (podaj to, jeśli chcesz). Jest to witryna z pytaniami i odpowiedziami, a zmieniając pytanie, unieważniasz inne odpowiedzi. Wycofałem twoje zmiany. –

+0

Przepraszamy za nadpisanie, ale rozwiązałem ten błąd.Chciałbym także zapytać, jak to możliwe, że aplikacja przenosi miliony rekordów między serwerem Sybase i SQL w mniej niż 5 minut? Moja nowa aplikacja nadal zajmuje 2 dni, aby przenieść 4 miliony z Oracle do Elasticsearch, ponieważ ma dużą liczbę kolumn w tabeli. –

3

Ale nie wkładając w partiach po 100
W końcu jesteś wkładając jedną w czas
(i może to nie być nawet poprawny kod do wstawienia)

foreach(Record r in l) 
    client.Index<Record>(r, "index", "type"); 

Wszystkie te girations Read zrobić nic, jeśli wkładka jest jeden wiersz naraz
Jesteś po prostu wprowadzającej opóźnienie podczas dostać następny partia
Czytaj jest (prawie) zawsze szybciej niż pisać

OracleCommand cmd = new OracleCommand(BuildQuery(all), oracle_connection); 
OracleDataReader reader = cmd.ExecuteReader(); 
while (reader.Read()) 
{ 
    client.Index<Record>(new Record(reader.GetSting(0), 
         reader.GetSting(1), reader.GetSting(2), reader.GetSting(3),  
         reader.GetSting(4), reader.GetSting(5), reader.GetSting(6), 
         "index", "type"); 
} 
reader.Close(); 

można użyć BlockingCollection jeśli chcesz czytać i pisać w równoległym
Ale używać max rozmiar czytać nie zbyt daleko od zapisu