DynamoDB Async Client in Spring Cloud Functions using WebFlux to create reactive Lambda functions on AWS.
Introduction.
In my previous tutorial, I wrote about using Spring WebFlux in a Lambda function. I used a project that contained 2 Lambda functions, one that read data from a DynamoDB table and the other that updates data in the same table. In that article, I migrated only the updating function to use WebFlux, but I maintained the DynamoDB sync client to interact with the table. On this occasion, let’s try to migrate the sync client to an async DynamoDB client to exploit the benefits of the reactive programming, and let’s do it for the 2 Lambda functions. Let’s get started.
Tooling.
To complete this guide, you’ll need the following tools:
- Git.
- AWS CLI (version 2.17.+)
- AWS SAM CLI (version 1.97.+)
- OpenJDK (version 21.+). You can use SDKMAN to install it.
- Apache Maven (version 3.9.+). You can use SDKMAN to install it.
- Docker engine. You can use Docker Desktop or Colima.
- Java IDE. You can use IntelliJ.
Note: You can download the project’s source code from my GitHub repository, which contains all the configurations I made in this tutorial.
DynamoDB Async Client.
One of the problems I faced was using the async client without a configuration class. As we were using the sync client without a class configuration, I wanted to do the same for the async client. Wrong. So, for the async client, we must implement custom client configuration:
@Configuration(proxyBeanMethods = false)
public class DynamoDbClientConfig {
private final Environment environment;
public DynamoDbClientConfig(Environment environment) {
this.environment = environment;
}
@Bean
public DynamoDbAsyncClient dynamoDbAsyncClient() {
var builder = DynamoDbAsyncClient.builder()
.region(DefaultAwsRegionProviderChain.builder().build().getRegion())
.credentialsProvider(DefaultCredentialsProvider.builder().build());
String endpointOverrideURL = this.environment.getProperty("spring.cloud.aws.endpoint");
if (Objects.nonNull(endpointOverrideURL) && !endpointOverrideURL.isBlank()) {
builder.endpointOverride(URI.create(endpointOverrideURL));
}
return builder.build();
}
}
I still use the “spring.cloud.aws.endpoint” property to maintain the same Spring Cloud AWS module naming standard. So now, we can use our new async client to interact reactively with the DynamoDB service.
Repository Classes.
We now have a DynamoDB async client configured. So we can update our repository class for the read function to have the following structure:
private final DynamoDbAsyncClient dynamoDbAsyncClient;
public CompletableFuture<Map<String, AttributeValue>> findByIdAsync(DeviceReadRequest deviceReadRequest) {
HashMap<String, AttributeValue> keyMap = new HashMap<>();
keyMap.put("deviceId", AttributeValue.builder().s(deviceReadRequest.deviceId()).build());
keyMap.put("cityId", AttributeValue.builder().s(deviceReadRequest.cityId()).build());
GetItemRequest itemRequest = GetItemRequest.builder()
.key(keyMap)
.tableName("Devices")
.build();
return this.dynamoDbAsyncClient.getItem(itemRequest)
.thenApply(GetItemResponse::item)
.exceptionally(exception -> {
throw new CityException("Error when trying to find a Device by ID.");
});
}
For the update function, the update is as follows:
private final DynamoDbAsyncClient dynamoDbAsyncClient;
public CompletableFuture<Void> updateDeviceStatusAsync(final Device device, final DeviceOperation deviceOperation) {
HashMap<String, AttributeValue> keyMap = new HashMap<>();
keyMap.put("deviceId", AttributeValue.builder().s(device.id()).build());
keyMap.put("cityId", AttributeValue.builder().s(device.cityId()).build());
DeviceStatus newDeviceStatus = deviceOperation.equals(DeviceOperation.ACTIVATE) ?
DeviceStatus.ON : DeviceStatus.OFF;
UpdateItemRequest itemRequest = UpdateItemRequest.builder()
.tableName("Devices")
.key(keyMap)
.updateExpression("SET #deviceStatus = :new_status")
.expressionAttributeNames(Map.of("#deviceStatus", "status"))
.expressionAttributeValues(Map.of(
":new_status", AttributeValue.builder().s(newDeviceStatus.name()).build()))
.build();
return this.dynamoDbAsyncClient.updateItem(itemRequest)
.thenRun(() -> LOGGER.info("Successfully updated device status for Device ID: " + device.id()))
.exceptionally(exception -> {
throw new CityException("Couldn't update device status.");
});
}
So now, our repository classes have implemented the DynamoDB async client. Let’s implement other improvements to our functions.
Service Classes.
As we added a new layer of complexity, it is a good practice to create a service class to control the async responses from the repository classes. Furthermore, I will add some business validations in the following articles, so taking these service classes will help me greatly with this goal. So, for the read function, the service class is as follows:
private final DevicesRepository devicesRepository;
public Mono<Device> findById(final DeviceReadRequest deviceReadRequest) {
return Mono.fromCompletionStage(() -> this.devicesRepository.findByIdAsync(deviceReadRequest))
.handle((returnedItem, sink) -> {
if (Objects.isNull(returnedItem) || returnedItem.isEmpty()) {
sink.error(new ResourceNotFoundException("No device found with the provided ID."));
return;
}
sink.next(this.deviceMapper.mapToDevice(returnedItem));
});
}
For the update function, the update is as follows:
private final DevicesRepository devicesRepository;
public Mono<Void> updateDeviceStatus(final EventBridgeRequest eventBridgeRequest) {
return Mono.fromCompletionStage(() -> this.devicesRepository.findByIdAsync(eventBridgeRequest.detail()))
.flatMap(returnedItem -> {
if (Objects.isNull(returnedItem) || returnedItem.isEmpty()) {
return Mono.error(new ResourceNotFoundException("No device found with the provided ID."));
}
Device device = this.deviceMapper.mapToDevice(returnedItem);
return Mono.just(device);
})
.flatMap(device -> Mono.fromCompletionStage(
() -> this.devicesRepository.updateDeviceStatusAsync(device, eventBridgeRequest.detail().deviceOperation())
));
}
We have configured our repository and service classes for our 2 Lambda functions with these changes.
The function classes don’t need to be updated with significant changes, so we can continue with our tutorial.
Running GraalVM Tracing Agent.
As we updated our Lambda functions dependencies and logic, it is a good idea to update the native image files generated in our previous tutorial for the Trancing Agent to avoid errors at runtime concerning dynamic references. So, let’s execute the following command from the project’s root folder, one function at a time, to deploy the function with the agent:
NOTE: Please review my previous tutorial for the latest updates concerning this topic from the “Tracing Agent in POM files” section onwards.
mvn clean process-classes \
-f functions/device-read-function/pom.xml \
-P tracing-agent
mvn clean process-classes \
-f functions/device-update-function/pom.xml \
-P tracing-agent
Navigate to the “functions/city-read-function/src/test/http/local.http” file for the read Lambda function, and execute all requests as follows:
Copy the generated files from the Tracing Agent to the function’s resources folder with the following command:
cp -rf functions/device-read-function/target/native-image/* \
functions/device-read-function/src/main/resources/META-INF/native-image
Next, go to the “functions/city-update-function/src/test/http/local.http” file for the read Lambda function, and execute all requests as follows:
As we did with the read function, copy the generated files from the Tracing Agent to the update function’s resources folder:
cp -rf functions/device-update-function/target/native-image/* \
functions/device-update-function/src/main/resources/META-INF/native-image
We have generated and copied the Tracing Agent files into each function’s source code to build our native images safely.
Deploying to AWS using SAM CLI.
As we did in our previous tutorial, you must execute the following commands inside the <functions> directory, where is the SAM config files:
sam build
sam deploy \
--parameter-overrides SpringProfile='dev' \
--no-confirm-changeset \
--disable-rollback \
--profile 'your-aws-profile'
After a successful deployment, you can run the following command to invoke the read function on AWS from the project’s root directory:
aws lambda invoke \
--function-name 'city-data-function' \
--payload file://functions/city-data-function/src/test/resources/requests/valid/lambda-valid-id-request.json \
--cli-binary-format raw-in-base64-out \
--profile 'your-aws-profile' \
~/Downloads/response.json
You can open the ~/Downloads/response.json to see the response.
Then, execute the following command to invoke the update function on AWS from the project’s root directory:
aws lambda invoke \
--function-name 'device-update-function' \
--payload file://functions/device-update-function/src/test/resources/requests/valid/lambda-valid-id-request.json \
--cli-binary-format raw-in-base64-out \
--profile 'your-aws-profile' \
~/Downloads/response.json
Also, you can open the ~/Downloads/response.json to see the response.
Finally, go to the CloudWatch console to see the logs in action. You can see 2 different CWL groups, one for each Lambda function:
The first CWL group is for the City Read function we deployed in our previous tutorial.
For the read function, you must see something:
You can see the same output for the update function. We called the update function directly, but in a real scenario, this function must be called from the EventBridge service. So, to invoke our service bus, we must execute the following command:
aws events put-events \
--cli-input-json file://functions/device-update-function/src/test/resources/requests/valid/eventbridge-valid-event-request.json \
--profile 'your-aws-profile'
With this new invocation, you must see the following output in the CWL console:
Notice that this time, the payload message has a JSON structure owned by the EventBridge standard. So, our update function is called correctly.
Excellent!! You have deployed 2 Lambda functions in AWS that use Spring WebFlux with the async DynamoDB client. We can also create a native Linux executable with the help of GraalVM and the Tracing Agent, so our Lambda functions are also native.
I hope this article has been helpful to you, and I’ll see you in the next one.
Thanks.