Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions reactive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>${reactor.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-okhttp</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
*/
package feign.reactive;

import feign.FeignException;
import feign.InvocationHandlerFactory.MethodHandler;
import feign.Target;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.lang.reflect.Type;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;

public abstract class ReactiveInvocationHandler implements InvocationHandler {

Expand Down Expand Up @@ -90,22 +90,50 @@ protected abstract Publisher invoke(Method method,
Object[] arguments);

/**
* Invoke the Method Handler as a Callable.
* Invoke the Method Handler as a Publisher.
*
* @param methodHandler to invoke
* @param arguments for the method
* @return a Callable wrapper for the invocation.
* @return a Publisher wrapper for the invocation.
*/
Callable<?> invokeMethod(MethodHandler methodHandler, Object[] arguments) {
return () -> {
try {
return methodHandler.invoke(arguments);
} catch (Throwable th) {
if (th instanceof FeignException) {
throw (FeignException) th;
Publisher<?> invokeMethod(MethodHandler methodHandler, Object[] arguments) {
return subscriber -> subscriber.onSubscribe(new Subscription() {
private final AtomicBoolean isTerminated = new AtomicBoolean(false);

@Override
public void request(long n) {
if (n <= 0 && !terminated()) {
subscriber.onError(new IllegalArgumentException("negative subscription request"));
}
throw new RuntimeException(th);
if (!isTerminated()) {
try {
Object result = methodHandler.invoke(arguments);
if (null != result) {
subscriber.onNext(result);
}
} catch (Throwable th) {
if (!terminated()) {
subscriber.onError(th);
}
}
}
if (!terminated()) {
subscriber.onComplete();
}
}

@Override
public void cancel() {
isTerminated.set(true);
}

private boolean isTerminated() {
return isTerminated.get();
}

private boolean terminated() {
return isTerminated.getAndSet(true);
}
};
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import feign.Target;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.Callable;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -32,11 +31,11 @@ public class ReactorInvocationHandler extends ReactiveInvocationHandler {

@Override
protected Publisher invoke(Method method, MethodHandler methodHandler, Object[] arguments) {
Callable<?> invocation = this.invokeMethod(methodHandler, arguments);
Publisher<?> invocation = this.invokeMethod(methodHandler, arguments);
if (Flux.class.isAssignableFrom(method.getReturnType())) {
return Flux.from(Mono.fromCallable(invocation)).subscribeOn(Schedulers.elastic());
return Flux.from(invocation).subscribeOn(Schedulers.elastic());
} else if (Mono.class.isAssignableFrom(method.getReturnType())) {
return Mono.fromCallable(invocation).subscribeOn(Schedulers.elastic());
return Mono.from(invocation).subscribeOn(Schedulers.elastic());
}
throw new IllegalArgumentException(
"Return type " + method.getReturnType().getName() + " is not supported");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class RxJavaInvocationHandler extends ReactiveInvocationHandler {

@Override
protected Publisher invoke(Method method, MethodHandler methodHandler, Object[] arguments) {
return Flowable.fromCallable(this.invokeMethod(methodHandler, arguments))
return Flowable.fromPublisher(this.invokeMethod(methodHandler, arguments))
.observeOn(Schedulers.trampoline());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import feign.Client;
import feign.InvocationHandlerFactory;
import feign.Logger;
import feign.Logger.Level;
import feign.Param;
Expand All @@ -38,20 +37,16 @@
import feign.ResponseMapper;
import feign.RetryableException;
import feign.Retryer;
import feign.Target;
import feign.codec.Decoder;
import feign.codec.ErrorDecoder;
import feign.jackson.JacksonDecoder;
import feign.jackson.JacksonEncoder;
import feign.jaxrs.JAXRSContract;
import io.reactivex.Flowable;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import okhttp3.mockwebserver.MockResponse;
Expand All @@ -63,6 +58,7 @@
import org.mockito.stubbing.Answer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

public class ReactiveFeignIntegrationTest {

Expand Down Expand Up @@ -100,16 +96,18 @@ public void testReactorTargetFull() throws Exception {
.target(TestReactorService.class, this.getServerUrl());
assertThat(service).isNotNull();

String version = service.version()
.block();
assertThat(version).isNotNull();
StepVerifier.create(service.version())
.expectNext("1.0")
.expectComplete()
.verify();
assertThat(webServer.takeRequest().getPath()).isEqualToIgnoringCase("/version");


/* test encoding and decoding */
User user = service.user("test")
.blockFirst();
assertThat(user).isNotNull().hasFieldOrPropertyWithValue("username", "test");
StepVerifier.create(service.user("test"))
.assertNext(user -> assertThat(user).hasFieldOrPropertyWithValue("username", "test"))
.expectComplete()
.verify();
assertThat(webServer.takeRequest().getPath()).isEqualToIgnoringCase("/users/test");

}
Expand All @@ -127,15 +125,17 @@ public void testRxJavaTarget() throws Exception {
.target(TestReactiveXService.class, this.getServerUrl());
assertThat(service).isNotNull();

String version = service.version()
.firstElement().blockingGet();
assertThat(version).isNotNull();
StepVerifier.create(service.version())
.expectNext("1.0")
.expectComplete()
.verify();
assertThat(webServer.takeRequest().getPath()).isEqualToIgnoringCase("/version");

/* test encoding and decoding */
User user = service.user("test")
.firstElement().blockingGet();
assertThat(user).isNotNull().hasFieldOrPropertyWithValue("username", "test");
StepVerifier.create(service.user("test"))
.assertNext(user -> assertThat(user).hasFieldOrPropertyWithValue("username", "test"))
.expectComplete()
.verify();
assertThat(webServer.takeRequest().getPath()).isEqualToIgnoringCase("/users/test");
}

Expand Down Expand Up @@ -164,7 +164,10 @@ public void testRequestInterceptor() {
TestReactorService service = ReactorFeign.builder()
.requestInterceptor(mockInterceptor)
.target(TestReactorService.class, this.getServerUrl());
service.version().block();
StepVerifier.create(service.version())
.expectNext("1.0")
.expectComplete()
.verify();
verify(mockInterceptor, times(1)).apply(any(RequestTemplate.class));
}

Expand All @@ -176,7 +179,10 @@ public void testRequestInterceptors() {
TestReactorService service = ReactorFeign.builder()
.requestInterceptors(Arrays.asList(mockInterceptor, mockInterceptor))
.target(TestReactorService.class, this.getServerUrl());
service.version().block();
StepVerifier.create(service.version())
.expectNext("1.0")
.expectComplete()
.verify();
verify(mockInterceptor, times(2)).apply(any(RequestTemplate.class));
}

Expand All @@ -193,7 +199,10 @@ public void testResponseMappers() throws Exception {
TestReactorService service = ReactorFeign.builder()
.mapAndDecode(responseMapper, decoder)
.target(TestReactorService.class, this.getServerUrl());
service.version().block();
StepVerifier.create(service.version())
.expectNext("1.0")
.expectComplete()
.verify();
verify(responseMapper, times(1))
.map(any(Response.class), any(Type.class));
verify(decoder, times(1)).decode(any(Response.class), any(Type.class));
Expand All @@ -208,16 +217,16 @@ public void testQueryMapEncoders() {
TestReactiveXService service = RxJavaFeign.builder()
.queryMapEncoder(encoder)
.target(TestReactiveXService.class, this.getServerUrl());
String results = service.search(new SearchQuery())
.blockingSingle();
assertThat(results).isNotEmpty();
StepVerifier.create(service.search(new SearchQuery()))
.expectNext("No Results Found")
.expectComplete()
.verify();
verify(encoder, times(1)).encode(any(Object.class));
}

@SuppressWarnings({"ResultOfMethodCallIgnored", "ThrowableNotThrown"})
@SuppressWarnings({"ThrowableNotThrown"})
@Test
public void testErrorDecoder() {
this.thrown.expect(RuntimeException.class);
this.webServer.enqueue(new MockResponse().setBody("Bad Request").setResponseCode(400));

ErrorDecoder errorDecoder = mock(ErrorDecoder.class);
Expand All @@ -227,8 +236,11 @@ public void testErrorDecoder() {
TestReactiveXService service = RxJavaFeign.builder()
.errorDecoder(errorDecoder)
.target(TestReactiveXService.class, this.getServerUrl());
service.search(new SearchQuery())
.blockingSingle();
StepVerifier.create(service.search(new SearchQuery()))
.expectErrorSatisfies(ex -> assertThat(ex)
.isInstanceOf(IllegalStateException.class)
.hasMessage("bad request"))
.verify();
verify(errorDecoder, times(1)).decode(anyString(), any(Response.class));
}

Expand All @@ -243,7 +255,10 @@ public void testRetryer() {
TestReactorService service = ReactorFeign.builder()
.retryer(spy)
.target(TestReactorService.class, this.getServerUrl());
service.version().log().block();
StepVerifier.create(service.version())
.expectNext("1.0")
.expectComplete()
.verify();
verify(spy, times(1)).continueOrPropagate(any(RetryableException.class));
}

Expand All @@ -261,7 +276,10 @@ public void testClient() throws Exception {
TestReactorService service = ReactorFeign.builder()
.client(client)
.target(TestReactorService.class, this.getServerUrl());
service.version().block();
StepVerifier.create(service.version())
.expectNext("1.0")
.expectComplete()
.verify();
verify(client, times(1)).execute(any(Request.class), any(Options.class));
}

Expand All @@ -272,8 +290,10 @@ public void testDifferentContract() throws Exception {
TestJaxRSReactorService service = ReactorFeign.builder()
.contract(new JAXRSContract())
.target(TestJaxRSReactorService.class, this.getServerUrl());
String version = service.version().block();
assertThat(version).isNotNull();
StepVerifier.create(service.version())
.expectNext("1.0")
.expectComplete()
.verify();
assertThat(webServer.takeRequest().getPath()).isEqualToIgnoringCase("/version");
}

Expand Down
Loading