2017-10-17 50 views
5

Próbuję wysłać duży plik X? -GB jako strumień do kolejki ActiveMQ w celu przetworzenia.Jak wysłać duże pliki do ActiveMQ za pomocą wielbłąda

Znam ActiveMQ supports streams, podobnie jak camel-jms, ale nic, co próbuję ustawić w kolejce, wydaje się mieć jakiekolwiek znaczenie. Jedyną rzeczą, która się zmienia, jest wyłączenie buforowania wyników strumieniowych w zamian za wyjątek "strumień zamknięty".

Jestem otwarty na używanie procesora lub niestandardowej klasy (o ile zasoby zostaną oczyszczone), ale powinno to być możliwe od samego projektu. Jak poprawnie przetworzyć duży plik za pośrednictwem camel-activemq bez uzyskania OutOfMemoryError?

Korzystanie

  • ServiceMix-7.0.0
  • wielbłądziej 2.16.4
  • ActiveMQ-5.14.3

Oto mój wielbłąd plan

<?xml version="1.0" encoding="UTF-8"?> 
<blueprint 
xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" 
xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.0.0" 

    <!-- just calls exchange.setBody(exchange.getBody(InputStream.class)) -->  
    <bean id="toStreamBody" class="my.test.toInputStream"/> 

    <!-- define a bean of type StreamCachingStrategy which CamelContext will automaticly use --> 
    <bean id="streamStrategy" class="org.apache.camel.impl.DefaultStreamCachingStrategy"> 
     <property name="spoolDirectory" value="${java.io.tmpdir}TestTemp/#uuid#/"/> 
     <property name="spoolThreshold" value="131072"/> 
     <property name="spoolUsedHeapMemoryThreshold" value="70"/> 
     <property name="anySpoolRules" value="true"/> 
    </bean> 

    <!-- streamCaching="true" is "not a valid attribute" --> 
    <camelContext streamCache="true" xmlns="http://camel.apache.org/schema/blueprint"> 

     <route id="file_route"> 
      <from uri="file://FileUploadBin?delete=false&amp;moveFailed=.error"/> 
      <!-- just calls exchange.setBody(exchange.getBody(InputStream.class)) --> 
      <bean ref="toStreamBody"/> 
      <to uri="activemq:queue:TestQ"/> 
     </route> 

     <route id="myTestQ"> 
      <from uri="activemq:queue:TestQ?concurrentConsumers=1&amp;maxConcurrentConsumers=64&amp;maxMessagesPerTask=100&amp;asyncConsumer=true&amp;jmsMessageType=Stream&amp;mapJmsMessage=false"/> 
      <bean ref="toStreamBody"/> 
      <log message="FINISHED" loggingLevel="WARN"/> 
     </route> 

    </camelContext> 
</blueprint> 

Oto błąd, który ciągle dostaję

2017-10-17 08:46:53,859 | ERROR | - RecipientList | DefaultErrorHandler    | 43 - org.apache.camel.camel-core - 2.16.4 | Failed delivery for (MessageId: ID-DESKTOP-H2O66PO-62468-1508242908251-4-4 on ExchangeId: ID-DESKTOP-H2O66PO-62468-1508242908251-4-5). Exhausted after delivery attempt: 1 caught: org.apache.camel.TypeConversionException: Error during type conversion from type: java.lang.String to the required type: byte[] with value [Body is instance of org.apache.camel.StreamCache] due java.lang.OutOfMemoryError: Java heap space 

Message History 
--------------------------------------------------------------------------------------------------------------------------------------- 
RouteId    ProcessorId   Processor                  Elapsed (ms) 
[file_route  ] [file_route  ] [file://FileUploadBin?delete=false&moveFailed=.error       ] [  3764] 

Exchange 
--------------------------------------------------------------------------------------------------------------------------------------- 
Exchange[ 
     Id     ID-DESKTOP-H2O66PO-62468-1508242908251-4-5 
     ExchangePattern  InOnly 
     Headers    {breadcrumbId=ID-DESKTOP-H2O66PO-62468-1508242908251-4-4, fileName=Die.txt} 
     BodyType   org.apache.camel.converter.stream.FileInputStreamCache 
     Body    [Body is instance of org.apache.camel.StreamCache] 
] 

Stacktrace 
--------------------------------------------------------------------------------------------------------------------------------------- 
org.apache.camel.TypeConversionException: Error during type conversion from type: java.lang.String to the required type: byte[] with value [Body is instance of org.apache.camel.StreamCache] due java.lang.OutOfMemoryError: Java heap space 
     at org.apache.camel.impl.converter.BaseTypeConverterRegistry.createTypeConversionException(BaseTypeConverterRegistry.java:610)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.impl.converter.BaseTypeConverterRegistry.convertTo(BaseTypeConverterRegistry.java:137)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.core.osgi.OsgiTypeConverter.convertTo(OsgiTypeConverter.java:108)[40:org.apache.camel.camel-blueprint:2.16.4] 
     at org.apache.camel.component.jms.JmsBinding.createJmsMessageForType(JmsBinding.java:560)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsBinding.createJmsMessage(JmsBinding.java:490)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsBinding.makeJmsMessage(JmsBinding.java:303)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsProducer$2.createMessage(JmsProducer.java:300)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:483)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:426)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$1.doInJms(JmsConfiguration.java:440)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)[154:org.apache.servicemix.bundles.spring-jms:3.2.17.RELEASE_1] 
     at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:437)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:413)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:367)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:153)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:196)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.processor.MulticastProcessor.doProcessParallel(MulticastProcessor.java:823)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.processor.MulticastProcessor.access$200(MulticastProcessor.java:84)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:319)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:304)[43:org.apache.camel.camel-core:2.16.4] 
     at java.util.concurrent.FutureTask.run(FutureTask.java:266)[:1.8.0_121] 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)[:1.8.0_121] 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)[:1.8.0_121] 
     at java.lang.Thread.run(Thread.java:745)[:1.8.0_121] 
Caused by: org.apache.camel.RuntimeCamelException: java.lang.OutOfMemoryError: Java heap space 
     at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1652)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.util.ObjectHelper.invokeMethod(ObjectHelper.java:1247)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.impl.converter.StaticMethodTypeConverter.convertTo(StaticMethodTypeConverter.java:59)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.impl.converter.BaseTypeConverterRegistry.doConvertTo(BaseTypeConverterRegistry.java:293)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.impl.converter.BaseTypeConverterRegistry.convertTo(BaseTypeConverterRegistry.java:120)[43:org.apache.camel.camel-core:2.16.4] 
     ... 24 more 
Caused by: java.lang.OutOfMemoryError: Java heap space 
     at java.util.Arrays.copyOf(Arrays.java:3236)[:1.8.0_121] 
     at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)[:1.8.0_121] 
     at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)[:1.8.0_121] 
     at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)[:1.8.0_121] 
     at java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458)[:1.8.0_121] 
     at sun.nio.ch.FileChannelImpl.transferToArbitraryChannel(FileChannelImpl.java:567)[:1.8.0_121] 
     at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:616)[:1.8.0_121] 
     at org.apache.camel.converter.stream.FileInputStreamCache.writeTo(FileInputStreamCache.java:108)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.converter.stream.StreamCacheConverter.convertToByteArray(StreamCacheConverter.java:102)[43:org.apache.camel.camel-core:2.16.4] 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.8.0_121] 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)[:1.8.0_121] 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)[:1.8.0_121] 
     at java.lang.reflect.Method.invoke(Method.java:498)[:1.8.0_121] 
     at org.apache.camel.util.ObjectHelper.invokeMethod(ObjectHelper.java:1243)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.impl.converter.StaticMethodTypeConverter.convertTo(StaticMethodTypeConverter.java:59)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.impl.converter.BaseTypeConverterRegistry.doConvertTo(BaseTypeConverterRegistry.java:293)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.impl.converter.BaseTypeConverterRegistry.convertTo(BaseTypeConverterRegistry.java:120)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.core.osgi.OsgiTypeConverter.convertTo(OsgiTypeConverter.java:108) 
     at org.apache.camel.component.jms.JmsBinding.createJmsMessageForType(JmsBinding.java:560)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsBinding.createJmsMessage(JmsBinding.java:490)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsBinding.makeJmsMessage(JmsBinding.java:303)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsProducer$2.createMessage(JmsProducer.java:300)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:483)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:426)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$1.doInJms(JmsConfiguration.java:440)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)[154:org.apache.servicemix.bundles.spring-jms:3.2.17.RELEASE_1] 
     at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:437)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:413)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:367)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:153)[46:org.apache.camel.camel-jms:2.16.4] 
     at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[43:org.apache.camel.camel-core:2.16.4] 
     at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:196)[43:org.apache.camel.camel-core:2.16.4] 
2017-10-17 08:46:57,641 | WARN | ://FileUploadBin | GenericFileOnCompletion   | 43 - org.apache.camel.camel-core - 2.16.4 | Rollback file strategy: org.apache[email protected]294a3d48 for file: GenericFile[Die.txt] 
+1

Związane z: [to pytanie] (https://stackoverflow.com/questions/46422970/streaming-using-camel-jms). Ale bez odpowiedzi, nie używam szerszenia/bezpiecznika, a pytanie nie jest pomocne. – Tezra

+0

Czy próbowałeś wielbłądów? http://camel.apache.org/stream.html. Wystarczy Wymienić komponent pliku na "stream: file? FileName = fileName.in" i wysłać bezpośrednio do kolejki. – ltsallas

+1

@Itsallas Komponent strumienia nie działa lepiej. – Tezra

Odpowiedz

3

To nie jest obsługiwane w klasycznym brokerze ActiveMQ.

Jednak nowa generacja ArtMezy ActiveMQ obsługuje duże wiadomości, a my właśnie dodaliśmy do tego obsługę w camel-jms. Napisałem wpis na blogu na ten temat: http://www.davsclaus.com/2017/10/working-with-large-messages-using.html

Dodaliśmy także obsługę typu javax.jms.StreamMessage w camel-jms. Jednak ten interfejs API nie jest idealny dla dużych wiadomości, więc ma ograniczone zastosowanie. Mimo to możesz go włączyć, na komponencie z nową opcją streamMessageTypeEnabled w Camel 2.21 i dalej, jeśli treść wiadomości jest typu strumieniowego, wtedy Camel wyśle ​​jako StreamMessage zamiast BytesMessage.