- tl;dr
- I thought you hate Java?
- Will each long-living gRPC server-side streaming task in Java block a thread?
- Let a hundred clients stream
- Caveat: StreamObserver may not be thread-safe
- Problems for the reader: What determines the thread count, now?
tl;dr
You can use server-side streaming in gRPC Java in a blocking and in a (scalable) non-blocking way. Download github.com/JohannesFKnauf/grpc-java-threading-experiment and start playing. It’s also a nice example of how to implement publish-subscribe using the Java 9 Flow API.
I thought you hate Java?
When I recently complained about Java, some colleagues quoted:
“There are only two kinds of languages: the ones people complain about and the ones nobody uses.”
― Bjarne Stroustrup, The C++ Programming Language
I do hate Java as a programming language, yes. But still, I have returned to the pleasure agony of writing software in plain Java. The reason: I joined a new customer team. It develops shared platform libraries for the JVM – the core features almost every product team in the company will use. Our noble goal is to enable our users to choose whatever JVM technology they like.
Since the bytecode result of compiling plain Java is just the least common denominator for all JVM languages, we settled for this approach.
Will each long-living gRPC server-side streaming task in Java block a thread?
A couple of days ago, a colleague brought up the topic of long-living server-side streaming. He had a use case in mind: a long-living notification channel. A client could subscribe to change notifications by calling a gRPC service with server-side streaming. A service like this example:
As you can see, the TickRequest
is of no importance. It’s an empty message and will be ignored. The TickReply
is returned as a stream, i.e. a client will receive many TickReply
s.
Now, the question came up in the team: Will each stream block a thread on the server side? This would limit the scalability of such an approach immensely.
Why is it really simple in Go?
Besides the JVM, we support the Go platform. In Go’s gRPC implementation of server-side streaming, concurrent sessions are handled using goroutines and the stream is closed when a handler function returns (as described in this article about gRPC long-lived streaming in Go). The power of goroutines ensures scalability.
How can I learn more about gRPC Java?
Java’s threads are real system threads. Hence, blocking a method – and thereby a thread – is not an option. A scalable API for Java will have to look different.
Instead of scrolling endless web pages, I decided to perform a little experiment in my spare time. You are just reading about it.
Let a hundred clients stream
In order to demonstrate a scalable solution in Java, I created 2 variants of a simple gRPC service: a blocking and a non-blocking implementation. For testing the scaling behaviour, I created a client app, which opens 100 separate channels consuming from the server. So, each time I start the client app, I create another 100 consumers.
The blocking server
The blocking server app – similar to a classical web application server – will call a method subscribeTicker
in a separate thread for each incoming connection. This thread will be reclaimed only when we return from the method.
In this example, I simulate a long-running operation by sleeping inside of the method.
Let’s build it and start a server:
$ ./gradlew clean check assemble installDist
BUILD SUCCESSFUL in 8s
13 actionable tasks: 13 executed
$ tar xf build/distributions/grpc-java-threading-experiment.tar
$ grpc-java-threading-experiment/bin/grpc-java-threading-experiment server
Now, in a separate terminal, let’s spawn 10 clients:
$ for i in {1..10}; do grpc-java-threading-experiment/bin/grpc-java-threading-experiment client & done
The client terminal will just deliver all the streamed messages:
14:48:58.632 [Thread-61] INFO d.m.e.grpcticker.TickStreamClient - Blarz
14:48:58.637 [Thread-57] INFO d.m.e.grpcticker.TickStreamClient - Blarz
14:48:58.637 [Thread-66] INFO d.m.e.grpcticker.TickStreamClient - Blarz
...
You can ignore them.
The server terminal will show us how many threads it spawns to deliver those streams:
$ grpc-java-threading-experiment/bin/grpc-java-threading-experiment server
14:48:47.554 [main] INFO d.m.examples.grpcticker.GrpcServer - Server started, listening on 54321
14:48:48.373 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 3
14:48:49.374 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 3
14:48:50.374 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 3
14:48:51.375 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 3
14:48:52.375 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 3
14:48:53.376 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 3
14:48:54.377 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 3
14:48:55.377 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 3
14:48:56.377 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 3
14:48:57.378 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 7
14:48:58.378 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 57
14:48:59.378 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 606
14:49:00.379 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 1007
14:49:01.379 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 1007
14:49:02.379 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 1007
...
14:51:37.334 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 1007
14:51:38.342 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 1000
14:51:39.345 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 487
14:51:40.349 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 7
14:51:41.353 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 7
...
...
You can clearly see that for each stream, a separate thread has to be created and maintained – 1000 clients, 1000 threads.
The non-blocking server – or: leveraging Java 9’s Flow API
The non-blocking server app leverages a feature in the gRPC Java API. The StreamObserver
API in its very nature is asynchronous. You can implement it in a way, which does not block.
For that purpose, I will implement a publish-subscribe scheme using Java 9’s Flow API. (Some might argue whether it’s really the publish-subscribe pattern or rather the observer pattern or something entirely different; but I don’t want to bore you.)
Here is what we’re aiming for:
A single publisher will produce a continuous stream of messages. It will notify all current subscribers upon submission of a new message.
First, I need a notification publisher.
The TickPublisher
is going to simulate the notification stream. You inject a SubmissionPublisher tickPublisher
on construction.
The same tickPublisher
is going to be used as a communication channel to the subscribers.
The AsyncTickerService
implementation of our gRPC service will just create a new subscriber and subscribe it to the tickPublisher
, when a new gRPC request arrives. The subscriber will store the observer in order to deliver messages later. The handler method returns. The thread is reclaimed, but the observer stays alive.
The only missing link is the TickSubscriber
s behaviour. It will have to react to messages from the publisher. First, it will have to receive and store its subscription in the onSubscribe
method.
Then, it’s going to receive message by message through the onNext
method.
In addition, I define a termination criterion and close the stream after receiving 100 messages. This enables me to see the behaviour when the number of clients is declining again.
Test Drive II: The Duel
When you run the async-server and spawn 1000 – or even 2000 or more – clients, the server’s thread count will stay low.
$ grpc-java-threading-experiment/bin/grpc-java-threading-experiment async-server
20:06:22.641 [Thread-1] INFO d.m.e.grpcticker.TickPublisher - Publishing new tick 1 to 0 subscribers.
20:06:22.776 [main] INFO d.m.examples.grpcticker.GrpcServer - Server started, listening on 54321
20:06:23.588 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 4
20:06:23.644 [Thread-1] INFO d.m.e.grpcticker.TickPublisher - Publishing new tick 2 to 0 subscribers.
20:06:24.589 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 4
20:06:24.644 [Thread-1] INFO d.m.e.grpcticker.TickPublisher - Publishing new tick 3 to 0 subscribers.
20:06:25.612 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 4
...
20:06:30.625 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 19
20:06:30.665 [Thread-1] INFO d.m.e.grpcticker.TickPublisher - Publishing new tick 9 to 0 subscribers.
20:06:31.641 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 38
20:06:31.667 [Thread-1] INFO d.m.e.grpcticker.TickPublisher - Publishing new tick 10 to 1000 subscribers.
20:06:32.642 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 38
20:06:32.678 [Thread-1] INFO d.m.e.grpcticker.TickPublisher - Publishing new tick 11 to 1000 subscribers.
...
20:06:40.680 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 38
20:06:40.721 [Thread-1] INFO d.m.e.grpcticker.TickPublisher - Publishing new tick 19 to 1000 subscribers.
20:06:41.682 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 38
20:06:41.727 [Thread-1] INFO d.m.e.grpcticker.TickPublisher - Publishing new tick 20 to 1066 subscribers.
20:06:42.688 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 38
20:06:42.728 [Thread-1] INFO d.m.e.grpcticker.TickPublisher - Publishing new tick 21 to 1991 subscribers.
20:06:43.689 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 38
20:06:43.730 [Thread-1] INFO d.m.e.grpcticker.TickPublisher - Publishing new tick 22 to 2000 subscribers.
20:06:44.690 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 38
20:06:44.737 [Thread-1] INFO d.m.e.grpcticker.TickPublisher - Publishing new tick 23 to 2000 subscribers.
...
20:08:19.975 [Thread-1] INFO d.m.e.grpcticker.TickPublisher - Publishing new tick 109 to 2000 subscribers.
20:08:20.922 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 11
20:08:20.977 [Thread-1] INFO d.m.e.grpcticker.TickPublisher - Publishing new tick 110 to 2000 subscribers.
20:08:21.923 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 43
20:08:21.995 [Thread-1] INFO d.m.e.grpcticker.TickPublisher - Publishing new tick 111 to 1000 subscribers.
20:08:22.927 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 43
...
20:08:31.071 [Thread-1] INFO d.m.e.grpcticker.TickPublisher - Publishing new tick 120 to 1000 subscribers.
20:08:31.947 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 43
20:08:32.072 [Thread-1] INFO d.m.e.grpcticker.TickPublisher - Publishing new tick 121 to 934 subscribers.
20:08:32.948 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 45
20:08:33.073 [Thread-1] INFO d.m.e.grpcticker.TickPublisher - Publishing new tick 122 to 9 subscribers.
20:08:33.950 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 45
20:08:34.073 [Thread-1] INFO d.m.e.grpcticker.TickPublisher - Publishing new tick 123 to 0 subscribers.
20:08:34.951 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 45
20:08:35.074 [Thread-1] INFO d.m.e.grpcticker.TickPublisher - Publishing new tick 124 to 0 subscribers.
20:08:35.951 [Thread-0] INFO d.m.e.grpcticker.ThreadCountReporter - Thread count: 45
...
Mission accomplished!
Caveat: StreamObserver may not be thread-safe
⚠⚠⚠
From the StreamObserver API docs: “Implementations are not required to be thread-safe […] Since individual StreamObservers are not thread-safe, if multiple threads will be writing to a StreamObserver concurrently, the application must synchronize calls.”
⚠⚠⚠
In this example code, I circumvent concurrent access to the same StreamObserver by employing the contractual delivery guarantees for all Publisher implementations: “Publishers ensure that Subscriber method invocations for each subscription are strictly ordered in happens-before order.”
Problems for the reader: What determines the thread count, now?
Hint 1: Try replacing the Publisher thread pool with sequential execution
- SubmissionPublisher API docs
- Executor is a functional interface and you can use Runnable::run
private SubmissionPublisher<TickReply> tickPublisher = new SubmissionPublisher<>(Runnable::run, 100);
Hint 2: Try to manipulate the gRPC request handler thread pool
this.server = ServerBuilder.forPort(DEFAULT_PORT)
.addService(service)
.executor(Executors.newFixedThreadPool(5))
.build();
Post header background image by Al3xanderD (Alexander Droeger) from Pixabay.