Actor model
Clustering
Persistence (Event Sourcing)
HTTP
Streams
gRPC
class MyService {
public Result performTask(Task task) throws Exception {
// ... do work
}
}
for (Task task: tasks) {
Result result = service.performTask(task);
// Handle result...
}
def post(request: HttpRequest): HttpResponse = {
val userId = request.getQueryParam("userId")
val user: User = getUserFromDatabase(userId)
ExternalService.sendPresentToUser(user)
HttpResponse(200)
}
HttpResponse post(HttpRequest request) {
String userId = request.getQueryParam("userId");
Domain.User user = getUserFromDatabase(userId);
ExternalService.sendPresentToUser(user);
return new HttpResponse(200);
}
def post(request: HttpRequest): HttpResponse = {
// nice and quick
val userId = request.getQueryParam("userId")
// 5 millis to 10 seconds?
val user: User = getUserFromDatabase(userId)
// 5 millis to 10 seconds?
ExternalService.sendPresentToUser(user)
HttpResponse(200)
}
private HttpResponse post(HttpRequest request) {
// nice and quick
String userId = request.getQueryParam("userId");
// 5 millis to 10 seconds?
Domain.User user = getUserFromDatabase(userId);
// 5 millis to 10 seconds?
ExternalService.sendPresentToUser(user);
return new HttpResponse(200);
}
def request(request: HttpRequest): Future[HttpResponse] = ???
CompletableFuture<HttpResponse> post(HttpRequest request)
def request(request: HttpRequest): Future[HttpResponse] =
for {
user <- lookupUser(request.getQueryParam("userId"))
_ <- AsyncExternalService.sendPresentToUser(user)
} yield HttpResponse(200)
CompletableFuture<HttpResponse> request(HttpRequest request) {
return wf.lookupUser(request.getQueryParam("id"))
.thenCompose(user -> wf.sendPresentToUser(user))
.thenApply(v -> new HttpResponse(200));
}
path("user" / Segment) { name =>
get {
withRequestTimeout(500.millis) {
val user: Future[Option[User]] = DataAccess.lookupUser(name)
onComplete(user) {
case Success(None) => complete(StatusCodes.NotFound)
case Success(Some(u)) => complete(u)
case Failure(t) => complete(StatusCodes.InternalServerError, t.getMessage)
}
}
}
}
get(() -> {
CompletableFuture<Optional<User>> user =
dataAccess.lookupUser(id);
return onSuccess(user, (Optional<User> opUser) ->
opUser.map(u -> complete(u.serialise()))
.orElse(complete(StatusCodes.NOT_FOUND)));
}
path("user" / Segment) { name =>
get {
withRequestTimeout(500.millis) {
val user: Future[Option[User]] = DataAccess.lookupUser(name)
onComplete(user) {
case Success(None) => complete(StatusCodes.NotFound)
case Success(Some(u)) => complete(u)
case Failure(t) => complete(StatusCodes.InternalServerError, t.getMessage)
}
}
}
}
def lookupUser(userId: UserId): Future[Option[User]] =
session.executeAsync("select * from users where user_id = ?", userId).asScala
.map((rs: ResultSet) => Option(rs.one())
.map(row => User(
row.getString("user_id"),
row.getString("user_name"),
row.getInt("age"))))
CompletableFuture<Optional<User>> lookupUser(String userId) {
CompletableFuture<ResultSet> rs = toCompletableFuture(
session.executeAsync(CQL, userId));
CompletableFuture<Optional<User>> user =
rs.thenApply((ResultSet rSet) ->
Optional.ofNullable(rSet.one())
.map((Row row) -> new User(
row.getString("user_id"),
row.getString("user_name"),
row.getInt("age")))
);
return user;
}
Source.range(0, 20000000);
Flow.fromFunction(n -> n.toString());
Sink.foreach(str -> System.out.println(str));
Source<Integer, NotUsed> source =
Source.range(0, 20000000);
Flow<Integer, String, NotUsed> flow =
Flow.fromFunction(n -> n.toString());
Sink<String, CompletionStage<Done>> sink =
Sink.foreach(str -> System.out.println(str));
RunnableGraph<NotUsed> runnable =
source.via(flow).to(sink);
ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);
runnable.run(materializer);
val source = Source(0 to 20000000)
val flow = Flow[Int].map(_.toString())
val sink = Sink.foreach[String](println(_))
val runnable = source.via(flow).to(sink)
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
runnable.run()
Java:
Source.range(0, 20000000)
.map(Object::toString)
.runForeach(System.out::println, materializer);
Scala:
Source(0 to 20000000)
.map(_.toString)
.runForeach(println)
RunnableGraph<NotUsed> graph = Source.range(0, 20000000)
.map(Object::toString)
.to(Sink.foreach(str -> System.out.println(str)));
ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);
NotUsed n1 = graph.run(materializer);
NotUsed n2 = graph.run(materializer);
val graph = Source(0 to 20000000)
.map(_.toString)
.to(Sink.foreach(println))
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val n1: NotUsed = graph.run()
val n2: NotUsed = graph.run()
Source.range(1, 3)
.map(x -> x + 1)
.map(x -> x * 2)
.to(Sink.reduce((x, y) -> x + y));
Source(1 to 3)
.map(x => x + 1)
.map(x => x * 2)
.to(Sink.reduce[Int](_ + _))
Source.range(1, 3)
.map(x -> x + 1).async()
.map(x -> x * 2)
.to(Sink.reduce((x, y) -> x + y));
Source(1 to 3)
.map(x => x + 1).async
.map(x => x * 2)
.to(Sink.reduce[Int](_ + _))
Source.range(1, 3)
.map(x -> x + 1)
.mapAsync(5, n -> CompletableFuture.completedFuture(n * 2))
.to(Sink.reduce((x, y) -> x + y));
Source(1 to 3)
.map(x => x + 1)
.mapAsync(5)(n => Future.successful(n * 2))
.to(Sink.reduce[Int](_ + _))
Demand is signalled across async boundaries
Thanks to Reactive Streams, across different libraries:
Often also possible across external protocols, i.e. TCP:
Can be seen in e.g. wireshark:
HTTP Client -> TCP -> Server -> HTTP Server -> TCP -> Apache Cassandra
val response: Future[HttpResponse] = Http().singleRequest(
HttpRequest(uri = s"http://localhost:8080/user/tracking/$userId")
)
response.map {
case HttpResponse(StatusCodes.OK, headers, entity, _) =>
val response: Source[ByteString, _] = entity.dataBytes
response.via(Framing.delimiter(
ByteString("\n"), maximumFrameLength = 100))
.map(_.utf8String.parseJson.convertTo[Event])
.mapMaterializedValue(_ => NotUsed)
}
val response: Future[HttpResponse] = Http().singleRequest(
HttpRequest(uri = s"http://localhost:8080/user/tracking/$userId")
)
response.map {
case HttpResponse(StatusCodes.OK, headers, entity, _) =>
val response: Source[ByteString, _] = entity.dataBytes
response.via(Framing.delimiter(
ByteString("\n"), maximumFrameLength = 100))
.map(_.utf8String.parseJson.convertTo[Event])
.mapMaterializedValue(_ => NotUsed)
}
val bound: Future[Http.ServerBinding] =
Http().bindAndHandle(route, "localhost", 8080)
def bindAndHandle(
handler: Flow[HttpRequest, HttpResponse, Any],
interface: String,
port: Int)
val streamingRoute = path("user" / "tracking" / Segment) { name: String =>
val source: Source[Event, NotUsed] =
DataAccess.lookupEvents(name)
val asJson: Source[ByteString, NotUsed] = source.map(e =>
ByteString(s"${e.toJson.toString()}\n",
StandardCharsets.UTF_8))
complete(HttpEntity(ContentTypes.`application/json`, asJson))
}
val streamingRoute = path("user" / "tracking" / Segment) { name: String =>
val source: Source[Event, NotUsed] =
DataAccess.lookupEvents(name)
val asJson: Source[ByteString, NotUsed] = source.map(e =>
ByteString(s"${e.toJson.toString()}\n",
StandardCharsets.UTF_8))
complete(HttpEntity(ContentTypes.`application/json`, asJson))
}
CassandraSource(new SimpleStatement(
"select * from user_tracking where user_id = ?", userId))(session)
.map(row => Event(
row.getString("user_id"),
UUIDs.unixTimestamp(row.getUUID("time")),
row.getString("event")))
Java