DNA //evolutions

Reactive Events and Stream-Based Processing

Real-time observability and integration with reactive architectures

JOpt.TourOptimizer is built on a reactive, stream-based event architecture. Every signal the optimizer produces during a run (progress updates, warnings, status changes, errors, node filtering events, intermediate results) is emitted through RxJava ReplaySubjects. These subjects can be subscribed to by any number of consumers, enabling real-time observability, structured logging, UI updates, and seamless integration with reactive service frameworks like Spring WebFlux.

This is not an afterthought or a logging add-on. The event system is part of the core architecture. The same reactive pipeline that drives internal optimization phases is exposed to the integrator. You subscribe to precisely the events you need, process them without blocking the optimizer, and compose them into your application's event-driven infrastructure.


Overview


Event subjects

The optimizer exposes its events through getOptimizationEvents(), which returns an OptimizationEvents instance. This object provides access to multiple typed ReplaySubjects and a CompletableFuture for the final result:

SubjectMethodTypeBufferWhat it emits
ProgressprogressSubject()ReplaySubject<IOptimizationProgress>10Current cost, stage, percentage, winner solution
StatusstatusSubject()ReplaySubject<IStatusEvent>20Lifecycle changes (started, stage transitions, completed)
WarningwarningSubject()ReplaySubject<IWarningEvent>20Non-fatal issues detected during the run
ErrorerrorSubject()ReplaySubject<IErrorEvent>20Errors encountered during optimization
Node filteringnodeFilteringSubject()ReplaySubject<NodeFilteringEvent>100Nodes excluded by AutoFilter with reasons
Requested resultrequestedResultSubject()ReplaySubject<IOptimizationResult>10Intermediate results requested during the run
Before filtering resultbeforeNodeFilteringResultSubject()ReplaySubject<IOptimizationResult>10Result snapshot before AutoFilter applies
After filtering resultafterNodeFilteringResultSubject()ReplaySubject<IOptimizationResult>10Result snapshot after AutoFilter applies
Final resultresultFuture()CompletableFuture<IOptimizationResult>n/aThe final optimization result (completes once)

All ReplaySubjects are completed automatically when the result future completes (either normally or exceptionally). This guarantees clean resource cleanup regardless of how the optimization ends.

Each subject is independent. You can subscribe to any combination. Multiple subscribers can attach to the same subject without interference.

Subscribing to events

Subscribing is a single line per event type. The subscription receives a lambda that processes each emitted event:

opti.getOptimizationEvents()
    .progressSubject()
    .subscribe(p -> System.out.println(p.getProgressString()));

opti.getOptimizationEvents()
    .warningSubject()
    .subscribe(w -> System.out.println(w.toString()));

opti.getOptimizationEvents()
    .statusSubject()
    .subscribe(s -> System.out.println(s.getDescription() + " " + s.getCode()));

opti.getOptimizationEvents()
    .errorSubject()
    .subscribe(e -> System.out.println(e.getCause() + " " + e.getCode()));
Subscribe before starting the optimization. If you start synchronously (startRunSync) and subscribe afterwards, you will miss events that occurred during the run. With asynchronous start (startRunAsync), subscribing after the call is safe because events are buffered by the ReplaySubject.

It is possible to create multiple subscriptions to a single event source. For example, one subscription can log to stdout while another pushes to a metrics pipeline. Both receive the same events independently.

ReplaySubject semantics

JOpt uses RxJava ReplaySubjects with size-bounded buffers for its event streams. This design choice has important operational implications:

Late subscription is safe. A subscriber that attaches after the optimization has started will receive the most recent buffered events. A UI component that connects mid-run will immediately see current progress, not an empty state.

Buffering is bounded. The internal buffers are sized per subject type: progress buffers 10 events, error/warning/status buffer 20 each, node filtering buffers 100, and result-related subjects buffer 10. These sizes are defined in the OptimizationEvents constructor using ReplaySubject.createWithSize(n). Events beyond the buffer size are discarded from replay. This prevents memory growth in long-running optimizations.

Not a durable store. ReplaySubjects are in-memory buffers. If you need durable event history (for audit, compliance, or post-run analysis), forward events from your subscription into your logging or telemetry pipeline (OpenTelemetry, ELK, Splunk, or a database).

Thread-safe emission. Events are emitted from optimizer threads and consumed in subscriber threads. RxJava handles the thread safety of the subject itself. Your subscriber lambda should be lightweight and non-blocking to avoid backpressure on the optimizer.

Progress frequency control

Progress events can fire very frequently during an optimization run. Emitting too often slows the optimizer (logging overhead) and floods downstream consumers. Emitting too rarely makes the UI feel unresponsive.

JOpt provides per-stage frequency control through optimizer properties:

// SA stage: emit progress every 0.1%
props.setProperty("JOpt.Algorithm.PreOptimization.SA.OnProgressOutPercentage", "0.1");

// Genetic stage: emit progress every 0.01%
props.setProperty("JOpt.Algorithm.GE.OnProgressOutPercentage", "0.01");

This allows fine-tuning per algorithm phase. The SA pre-optimization stage is typically iteration-heavy and benefits from a moderate output rate. The genetic stage runs longer and may justify more frequent updates for responsive UIs.

On-demand progress requests

In addition to automatic progress events, JOpt supports imperative progress requests. You can call requestProgress() at any time to force the optimizer to emit its current state, regardless of the configured frequency.

This is useful for:

  • External polling intervals (e.g., a REST endpoint that returns current status every 5 seconds)
  • Manual checkpoints in batch workflows
  • Integration with external monitoring systems that pull rather than subscribe
// Request current progress from an external thread
opti.requestProgress();

The requested progress is emitted through the same progressSubject(). Your existing subscribers receive it without additional wiring.

Structured progress data

Each progress event carries an IOptimizationProgress object with structured data:

  • Current cost and improvement metrics
  • Algorithm phase identifier (SA, Genetic, Construction)
  • Progress percentage within the current phase
  • The current best (winner) solution
  • A human-readable progress string for quick logging

This structured data enables:

  • UI progress bars with phase-aware display
  • Stage-based early stopping ("if genetic stage is at 95% and cost has not improved in 5 minutes, stop")
  • Telemetry dashboards tracking cost convergence over time
  • Automated alerts when optimization quality plateaus

For production dashboards, it is recommended to sample structured progress at a fixed wall-clock interval (e.g., every 10 seconds) rather than on every event. Store snapshots of cost, distance, and time metrics for convergence charts.

Error handling with RxJava

RxJava has specific error propagation rules that matter in production. If a subscriber's onNext handler throws an exception, RxJava routes it through the error channel. Without an explicit error consumer, the exception becomes an OnErrorNotImplementedException and may be routed to the global RxJava error handler, producing confusing stack traces.

The solution is to always provide an error consumer when subscribing:

// Create an error consumer based on the uncaught exception handler
MyUncaughtExceptionHandler handler = new MyUncaughtExceptionHandler();
handler.attachOptimization(opti);

Consumer<? super Throwable> errorConsumer =
    t -> handler.uncaughtException(Thread.currentThread(), t);

// Subscribe with the error consumer
opti.getOptimizationEvents()
    .progressSubject()
    .subscribe(
        p -> processProgress(p),  // onNext
        errorConsumer              // onError
    );

This guarantees that subscription failures are bridged into your uncaught exception handling policy. The optimization terminates deterministically with an exceptional result future instead of silently corrupting state.

Uncaught exception handling

JOpt allows attaching a custom UncaughtExceptionHandler to the optimization. This handler is invoked when exceptions occur in optimizer threads that are not caught by normal try/catch blocks.

The recommended pattern:

MyUncaughtExceptionHandler handler = new MyUncaughtExceptionHandler();
handler.attachOptimization(opti);
opti.setSlaveUncaughtExceptionHandler(handler);

When invoked, the handler should:

  1. Log the error with context (optimization ID, thread name, algorithm phase)
  2. Complete the result future exceptionally (opti.getResultFuture().completeExceptionally(t))
  3. Avoid heavy blocking IO inside the handler (keep it short and deterministic)

This provides a clean shutdown path. The calling code can handle the exception through the CompletableFuture error chain (exceptionally(...), handle(...)) instead of dealing with orphaned threads or silent failures.

Integration with Spring WebFlux

The reactive event architecture maps directly onto Spring WebFlux and other reactive frameworks. Because JOpt emits events through RxJava subjects, they can be bridged to Project Reactor (Flux/Mono) with minimal wiring.

The TourOptimizer REST service is packaged as a Spring WebFlux application, using non-blocking HTTP handling. This means:

  • Progress streaming. Server-Sent Events (SSE) or WebSocket endpoints can forward progress events from the optimizer to the client in real time. The RxJava subject is the source; a thin adapter converts it to a Reactor Flux.
  • Non-blocking job control. Optimization runs are started asynchronously. The CompletableFuture result integrates natively with WebFlux's async response model. No thread starvation under load.
  • Scalable concurrency. Multiple optimization jobs can run concurrently, each emitting its own event stream. WebFlux handles the concurrent HTTP connections without blocking threads.
  • Error propagation. Exceptions in optimizer threads propagate through the result future and can be mapped to HTTP error responses cleanly.

This integration is not theoretical. The production REST endpoint (Docker-REST-TourOptimizer) uses this architecture. The Swagger UI at localhost:8081/swagger-ui/index.html exposes the full optimization API with reactive request/response handling.

For Python, C#, and Angular clients, the REST interface abstracts the reactive internals. These clients interact through standard HTTP requests and receive results as JSON. The reactive architecture is an internal implementation detail that benefits all clients through better throughput and lower latency.

Node filtering events

Node filtering is fully reactive. The nodeFilteringSubject() emits NodeFilteringEvent objects whenever AutoFilter excludes a node. Additionally, snapshot subjects emit the optimization result immediately before and after filtering, giving you a complete picture of what changed:

// Subscribe to individual filtering events
opti.getOptimizationEvents()
    .nodeFilteringSubject()
    .subscribe(ev -> {
        System.out.println("Filtered: " + ev.getCode().getCodeMessage());
        ev.getFilterReasons().forEach(reason ->
            System.out.println("  Reason: " + reason));
    });

// Subscribe to the result snapshot before filtering
opti.getOptimizationEvents()
    .beforeNodeFilteringResultSubject()
    .subscribe(result -> analyzePreFilterState(result));

// Subscribe to the result snapshot after filtering
opti.getOptimizationEvents()
    .afterNodeFilteringResultSubject()
    .subscribe(result -> analyzePostFilterState(result));

The before/after result subjects are particularly useful for comparing what the schedule looked like before AutoFilter intervened. This enables audit-grade reporting: "here is the plan before exclusion, here is the plan after, and here are the nodes that were removed with their reasons."

Legacy callbacks: internally backed by subscriptions

JOpt also supports a traditional callback interface (IOptimizationEventListener) with methods like onProgress(...), onError(...), onWarning(...), onStatus(...), and onNodeFiltering(...). These callbacks are not a separate event system. Internally, they are implemented as subscriptions to the same ReplaySubjects through a LegacyCallbackSubscriber.

When you override onProgress(IOptimizationProgress progress) in your Optimization subclass, you are effectively subscribing a lambda to the progressSubject(). The same applies to all other callback methods. This means:

  • Callbacks and direct subject subscriptions coexist safely. You can use both in the same application.
  • Error handling rules from RxJava apply to callbacks too. If your onProgress(...) override throws an exception, it becomes a subscription error. This is why attaching an error consumer (via the uncaught exception handler) is recommended even when using callbacks.
  • Legacy callbacks receive both the typed event object and the convenience string/code representation. For example, onProgress is called once with the progress string and once with the structured IOptimizationProgress object.

For new integrations, subscribing directly to the subjects is recommended because it gives you full control over error consumers, threading, and composition. The callback interface remains available for simpler use cases and backward compatibility.

Combining and coupling observables

Because all event subjects are standard RxJava Observables, they can be combined using the full RxJava operator library. This is powerful for scenarios where you need to correlate information from multiple event streams.

Combining results with progress

A common pattern is to couple the requested result stream with the progress stream. For example, you may want to run validation checks at specific progress percentages:

Observable<IOptimizationResult> resultsOb =
    opti.getOptimizationEvents().requestedresult;
Observable<IOptimizationProgress> progressOb =
    opti.getOptimizationEvents().progress;

// Combine: for each new result, pair it with the latest progress
Observable<Boolean> validationResults =
    resultsOb.withLatestFrom(progressOb, (result, progress) -> {

        if (progress.getProgress() == 50.0) {
            // At 50%, check that no route has overtime
            return result.getRoutes().stream()
                .allMatch(r -> r.getRouteCostAndViolationController()
                    .getViolationSummary()
                    .getNumViolationsRouteTime() == 0);
        }

        if (progress.getProgress() == 95.0) {
            // At 95%, check that all routes are finalized
            return result.getRoutes().stream()
                .allMatch(r -> r.isFinalized());
        }

        return true;
    });

validationResults.subscribe(passed -> {
    if (!passed) {
        System.out.println("Validation failed at current progress");
    }
});

The withLatestFrom operator pairs each emitted result with the most recent progress event. This allows stage-aware validation without polling or manual state tracking.

Filtering subjects with predicates

JOpt provides a utility method createBoundSubject that creates a derived subject emitting only events matching a predicate:

// Create a subject that only emits progress events from the genetic stage
ReplaySubject<IOptimizationProgress> geneticProgress =
    OptimizationEvents.createBoundSubject(
        opti.getOptimizationEvents().progressSubject(),
        p -> p.getCallerId().contains("Genetic"),
        10  // buffer size
    );

geneticProgress.subscribe(p ->
    System.out.println("Genetic stage: " + p.getProgressString()));

This is useful for building stage-specific dashboards, triggering actions only during certain algorithm phases, or routing different event subsets to different downstream consumers.

Testing with TestObserver

For integration tests, RxJava's TestObserver can be attached to any subject or derived observable. This provides assertion capabilities directly on the event stream:

TestObserver<OptimizationEvents.AssertionResult> testObserver =
    OptimizationEvents.subscribeSubjectWithTestObserver(
        opti.getOptimizationEvents().progressSubject(),
        p -> new OptimizationEvents.AssertionResult(
            p.getProgress() >= 0 && p.getProgress() <= 100,
            "Progress must be between 0 and 100"
        )
    );

// After optimization completes:
testObserver.assertNoErrors();
testObserver.values().forEach(ar -> assertTrue(ar.passed(), ar.message()));

These patterns demonstrate that JOpt's event system is not just a notification mechanism. It is a composable reactive pipeline that supports validation, correlation, filtering, and testing at the stream level.

Pattern 1: Synchronous with callbacks (simplest)

Start synchronously, override onProgress, onError, onWarning methods directly. Suitable for CLI tools, batch jobs, and quick prototyping.

Pattern 2: Asynchronous with CompletableFuture (service-oriented)

Start with startRunAsync(), chain result processing via thenAccept, handle, whenComplete. Avoid blocking request threads in service environments.

Start asynchronously, subscribe to event subjects for continuous visibility, use the CompletableFuture for final result retrieval. This provides the best combination of observability and integration flexibility.

// 1. Build model
addNodes(opti);
addResources(opti);

// 2. Subscribe to events
opti.getOptimizationEvents().progressSubject()
    .subscribe(p -> metricsService.recordProgress(p));
opti.getOptimizationEvents().errorSubject()
    .subscribe(e -> alertService.raiseError(e));

// 3. Start async
CompletableFuture<IOptimizationResult> future = opti.startRunAsync();

// 4. Handle result
future.thenAccept(result -> resultService.publish(result))
      .exceptionally(ex -> { errorService.handle(ex); return null; });

This pattern separates modeling, observability, and lifecycle management into clean, composable concerns.

Examples on GitHub: Recommended implementation examples (synchronous, asynchronous, and reactive patterns).

Closing Words

The reactive event system in JOpt.TourOptimizer is not a convenience layer on top of a batch optimizer. It is the native execution model. All events, including node filtering and intermediate results, are emitted through RxJava ReplaySubjects. Legacy callback methods are internally backed by subscriptions to the same subjects, ensuring a single consistent event source. Observables can be combined, filtered, and tested using the full RxJava operator library, enabling stage-aware validation, stream correlation, and integration testing at the event level.

This architecture maps directly onto modern reactive service frameworks like Spring WebFlux. Progress streams can be forwarded to Server-Sent Events or WebSocket connections. Error propagation is deterministic through custom exception handlers. And the entire system integrates naturally into event-driven microservice architectures where blocking is not an option.


Authors

A product by dna-evolutions ©