Yes. This improves the situation:
@Slf4j
@Configuration
public class ExtraRSocketConfig implements RSocketServerCustomizer {
private final MeterRegistry registry;
public ExtraRSocketConfig(MeterRegistry registry) {
this.registry = registry;
}
@Override
public void customize(io.rsocket.core.RSocketServer rSocketServer) {
rSocketServer.interceptors(
iRegistry -> {
log.info("Adding RSocket interceptors...");
ImmutableTag tag = new ImmutableTag("application", "ce-calendar-timespend-frontend");
iRegistry.forResponder(new MicrometerRSocketInterceptor(registry, tag));
iRegistry.forRequester(rSocketInterceptors -> {
rSocketInterceptors.add(new MicrometerRSocketInterceptor(registry, tag));
rSocketInterceptors.add(new OTelTraceResettingRSocketInterceptor());
});
iRegistry.forConnection(new MicrometerDuplexConnectionInterceptor(registry, tag));
}
);
}
static final class OTelTraceResettingRSocketInterceptor implements RSocketInterceptor {
@Override
public RSocket apply(RSocket rSocket) {
return new OTelRSocket(rSocket);
}
}
}
and
class OTelRSocket implements RSocket {
private final RSocket delegate;
public OTelRSocket(RSocket delegate) {
this.delegate = delegate;
}
@Override
public void dispose() {
delegate.dispose();
}
@Override
public Mono<Void> fireAndForget(Payload payload) {
Span span = tracer.spanBuilder("OTelRSocket fireAndForget").setNoParent().startSpan();
return delegate
.fireAndForget(payload)
.doOnError(span::recordException)
.doFinally(signalType -> span.end());
}
@Override
public Mono<Void> metadataPush(Payload payload) {
Span span = tracer.spanBuilder("OTelRSocket metadataPush").setNoParent().startSpan();
return delegate
.metadataPush(payload)
.doOnError(span::recordException)
.doFinally(signalType -> span.end());
}
@Override
public Mono<Void> onClose() {
return delegate.onClose();
}
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
Span span = tracer.spanBuilder("OTelRSocket requestChannel").setNoParent().startSpan();
return delegate
.requestChannel(payloads)
.doOnError(span::recordException)
.doFinally(signalType -> span.end());
}
@Override
public Mono<Payload> requestResponse(Payload payload) {
return Mono.defer(
() -> {
Span span = tracer.spanBuilder("OTelRSocket requestResponse").setNoParent().startSpan();
return delegate
.requestResponse(payload)
.doOnError(span::recordException)
.doFinally(signalType -> span.end());
});
}
@Override
public Flux<Payload> requestStream(Payload payload) {
Span span = tracer.spanBuilder("OTelRSocket requestStream").setNoParent().startSpan();
return delegate
.requestStream(payload)
.doOnError(span::recordException)
.doFinally(signalType -> span.end());
}
private final static Tracer tracer = GlobalOpenTelemetry.getTracer("be.acerta.ce.cqrs.common.gateway.rsocket.OTelRSocket");
}