Próbuję wysłać duży plik X? -GB jako strumień do kolejki ActiveMQ w celu przetworzenia.Jak wysłać duże pliki do ActiveMQ za pomocą wielbłąda
Znam ActiveMQ supports streams, podobnie jak camel-jms, ale nic, co próbuję ustawić w kolejce, wydaje się mieć jakiekolwiek znaczenie. Jedyną rzeczą, która się zmienia, jest wyłączenie buforowania wyników strumieniowych w zamian za wyjątek "strumień zamknięty".
Jestem otwarty na używanie procesora lub niestandardowej klasy (o ile zasoby zostaną oczyszczone), ale powinno to być możliwe od samego projektu. Jak poprawnie przetworzyć duży plik za pośrednictwem camel-activemq bez uzyskania OutOfMemoryError?
Korzystanie
- ServiceMix-7.0.0
- wielbłądziej 2.16.4
- ActiveMQ-5.14.3
Oto mój wielbłąd plan
<?xml version="1.0" encoding="UTF-8"?>
<blueprint
xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.0.0"
<!-- just calls exchange.setBody(exchange.getBody(InputStream.class)) -->
<bean id="toStreamBody" class="my.test.toInputStream"/>
<!-- define a bean of type StreamCachingStrategy which CamelContext will automaticly use -->
<bean id="streamStrategy" class="org.apache.camel.impl.DefaultStreamCachingStrategy">
<property name="spoolDirectory" value="${java.io.tmpdir}TestTemp/#uuid#/"/>
<property name="spoolThreshold" value="131072"/>
<property name="spoolUsedHeapMemoryThreshold" value="70"/>
<property name="anySpoolRules" value="true"/>
</bean>
<!-- streamCaching="true" is "not a valid attribute" -->
<camelContext streamCache="true" xmlns="http://camel.apache.org/schema/blueprint">
<route id="file_route">
<from uri="file://FileUploadBin?delete=false&moveFailed=.error"/>
<!-- just calls exchange.setBody(exchange.getBody(InputStream.class)) -->
<bean ref="toStreamBody"/>
<to uri="activemq:queue:TestQ"/>
</route>
<route id="myTestQ">
<from uri="activemq:queue:TestQ?concurrentConsumers=1&maxConcurrentConsumers=64&maxMessagesPerTask=100&asyncConsumer=true&jmsMessageType=Stream&mapJmsMessage=false"/>
<bean ref="toStreamBody"/>
<log message="FINISHED" loggingLevel="WARN"/>
</route>
</camelContext>
</blueprint>
Oto błąd, który ciągle dostaję
2017-10-17 08:46:53,859 | ERROR | - RecipientList | DefaultErrorHandler | 43 - org.apache.camel.camel-core - 2.16.4 | Failed delivery for (MessageId: ID-DESKTOP-H2O66PO-62468-1508242908251-4-4 on ExchangeId: ID-DESKTOP-H2O66PO-62468-1508242908251-4-5). Exhausted after delivery attempt: 1 caught: org.apache.camel.TypeConversionException: Error during type conversion from type: java.lang.String to the required type: byte[] with value [Body is instance of org.apache.camel.StreamCache] due java.lang.OutOfMemoryError: Java heap space
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[file_route ] [file_route ] [file://FileUploadBin?delete=false&moveFailed=.error ] [ 3764]
Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
Id ID-DESKTOP-H2O66PO-62468-1508242908251-4-5
ExchangePattern InOnly
Headers {breadcrumbId=ID-DESKTOP-H2O66PO-62468-1508242908251-4-4, fileName=Die.txt}
BodyType org.apache.camel.converter.stream.FileInputStreamCache
Body [Body is instance of org.apache.camel.StreamCache]
]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.TypeConversionException: Error during type conversion from type: java.lang.String to the required type: byte[] with value [Body is instance of org.apache.camel.StreamCache] due java.lang.OutOfMemoryError: Java heap space
at org.apache.camel.impl.converter.BaseTypeConverterRegistry.createTypeConversionException(BaseTypeConverterRegistry.java:610)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.impl.converter.BaseTypeConverterRegistry.convertTo(BaseTypeConverterRegistry.java:137)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.core.osgi.OsgiTypeConverter.convertTo(OsgiTypeConverter.java:108)[40:org.apache.camel.camel-blueprint:2.16.4]
at org.apache.camel.component.jms.JmsBinding.createJmsMessageForType(JmsBinding.java:560)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsBinding.createJmsMessage(JmsBinding.java:490)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsBinding.makeJmsMessage(JmsBinding.java:303)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsProducer$2.createMessage(JmsProducer.java:300)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:483)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:426)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$1.doInJms(JmsConfiguration.java:440)[46:org.apache.camel.camel-jms:2.16.4]
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)[154:org.apache.servicemix.bundles.spring-jms:3.2.17.RELEASE_1]
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:437)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:413)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:367)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:153)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:196)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.processor.MulticastProcessor.doProcessParallel(MulticastProcessor.java:823)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.processor.MulticastProcessor.access$200(MulticastProcessor.java:84)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:319)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:304)[43:org.apache.camel.camel-core:2.16.4]
at java.util.concurrent.FutureTask.run(FutureTask.java:266)[:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)[:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)[:1.8.0_121]
at java.lang.Thread.run(Thread.java:745)[:1.8.0_121]
Caused by: org.apache.camel.RuntimeCamelException: java.lang.OutOfMemoryError: Java heap space
at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1652)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.util.ObjectHelper.invokeMethod(ObjectHelper.java:1247)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.impl.converter.StaticMethodTypeConverter.convertTo(StaticMethodTypeConverter.java:59)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.impl.converter.BaseTypeConverterRegistry.doConvertTo(BaseTypeConverterRegistry.java:293)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.impl.converter.BaseTypeConverterRegistry.convertTo(BaseTypeConverterRegistry.java:120)[43:org.apache.camel.camel-core:2.16.4]
... 24 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)[:1.8.0_121]
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)[:1.8.0_121]
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)[:1.8.0_121]
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)[:1.8.0_121]
at java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458)[:1.8.0_121]
at sun.nio.ch.FileChannelImpl.transferToArbitraryChannel(FileChannelImpl.java:567)[:1.8.0_121]
at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:616)[:1.8.0_121]
at org.apache.camel.converter.stream.FileInputStreamCache.writeTo(FileInputStreamCache.java:108)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.converter.stream.StreamCacheConverter.convertToByteArray(StreamCacheConverter.java:102)[43:org.apache.camel.camel-core:2.16.4]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.8.0_121]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)[:1.8.0_121]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)[:1.8.0_121]
at java.lang.reflect.Method.invoke(Method.java:498)[:1.8.0_121]
at org.apache.camel.util.ObjectHelper.invokeMethod(ObjectHelper.java:1243)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.impl.converter.StaticMethodTypeConverter.convertTo(StaticMethodTypeConverter.java:59)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.impl.converter.BaseTypeConverterRegistry.doConvertTo(BaseTypeConverterRegistry.java:293)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.impl.converter.BaseTypeConverterRegistry.convertTo(BaseTypeConverterRegistry.java:120)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.core.osgi.OsgiTypeConverter.convertTo(OsgiTypeConverter.java:108)
at org.apache.camel.component.jms.JmsBinding.createJmsMessageForType(JmsBinding.java:560)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsBinding.createJmsMessage(JmsBinding.java:490)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsBinding.makeJmsMessage(JmsBinding.java:303)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsProducer$2.createMessage(JmsProducer.java:300)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:483)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:426)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$1.doInJms(JmsConfiguration.java:440)[46:org.apache.camel.camel-jms:2.16.4]
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)[154:org.apache.servicemix.bundles.spring-jms:3.2.17.RELEASE_1]
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:437)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:413)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:367)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:153)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:196)[43:org.apache.camel.camel-core:2.16.4]
2017-10-17 08:46:57,641 | WARN | ://FileUploadBin | GenericFileOnCompletion | 43 - org.apache.camel.camel-core - 2.16.4 | Rollback file strategy: org.apache[email protected]294a3d48 for file: GenericFile[Die.txt]
Związane z: [to pytanie] (https://stackoverflow.com/questions/46422970/streaming-using-camel-jms). Ale bez odpowiedzi, nie używam szerszenia/bezpiecznika, a pytanie nie jest pomocne. – Tezra
Czy próbowałeś wielbłądów? http://camel.apache.org/stream.html. Wystarczy Wymienić komponent pliku na "stream: file? FileName = fileName.in" i wysłać bezpośrednio do kolejki. – ltsallas
@Itsallas Komponent strumienia nie działa lepiej. – Tezra