Java Infinite Streams

Edwin DalorzoThe following examples make extensive use of the new features in Java 8 to implement the concept of a stream: a classical data structure similar to a list that uses lazy evaluation. This is not exactly how they have defined the new streams API in Java 8 (you can find other articles I have written about this at the end), but for me it worked as an excellent exercise to understand important concepts like lazy evaluation, the creation of closures through lambda expressions and the value of the new features like default methods and static methods in interfaces. The concept is based on the same idea of a stream developed in Lisp by Abelson and Gerald and Julie Sussman in their book Structure and Interpretation of Computer Programs.

In this article I will develop several examples of infinite streams with Java, particularly an infinite stream of natural numbers, and on top of that, an infinite stream of prime numbers using the sieve of Eratosthenes and finally an infinite stream of Fibonacci numbers.

The Stream Type

Let’s start by defining an interface for our streams. I will base my design on a classical definition of an algebraic data type for lists. A list can be conceived as a simple node composed of two items: the first one contains a value, and the second one contains a reference that points to the next node. When a given list does not have any more nodes, then the second item can point to null or to another object that represents the absence of a value (i.e. an object representing an empty list). This simple definition works fine for lists, but not for streams, because in a stream we count on the lazy evaluation of the nodes: that is, a node is not evaluated until the program actually requires it. So, in this case we introduce a variant so that the second element of the node is a parameterless closure (a.k.a. thunk) that when evaluated would provide a reference to the next node. In other words, the next node is not created until required. Let’s start with something simple and we’ll make it more complex as we go.

public interface Stream<T> {
    public T head();
    public Stream<T> tail();
    public boolean isEmpty();
}

Clearly, by invoking the method head(), we can gain access to the value being held within a given node, and by invoking tail() we can trigger the evaluation of the next node, which is itself another Stream node. Now, the realization of the fact that a stream is nothing but a simple cell with two items is of paramount importance in order to understand the evolution of this concept in the coming paragraphs.

Stream Implementation

Let’s now delve into the implementation of a couple of stream nodes. There are basically two type of stream nodes: the ones that contain nothing, and the ones that contain something. It is better to start with the simpler of them. The following is the definition of what we call an empty stream node (Null or Nil are also common names for this type of node):

public class Empty<T> implements Stream<T> {
   public T head() {
      throw new UnsupportedOperationException("Empty stream");
   }
   public Stream<T> tail() {
      throw new UnsupportedOperationException("Empty stream");
   }
   public boolean isEmpty() { return true; }
}

This node contains nothing, so clearly the empty stream node is just a placeholder to indicate the absence of a value. It can be useful to indicate the end of a non-empty stream as we’ll see later.

Let’s now proceed to the definition of a non-empty stream node, which is where the magic really happens:

public class Cons<T> implements Stream<T>{

        private final T head;
        //stream thunk
        private final Supplier<Stream<T>> tail;

        public Cons(T head, Supplier<Stream<T>> tail) {
            this.head = head;
            this.tail = tail;
        }

        public T head() {
            return this.head;
        }

        public Stream<T> tail() {
            //triggers closure evaluation
            return this.tail.get();
        }

        public boolean isEmpty() {
            return false;
        }
}

The important thing to notice here is that we do not pass a stream to be assigned directly to the tail of our Cons cell, instead we pass a Supplier. This interface Supplier is one of the new functional interfaces in Java 8 and it is really appropriate for our definition here because it only contains a nullary method named get() that can return any parameterized value that we want. So we can use a lambda expression to implement this interface on the fly, this would be a closure to generate the next stream node. This is where the lazy evaluation magic can happen by means of using this parameterless closure to define the next stream cell. Notice as well how the method tail() triggers the evaluation of the closure and generates a new stream cell.

An Infinite Stream of Numbers

Let’s see how this can be used by writing a small piece of code that creates an infinite stream of numbers starting from n:

public static Stream<Integer> from(int n) {
   return new Cons<>(n, () -> from(n+1));
}

As we promised, the stream node is composed by two values. The first one is the value of n, and the second one is a thunk or parameterless closure which, interestingly, when evaluated invokes the method again to generate yet another stream node, but this time composed by the value of n+1, and another thunk to the next node, and so on, and so on.  Here my definition deviated a bit from Abelson and Sussman, who would have returned a supplier closure encapsulating the initial stream node. For simplicity, I return the initial stream cell instead.

The important realization here is the fact that the thunk will not be evaluated in the moment we invoke the from method, but until we eventually call the tail method of a stream cell returned by our from method here. That is why I left a comment in the tail method above saying “triggers closure evaluation”. So we can see how the closure here is helping us to implement the lazy evaluation feature of the stream by delaying the time when the new stream cell is created. Now our stream created by the from method above is infinite, conceptually speaking. Therefore if we try to evaluate it, we could loop forever (if it weren’t because of the limitations in memory resources or the fact that integers in Java are bound to 31 bits (1 bit is reserved for the sign) and that they wrap around when an overflow occurs).

A Finite Stream Based on a Predicate

Now the question is, how can we possibly extract values from this stream? The problem is that if we try to extract anything from an infinite stream in any kind of loop, we could go forever. So, what we need is a way to limit the number of elements in the stream to a discrete amount. One way to do this is to create another stream, but this time a finite stream, based on the first one, and that it only contains a discrete amount of elements from the infinite stream. We could define the number of elements that we want based on a predicate. And this is quite appropriate, because one of the new functional interfaces in Java 8 is precisely Predicate. Even better, now that Java has extension methods, and now that interfaces support the definition of static methods, we could define the following methods in the stream interface itself:

public interface Stream<T> {
   //…
   public default Stream<T> takeWhile(Predicate<? super T>) {
       return takeWhile(this, predicate);
   }

   public static <T>  Stream<T> takeWhile(Stream<? extends T> source,
                                          Predicate<? super T> predicate) {
       if(source.isEmpty() || !predicate.test(source.head())) {
           return new Empty<>();
       }
       //creates new cons cell and a closure for the rest
       return new Cons<>(source.head(),
                         () -> takeWhile(source.tail(), predicate));
   }
}

Now I have to admit that I defined these methods here because I thought it would be a good way to demonstrate these new features in Java 8, but I have not pondered the implications of my design. That been said, I also believe that having defined these methods directly in the interface avoids the need to define an implementation for them in the Empty and Cons classes themselves, since they can consume this default implementation by inheritance.

Notice how the method takeWhile creates a new stream, but as you can see, in the moment that one of the stream nodes does not satisfy the predicate, we return an empty stream node, so this indicates the end of the stream and the end of the evaluation of the infinite stream. By this we can generate a finite stream out of our infinite stream using a predicate to restrict the number of items we want to get out of the infinite stream. We could do something like the following to obtain a new stream containing only the first 10 natural numbers:

Stream<Integer> tenNats = from(0).takeWhile(n -> n < 10);

Consuming the Stream

In the next step what we would like to do is to provide a way to print the contents of a finite stream or do any other kind of tasks with the values in it (i.e. serialize them, send them over network, etc.). Once again, using Java 8 extension methods we can add a forEach method to the stream interface so that we can provide a Consumer to do something with the values within the stream. The Consumer functional interface is also one of those new interfaces in the JDK 8.

public interface Stream<T> {
   //…
   public default void forEach(Consumer<? super T> consumer) {
      forEach(this, consumer);
   }

   public static <T>  void forEach(Stream<? extends T> source,
                                   Consumer<? super T> consumer) {
      while(!source.isEmpty()) {
         consumer.accept(source.head());
         //triggers closure evaluation
         source = source.tail();
      }
  }
}

Here you can see how we take advantage of the fact that empty stream cells return true for their method isEmpty. Now if we would like to print those first 10 natural numbers we could do something as follows:

from(0).takeWhile(n –> n < 10)
       .forEach(System.out::println);

That is: from an infinite stream of natural numbers take the first 10 natural numbers as a finite stream and consume those numbers by printing them to the main output.

Filtering Elements

Another useful method could be one that filters elements out of a stream based on a predicate. So let’s add now a filter method. This will be different from takeWhile in the sense that takeWhile takes elements out of a stream as long as a predicate is true, whereas filter will take out all elements of a stream that satisfy a predicate (which does not necessarily mean eagerly as you could infer from our use of closures here again).

public interface Stream<T> {
   //…
   public default Stream<T> filter(Predicate<? super T> predicate) {
      return filter(this, predicate);
   }

   public static <T> Stream<T> filter(Stream<? extends T> source,
                                      Predicate<? super T> predicate) {
      if(source.isEmpty()) {
         return new Empty<>();
      }
       if(predicate.test(source.head())) {
          return new Cons<>(source.head(),
                            () -> filter(source.tail(), predicate));
       }
      return filter(source.tail(), predicate);
   }
}

A weakness in this approach is the possibility of hitting a StackoverlflowError in the last line, but leaving that aside you can see that we iterate over the original stream until we find a element that satisfies the predicate and then extract it and generate a closure capable of looking for the next one when that is necessary. So, once again, we are taking advantage of lazy evaluation here. Now we could do cool things like printing the first odd numbers in the range of 0-9 from our infinite stream, like so:

from(0).takeWhile(n -> n < 10)
       .filter(n -> n % 2!= 0)
       .forEach(System.out::println);

Evidently, conceptually speaking, we could work with any number of elements here, not just 10, because our stream is infinite and only limited by the condition in our predicate. So, we could just as easily define a predicate to extract the first 1000 numbers and define a consumer that sends them over the network.

The Sieve of Eratosthenes

We are ready now to implement something really cool like the sieve of Eratosthenes to create an infinite stream of prime numbers. The sieve of Eratosthenes says the following: imagine that you have an infinite stream of natural numbers starting with 2. Take the first element (2 in this case) and remove all divisors of 2 from the rest of the stream (i.e. the tail), by this generating a new stream which does not contain the divisors of 2. Now take the next element in this new stream (3 in this case) and remove from the rest of stream all divisors for 3, again generating a new stream. Now take the next element in this new stream (5 in this case, since 4 was divisor of 2 and therefore was removed before), and then remove from the rest of the stream all divisors of 5. And so on, and so on. If we do this repeatedly every item we take out of the head of every stream is a prime number (since we make sure we delete all divisors of that number from the rest of the stream when we generate a new one). Now, evidently, if the original stream is an infinite stream, the only way to deal with the Sieve of Eratosthenes is by means of using lazy evaluation. And our stream implementation is perfect for this. Let’s define the sieve stream:

public static Stream<Integer> sieve(Stream<Integer> s) {
    return new Cons<>(s.head(),
                 ()-> sieve(s.tail().filter(n -> n % s.head() != 0)));
}

Notice that we receive an initial stream s, we can assume this to be a stream of natural numbers starting at 2. So, we create a new stream (a.k.a. sieve) containing the first element from the argument stream, and for the tail, we define a closure that when evaluated will create a new sieve based on filtering out from the tail of the argument stream all divisors of the previously extracted number in the original stream. Now we can use this to do cool things like printing all the primes smaller than 100, like so:

sieve(from(2)).takeWhile(n -> n < 100)
              .forEach(System.out::println);

Here we reuse our from method to generate the initial infinite stream of natural numbers starting with the first known prime (i.e. 2) and sieve uses that first argument to generate the infinite stream of primes.

An Infinite Stream of Fibonacci Numbers

With the same simplicity we could define an infinite streams of Fibonacci numbers, like so:

public static Stream<Integer> fibonacci() {
   //first two fibonacci and a closure to get the rest.
   return new Cons<>(0,() -> new Cons<>(1, () -> nextFibPair(0,1)));
}

private static Stream<Integer> nextFibPair(int a, int b) {
   int fib = a + b, prev = b;
   return new Cons<>(fib, () -> nextFibPair(prev, fib));
}

We can see that our Fibonacci stream starts with 0 and 1, which are the two first Fibonacci numbers, and from there on, we define a closure capable of generating the next Fibonacci number on the fly whenever it is required.

Clearly, the addition of lambda expressions and extension methods to Java 8 simplified a lot the definition of this kind of programming idioms, although it was also possible in previous versions of Java, the only difference here is that it did not require lots of boilerplate code as we needed before.

You can download the full definition of this code for you own experimentation from a public gist.

Further Reading

About these ads

8 thoughts on “Java Infinite Streams

  1. Pingback: Java Streams Preview vs .Net LINQ | Informatech CR Blog

  2. Pingback: Java Streams API Preview | Informatech CR Blog

    • Well, this has been possible in Java before, using anonymous classes, it is just that it was more verbose. But I agree that with lambda expressions it is simpler to implement some of these idioms.

  3. Pingback: 10 Subtle Best Practices when Coding Java | Java, SQL, and jOOQ / jOOX

  4. Excellent article!
    Thank you.

    There’s a small mistake in a piece of code:
    In “public default Stream takeWhile(Predicate)” the parameter name “predicate” is missing.

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s