Hey, folks. I’d love feedback on this proposal for a provider based queue system for ColdBox.
The idea here is that we could standardize the queueing jobs including things like queue names and delays regardless of the backing provider.
This is an example of the interfaces and components including 3 different provider options.
Usage:
getInstance( "ProcessPodcastJob" )
.setPodcast( podcast )
.dispatch();
getInstance( "SendWelcomeEmailJob" )
.setUser( user )
.delay( createTimeSpan( 0, 1, 0, 0 ) ) // 1 hour from now
.dispatch();
ProcessPodcastJob.cfc:
component extends="DispatchableJob" accessors="true" {
property name="podcast";
function handle() {
// do processing work here
}
}
DispatchableJob.cfc:
component {
property name="dispatcher" inject="provider:Dispatcher@cbq";
function handle() {
throw(
type = "EmptyAbstractMethod",
message = "Method is abstract and must be implemented in an concrete class"
);
}
function getDispatchConnection() {}
function getQueue() {}
function dispatch() {
getDispatcher().dispatch( this );
return this;
}
}
Dispatcher.cfc:
component singleton accessors="true" {
property name="settings" inject="coldbox:moduleSettings:cbq";
property name="wirebox" inject="wirebox";
variables.connectionQueueMap = {};
function dispatch( required DispatchableJob job ) {
var queueProvider = getQueueForJob( arguments.job );
var queueName = arguments.job.getQueue();
param queueName = variables.settings.defaultQueue;
provider.setQueue( queueName );
queueProvider.push( arguments.job );
return arguments.job
}
function getQueueForJob( required DispatchableJob job ) {
var connectionName = arguments.job.getDispatchConnection();
param connectionName = "default";
if ( !variables.connectionQueueMap.keyExists( connectionName ) ) {
variables.connectionQueueMap [ connectionName ] = buildQueueProvider(
variables.settings.connections[ connectionName ]
);
}
return variables.connectionQueueMap[ connectionName ];
}
IProvider function buildQueueProvider( required struct connectionInfo ) {
var provider = variables.wirebox.getInstance(
arguments.connectionInfo.provider
);
provider.configure( arguments.connectionInfo.properties );
return provider;
}
}
config/ColdBox.cfc
moduleSettings = {
"cbq": {
"connections": {
"default": {
"provider": "ColdBoxAsyncProvider@cbq",
"properties": {}
},
"rabbit": {
"provider": "RabbitProvider@cbq",
"properties": {
"host": "...",
"port": "...",
"username": "...",
"password": "...",
"queueName": "cbq"
}
},
"db": {
"provider": "DBProvider@cbq",
"properties": {
"tableName": "cbq",
"queryOptions": {}
}
}
}
}
};
IProvider.cfc
interface displayname="IProvider" {
function configure();
function push( required DispatchableJob job );
function onLoad();
function onUnload();
}
ColdBoxAsyncProvider.cfc
component implements="cbq.models.IProvider" {
property name="async" inject="coldbox:asyncManager";
property name="wirebox" inject="wirebox";
function configure() {}
function push( required DispatchableJob job ) {
variables.async.newFuture( function() {
variables.wirebox.autowire( job );
job.handle();
} );
}
function onLoad() {
variables.executor = variables.async.newExecutor( "cbq" );
}
function onUnload() {
if ( !isNull( variables.executor ) ) {
variables.executor.shutdown();
}
}
}
RabbitProvider.cfc
component implements="cbq.models.IProvider" {
property name="client" inject="RabbitClient@rabbitsdk";
property name="wirebox" inject="wirebox";
property name="channel";
function configure( struct properties ) {
variables.queueName = arguments.properties.queueName;
variables.client.connect(
host = arguments.properties.host,
port = arguments.properties.port,
username = arguments.properties.username,
password = arguments.properties.password
)
variables.client.queueDeclare( variables.queueName );
}
function push( required DispatchableJob job ) {
variables.client.publish(
routingKey = variables.queueName,
body = job.getMemento()
);
}
function onLoad() {
variables.channel = variables.client.startConsumer(
queue = variables.queueName,
consumer = function( message, channel, log ) {
var payload = message.getBody();
var cfc = variables.wirebox.getInstance( payload.cfc );
cfc.handle( argumentCollection = payload.body );
message.acknowledge();
},
autoAcknowledge = false
);
}
function onUnload() {
if ( !isNull( variables.channel ) ) {
variables.channel.close();
}
}
}
DBProvider.cfc
component implements="cbq.models.IProvider" {
property name="async" inject="coldbox:asyncManager";
property name="wirebox" inject="wirebox";
property name="qb" inject="provider:QueryBuilder@qb";
function configure( struct properties ) {
variables.properties = arguments.properties;
}
function push( required DispatchableJob job ) {
variables.qb.newQuery()
.table( variables.properties.tableName )
.insert(
values = {
"payload": serializeJSON( job.getMemento() ),
"reservedDate": now(),
"attempts": 0,
},
options = variables.properties.queryOptions
);
}
function onLoad() {
variables.scheduler = variables.async.newScheduler( "cbq" );
variables.scheduler.task( "Work cbq" )
.call( function() {
var jobRecord = variables.qb.newQuery()
.table( variables.properties.tableName )
.where( "queue", variables.properties.queueName )
.where( "available", 1 )
.orderByAsc( "id" )
.first( options = variables.properties.queryOptions );
if ( isNull( job ) ) {
return;
}
variables.qb.newQuery()
.table( variables.properties.tableName )
.where( "id", jobRecord.id )
.update(
values = {
"reservedDate": now(),
"attempts": qb.raw( "attempts + 1" ),
},
options = variables.properties.queryOptions
);
var payload = deserializeJSON( jobRecord.payload );
var cfc = variables.wirebox.getInstance( payload.cfc );
cfc.handle( argumentCollection = payload.body );
} )
.every( 1, "seconds" );
variables.scheduler.startup();
}
function onUnload() {
if ( !isNull( variables.scheduler ) ) {
variables.scheduler.shutdown();
}
}
}
There’s obviously a lot of details missing, especially exception handling and more configuration options, but I want to know if this seems useful and on the right track. Thanks!