2012-06-14 8 views
14

Chciałbym skompresować strumień wejściowy w java przy użyciu kompresji Gzip.Kompresuj InputStream z gzipem

Załóżmy, że mamy strumień wejściowy (1 GB danych ..) nie został skompresowany. Chcę wskutek skompresowany InputStream od źródła:

public InputStream getCompressedStream(InputStream unCompressedStream) { 

    // Not working because it's uncompressing the stream, I want the opposite. 
    return new GZIPInputStream(unCompressedStream); 

} 

Odpowiedz

3

Do kompresowania danych potrzebny jest GZIPOutputStream. Ale ponieważ musisz odczytywać dane tak, jakby z InputStream, musisz przekonwertować OutputStream na InputStream. Można użyć getBytes(), aby to zrobić:

GZIPOutputStream gout = new GZIPOutputStream(); 
//... Code to read from your original uncompressed data and write to gout. 

//Convert to InputStream. 
new ByteArrayInputStream(gout.getBytes()); 

Jednak metoda ta ma ograniczenia, które trzeba odczytać wszystkie dane w pierwszym - a to oznacza, że ​​trzeba mieć wystarczająco dużo pamięci, aby uznać, że bufor.

alternatywne podejścia wykorzystujące rury są wymienione w tym wątku - How to convert OutputStream to InputStream?

+1

@Fabien - przeczytaj poniższy komentarz - jeśli masz 1 TB danych do odczytu ze strumienia wejściowego - nie używaj powyższej metody !! O ile nie masz 1TB pamięci do oszczędzania. Użyj metod rur. – kjp

+0

Nie istnieje "dobry" standardowy sposób, aby OutputStream był InputStream. Są tylko 2 sposoby, aby to zrobić. Albo cache cały OutputStream gdzieś lub użyć wątków. Oba mają swoje wady. Użyj strumienia InputStream, jeśli możesz. Zobacz mój post poniżej. –

+3

@kjp - GZIPOutputStream nie ma domyślnego konstruktora. –

2

nie należy szukać w GZIPOutputStream w tym przypadku?

public OutputStream getCompressedStream(InputStream input) { 
    OutputStream output = new GZIPOutputStream(new ByteArrayOutputStream()); 
    IOUtils.copy(input, output); 
    return output; 
} 
+0

GZIPOutputStream zwraca OutputStream. Ale chcę InputStream. – Fabien

+0

Nie można utworzyć wejścia z wejścia. Możesz tworzyć tylko dane wyjściowe. – adarshr

+3

Ok, Wyobraź sobie, że masz 1 tera bajtów danych jako InputStream ... – Fabien

1

Nie ma DeflatingGZIPInputStream w JRE. Aby opróżnić z „deflate” formatu kompresji, użyj java.util.zip.DeflaterInputStream i java.util.zip.DeflaterOutputStream:

public InputStream getCompressedStream(InputStream unCompressedStream) { 
    return new DeflaterInputStream(unCompressedStream); 
} 

Można czerpać klasę od java.util.zip.DeflaterInputStream że opada w formacie GZIP patrząc na źródło java.util.zip.GZIPOutputStream.

9

DeflaterInputStream nie jest tym, czego potrzebujesz, ponieważ nie ma w nim nagłówka/przyczepy gzip i używa nieco innej kompresji.

Jeśli zmienisz z OutputStream (push) na InputStream (pull), musisz zrobić coś innego.

Co GZIPOutputStream jest:

  • napisać statycznej nagłówku gzip
  • napisać przebicia strumień korzystając DeflaterOutputStream. Podczas zapisywania strumienia suma kontrolna CRC32 jest budowana na podstawie nieskompresowanych danych, a liczba bajtów to
  • napisanie zwiastuna zawierającego sumę kontrolną CRC32 i liczbę bajtów.

Jeśli chcesz zrobić to samo z InputStreams, potrzebny jest strumień, który zawiera:

  • nagłówku
  • opróżniony z powietrza zawartość
  • przyczepa

Najlepszym sposobem na to jest zapewnienie 3 różnych strumieni i połączenie ich w jedno. Na szczęście jest SequenceInputStream, który łączy twoje strumienie.

Oto moja realizacja plus prosty test jednostka:

import java.io.ByteArrayInputStream; 
import java.io.FileInputStream; 
import java.io.FilterInputStream; 
import java.io.IOException; 
import java.io.InputStream; 
import java.io.SequenceInputStream; 
import java.util.Enumeration; 
import java.util.zip.CRC32; 
import java.util.zip.Deflater; 
import java.util.zip.DeflaterInputStream; 
import java.util.zip.DeflaterOutputStream; 

/** 
* @author mwyraz 
* Wraps an input stream and compresses it's contents. Similiar to DeflateInputStream but adds GZIP-header and trailer 
* See GzipOutputStream for details. 
* LICENSE: Free to use. Contains some lines from GzipOutputStream, so oracle's license might apply as well! 
*/ 
public class GzipCompressingInputStream extends SequenceInputStream 
{ 
    public GzipCompressingInputStream(InputStream in) throws IOException 
    { 
     this(in,512); 
    } 
    public GzipCompressingInputStream(InputStream in, int bufferSize) throws IOException 
    { 
     super(new StatefullGzipStreamEnumerator(in,bufferSize)); 
    } 

    static enum StreamState 
    { 
     HEADER, 
     CONTENT, 
     TRAILER 
    } 

    protected static class StatefullGzipStreamEnumerator implements Enumeration<InputStream> 
    { 

     protected final InputStream in; 
     protected final int bufferSize; 
     protected StreamState state; 

     public StatefullGzipStreamEnumerator(InputStream in, int bufferSize) 
     { 
      this.in=in; 
      this.bufferSize=bufferSize; 
      state=StreamState.HEADER; 
     } 

     public boolean hasMoreElements() 
     { 
      return state!=null; 
     } 
     public InputStream nextElement() 
     { 
      switch (state) 
      { 
       case HEADER: 
        state=StreamState.CONTENT; 
        return createHeaderStream(); 
       case CONTENT: 
        state=StreamState.TRAILER; 
        return createContentStream(); 
       case TRAILER: 
        state=null; 
        return createTrailerStream(); 
      } 
      return null; 
     } 

     static final int GZIP_MAGIC = 0x8b1f; 
     static final byte[] GZIP_HEADER=new byte[] { 
       (byte) GZIP_MAGIC,  // Magic number (short) 
       (byte)(GZIP_MAGIC >> 8), // Magic number (short) 
       Deflater.DEFLATED,  // Compression method (CM) 
       0,      // Flags (FLG) 
       0,      // Modification time MTIME (int) 
       0,      // Modification time MTIME (int) 
       0,      // Modification time MTIME (int) 
       0,      // Modification time MTIME (int) 
       0,      // Extra flags (XFLG) 
       0       // Operating system (OS) 
     }; 
     protected InputStream createHeaderStream() 
     { 
      return new ByteArrayInputStream(GZIP_HEADER); 
     } 
     protected InternalGzipCompressingInputStream contentStream; 
     protected InputStream createContentStream() 
     { 
      contentStream=new InternalGzipCompressingInputStream(new CRC32InputStream(in), bufferSize); 
      return contentStream; 
     } 
     protected InputStream createTrailerStream() 
     { 
      return new ByteArrayInputStream(contentStream.createTrailer()); 
     } 
    } 

    /** 
    * Internal stream without header/trailer 
    */ 
    protected static class CRC32InputStream extends FilterInputStream 
    { 
     protected CRC32 crc = new CRC32(); 
     protected long byteCount; 
     public CRC32InputStream(InputStream in) 
     { 
      super(in); 
     } 

     @Override 
     public int read() throws IOException 
     { 
      int val=super.read(); 
      if (val>=0) 
      { 
       crc.update(val); 
       byteCount++; 
      } 
      return val; 
     } 
     @Override 
     public int read(byte[] b, int off, int len) throws IOException 
     { 
      len=super.read(b, off, len); 
      if (len>=0) 
      { 
       crc.update(b,off,len); 
       byteCount+=len; 
      } 
      return len; 
     } 
     public long getCrcValue() 
     { 
      return crc.getValue(); 
     } 
     public long getByteCount() 
     { 
      return byteCount; 
     } 
    } 

    /** 
    * Internal stream without header/trailer 
    */ 
    protected static class InternalGzipCompressingInputStream extends DeflaterInputStream 
    { 
     protected final CRC32InputStream crcIn; 
     public InternalGzipCompressingInputStream(CRC32InputStream in, int bufferSize) 
     { 
      super(in, new Deflater(Deflater.DEFAULT_COMPRESSION, true),bufferSize); 
      crcIn=in; 
     } 
     public void close() throws IOException 
     { 
      if (in != null) 
      { 
       try 
       { 
        def.end(); 
        in.close(); 
       } 
       finally 
       { 
        in = null; 
       } 
      } 
     } 

     protected final static int TRAILER_SIZE = 8; 

     public byte[] createTrailer() 
     { 
      byte[] trailer= new byte[TRAILER_SIZE]; 
      writeTrailer(trailer, 0); 
      return trailer; 
     } 

     /* 
     * Writes GZIP member trailer to a byte array, starting at a given 
     * offset. 
     */ 
     private void writeTrailer(byte[] buf, int offset) 
     { 
      writeInt((int)crcIn.getCrcValue(), buf, offset); // CRC-32 of uncompr. data 
      writeInt((int)crcIn.getByteCount(), buf, offset + 4); // Number of uncompr. bytes 
     } 

     /* 
     * Writes integer in Intel byte order to a byte array, starting at a 
     * given offset. 
     */ 
     private void writeInt(int i, byte[] buf, int offset) 
     { 
      writeShort(i & 0xffff, buf, offset); 
      writeShort((i >> 16) & 0xffff, buf, offset + 2); 
     } 

     /* 
     * Writes short integer in Intel byte order to a byte array, starting 
     * at a given offset 
     */ 
     private void writeShort(int s, byte[] buf, int offset) 
     { 
      buf[offset] = (byte)(s & 0xff); 
      buf[offset + 1] = (byte)((s >> 8) & 0xff); 
     } 
    } 

} 

import static org.junit.Assert.*; 

import java.io.ByteArrayInputStream; 
import java.io.IOException; 
import java.io.InputStream; 
import java.util.Arrays; 
import java.util.zip.CRC32; 
import java.util.zip.GZIPInputStream; 

import org.junit.Test; 

public class TestGzipCompressingInputStream 
{ 

    @Test 
    public void test() throws Exception 
    { 
     testCompressor("test1 test2 test3"); 
     testCompressor("1MB binary data",createTestPattern(1024*1024)); 
     for (int i=0;i<4096;i++) 
     { 
      testCompressor(i+" bytes of binary data",createTestPattern(i)); 
     } 
    } 

    protected byte[] createTestPattern(int size) 
    { 
     byte[] data=new byte[size]; 
     byte pattern=0; 
     for (int i=0;i<size;i++) 
     { 
      data[i]=pattern++; 
     } 
     return data; 
    } 

    protected void testCompressor(String data) throws IOException 
    { 
     testCompressor("String: "+data,data.getBytes()); 
    } 
    protected void testCompressor(String dataInfo, byte[] data) throws IOException 
    { 
     InputStream uncompressedIn=new ByteArrayInputStream(data); 
     InputStream compressedIn=new GzipCompressingInputStream(uncompressedIn); 
     InputStream uncompressedOut=new GZIPInputStream(compressedIn); 

     byte[] result=StreamHelper.readBinaryStream(uncompressedOut); 

     assertTrue("Test failed for: "+dataInfo,Arrays.equals(data,result)); 

    } 

} 
+0

To zadziałało! Dzięki! – chaimp

1

przykład roboczych z kompresji strumienia wejściowego można znaleźć w popularnym open source ESB Mule: GZIPCompressorInputStream.

Wykorzystuje on DeflaterInputStream dostarczony przez środowisko JRE do kompresji, przedrostek gzip i dołącza do niego zwiastun gzip (zwany również stopką).

Niestety, jest pod CPA License, który nie wydaje się być bardzo często. Ponadto wydaje się, że nie ma testu jednostki.

+1

Kompresowanie Apache Commons ma teraz [GzipCompressorInputStream] (https://commons.apache.org/proper/commons-compress/javadocs/api-1.12/index.html) – user1585916

+1

@ user1585916: Implementacja commons Apache ma tę samą nazwę, jednak ** nie kompresuje ** - z JavaDoc: 'Strumień wejściowy, który dekompresuje pliki .gz. – Robert

4

Jeśli nie chcesz, aby załadować zawartość do dużej tablicy bajtów i potrzebują prawdziwego rozwiązania Streaming:

package x.y.z; 

import org.apache.commons.io.IOUtils; 

import java.io.*; 
import java.util.Arrays; 
import java.util.List; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.zip.GZIPInputStream; 
import java.util.zip.GZIPOutputStream; 
import java.util.zip.ZipOutputStream; 

/** 
* Stream Compression Utility 
* 
* @author Thamme Gowda N 
*/ 
public enum CompressionUtil { 
    INSTANCE; 

    public static final int NUM_THREADS = 5; 
    private final ExecutorService pool; 

    CompressionUtil(){ 
     this.pool = Executors.newFixedThreadPool(NUM_THREADS); 
    } 

    public static CompressionUtil getInstance(){ 
     return INSTANCE; 
    } 

    /** 
    * Supported compression type names 
    */ 
    public static enum CompressionType { 
     GZIP, 
     ZIP 
    } 

    /** 
    * Wraps the given stream in a Compressor stream based on given type 
    * @param sourceStream : Stream to be wrapped 
    * @param type   : Compression type 
    * @return source stream wrapped in a compressor stream 
    * @throws IOException when some thing bad happens 
    */ 
    public static OutputStream getCompressionWrapper(OutputStream sourceStream, 
            CompressionType type) throws IOException { 

     switch (type) { 
      case GZIP: 
       return new GZIPOutputStream(sourceStream); 
      case ZIP: 
       return new ZipOutputStream(sourceStream); 
      default: 
       throw new IllegalArgumentException("Possible values :" 
         + Arrays.toString(CompressionType.values())); 
     } 
    } 

    /** 
    * Gets Compressed Stream for given input Stream 
    * @param sourceStream : Input Stream to be compressed to 
    * @param type: Compression types such as GZIP 
    * @return Compressed Stream 
    * @throws IOException when some thing bad happens 
    */ 
    public static InputStream getCompressedStream(final InputStream sourceStream, 
            CompressionType type) throws IOException { 

     if(sourceStream == null) { 
      throw new IllegalArgumentException("Source Stream cannot be NULL"); 
     } 

     /** 
     * sourceStream --> zipperOutStream(->intermediateStream -)--> resultStream 
     */ 
     final PipedInputStream resultStream = new PipedInputStream(); 
     final PipedOutputStream intermediateStream = new PipedOutputStream(resultStream); 
     final OutputStream zipperOutStream = getCompressionWrapper(intermediateStream, type); 

     Runnable copyTask = new Runnable() { 

      @Override 
      public void run() { 
       try { 
        int c; 
        while((c = sourceStream.read()) >= 0) { 
         zipperOutStream.write(c); 
        } 
        zipperOutStream.flush(); 
       } catch (IOException e) { 
        IOUtils.closeQuietly(resultStream); // close it on error case only 
        throw new RuntimeException(e); 
       } finally { 
        // close source stream and intermediate streams 
        IOUtils.closeQuietly(sourceStream); 
        IOUtils.closeQuietly(zipperOutStream); 
        IOUtils.closeQuietly(intermediateStream); 
       } 
      } 
     }; 
     getInstance().pool.submit(copyTask); 
     return resultStream; 
    } 

    public static void main(String[] args) throws IOException { 
     String input = "abcdefghij"; 
     InputStream sourceStream = new ByteArrayInputStream(input.getBytes()); 
     InputStream compressedStream = 
       getCompressedStream(sourceStream, CompressionType.GZIP); 

     GZIPInputStream decompressedStream = new GZIPInputStream(compressedStream); 
     List<String> lines = IOUtils.readLines(decompressedStream); 
     String output = lines.get(0); 
     System.out.println("test passed ? " + input.equals(output)); 

    } 
} 
2

Wydaje się spóźniłem na okres 3 lat, ale być może będzie przydatna dla kogoś. Moje rozwiązanie jest podobne do rozwiązania @Michael Wyraz za, jedyną różnicą jest to, że moje rozwiązanie jest oparte na FilterInputStream

import java.io.ByteArrayInputStream; 
import java.io.FilterInputStream; 
import java.io.IOException; 
import java.io.InputStream; 
import java.util.zip.CRC32; 
import java.util.zip.Deflater; 

public class GZipInputStreamDeflater extends FilterInputStream { 

    private static enum Stage { 
     HEADER, 
     DATA, 
     FINALIZATION, 
     TRAILER, 
     FINISH 
    } 

    private GZipInputStreamDeflater.Stage stage = Stage.HEADER; 

    private final Deflater deflater = new Deflater(Deflater.DEFLATED, true); 
    private final CRC32 crc = new CRC32(); 

    /* GZIP header magic number */ 
    private final static int GZIP_MAGIC = 0x8b1f; 

    private ByteArrayInputStream trailer = null; 
    private ByteArrayInputStream header = new ByteArrayInputStream(new byte[] { 
     (byte) GZIP_MAGIC, // Magic number (short) 
     (byte) (GZIP_MAGIC >> 8), // Magic number (short) 
     Deflater.DEFLATED, // Compression method (CM) 
     0, // Flags (FLG) 
     0, // Modification time MTIME (int) 
     0, // Modification time MTIME (int) 
     0, // Modification time MTIME (int) 
     0, // Modification time MTIME (int) 
     0, // Extra flags (XFLG) 
     0, // Operating system (OS) 
    }); 

    public GZipInputStreamDeflater(InputStream in) { 
     super(in); 
     crc.reset(); 
    } 

    @Override 
    public int read(byte[] b, int off, int len) throws IOException { 
     int read = -1; 

     switch(stage) { 
      case FINISH: 
       return -1; 
      case HEADER: 
       read = header.read(b, off, len); 
       if(header.available() == 0) { 
        stage = Stage.DATA; 
       } 
       return read; 
      case DATA: 
       byte[] b2 = new byte[len]; 
       read = super.read(b2, 0, len); 
       if(read <= 0) { 
        stage = Stage.FINALIZATION; 
        deflater.finish(); 
        return 0; 
       } 
       else { 
        deflater.setInput(b2, 0, read); 
        crc.update(b2, 0, read); 
        read = 0; 
        while(!deflater.needsInput() && len - read > 0) { 
         read += deflater.deflate(b, off + read, len - read, Deflater.NO_FLUSH); 
        } 
        return read; 
       } 
      case FINALIZATION: 
       if(deflater.finished()) { 
        stage = Stage.TRAILER; 

        int crcVaue = (int) crc.getValue(); 
        int totalIn = deflater.getTotalIn(); 

        trailer = new ByteArrayInputStream(new byte[] { 
         (byte) (crcVaue >> 0), 
         (byte) (crcVaue >> 8), 
         (byte) (crcVaue >> 16), 
         (byte) (crcVaue >> 24), 

         (byte) (totalIn >> 0), 
         (byte) (totalIn >> 8), 
         (byte) (totalIn >> 16), 
         (byte) (totalIn >> 24), 
        }); 

        return 0; 
       } 
       else { 
        read = deflater.deflate(b, off, len, Deflater.FULL_FLUSH); 
        return read; 
       } 
      case TRAILER: 
       read = trailer.read(b, off, len); 
       if(trailer.available() == 0) { 
        stage = Stage.FINISH; 
       } 
       return read; 
     } 
     return -1; 
    } 

    @Override 
    public void close() throws IOException { 
     super.close(); 
     deflater.end(); 
     if(trailer != null) { 
      trailer.close(); 
     } 
     header.close(); 
    } 
} 

Wykorzystanie:

AmazonS3Client s3client = new AmazonS3Client(...); 
try (InputStream in = new GZipInputStreamDeflater(new URL("http://....../very-big-file.csv").openStream());) { 
    PutObjectRequest putRequest = new PutObjectRequest("BUCKET-NAME", "/object/key", in, new ObjectMetadata()); 
    s3client.putObject(putRequest); 
} 
+0

Nicolai, czy rozważałeś opublikowanie tego jako biblioteki? –

+0

Dziękuję, @JoshLemer. Nie myślałem o opublikowaniu tego jako biblioteki. Czy uważasz, że byłaby to użyteczna biblioteka?Zawiera tylko jedną klasę i jest bardzo mały dla biblioteki, prawda? – Nicolai

1

Można użyć EasyStream.

try(final InputStreamFromOutputStream<Void> isOs = new InputStreamFromOutputStream<Void>() { 
    @Override 
    protected void produce(final OutputStream dataSink) throws Exception { 
     InputStream in = new GZIPInputStream(unCompressedStream); 
     IOUtils.copy(in, dataSink); 
    } 
}) {   

    //You can use the compressed input stream here 

} catch (final IOException e) { 
    //Handle exceptions here 
} 
1

PipedOutputStream pozwala pisać do GZIPOutputStream i udostępniać te dane za pomocą metody InputStream. Ma stały koszt pamięci, w przeciwieństwie do innych rozwiązań, które buforują cały strumień danych do tablicy lub pliku. Jedynym problemem jest to, że nie możesz czytać i pisać z tego samego wątku, musisz użyć oddzielnego.

private InputStream gzipInputStream(InputStream in) throws IOException { 
    PipedInputStream zipped = new PipedInputStream(); 
    PipedOutputStream pipe = new PipedOutputStream(zipped); 
    new Thread(
      () -> { 
       try(OutputStream zipper = new GZIPOutputStream(pipe)){ 
        IOUtils.copy(in, zipper); 
       } catch (IOException e) { 
        e.printStackTrace(); 
       } 
      } 
    ).start(); 
    return zipped; 
} 
+0

Ładne rozwiązanie. Jedyną rzeczą, która sprawia mi ból głowy, jest to, jak wyciągnąć potencjalny "wyjątek IOEx" z wątku, aby nie został zignorowany. – Robert

+0

Myślę, że w tym momencie sięgnęłbym po [EasyStream OutputStreamToInputStream] (http://io-tools.sourceforge.net/easystream/apidocs/com/gc/iotools/stream/os/OutputStreamToInputStream.html) jako sugestie @smartwjw w jego odpowiedzi. Zasadniczo działa to pod maską, ale dokumentacja mówi, że ujawnia wszelkie wyjątki za pośrednictwem [getResult()] (http://io-tools.sourceforge.net/easystream/apidocs/com/gc/iotools/stream/os/OutputStreamToInputStream .html # getResult--) – MikeFHay