RFC: cbq - a Provider-based Queueing System for ColdBox

Hey, folks. I’d love feedback on this proposal for a provider based queue system for ColdBox.

The idea here is that we could standardize the queueing jobs including things like queue names and delays regardless of the backing provider.

This is an example of the interfaces and components including 3 different provider options.

Usage:

getInstance( "ProcessPodcastJob" )
	.setPodcast( podcast )
	.dispatch();

getInstance( "SendWelcomeEmailJob" )
	.setUser( user )
	.delay( createTimeSpan( 0, 1, 0, 0 ) ) // 1 hour from now 
	.dispatch();

ProcessPodcastJob.cfc:

component extends="DispatchableJob" accessors="true" {

	property name="podcast";

	function handle() {
		// do processing work here
	}

}

DispatchableJob.cfc:

component {

	property name="dispatcher" inject="provider:Dispatcher@cbq";

	function handle() {
		throw(
			type = "EmptyAbstractMethod",
			message = "Method is abstract and must be implemented in an concrete class"
		);
	}

	function getDispatchConnection() {}
	function getQueue() {}

	function dispatch() {
		getDispatcher().dispatch( this );
		return this;
	}

}

Dispatcher.cfc:

component singleton accessors="true" {

	property name="settings" inject="coldbox:moduleSettings:cbq";
	property name="wirebox" inject="wirebox";

	variables.connectionQueueMap = {};

	function dispatch( required DispatchableJob job ) {
	    var queueProvider = getQueueForJob( arguments.job );
	    var queueName = arguments.job.getQueue();
	    param queueName = variables.settings.defaultQueue;
	    provider.setQueue( queueName );
	    queueProvider.push( arguments.job );
	    return arguments.job
	}

	function getQueueForJob( required DispatchableJob job ) {
		var connectionName = arguments.job.getDispatchConnection();
		param connectionName = "default";
		if ( !variables.connectionQueueMap.keyExists( connectionName ) ) {
			variables.connectionQueueMap [ connectionName ] = buildQueueProvider(
				variables.settings.connections[ connectionName ]
			);
		}
		return variables.connectionQueueMap[ connectionName ];
	}

	IProvider function buildQueueProvider( required struct connectionInfo ) {
		var provider = variables.wirebox.getInstance(
			arguments.connectionInfo.provider
		);
		provider.configure( arguments.connectionInfo.properties );
		return provider;
	}

}

config/ColdBox.cfc

moduleSettings = {
	"cbq": {
		"connections": {
			"default": {
				"provider": "ColdBoxAsyncProvider@cbq",
				"properties": {}
			},
			"rabbit": {
				"provider": "RabbitProvider@cbq",
				"properties": {
					"host": "...",
					"port": "...",
					"username": "...",
					"password": "...",
					"queueName": "cbq"
				}
			},
			"db": {
				"provider": "DBProvider@cbq",
				"properties": {
					"tableName": "cbq",
					"queryOptions": {}
				}
			}
		}
	}
};

IProvider.cfc

interface displayname="IProvider" {

	function configure();

	function push( required DispatchableJob job );
	
	function onLoad();
	function onUnload();

}

ColdBoxAsyncProvider.cfc

component implements="cbq.models.IProvider" {

	property name="async" inject="coldbox:asyncManager";
	property name="wirebox" inject="wirebox";

	function configure() {}

	function push( required DispatchableJob job ) {
		variables.async.newFuture( function() {
			variables.wirebox.autowire( job );
			job.handle();
		} );
	}

	function onLoad() {
		variables.executor = variables.async.newExecutor( "cbq" );
	}
	
	function onUnload() {
		if ( !isNull( variables.executor ) ) {
			variables.executor.shutdown();
		}
	}

}

RabbitProvider.cfc

component implements="cbq.models.IProvider" {

	property name="client" inject="RabbitClient@rabbitsdk";
	property name="wirebox" inject="wirebox";
	property name="channel";

	function configure( struct properties ) {
		variables.queueName = arguments.properties.queueName;
		
		variables.client.connect(
			host = arguments.properties.host,
			port = arguments.properties.port,
			username = arguments.properties.username,
			password = arguments.properties.password
		)

		variables.client.queueDeclare( variables.queueName );
	}

	function push( required DispatchableJob job ) {
		variables.client.publish(
			routingKey = variables.queueName,
			body = job.getMemento()
		);
	}
	
	function onLoad() {
		variables.channel = variables.client.startConsumer(
			queue = variables.queueName,
			consumer = function( message,  channel, log ) {
				var payload = message.getBody();
				var cfc = variables.wirebox.getInstance( payload.cfc );
				cfc.handle( argumentCollection = payload.body );
				message.acknowledge();
			},
			autoAcknowledge = false
		);
	}
	
	function onUnload() {
		if ( !isNull( variables.channel ) ) {
			variables.channel.close();
		}
	}

}

DBProvider.cfc

component implements="cbq.models.IProvider" {

	property name="async" inject="coldbox:asyncManager";
	property name="wirebox" inject="wirebox";
	property name="qb" inject="provider:QueryBuilder@qb";

	function configure( struct properties ) {
		variables.properties = arguments.properties;
	}

	function push( required DispatchableJob job ) {
		variables.qb.newQuery()
			.table( variables.properties.tableName )
			.insert(
				values = {
					"payload": serializeJSON( job.getMemento() ),
					"reservedDate": now(),
					"attempts": 0,
				},
				options = variables.properties.queryOptions
			);
	}
	
	function onLoad() {
		variables.scheduler = variables.async.newScheduler( "cbq" );

		variables.scheduler.task( "Work cbq" )
			.call( function() {
				var jobRecord = variables.qb.newQuery()
					.table( variables.properties.tableName )
					.where( "queue", variables.properties.queueName )
					.where( "available", 1 )
					.orderByAsc( "id" )
					.first( options = variables.properties.queryOptions );

				if ( isNull( job ) ) {
					return;
				}

				variables.qb.newQuery()
					.table( variables.properties.tableName )
					.where( "id", jobRecord.id )
					.update(
						values = {
							"reservedDate": now(),
							"attempts": qb.raw( "attempts + 1" ),
						},
						options = variables.properties.queryOptions
					);

				var payload = deserializeJSON( jobRecord.payload );
				var cfc = variables.wirebox.getInstance( payload.cfc );
				cfc.handle( argumentCollection = payload.body );
			} )
			.every( 1, "seconds" );

		variables.scheduler.startup();
	}
	
	function onUnload() {
		if ( !isNull( variables.scheduler ) ) {
			variables.scheduler.shutdown();
		}
	}

}

There’s obviously a lot of details missing, especially exception handling and more configuration options, but I want to know if this seems useful and on the right track. Thanks!

@elpete this looks great! What we currently have is how to queue jobs but I am curious on the worker side which pulls the queued job and performs the work. Would this be a CommandBox task runner?

I love this idea and I know other language/frameworks have similar abstractions. Luis had a project years ago called ColdBox Messaging which aimed to do similar things, but he mostly focused on JMS if I recall (which is actually a Java-based MQ abstraction).

The trick here, as with any abstraction, is how to reconcile the disparate approaches each MQ takes, and more specifically-- each MQ’s SDK (where applicable) into a single API that is both simple but also accounts for all the edge cases. The basic messaging semantics are pretty simple, but Rabbit especially has several layers of their own abstraction such as

  • exchanges and explicit exchange management (create,update,delete)
  • explicit queue management (create,update,delete)
  • additional message properties (string message, map message, binary message, message headers, etc)
  • additional queue arguments for creation (durable, type, auto-delete, etc)
  • additional exchange arguments for creation (durable, type, auto-delete, internal, etc)

And then on the other side of the push, how and where do we configure the consumers? Is that in the remit of this library, or is it assumed that another application elsewhere is listening? Creation of consumer threads varies wildly from, say, Rabbit MQ to Active MQ and comes with its own bevy of configuration nuances. Then there’s message patterns such as topics, fanout, etc-- not all of which are implemented by every MQ.

So basically, I love the idea, and I have no clue how to do it in a manner that can provide full functionality from any back-end queue to the point of being able to swap the back-end out and have the same functionality :slight_smile:

Another, somewhat unrelated concern I have is the approach above is pretty OO heavy to the point of seeming quite a bit more complicated than just using some of these native queue libraries directly. For example, I can publish a message with RabbitMQ and this is the only code I need to write:

getInstance( 'RabbitClient@rabbitsdk' )
	.publish( 'My message', 'myQueue' );

Unless I’m misinterpreting the examples above, I wouldn’t necessarily want to need to create a CFC for every message I want to send. I love the very lightweight semantics. Rabbit even allows me to define a consumer using only a closure and no CFC as well.

And finally, some thoughts on registering providers. One thing I’ve grown to like about Java’s Service Loader architecture is the ability to drop in a jar that “advertises” a particular service in its manifest, which is automatically found by convention in the class loader and then can be used without any particular configuration needed by the using library. In our case, what if I’m using the Rabbit MQ module already in my application, which means I’ve already set up the configuration of the module in the standard moduleSettings location? If we added explicit support into the Rabbit MQ module (and any other module we wanted) to provide a CFC that implemented the cbq interface, then cbq could find all elligbible providers at runtime and simply use them automatically. Or perhaps it’s the other way around, and RabbitMQ (and friends) look for cbq and register/deregister themselves.

Either way, I’d like to see the individual 3rd party implementations be self contained in their configuration and cbq simply detects and uses them without the need for additional explicit moduleSettings boilerplate that duplicates that of the original module. cbq can bake in a few of its own out-of-the box queues and then dropping in a RabbitMQ or ActiveMQ module (the latter doesn’t actually exist yet FWIW) would make those available as providers as well.

And for some more fun reading, I took a look at what some other frameworks are doing for this.

Here are the docs for Larvel’s queue system-- I can tell you seem to have copied a lot from here :slight_smile:

Here’s the Ruby on Rails docs for Active Jobs:

Here’s the JMS (Java Messaging Service) spec used in Java EE apps

There’s a bunch of node libraries for this, but here’s one:

The code above tries to handle the worker situation using the onLoad method where needed.

(@bdw429s - you asked this question as well.)

The ColdBoxAsyncProvider doesn’t actually need any set up.
The RabbitProvider starts up a consumer in the onLoad.
The DBProvider starts up a scheduled task to poll the database in onLoad.

The idea was a Provider is responsible for both push and pop operations.

Unless I’m misinterpreting the examples above, I wouldn’t necessarily want to need to create a CFC for every message I want to send. I love the very lightweight semantics. Rabbit even allows me to define a consumer using only a closure and no CFC as well.

It was my intent to create a CFC for each. I do want it to not necessarily be a Job but could be a Notification or any thing that can be Dispatchable.

So basically, I love the idea, and I have no clue how to do it in a manner that can provide full functionality from any back-end queue to the point of being able to swap the back-end out and have the same functionality :slight_smile:

I’d like to think of this as the 80-90% use case for queues. It would cover basic pub/sub, delay, queue names, attempts/retries, etc.

Either way, I’d like to see the individual 3rd party implementations be self contained in their configuration and cbq simply detecs and uses them without the need for additional explicit moduleSettings boilerplate that duplicates that of the original module. cbq can bake in a few of its own out-of-the box queues and then dropping in a RabbitMQ or ActiveMQ module (the latter doesn’t actually exist yet FWIW) would make those available as providers as well.

I think one way to handle this is to use custom WireBox mappings for providers whenever possible. So perhaps the RabbitProvider accepts a property of mapping that points to a RabbitClient@rabbitsdk instance?

And for some more fun reading, I took a look at what some other frameworks are doing for this.

Anything specific you liked from these other frameworks? (You’re right that a lot was taken from Laravel. :stuck_out_tongue_winking_eye:)

Yep. One thing to keep in mind however is one of our clients where I’m using Rabbit is currently listening to 8 different queues. I’m not sure if that should mean

  • 8 different providers need instantiated
  • The provider should be more dynamic (to loop and create multiple consumers)
  • The user should just interact with the Rabbit module directly to create the listeners.

This makes me wonder if the consumer should be an implicit behavior of the provider loading, or if cbq should instead expose an API method for starting up the consumer locally, if desired. Which could be used to start as many consumers listening to as many queues as you wanted.

I haven’t really looked into multiple named queues yet. The idea is that queue names are used for priority or something like that (high, default) and the CFC instance is used to signify what job is being executed.

No, the idea is just that an app of any significant complexity isn’t going to have a single queue. It’s going to have a bunch of them, each with their own messages, their own semantics (worker queue, fan out, topic, etc), and their own consumers who operate independently (a backlog of sync-to-S3 messages doesn’t delay the order-created messages from processing).

  • order-created
  • incoming-lead
  • message-received
  • generate-PDF
  • sync-to-S3
  • system-broadcast-topic
  • etc

So while the API of cbq may allow me to say, “Here’s a message that needs sent to the generate-PDF worker queue”, or "Here’s a message to broadcast to all consumers of the system-broadcast-topic". I may want to have a consumer thread for each queue/topic also running in the same application. In my mind, I’d only have a single Rabbit MQ provider registered with cbq which provides the bridge for these messages to be pushed to and popped from Rabbit, but there may be a number of actual message endpoints we’re sending to.


On a completely unrelated note-- it’s also worth noting that CF engines Lucee and Adobe already have an abstraction layer in place for queues (and other things) in the Event Gateway feature. Even though each engine did theirs a little differently, they both provide:

There’s a lot of reasons not to use them, but it’s worth pointing out they exist, and have been around for years.

On phone. Will answer more when I get to my desktop.

Thanks for taking the initiative with this Eric. It is very well appreciated.

I gave Grant a run down of ColdBox messaging a few weeks ago. Which had the concepts of abstracted messaging queues, commands and processing. Things have changed tremendously since then. It basically supported only Apache MQ and publish/subscriber paradigms. With that said. The one cfc per job sounds more like just a job/command queue rather than an abstraction to messaging queues. So this more feels like a distributed job interface than a messaging interface.

Especially since the AMQP protocol is extensive and it will be like plugging a square in a round hole. Like Brad mentioned. Enterprise messaging would be a key feature set to support.

So I think we have to identify what will we support. Jobs or messaging.

The Redis extension we have for lucee also has messaging now and it works very simply as well.

I think this is a good start. Will comment more later.

This was an excellent and quick overview of how queues work in Laravel. It helped me understand Brad’s point about being able to configure workers per connection and queue.

1 Like

I’m interested in using this as a job queue, specifically being able to create jobs that could be picked up by containers to be processed at scale.

Two issues come to mind. We use sequences => jobs => tasks. In general, tasks don’t have to be processed in any specific order within a job, but jobs within sequences do. Tasks are a way of breaking up jobs into smaller chunks that can be parallelized and/or to improve isolation in terms of error handling. After all tasks are finalized, the job should be marked as complete. After all jobs are complete, a sequence should be marked as complete. It would be great to keep this in mind and provide mechanisms to make concepts like these work efficiently.

To Luis’s point about jobs vs messaging, it would be ideal to find a way to encapsulate job instructions into a generic message so both uses could be supported. I would want to use this same system to log error messages or other data to make that more async.

Two approaches come to mind:

  1. A cbq job should be able to dispatch another cbq job inside of it. This can allow chaining as you described.

  2. I really like the concept of batching described here in Laravel’s queues: Queues - Laravel - The PHP Framework For Web Artisans