I tried to horizontally scale my service to increase the amount of command handlers. This seems to remedy the cannot dispatch new commands issue.
However, my console is now spammed with errors like
org.axonframework.eventhandling.tokenstore.UnableToClaimTokenException: Unable to extend the claim on token for processor 'processorName[5]'. It is either claimed by another process, or there is no such token.
o.a.eventhandling.pooled.WorkPackage : Error while processing batch in Work Package [4]-[removeDeletedShiftsFromPlanningProcess]. Aborting Work Package...
This one is triggered by org.axonframework.eventhandling.tokenstore.jpa.JpaTokenStore.fetchToken:
org.hibernate.exception.LockAcquisitionException: could not extract ResultSet
and
com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException: Deadlock found when trying to get lock; try restarting transaction
Eventually, all events are handled successfully.
I configured my event processors like this. I copied and adapted it from another blog as a solution to my previous performance-related question. Perhaps I did something wrong here as there is no real documentation for it.
public void configure(EventProcessingConfigurer configurer, ScheduledExecutorService threadPool) {
configurer.usingPooledStreamingEventProcessors();
configurer.registerPooledStreamingEventProcessorConfiguration(
"processor1",
createProcessorConfiguration("processor_coordinator", threadPool, 10, 200)
);
configurer.registerPooledStreamingEventProcessorConfiguration(
"processor2",
createProcessorConfiguration("processor_coordinator", threadPool, 10, 200)
);
configurer.registerTrackingEventProcessorConfiguration("processor3", configuration ->
TrackingEventProcessorConfiguration
.forSingleThreadedProcessing()
.andBatchSize(200));
configurer.registerListenerInvocationErrorHandler("processor3", configuration -> PropagatingErrorHandler.instance());
}
private EventProcessingConfigurer.PooledStreamingProcessorConfiguration createProcessorConfiguration(String coordinatorName, ScheduledExecutorService threadPool, int initialSegmentCount, int batchSize) {
return (configuration, builder) -> builder.coordinatorExecutor(
Executors.newScheduledThreadPool(10, new AxonThreadFactory(coordinatorName)))
.workerExecutor(threadPool)
.batchSize(batchSize)
.initialSegmentCount(initialSegmentCount);
}