Skip to content

Conversation

dmitrysulman
Copy link
Contributor

@dmitrysulman dmitrysulman commented Sep 12, 2025

This PR updates RSocketServiceMethod to allow using the @RSocketExchange annotation with Kotlin suspending functions when used as RSocket requester.

Fixes #34868

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged or decided on label Sep 12, 2025
@dmitrysulman dmitrysulman changed the title Add RSocketServiceMethod support for suspending functions Add RSocketServiceMethod support for Kotlin suspending functions Sep 12, 2025
@doxlik
Copy link

doxlik commented Sep 12, 2025

@dmitrysulman hello, dear Dmitry. Can you please also add test cases for streaming?

At least requestStream that return Flow. Because I also was doing poc of this feature (just for interest) and Flow return type most probably will be covered by ReactiveAdapterRegistry, but still it is good for it to also be covered by test.

And also meanwhile I am not sure if the requestChannel will correctly handle Flow as argument, so also good to be covered with test.

Will be very grateful if you have time to add it and thank you for your contribution.

@dmitrysulman dmitrysulman force-pushed the rsocket-interface-coroutine branch from fee43d6 to df0dde9 Compare September 13, 2025 11:16
@dmitrysulman dmitrysulman force-pushed the rsocket-interface-coroutine branch from 350082c to 255ef56 Compare September 13, 2025 16:24
@dmitrysulman
Copy link
Contributor Author

@doxlik done, Request Stream and Request Channel suspending functions (using Flow as the return type) are now supported.

The Flow type argument for Request Channel is handled correctly: PayloadArgumentResolver delegates to the relevant ReactiveAdapter.

@sdeleuze sdeleuze self-assigned this Sep 13, 2025
@sdeleuze sdeleuze added in: messaging Issues in messaging modules (jms, messaging) theme: kotlin An issue related to Kotlin support labels Sep 13, 2025
@sdeleuze sdeleuze added this to the 7.0.0-RC1 milestone Sep 13, 2025
MethodParameter returnParam = new MethodParameter(method, -1);
Class<?> returnType = returnParam.getParameterType();
boolean isUnwrapped = KotlinDetector.isSuspendingFunction(method) &&
!COROUTINES_FLOW_CLASS_NAME.equals(returnParam.getParameterType().getName());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

returnParam.getParameterType().getName() can be replaced with returnType.getName().
@sdeleuze I can update the branch or it can be fixed in the polishing commit.

suspend fun requestResponse(input: String): String

@RSocketExchange("rs")
suspend fun requestStream(input: String): Flow<String>
Copy link

@doxlik doxlik Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dmitrysulman By the way, here it is tricky part whether to let suspend functions for "Flow returning" methods (requestStream / requestChannel) because the Flow itself does not require to be in suspend function, all asynchronous work happens in flow {} / Flow.collect {} callback.

And despite I personally prefer to keep such functions also suspend, the big open source thinks other way

  1. Spring HTTP Interfaces does not allow (at least in 6.x.x.) suspend for "Flow returning" methods.
  2. GRPC Kotlin also generates non-suspend functions for "Flow returning" methods.

@sdeleuze please correct me if I'm am wrong

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spring HTTP Interfaces does not allow (at least in 6.x.x.) suspend for "Flow returning" methods.

This can be easily fixed after (if) the current PR is merged (the fix of the CoroutinesUtils.asFlow method is required).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: messaging Issues in messaging modules (jms, messaging) status: waiting-for-triage An issue we've not yet triaged or decided on theme: kotlin An issue related to Kotlin support
Projects
None yet
Development

Successfully merging this pull request may close these issues.

RSocket Interfaces doesn't work with coroutines.
4 participants