Redis Streams for Java

Streaming data is both one of the most valuable, and one of the most challenging, data sources to use. Efficiently processing and analyzing these continuous data streams requires an equally efficient solution, such as the Redis Streams data type.

Redis is an open-source, in-memory data structure store used to implement NoSQL key-value databases, with many powerful data types and constructs. But what if you want to enjoy access to Redis Streams while using a programming language such as Java? In this article, we'll discuss how you can use Redis Streams for Java thanks to Redisson—a Redis Java client.

What is Redis Streams?

Redis Streams is a new Redis feature and data structure that was first released in Redis 5.0. The Redis Streams data type has been optimized for working with large quantities of streaming data. The benefits of Redis Streams include:

  • Managing the amount of data consumed.
  • Sending asynchronous messages between producers and consumers.
  • Persisting data even when consumers are offline.

Essentially, Redis Streams are lists that can append data only to the end of the structure. Each data point consists of a unique ID containing a timestamp, as well as the corresponding data. Although they superficially resemble the publish/subscribe messaging pattern, which is also implemented in Redis, Redis Streams have a distinct syntax and incorporate additional functionality.

In particular, one very important feature of Redis Streams is the consumer group. A consumer group in Redis Streams is a data structure with a separate list for each consumer, helping to parallelize and scale your consumption of messages.

Redis Streams for Java with Redisson

Unfortunately, Redis isn't automatically compatible with Java out of the box, which is why many Java developers use clients such as Redisson. Redisson is a third-party Redis Java client that includes many implementations of familiar Java distributed objects, collections, and constructs. So how do you go about using Redis Streams in Java and Redisson?

Installing Redisson

If you use the Maven build automation tool, installing Redisson is easy. Simply insert the following code into your pom.xml file:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.0</version>
</dependency>

To make sure that Redisson is up-to-date, check the latest version of the dependency on the Maven website.

Creating a Redis client

The next step is to create a Redis client with Redisson. The following line of code is the simplest way to do so:

RedissonClient client = Redisson.create();

By default, this will create a Redis client running on port 6379. You can also specify various settings and configurations and pass them to the create() method. The code below also creates a Redis client on port 6379, while demonstrating the use of a configuration object:

Config config = new Config();
config.useSingleServer().setAddress("127.0.0.1:6379");
 
RedissonClient client = Redisson.create(config);

Adding data to Redis Streams

Once you've created a Redis client, you're free to add data to, and consume data from, Redis Streams. Below is an example of how to add data to Redis Streams:

Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");

RedissonClient redisson = Redisson.create(config);

RStream<String, String> stream = redisson.getStream("sensor#4921");

stream.add("speed", "19");
stream.add("velocity", "39%");
stream.add("temperature", "10C");

redisson.shutdown();

The getStream() method retrieves a stream instance by name. In this case, the stream that we will be adding to is called "sensor#4921".

The RStream<String, String> syntax indicates that the given stream maps Redis strings to Redis strings. We can confirm this by examining the three calls to the add() method. The first argument to the add() method specifies the entry's key, and the second argument specifies its value. In this case, both arguments are strings corresponding to readings from a weather instrument.

Consuming data from Redis Streams

In addition to adding data to Redis Streams, we also want to consume data from them. Below is an example of how to consume data from Redis Streams:

Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");

RedissonClient redisson = Redisson.create(config);

RStream<String, String> stream = redisson.getStream("sensor#4921");

stream.createGroup("sensors_data", StreamMessageId.ALL);

Map<StreamMessageId, Map<String, String>> messages = stream.readGroup("sensors_data", "consumer_1");
for (Map.Entry<StreamMessageId, Map<String, String>> entry : messages.entrySet()) {
  Map<String, String> msg = entry.getValue();
  System.out.println(msg);

  stream.ack("sensors_data", entry.getKey());
}

redisson.shutdown();

As before, we specify the given stream using the getStream() method. The createGroup() method creates a new consumer group called "sensors_data". The StreamMessageId.ALL argument specifies that we want to consume all messages, whether they were sent before or after the group was created.

Next, we retrieve the messages using the readGroup() method, specifying the group name and the consumer name. We then use a for loop to process each message, storing it in a map data structure. Finally, the ack() method (short for "acknowledgement") designates that each message has been correctly processed.

Similar articles