Axon-tracing-opentelemetry reuses traceid

Yes. This improves the situation:

@Slf4j
@Configuration
public class ExtraRSocketConfig implements RSocketServerCustomizer {
    private final MeterRegistry registry;

    public ExtraRSocketConfig(MeterRegistry registry) {
        this.registry = registry;
    }

    @Override
    public void customize(io.rsocket.core.RSocketServer rSocketServer) {
        rSocketServer.interceptors(
                iRegistry -> {
                    log.info("Adding RSocket interceptors...");
                    ImmutableTag tag = new ImmutableTag("application", "ce-calendar-timespend-frontend");
                    iRegistry.forResponder(new MicrometerRSocketInterceptor(registry, tag));
                    iRegistry.forRequester(rSocketInterceptors -> {
                        rSocketInterceptors.add(new MicrometerRSocketInterceptor(registry, tag));
                        rSocketInterceptors.add(new OTelTraceResettingRSocketInterceptor());
                    });
                    iRegistry.forConnection(new MicrometerDuplexConnectionInterceptor(registry, tag));
                }
        );
    }

    static final class OTelTraceResettingRSocketInterceptor implements RSocketInterceptor {
        @Override
        public RSocket apply(RSocket rSocket) {
            return new OTelRSocket(rSocket);
        }
    }

}

and

class OTelRSocket implements RSocket {

    private final RSocket delegate;

    public OTelRSocket(RSocket delegate) {
        this.delegate = delegate;
    }

    @Override
    public void dispose() {
        delegate.dispose();
    }

    @Override
    public Mono<Void> fireAndForget(Payload payload) {
        Span span = tracer.spanBuilder("OTelRSocket fireAndForget").setNoParent().startSpan();
        return delegate
                .fireAndForget(payload)
                .doOnError(span::recordException)
                .doFinally(signalType -> span.end());
    }

    @Override
    public Mono<Void> metadataPush(Payload payload) {
        Span span = tracer.spanBuilder("OTelRSocket metadataPush").setNoParent().startSpan();
        return delegate
                .metadataPush(payload)
                .doOnError(span::recordException)
                .doFinally(signalType -> span.end());

    }

    @Override
    public Mono<Void> onClose() {
        return delegate.onClose();
    }

    @Override
    public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
        Span span = tracer.spanBuilder("OTelRSocket requestChannel").setNoParent().startSpan();
        return delegate
                .requestChannel(payloads)
                .doOnError(span::recordException)
                .doFinally(signalType -> span.end());
    }

    @Override
    public Mono<Payload> requestResponse(Payload payload) {
        return Mono.defer(
                () -> {
                    Span span = tracer.spanBuilder("OTelRSocket requestResponse").setNoParent().startSpan();
                    return delegate
                            .requestResponse(payload)
                            .doOnError(span::recordException)
                            .doFinally(signalType -> span.end());
                });
    }

    @Override
    public Flux<Payload> requestStream(Payload payload) {
        Span span = tracer.spanBuilder("OTelRSocket requestStream").setNoParent().startSpan();
        return delegate
                .requestStream(payload)
                .doOnError(span::recordException)
                .doFinally(signalType -> span.end());
    }

    private final static Tracer tracer = GlobalOpenTelemetry.getTracer("be.acerta.ce.cqrs.common.gateway.rsocket.OTelRSocket");
    
}
1 Like