Concepts of CQRS

In the previous blog, we have learnt what Event Sourcing is. Moving forward, we will dive into the concepts of CQRS, understanding its meaning, why is it required and its advantages.

CQRS Pattern

CQRS stands for Command Query Responsibility Segregation. As the name suggests, we split the application in two parts: Command-Side and Query-Side. Now one will ask what this command and query means? Okay, so –

  • Commands are the ones that change the state of the object or entity, also called modifiers or mutators.
  • Queries are those that returns the state of the entity, and do not change anything. The another term for that will be accessors.

Why is it required?

In traditional data management systems, both commands and queries are executed against the same set of entities, having a single reprsentation or view. The CRUD operations are applied to a single datastore,  the same entity or object is accessed to handle both read and write operations.

Issues with having single view for both read and write side –

  • Introduces the risks of data contention.
  • Managing permissions and security become complex as same objects are exposed to both read and write operations.

How CQRS solves this problem?

The CQRS pattern holds the idea that the method should either change the state of the entity, or returns the result, but not both. Segregating models for read and write side reduces the complexity that comes with having single view for both of them.

Benefits of CQRS –

  • Separate command and query models, results in simplified design and implementation of the system and overall reduction of complexity.
  • One can easily optimise the read side of the system separately from the write side, allows scaling each differently as per the load on the side. For example – Read datastores often encounters greater load, and hence can be scaled without affecting the write datastores.
  • You can provide multiple views of your data for querying purpose depending on the use cases.

How CQRS works?

CQRS is mainly used in conjuction with Event Sourcing. The write side model of the CQRS-based system handles the events persistence, acting as a source of information for the read side. The read model of the system provides materialized views of the data, typically as highly denormalized views.

Below diagram provides the details of the CQRS based system.

cqrss

How to implement Event Sourcing and CQRS?

If you are using microservices architecture to build your applications and want to utilise the benefits of Event Sourcing and CQRS, various frameworks and platforms are available for developing asynchronous microservices.

1. Eventuate is one such example. It is an application platform, consisting of two products, one of which is Eventuate ES – a microservices framework that implements an event sourcing-based programming and persistence model.

2. Lagom is an open source micro-service framework built on top of Akka and Play framework. Lagom Persistence makes use of Event sourcing and CQRS to help achieve the decoupled architecture.

You can find the real life example of how  Lagom handles Event Sourcing and CQRS in the following blogs :

3. Axon is a lightweight framework that helps developers build scalable and extensible applications based on the CQRS principles. It does so by providing implementations of the most important building blocks, such as aggregates, repositories and event buses.

References

Happy Blogging.! 🙂

 

 

Advertisements

Introduction to Event Sourcing

Have you ever came across the terms Event Sourcing and CQRS? This blog is a good start to gain basic knowledge about the two. Here, we will learn what is Event Sourcing? and what are the benefits of having an event sourced application? CQRS will be covered in the next blog.

What is Event Sourcing?

Event sourcing is the practice of capturing all changes as domain events, which are immutable facts of things that have happened.

Okay, this definition may sound a bit vague. Let’s consider a simple example to understand what actually it is.

Suppose you have an application which offers CRUD facility for some user account. How are you going to manage the state of a user entity at any instant?
In a general scenario, to handle the user state, you will have a database that will be updated for each requests that come to the service.

Read more

Limit which classes a Trait can be mixed into!

A Scala trait is just like a Java interface. A trait can be extended by different classes, just the way we do with Java interfaces and also a class can inherit multiple traits, called mixins.

But what if we want to restrict mixing of traits to a limited number of classes? Do we have a way out for the same?

The answer is YES. We can limit a trait so that it can be mixed into classes that show some specific behavior. There are three ways to do that. Let’s look at each of them one by one.

1. Limiting which class can use a Trait by Inheritance

You can limit a trait so that it will be mixed into classes only if they inherit a particular superclass.

The syntax for the same is :

trait [TraitName] extends [SuperThing]

where TraitName can only be mixed into classes that have SuperThing as their parent class, where SuperThing may be a class, or abstract class. In other words, Trait and the class that wants to mix that trait should have same parent class or trait.

Read more

Amazon ES – Secure your cluster from anonymous users. #2

In the previous blog, we have learned how to create a domain on Amazon ES and also how to create an index using Curl on the cluster. Now, let’s just look how we can control access to Amazon ES Domain.

One of the key benefits of using Amazon ES is that you can utilize AWS Identity and Access Management (IAM) to control access to your search domains. However, if you were to run an unmanaged Elasticsearch cluster on AWS, utilizing IAM to authorize access to your domains would require more effort.

Amazon ES help us to add an authorization layer by integrating with IAM. You can configure your domains so that only trusted users and applications can access them. You write an IAM policy to control access to the cluster’s endpoint and attach that policy to specific IAM entities.

Types of IAM Policy

  1.  Resource-based Policy – A resource-based policy is attached to the Amazon ES domain (accessible through the domain’s console) and enables you to specify which AWS account and which AWS users or roles can access your Amazon ES endpoint. In addition, a resource-based policy lets you specify an IP condition for restricting access based on source IP addresses. This type of policy is attached to an AWS resource, such as an Amazon S3 bucket.
  2.  Identity-based Policy – You can specify which actions an IAM identity can perform against one or more AWS resources, such as an Amazon ES domain or an S3 bucket. This type of policy is attached to an identity, such as an IAM user, group, or role.

Strategies to authenticate Amazon ES requests

  1. On the basis of originating IP address – You can specify an IP Condition. Any call from that IP address will either be allowed access or be denied access to the resource.
  2. On the basis of originating Principal – You are required to include information in every request to your Amazon ES endpoint that AWS can use to authenticate the requesting user or application. This you can accomplish by signing the request using Signature Version 4.

Note: These authentication strategies apply to both types of policies.

The second approach is what we are going to discuss with the example that we have described in this blog. We will be using Resource-based policy to write our IAM policy.

Steps to configure resource-based policy

Step 1: Modify the access policy
Click on the ‘Modify Access Policy‘ button as shown in the image, to change access to your Amazon ES domain.

step11

Step 2: Select a policy template
To allow or block access to the domain, select a policy template from the template selector or add one or more Identity and Access Management (IAM) policy statements.

Screenshot from 2018-04-14 19-42-57

Since we are focussing on Resource-based policy, so we need to specify which AWS account and which AWS users or roles can access our Amazon ES endpoint, so we will be selecting ‘Allow or deny access to one or more AWS accounts or IAM users’ template.

Step 3: Enter the AWS  account details
A pop-up box will appear, asking you to enter a comma-separated list of valid AWS account IDs, AWS account ARNs, or IAM user ARNs to allow or deny access to your amazon ES endpoint, setting the Effect to ALLOW or DENY.
edited

Else, you can directly edit the access policy in the section below.

Screenshot from 2018-04-14 20-03-49

In the preceding screenshot, you can see that the policy is attached to an Amazon ES domain called my-es, which is defined in the Resource section of the policy.  In the Principal section, you can specify the AWS account you want to allow or deny access to, setting the ‘allow’ or ‘deny’ option in Effect.

At last, Click on the Submit button below to activate the policy.

Now, if you try to hit your cluster endpoint or Kibana endpoint from your browser, you will get the following error –

Screenshot from 2018-04-14 22-56-53

Signing Request to Amazon ES endpoint

Now, since you are done with setting up the IAM policy for your Amazon ES domain. The question comes, How are we going to connect with the Amazon ES cluster from our application? How can we send the request to our cluster endpoint to perform actions like create an index, update an index, etc.

The answer is pretty simple. All you need to do is issue a Signature Version signed request to your Amazon ES endpoint.

We have built our application using Scala-sbt. So, first add the following library dependencies in build.sbt

"com.amazonaws" % "aws-java-sdk-core" % "1.11.256"
"vc.inreach.aws" % "aws-signing-request-interceptor" % "0.0.20"

These are added to use classes such as AWSSigner to perform signing of request to the Amazon ES endpoint.

Following piece of code is all that you have to write to create a signed request –

private def performSigningSteps: HttpRequestInterceptor = {

  val awsCredentialsProvider: AWSCredentialsProvider = new AWSStaticCredentialsProvider(
    new BasicAWSCredentials(AWS_ACCESS_ID, AWS_ACCESS_KEY))

  val awsSigner = new AWSSigner(awsCredentialsProvider, REGION, SERVICE_NAME, clock)

  new AWSSigningRequestInterceptor(awsSigner)

}

final val clock = new Supplier[LocalDateTime] {
  override def get(): LocalDateTime = LocalDateTime.now(ZoneOffset.UTC)
}

where AWS_ACCESS_ID and AWS_ACCESS_KEY are the credentials of your AWS account on which you have set up the Amazon ES cluster.

SERVICE_NAME and REGION are set to the ‘es’ and the region where your cluster is being set up like us-east-1.

To be able to add the AWSSigningRequestInterceptor to Java High-Level Rest Client (you can find the complete detail about it here), and thus be able to sign requests to the Amazon Elasticsearch Service, you need to configure the same using setHttpClientConfigCallback method of the client.

lazy val client: RestHighLevelClient = new RestHighLevelClient(
  RestClient.builder(
    new HttpHost(AWS_HOST, 80))
    .setHttpClientConfigCallback(
      (httpClientBuilder: HttpAsyncClientBuilder) =>
        httpClientBuilder.addInterceptorLast(performSigningSteps))
    .build())

Here, AWS_HOST is the endpoint of your Amazon ES cluster, which looks like “search-my-es-xxxxxxxxxx.us-east-1.es.amazonaws.com”. Amazon ES service runs on port 80.

It’s done. You are good to go. You can create an index, perform operations like update, delete, search and many more very easily.

For viewing data which is inserted in Elasticsearch, either you can check the count of documents at AWS ES cluster ‘indices’ option or you can use proxy AWS ES Kibana. For running proxy AWS Kibana, use the following commands:

Install the npm module

sudo apt install npm
sudo npm install -g aws-es-kibana

Set AWS credentials – Use the AWS Access ID and Key.

export AWS_ACCESS_KEY_ID=XXXXXXXXXXXXXX
export AWS_SECRET_ACCESS_KEY=XXXXXXXXXXXXXX

Run the proxy

aws-es-kibana Search-my-es-xxxxxxxxxxxxxxx.us-east-1.es.amazonaws.com

You can find the complete demo code here.

References :

Hope you enjoyed reading the blog! 🙂

Amazon ES – setting up the cluster! #1

Amazon Web Services (AWS) is a cloud services platform, providing compute power, database storage, content delivery, security options and other functionality to allow businesses to build sophisticated applications with increased flexibility, scalability and reliability. Amazon Elasticsearch is one of the services provided by AWS.

Amazon Elasticsearch Service, also called Amazon ES, is a managed service that makes it easy to create a domain, deploy, operate, and scale Elasticsearch clusters in the AWS Cloud. Elasticsearch is an open-source, highly scalable full-text search and analytics engine. In the previous blog, we have discussed how to access its API specific methods using High-level Rest Client.

With Amazon ES, you get direct access to the Elasticsearch APIs so that existing code and applications work seamlessly with the service.

Amazon ES offers a number of benefits :

  • Cluster scaling options
  • Supports Open-Source APIs and Tools
  • Self-healing clusters
  • Replication for high availability
  • Data durability
  • Enhanced security
  • Node monitoring
  • Tightly Integrated with Other AWS Services

Getting Started with Amazon Elasticsearch Service

Here are the steps to get started with Amazon ES and creating your own Elasticsearch cluster.

Step 1: Sign up for an AWS account.
If you are not an AWS user, your first step is to create an AWS account. If you already have one, you are automatically signed up for Amazon ES.
Your AWS account enables you to access Amazon ES and other AWS services. As with other AWS services, you pay only for the resources that you use.
Look for the Elasticsearch service in the search box.

Screenshot from 2018-03-31 18-17-04

Step 2: Accessing Amazon Elasticsearch Service.
You can access Amazon ES through the Amazon ES console, the AWS SDKs, or the AWS CLI. The Amazon ES console lets you create, configure, and monitor your domains. It is the easiest way to get started with Amazon ES.

Step 3: Create an Amazon ES domain.
To get started using the service, you need to create an Amazon ES domain.

Screenshot from 2018-03-31 18-21-08

An Amazon ES domain is an Elasticsearch cluster in the AWS Cloud that has the compute and storage resources that you specify. For example, you can specify the number of instances, instance types, and storage options.

Step 4: Define domain.
A domain is a collection of all the resources needed to run your Elasticsearch cluster.

Screenshot from 2018-03-31 18-28-35

Step 5: Configure cluster.
You need to configure the instance and storage settings for your cluster based on the traffic, data, and availability requirements of your application. A cluster is a collection of one or more data nodes, optionally dedicated master nodes, and storage required to run Elasticsearch and operate your domain.

Screenshot from 2018-03-31 18-33-08

Screenshot from 2018-03-31 18-34-31

Step 6: Set up access.
Next step is to configure your network and attach policies that let you control access to your domain.

Screenshot from 2018-03-31 18-35-57

Step 7: Done.
Just Review and You are ready to go.

Screenshot from 2018-03-31 18-39-39

Screenshot from 2018-03-31 18-40-28

It takes up to ten minutes to initialize new domains. After your domain is initialized, you can upload data and make changes to the domain.

Because Elasticsearch uses a REST API to perform operations like index, update, search, get, delete, and many more. You can use standard clients like curl or any programming language that can send HTTP requests to your ES cluster.

A simple example to create an Index using Curl command on your Elasticsearch cluster :

curl -XPOST elasticsearch_domain_endpoint/user-index/user -d '{"id" : "1", "name": "user1", "age" : 20}' -H 'Content-Type: application/json'

where, elasticsearch_domain_endpoint is the endpoint of your cluster.

To check whether the document exists or not, you can perform the following curl operation :

curl -XGET elasticsearch_domain_endpoint/user-index/_search?pretty

Also, you can click on the Indices tab, to view all the indices created on your cluster,  documents count and its mapping details.

Screenshot from 2018-04-15 16-22-10

Apart from that, you can use the Kibana endpoint. Kibana enables visual exploration and real-time analysis of your data in Elasticsearch. You can learn about it here.

In the next blog, we will look how we can authenticate access to Amazon ES cluster and how we can interact with it programmatically.

For more details on Amazon Elasticsearch Service, check its documentation here.

Happy Blogging.!! 🙂

Java High-Level REST Client – Elasticsearch

Elasticsearch is a open-source, highly scalable full-text search and analytics engine. Using this, you can easily store, search, and analyze large amount of data in real time. Java REST client is the official client for elasticsearch which comes in 2 flavours :

  1.  Java Low-Level REST client – It allows to communicate with an Elasticsearch cluster through http. Leaves requests marshalling and responses un-marshalling to users.
  2.  Java High-Level REST client – It is based on low-level client and exposes API specific methods, taking care of requests marshalling and responses un-marshalling.

Our focus here will be to learn about High-Level REST client. I hope you are clear with the basics of Elasticsearch, if not, you can go through its documentation here.

Introduction

The Java High-Level REST client works on top of Java Low-Level REST client. It is forward compatible.
It allows one to use API specific methods, that accept request objects as an argument and return response objects. Serialization & Deserialization of request & response objects is handled by the client itself.
Each API can be called either synchronously or asynchronously.

Let’s discuss how we can use High-level REST client in our application built using scala-sbt.

Following is the dependency you need to add to build.sbt for using the client :

"org.elasticsearch.client" % "elasticsearch-rest-high-level-client" % "6.1.2"

Since, High-level REST client depends on elasticsearch core, so don’t forget to add elasticsearch core dependecy.

"org.elasticsearch" % "elasticsearch" % "6.1.2"

Initialization

The REST High-level client instance can be build as follows:

val client = new RestHighLevelClient(
  RestClient.builder(new HttpHost(HOST, PORT, "http")))

Here, you can replace the HOST with IP address on which elasticsearch is running. And, 9200 is the port to send REST requests to that node.

The Java High-Level REST Client supports the various APIs. Index, Update, Search, Get, Delete, Bulk are some of those APIs and there are many more.

CRUD & Search Operations

With the help of REST client, we can perform CRUD (Create, Read, Update, and Delete) and search operations  against our indexes . Let’s just have a quick discussion on these features.

How to index a document in elasticsearch?

To insert a document, first we need to create an IndexRequest which requires index, type and document Id as arguments. After that, document source should be provided with the request in JSON and other supported formats. Example for the same is given here :

val request = new IndexRequest(index_name, type_name, id)
request.source(jsonString, XContentType.JSON)

jsonString refers to the data you want to insert in elasticsearch.
And, then you can execute the request with the help of client you have created before.

client.index(request)

Update an existing document

For updating the document, you need to prepare an UpdateRequest passing index, type and id as arguments and then using a script or  a partial document for updation. And, then executing the update request through client.

val updateRequest = new UpdateRequest(index_name, type_name, id)

val builder = XContentFactory.jsonBuilder
builder.startObject
builder.field(fieldName, value)
builder.endObject

updateRequest.doc(builder)
client.update(updateRequest)

Delete operation

Deleting a document just requires two lines of code. First, create a DeleteRequest and then execute it via the REST client.

val deleteRequest = new DeleteRequest(index_name, type_name, id)
client.delete(deleteRequest)

Deleting the index is also a simple task. Following is the example for that :

val request = new DeleteIndexRequest(index_name)
client.indices().deleteIndex(request)

Search documents

The SearchRequest is used for any operation that has to do with searching documents, aggregations, suggestions and also offers ways of requesting highlighting on the resulting documents.
First, create a SearchRequest passing the index name as the argument.

val searchRequest = new SearchRequest(index_name)

After that, SearchSourceBuilder needs to be created, adding with it the query you may want to execute.

val searchSourceBuilder = new SearchSourceBuilder
searchSourceBuilder.query(QueryBuilders.matchAllQuery())
searchRequest.source(searchSourceBuilder)

Lastly, executing the SearchRequest through REST client.

client.search(searchRequest)

There are several other operations you can execute via High-Level REST client. Also, You can use Kibana to search, view, and interact with data stored in Elasticsearch indices. For understanding kibana, you can go through the following documentation.

The complete demo code is available here. You can check README.md file for instructions to run the application.

References:

Hope you enjoyed reading this blog!

 

Persistent Read Side in Lagom

Here in this blog, we will be discussing how we can query the lagom’s microservices for  retrieving data. I hope you are clear with persistent entity concepts in lagom, if not, you can take a quick overview by going through this blog.

Lagom handles data persistence by using ‘Persistent Entity’ which holds the state of individual entities, but to interact with them one must know the identifier of the entity. Hence, Lagom provides support to build read-side view of the persistent data which can be used for querying purpose.
And, this separation of the write-side and the read-side of the persistent data is often referred to as the CQRS (Command Query Responsibility Segregation) pattern.

Read-Side Processor

Read side can be implemented using any database. For now, we will be using Cassandra to understand its concepts.

One thing to keep in mind is that read side should only be updated in response to events received from persistent entities.
This is done by building a ReadSideProcessor which will transform the events generated by the Persistent Entities into database tables that can be queried, and at the same time it keeps track of which events it has handled by using offsets.

Each event produced by a persistent entity has an offset. When a read-side processor first starts, it loads the offset of the last event that is processed, and whenever an event is processed, it stores its offset.
Read more

Data Persistence in Lagom

Are you finding it difficult to understand lagom persistence? Don’t worry because help is right here.
In this blog, we will learn about lagom’s persistence with the help of a simple application and also discuss its theoretical aspects.
Before we begin, make sure you know about Event Sourcing and CQRS. You can read about it in details from this link .

Choosing a database

When we create any microservice, or in general any service, one of the biggest task is to manage data persistence. Lagom supports various databases for doing this task. By default, Lagom uses Cassandra to persist data. Tables, required to store data, are saved in cassandra keyspaces.
So, For now, we will be using Cassandra for storing our data. Our service basically creates a user on request and store the correspondng details in the database.

To use Cassandra, you need to add the following in your project’s build.sbt:

libraryDependencies += lagomScaladslPersistenceCassandra

Lagom requires keyspace configuration for three internal components – Journal, snapshot and offset.
Journal stores serialized events, Snapshots are automatically saved after a configured number of persisted events for faster recovery and Offset store provides read-side support.

Each microservice should have a unique keyspace name so that the tables of different services do not conflict with each other. However, You can use same keyspace for all of these components within one service.
To configure keyspace names, you need to add the following in your service implementations’ application.conf file:

play.application.loader = com.knoldus.user.impl.service.UserServiceLoader

user.cassandra.keyspace = userdatabase

cassandra-journal.keyspace = ${user.cassandra.keyspace}
cassandra-snapshot-store.keyspace = ${user.cassandra.keyspace}
lagom.persistence.read-side.cassandra.keyspace = ${user.cassandra.keyspace}

To check whether the events are actually being persisted or not in cassandra, you can disable the embedded Cassandra server by adding the following in your build.sbt and can use the external cassandra running on your local host.

lagomCassandraEnabled in ThisBuild := false
lagomUnmanagedServices in ThisBuild := Map("cas_native" -> "http://localhost:9042")

Persistent Entity

Lagom’s persistence can be handled by defining ‘PersistentEntity‘. Each instance of the entity has a stable entity identifier through which it can be accessed from service implementation or anywhere in the cluster. It is run by an actor and the state is durable using event sourcing.
To use lagom persistence, you need to define an entity class, that should extend PersistentEntity abstract class and override the abstract type members and method.

class UserEntity extends PersistentEntity

Three absrtact type members – Command, Event and State must be defined by the subclass.

override type Command = UserCommand[_]
override type Event = UserEvent
override type State = UserState

1. Command

You can interact with PersistentEntity by sending command messages to it. Commands are instructions to do something, like create user account, fetch user details, etc. Each command must implement the PersistentEntity.ReplyType interface to define reply type. Here is an example for how you need to define the commands for your application.

trait UserCommand[R] extends ReplyType[R]

case class CreateUserCommand(user: User) extends UserCommand[Done]

2. Event

A command may cause changes to the entity state, and those changes are stored as events. Events are the immutable facts of things that have happened like Account created or updated. Example for defining the event is given below.

sealed trait UserEvent extends AggregateEvent[UserEvent] {
  override def aggregateTag: AggregateEventTagger[UserEvent] = UserEvent.Tag
}
object UserEvent {
  val Tag: AggregateEventTag[UserEvent] = AggregateEventTag[UserEvent]
}
case class UserCreated(user: User) extends UserEvent

3. State

And, State is the condition that entity is in at specific instance. Events are replayed to recreate the current state of an entity. Below is an example to define the state. You can modify it according to your requirements.

case class UserState(user: Option[User], timeStamp: String)

4. InitialState

Your entity class should also implement abstract method ‘initialState‘ which defines the state of the entity when it is created for the first time.

override def initialState = UserState(None, LocalDateTime.now().toString)

5. Behavior

Another method that your concrete subclass should implement is ‘behavior‘. The behavior is defined as a set of actions or functions.

override def behavior: (UserState) => Actions
  • Command Handlers

To process commands, Command handlers are registered using ‘onCommand’ of the Actions. A command handler is a partial function with 3 parameters – Command, CommandContext(ctx) and current State. A command handler returns a Persist directive that defines what event or events, if any, to persist. thenPersist, thenPersistAll or done methods are used to create the Persist directive.

.onCommand[CreateUserCommand, Done] {
  case (CreateUserCommand(user), ctx, _) ⇒
    ctx.thenPersist(UserCreated(user))(_ ⇒ ctx.reply(Done))
}

A PersistentEntity may also process commands that do not change application state, such as query commands. Such command handlers are registered using ‘onReadOnlyCommand’.

.onReadOnlyCommand[GetUserCommand, User] {
   case (GetUserCommand(id), ctx, state) =>
    ctx.reply(state.user.getOrElse(User(id, "not found")))
}
  • Event Handlers

Event handlers are used both for persisting and replaying events. These are registered with the ‘onEvent’ method of the Actions. When an event has been persisted successfully, the current state is updated.

.onEvent {
  case (UserCreated(user), _) ⇒
    UserState(Some(user), LocalDateTime.now().toString)
}

A reply is sent with the ctx.reply method and reply message type must match the ReplyType defined by the command. It will be an acknowledgment that the entity has processed the command successfully.
You can use ctx.invalidCommand to reject an invalid command, which will fail the Future with PersistentEntity.InvalidCommandException on the sender side.

Here is the complete snapshot of UserEntity class:

class UserEntity extends PersistentEntity {

  override type Command = UserCommand[_]
  override type Event = UserEvent
  override type State = UserState

  override def initialState = UserState(None, LocalDateTime.now().toString)

  override def behavior: (UserState) => Actions = {
    case UserState(_, _) => Actions()
      .onCommand[CreateUserCommand, Done] {
      case (CreateUserCommand(user), ctx, _) ⇒
        ctx.thenPersist(UserCreated(user))(_ ⇒ ctx.reply(Done))
    }
      .onReadOnlyCommand[GetUserCommand, User] {
       case (GetUserCommand(id), ctx, state) =>
        ctx.reply(state.user.getOrElse(User(id, "not found")))
    }
      .onEvent {
        case (UserCreated(user), _) ⇒
          UserState(Some(user), LocalDateTime.now().toString)
      }
  }

}

Finally, to access the entity, you need to inject the PersistentEntityRegistry in your service implementation class.

class UserServiceImpl(persistentEntityRegistry: PersistentEntityRegistry)

And also, you need to register the persistent entity and Json serailizer registry in your application loader.

//Register the JSON serializer registry
override lazy val jsonSerializerRegistry = UserSerializerRegistry

// Register the lagom-persistent-entity-demo persistent entity
persistentEntityRegistry.register(wire[UserEntity])

In the service method you can retrieve a PersistentEntityRef for a given entity identifier from the registry. In the user application, a separate method for retrieving the Ref is being created.

def ref(id: String): PersistentEntityRef[UserCommand[_]] = {
  persistentEntityRegistry.refFor[UserEntity](id)
}

Then you can send the command to the entity using the ask method of the PersistentEntityRef. It returns a Future with the reply message. An example for that will be sending CreateUserCommand to the user entity.

ref(user.id).ask(CreateUserCommand(user))

And this is how Lagom’s persistence helps in managing data for your service.

Now, when you will run the application, you can see in cassandra query language shell(cqlsh) that keyspace named ‘userdatabase’ will get created having 4 tables – messages, config, snapshots and metadata. Events are actually persisted in messages table.
The complete demo code is available here. You can check README.md file for instructions to run the application.

References:

Hope you enjoyed reading this blog !!