RabbitSDK - Channel closes on message

I am trying to use RabbitSDK (https://forgebox.io/view/rabbitsdk) on ACF2018.

I have a RabbitMQ server up and running. When I start my consumer I can see the connection and channel in the RabbitMQ console, however when a message is sent to the queue the channel closes.

If I run the code below it will create the connection, start the consumer and consume all 4 message as well as keep the channel open. The only thing my CFC does is run cflog and acknowledges the message.

		rabbitClient
			.queueDeclare( 'testQueue' )
			.queueBind( queue='testQueue', exchange='amq.direct', routingKey='testQueue' );

		rabbitClient.startConsumer(
				queue			= 'testQueue', 
				autoAcknowledge	= false, 
				component		= 'models.rabbitMQ'
			);
				
		rabbitClient
			.publish( "Message 1", 'testQueue' )
			.publish( "Message 2", 'testQueue' )
			.publish( "Message 3", 'testQueue' )
			.publish( "Message 4", 'testQueue' );

However if I run just this in another request it will queue the messages however the channel is closed and no messages are consumed.

		rabbitClient
			.publish( "Message 5", 'testQueue' )
			.publish( "Message 6", 'testQueue' )
			.publish( "Message 7", 'testQueue' )
			.publish( "Message 8", 'testQueue' );

I am not sure if I am missing something. Any help, pointers or suggestions would be appreciated.

Watch the logs in the console. My assumption is that the rabbitclient is shutting down, which it does automatically when the framework retinits. Are you using ?fwreinit=1 in your test URL.

I am not. I can run those two requests right after a fresh reinit.

If I just run the first block of code the channel stays connected. I have tested by also sending a test message to the queue from the RabbitMQ console and it closes the channel as well.

Did you check the logs like I said? Also, re-reading your original message I guess I’m a little confused by what you’re doing. Firstly, there are actually like 7 channels involved in the first code example. One to declare the queue, one fore the consumer thread, and 4 for each of the messages you publish. I would expect all but the consumer thread to close after they were done.

In the second code example, I would expect 4 more channels to be created, and automatically close once they sent the message. Note a single TCP connection can have more than one channel riding on top of it.

When you say “the channel” closes, I’m not sure which one you’re referring to and what made you think it was going to stay open. And when you say “no messages are consumed” I’m not clear if you still have a consumer thread running in the application or not.

If you look in the Rabbit management UI, do you still see a consumer connected? Do you see the messages in the queue? Also, check the logs!

To confirm, the other channels are being opened and closed when messages are being sent. The consumer channel remains open until a new message is put in the queue either via CF or from the RabbitMQ Console. After reviewing the logs and testing more (You’re right, I should have checked first) below is the exception being thrown. FYI, I verified " Disable access to internal ColdFusion Java components" in the CF Admin is not checked.

[INFO ] 23:19:33.784 [pool-18-thread-5] ERROR com.rabbitmq.client.impl.ForgivingExceptionHandler - Consumer [(Component=rabbitsdk.models.Consumer)] (amq.ctag-9JJkCh0YuW23-LV50hpdJA) method handleDelivery for channel AMQChannel(amqp://WT@83.229.112.185:5672/WT,1) threw an exception for channel AMQChannel(amqp://WT@83.229.112.185:5672/WT,1)
java.lang.NullPointerException: null
        at coldfusion.thread.HttpServletRequestWrapper.<init>(HttpServletRequestWrapper.java:74) ~[chf20180014.jar:development]
        at coldfusion.filter.FusionContext.clone(FusionContext.java:1654) ~[chf20180014.jar:?]
        at jdk.internal.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) ~[?:?]
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
        at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
        at coldfusion.runtime.StructBean.invoke(StructBean.java:507) ~[cfusion.jar:development]
        at coldfusion.runtime.CfJspPage._invoke(CfJspPage.java:3723) ~[chf20180014.jar:development]
        at coldfusion.runtime.CfJspPage._invoke(CfJspPage.java:3604) ~[chf20180014.jar:development]
        at cfConsumer2ecfc374886366$funcLOADCONTEXT.runFunction(/Users/mike/files/RabbitMQTest/modules/rabbitsdk/models/Consumer.cfc:217) ~[?:?]
        at coldfusion.runtime.UDFMethod.invoke(UDFMethod.java:554) ~[chf20180014.jar:development]
        at coldfusion.runtime.UDFMethod$ArgumentCollectionFilter.invoke(UDFMethod.java:448) ~[chf20180014.jar:development]
        at coldfusion.filter.FunctionAccessFilter.invoke(FunctionAccessFilter.java:95) ~[cfusion.jar:?]
        at coldfusion.runtime.UDFMethod.runFilterChain(UDFMethod.java:399) ~[chf20180014.jar:development]
        at coldfusion.runtime.UDFMethod.runFilterChain(UDFMethod.java:372) ~[chf20180014.jar:development]
        at coldfusion.runtime.UDFMethod.invoke(UDFMethod.java:288) ~[chf20180014.jar:development]
        at coldfusion.runtime.CfJspPage._invokeUDF(CfJspPage.java:4175) ~[chf20180014.jar:development]
        at coldfusion.runtime.CfJspPage._invokeUDF(CfJspPage.java:4155) ~[chf20180014.jar:development]
        at cfConsumer2ecfc374886366$funcHANDLEDELIVERY.runFunction(/Users/mike/files/RabbitMQTest/modules/rabbitsdk/models/Consumer.cfc:103) ~[?:?]
        at coldfusion.runtime.UDFMethod.invoke(UDFMethod.java:554) ~[chf20180014.jar:development]
        at coldfusion.runtime.UDFMethod$ReturnTypeFilter.invoke(UDFMethod.java:485) ~[chf20180014.jar:development]
        at coldfusion.runtime.UDFMethod$ArgumentCollectionFilter.invoke(UDFMethod.java:448) ~[chf20180014.jar:development]
        at coldfusion.filter.FunctionAccessFilter.invoke(FunctionAccessFilter.java:95) ~[cfusion.jar:?]
        at coldfusion.runtime.UDFMethod.runFilterChain(UDFMethod.java:399) ~[chf20180014.jar:development]
        at coldfusion.runtime.UDFMethod.runFilterChain(UDFMethod.java:372) ~[chf20180014.jar:development]
        at coldfusion.runtime.UDFMethod.invoke(UDFMethod.java:288) ~[chf20180014.jar:development]
        at coldfusion.runtime.TemplateProxy.invoke(TemplateProxy.java:830) ~[chf20180014.jar:development]
        at coldfusion.runtime.TemplateProxy.invoke(TemplateProxy.java:613) ~[chf20180014.jar:development]
        at coldfusion.runtime.TemplateProxy.invoke(TemplateProxy.java:438) ~[chf20180014.jar:development]
        at coldfusion.runtime.java.CFCDynamicProxy.invoke(CFCDynamicProxy.java:157) ~[cfusion.jar:development]
        at com.sun.proxy.$Proxy46.handleDelivery(Unknown Source) ~[?:?]
        at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) ~[?:?]
        at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
        at java.lang.Thread.run(Unknown Source) ~[?:?]

BTW, I am looking forward to your “Message Queues with RabbitMQ” session at the CFSummit in Vegas!

1 Like

Yes, that is expected. Unlike actual TCP connections, channels are light weight, single threaded, and are not supposed to be shared across threads. If you know you have a bunch of messages to send, look into the batch() method on the client.

Ahh, that would be an issue for sure. Any unhandled exception while processing a message will auto-close the channel.

Hmm, hard to say. That error is happening from deep within Adobe CF’s own code. We do some funky stuff to get access to the CF page context inside the consumer threads. The last time I checked, it worked on CF 2018, but perhaps something changed.

I’ll have to poke and this and see if I can’t find out what the issue is, but I’m flying out to Houston tomorrow for our Into The Box conference. Can you follow up with me in a week if I’ve forgotten about this?

Awesome, see you there!

Absolutely, thank you for all your help!