Netflix Hystrix Reactive RxJava Execution Command Example
In this tutorial we show you how to execute a reactive Hystrix
command. We use the HystrixObservableCommand
which is used to wrap code that will execute potentially risky functionality. Typically meaning a service call over the network with fault and latency tolerance.
Project Structure
Let’s start by looking at the project structure.
Maven Dependencies
We use Apache Maven to manage our project dependencies. Make sure the following dependencies reside on the class-path.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.memorynotfound.netflix.hystrix</groupId>
<artifactId>reactive-execution</artifactId>
<version>1.0.0-SNAPSHOT</version>
<url>https://memorynotfound.com</url>
<name>Netflix Hystrix - ${project.artifactId}</name>
<dependencies>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>1.5.3</version>
</dependency>
<!-- logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
<!-- testing -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Netflix Hystrix Reactive RxJava Execution Command Example
This example uses the HystrixObservableCommand
which is used to wrap code that will execute potentially risky functionality (typically meaning a service call over the network) with fault and latency tolerance, statistics and performance metics capture, circuit breaker and bulkhead functionality. This command should be used for a purely non-blocking all pattern. The caller of this command will be subscribed to the Observable<R>
returned by the run()
method.
The construct()
method will be executed when the observe()
or toObservable()
methods are invoked. This’ll produce a non blocking method invocation and returns a result in a reactive java programming way.
package com.memorynotfound.netflix.hystrix;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixObservableCommand;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class GreetingCommand extends HystrixObservableCommand<String> {
private final String name;
public GreetingCommand(String name) {
super(HystrixCommandGroupKey.Factory.asKey("GreetingGroup"));
this.name = name;
}
@Override
protected Observable<String> construct() {
return Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> observer) {
try {
if (!observer.isUnsubscribed()){
if (name == null){
throw new IllegalArgumentException("name cannot be null");
}
observer.onNext("Hello " + name + "!");
observer.onCompleted();
}
} catch (Exception e){
observer.onError(e);
}
}
}).subscribeOn(Schedulers.io());
}
@Override
protected Observable<String> resumeWithFallback() {
return Observable.create((Observable.OnSubscribe<String>) observer -> {
if (!observer.isUnsubscribed()){
observer.onNext("Hello Guest!");
observer.onCompleted();
}
}).subscribeOn(Schedulers.io());
}
}
Writing a Reactive RxJava Fallback
If observe()
or toObservable()
fails in any way then the resumeWithFallback()
method will be invoked to provide an opportunity to return a fallback response. This should typically not require network transport. In other words, this should be a static, stubbed or cached result that can immediately be returned upon failure. If network traffic is wanted for fallback (such as going to MemCache or Redis) then the fallback implementation should invoke another HystrixObservableCommand
instance that protects against that network access and possibly has another level of fallback that does not involve network access.
Unit Test Netflix Hystrix Reactive RxJava Execution with JUnit
You can execute a HystrixObservableCommand
reactively by using the observe()
or toObservable()
method. Which is used for asynchronous execution of command with a callback by subscribing to the Observable
. This eagerly starts execution of the command the same as HystrixCommand.queue()
and HystrixCommand.execute()
.
In the testBlockingReactiveGreeting
JUnit test, we create a blocking execution using the Observable.toBlocking()
method. This creates a BlockingObservable
which has blocking capabilities. The single()
method is used to retrieve the item after the result is emitted.
The following testNonBlockingReactiveGreeting
and executeNonBlockingReactiveGreeting
we execute non-blocking reactive programming hystrix commands.
The last testNonBlockingReactiveGreetingFallBack
JUnit test demonstrates the resumeWithFallback
method from the HystrixObservableCommand
.
package com.memorynotfound.netflix.hystrix.test;
import com.memorynotfound.netflix.hystrix.GreetingCommand;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.functions.Action1;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import rx.schedulers.TestScheduler;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
public class GreetingCommandTest {
private static final Logger log = LoggerFactory.getLogger(GreetingCommandTest.class);
// blocking
@Test
public void testBlockingReactiveGreeting() throws Exception {
GreetingCommand command1 = new GreetingCommand("World");
GreetingCommand command2 = new GreetingCommand("John");
assertEquals("Hello World!", command1.observe().toBlocking().single());
assertEquals("Hello John!", command2.observe().toBlocking().single());
assertFalse(command1.isResponseFromFallback());
assertFalse(command2.isResponseFromFallback());
}
// non-blocking
@Test
public void testNonBlockingReactiveGreeting() throws Exception {
GreetingCommand command1 = new GreetingCommand("World");
GreetingCommand command2 = new GreetingCommand("John");
Observable<String> oWorld = command1.observe();
Observable<String> oJohn = command2.observe();
TestScheduler testScheduler = Schedulers.test();
TestSubscriber<String> worldSubscriber = new TestSubscriber<>();
TestSubscriber<String> johnSubscriber = new TestSubscriber<>();
oWorld.subscribe(worldSubscriber);
oJohn.subscribe(johnSubscriber);
testScheduler.advanceTimeBy(100L, TimeUnit.SECONDS);
assertEquals("Hello World!", worldSubscriber.getOnNextEvents().get(0));
assertEquals("Hello John!", johnSubscriber.getOnNextEvents().get(0));
assertFalse(command1.isResponseFromFallback());
assertFalse(command2.isResponseFromFallback());
}
@Test
public void executeNonBlockingReactiveGreeting() {
GreetingCommand command1 = new GreetingCommand("World");
GreetingCommand command2 = new GreetingCommand("John");
Observable<String> oWorld = command1.observe();
Observable<String> oJohn = command2.observe();
// non-blocking
// - this is a verbose anonymous inner-class approach and doesn't do assertions
oWorld.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
// nothing needed here
}
@Override
public void onError(Throwable e) {
log.error("", e);
}
@Override
public void onNext(String greeting) {
log.debug(greeting);
}
});
// non-blocking
// - also verbose anonymous inner-class
// - ignore errors and onCompleted signal
oJohn.subscribe(log::debug);
oJohn.subscribe(greeting -> log.debug(greeting));
oJohn.subscribe(new Action1<String>() {
@Override
public void call(String greeting) {
log.debug(greeting);
}
});
}
@Test
public void testBlockingReactiveGreetingFallBack() throws Exception {
try {
GreetingCommand command = new GreetingCommand(null);
assertEquals("Hello Guest!", command.observe().toBlocking().single());
assertTrue(command.isResponseFromFallback());
} catch (Exception e){
fail("we should not get an exception as we return a fallback");
}
}
@Test
public void testNonBlockingReactiveGreetingFallBack() throws Exception {
try {
GreetingCommand command = new GreetingCommand(null);
Observable<String> oNull = command.observe();
TestScheduler testScheduler = Schedulers.test();
TestSubscriber<String> nullSubscriber = new TestSubscriber<>();
oNull.subscribe(nullSubscriber);
testScheduler.advanceTimeBy(100L, TimeUnit.MILLISECONDS);
assertEquals("Hello Guest!", nullSubscriber.getOnNextEvents().get(0));
assertTrue(command.isResponseFromFallback());
} catch (Exception e){
fail("we should not get an exception as we return a fallback");
}
}
}
JUnit Test Results
When we run all of our JUnit tests, we receive the following output.
References
- Netflix Hystrix Documentation
- Netflix Hystrix JavaDoc API
- HystrixObservableCommand JavaDoc
- rx.Observable JavaDoc
- rx.BlockingObservable JavaDoc