panthema / 2012 / 1119-eSAIS-Inducing-Suffix-and-LCP-Arrays-in-External-Memory / eSAIS-DC3-LCP-0.5.4 / stxxl / doc / tutorial_stream.dox (Download File)
// -*- mode: c++; mode: visual-line; mode: flyspell; fill-column: 100000 -*-
/***************************************************************************
 *  doc/tutorial_stream.dox
 *
 *  Usage Tutorial for STXXL
 *
 *  Part of the STXXL. See http://stxxl.sourceforge.net
 *
 *  Copyright (C) 2013 Timo Bingmann <tb@panthema.net>
 *
 *  Distributed under the Boost Software License, Version 1.0.
 *  (See accompanying file LICENSE_1_0.txt or copy at
 *  http://www.boost.org/LICENSE_1_0.txt)
 **************************************************************************/

namespace stxxl {
namespace stream {

/** \page tutorial_stream Tutorial for the Stream Package

\author Timo Bingmann (2012-06-11)

This page gives a short introduction into the stream package. First the main abstractions are discussed and then some examples on how to utilize the existing algorithms are developed.

All example code can be found in \ref examples/stream/stream1.cpp

In \ref tutorial_stream_edgesort another example is given, where an existing algorithm is "pipelined".

\section stream1 Abstraction, Interface and a Simple Example

The stream package is built around the abstract notion of an object being able to produce a sequence of output values. Only three simple operations are necessary:
- Retrieval of the current value: prefix \c * operator
- Advance to the next value in the sequence: prefix \c ++ operator
- Indication of the sequence's end: \c empty() function

The most common place object that fits easily into this abstraction is the random generator. Actually, a random generator only requires two operations: it can be queried for its current value and be instructed to calculate/advance to new value. Of course the random sequence should be unbounded, so an \c empty() function would always be false. Nevertheless, this common-place example illustrates the purpose of the stream interface pretty well.

All stream objects must support the three operations above, they form the stream algorithm concept. In C++ a class conforms to this concept if it implements the following interface:

\code
struct stream_object
{
    // Type of the values in the output sequence.
    typedef output_type value_type;

    // Retrieval prefix * operator (like dereferencing a pointer or iterator).
    const value_type& operator* () const;

    // Prefix increment ++ operator, which advances the stream to the next value.
    stream_object& operator++ ();

    // Empty indicator. True if the last ++ operation could not fetch a value.
    bool empty() const;
};
\endcode

A very simple stream object that produces the sequence 1,2,3,4,....,1000 is shown in the following snippet:

\code
struct counter_object
{
    // This stream produces a sequence of integers.
    typedef int         value_type;

private:
    // A class attribute to save the current value.
    int                 m_current_value;

public:
    // A constructor to set the initial value to 1.
    counter_object()
        : m_current_value(1)
    {
    }

    // The retrieve operator returning the current value.
    const value_type& operator* () const
    {
        return m_current_value;
    }

    // Increment operator advancing to the next integer.
    counter_object& operator++ ()
    {
        ++m_current_value;
        return *this;
    }

    // Empty indicator, which in this case can check the current value.
    bool empty() const
    {
        return (m_current_value > 1000);
    }
};
\endcode

After this verbose interface definition, the actual iteration over a stream object can be done as follows:

\code
counter_object counter;

while (!counter.empty())
{
    std::cout << *counter << " ";
    ++counter;
}
std::cout << std::endl;
\endcode

For those who like to shorten everything into fewer lines, the above can also be expressed as a for loop:

\code
for (counter_object cnt; !cnt.empty(); ++cnt)
{
    std::cout << *cnt << " ";
}
std::cout << std::endl;
\endcode

Both loops will print the following output:
\verbatim
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 [...] 995 996 997 998 999 1000
\endverbatim

\section stream2 Pipelining: Plugging Stream Objects Together

The stream interface is so very useful for external memory algorithms because it represents the concept of sequential access to a stream of individual values. While the simple example above only works with integers, the \c value_type of streams will more often contain complex tuple structs with multiple components.

A stream algorithm can then be constructed from multiple stream objects that pass data from one to another. This notion of "plugging together" stream objects is used in the following example to calculate the square of each value of an integer sequence:

\code
template <typename InputStream>
struct squaring_object
{
    // This stream produces a sequence of integers.
    typedef int         value_type;

private:
    // A reference to another stream of integers, which are our input.
    InputStream&        m_input_stream;

    // A temporary value buffer to hold the current square for retrieval.
    value_type          m_current_value;

public:
    // A constructor taking another stream of integers as input.
    squaring_object(InputStream& input_stream)
        : m_input_stream(input_stream)
    {
        if (!m_input_stream.empty())
        {
            m_current_value = *m_input_stream;
            m_current_value = m_current_value * m_current_value;
        }
    }

    // The retrieve operator returning the square of the input stream.
    const value_type& operator* () const
    {
        return m_current_value;
    }

    // Increment operator: handled by incrementing the input stream.
    squaring_object& operator++ ()
    {
        ++m_input_stream;
        if (!m_input_stream.empty())
        {
            m_current_value = *m_input_stream;
            m_current_value = m_current_value * m_current_value;
        }
        return *this;
    }

    // Empty indicator: this stream is empty when the input stream is.
    bool empty() const
    {
        return m_input_stream.empty();
    }
};
\endcode

For a beginner in stream object programming, the squaring example contains multiple unexpected, verbose complications.

- We wish to allow many different integer sequences as input streams to the squaring class. For this we use template meta-programming and define squaring to take any class as \c InputStream template parameter. As yet, in C++ we cannot syntactically define which concepts the template parameters must fulfill, in this case one would require \c InputStream to implement the stream interface.

- After defining the input stream class, one will usually need an instantiated object of that class inside the new stream class. Most common practice is to define references to other streams as class attributes, and have the actual objects be passed to the constructor of the new stream object. <br> In the case of the squaring class, any \c InputStream object is accepted by the constructor and a reference is saved into \c m_input_stream.

- As second attribute, the squaring class contains m_current_value. The additional temporary value is required in this case because \c operator*() must return a const-reference, so the square must actually be stored in a variable after it is calculated. Now note that the squaring operation in this version is implemented at two places: in the constructor and the \c operator++(). <br> This is necessary, because the stream concept requires that the first value be <em>immediately available after construction</em>! Therefore it must be calculated in the constructor, and this code is usually a duplicate to the action done in \c operator++(). A real implementation would probably combine the calculation code into a \c process() function and also do additional allocation work in the constructor.

An instance of the \c counter_object can be plugged into a \c squaring_object as done in the following example:

\code
counter_object counter;
squaring_object<counter_object> squares(counter);

while (!squares.empty())
{
    std::cout << *squares << " ";
    ++squares;
}
std::cout << std::endl;
\endcode

The example outputs:

\verbatim
1 4 9 16 25 36 49 64 81 100 121 144 169 [...] 986049 988036 990025 992016 994009 996004 998001 1000000
\endverbatim

\section stream3 Miscellaneous Utilities Provided by the Stream Package

The above examples are pure C++ interface manipulations and do not even require STXXL. However, when writing stream algorithms you can take advantage of the utilities provided by the stream package to create complex algorithms. Probably the most useful is the pair of sorting classes, which will be discussed after a few preliminaries.

More complex algorithms will most often use tuples as values passed from one stream to another. These tuples wrap all information fields of a specific piece of data. Simple tuples can be created using \c std::pair, tuples with larger number of components can use Boost.Tuple or just plain structs with multiple fields. (In the tuple case, the temporary value inside the stream struct can mostly be avoided.)

The stream package contains utilities to plug stream classes together to form complex algorithms. The following few examples are very basic algorithms:

Very often the input to a sequence of stream classes comes from an array or other container. In this case one requires an input stream object, which iterates through the container and outputs each element once. STXXL provides iterator2stream for this common purpose:
\code
std::vector<int> intvector;
// (fill intvector)

// define stream class iterating over an integer vector
typedef stxxl::stream::iterator2stream< std::vector<int>::const_iterator > intstream_type;

// instantiate the stream object, iterate from begin to end of intvector.
intstream_type intstream (intvector.begin(), intvector.end());

// plug in squaring object after vector iterator stream.
squaring_object<intstream_type> squares(intstream);
\endcode

Most important: if the input container is a stxxl::vector, then one should use vector_iterator2stream, because this class will prefetch additional blocks from the vector while processing the stream.
\code
stxxl::vector<int> intvector;
// (fill intvector)

// define stream class iterating over an integer STXXL vector
typedef stxxl::stream::vector_iterator2stream< stxxl::vector<int>::const_iterator > intstream_type;

// instantiate the stream object, iterate from begin to end of intvector using prefetching
intstream_type intstream (intvector.begin(), intvector.end());

// plug in squaring object after vector iterator stream.
squaring_object<intstream_type> squares(intstream);
\endcode

The opposite to iterator2stream is to collect the output of a sequence of stream objects into a container or stxxl::vector. This operation is called \c materialize and also comes in the general version and a special version for the STXXL-vector, which uses asynchronous writes.

This example shows how to materialize a stream into a usual STL vector.
\code
// construct the squared counter stream
counter_object counter;
squaring_object<counter_object> squares(counter);

// allocate vector of 100 integers
std::vector<int> intvector (100);

// materialize 100 integers from stream and put into vector
stxxl::stream::materialize(squares, intvector.begin(), intvector.end());
\endcode

And the only modification needed to support larger data sets is to materialize to an STXXL vector:
\code
// construct the squared counter stream
counter_object counter;
squaring_object<counter_object> squares(counter);

// allocate STXXL vector of 100 integers
stxxl::vector<int> intvector (100);

// materialize 100 integers from stream and put into STXXL vector
stxxl::stream::materialize(squares, intvector.begin(), intvector.end());
\endcode

\section stream4 Sorting As Provided by the Stream Package

Maybe the most important set of tools in the stream package is the pairs of sorter classes runs_creator and runs_merger. The general way to sort a sequential input stream is to first consolidate a large number of input items in an internal memory buffer. Then when the buffer is full, it can be sorted in internal memory and subsequently written out to disk. This sorted sequence is then called a run. When the input stream is finished and the sorted output must be produced, theses sorted sequences can efficiently be merged using a tournament tree or similar multi-way comparison structure. (see \ref design_algo_sorting.)

STXXL implements this using two stream classes: runs_creator and runs_merger.

The following examples shows how to sort the integer sequence 1,2,...,1000 first by the right-most decimal digit, then by its absolute value (yes a somewhat constructed example, but it serves its purpose well.) For all sorters a comparator object is required which tells the sorter which of two objects is the smaller one. This is similar to the requirements of the usual STL, however, the STXXL sorters need to additional functions: \c min_value() and \c max_value() which are used as padding sentinels. These functions return the smallest and highest possible values of the given data type.
\code
// define comparator class: compare right-most decimal and then absolute value
struct CompareMod10
{
    // comparison operator() returning true if (a < b)
    inline bool operator() (int a, int b) const
    {
        if ((a % 10) == (b % 10))
            return a < b;
        else
            return (a % 10) < (b % 10);
    }

    // smallest possible integer value
    int min_value() const { return INT_MIN; }
    // largest possible integer value
    int max_value() const { return INT_MAX; }
};
\endcode

All sorters steps require an internal memory buffer. This size can be fixed using a parameter to runs_creator and runs_merger. The following example code instantiates a counter object, plugs this into a runs_creator which is followed by a runs_merger.

\code
static const int ram_use = 10*1024*1024;   // amount of memory to use in runs creation

counter_object  counter;        // the counter stream from first examples

// define a runs sorter for the counter stream which order by CompareMod10 object.
typedef stxxl::stream::runs_creator<counter_object, CompareMod10> rc_counter_type;

// instance of CompareMod10 comparator class
CompareMod10    comparemod10;

// instance of runs_creator which reads the counter stream.
rc_counter_type rc_counter (counter, comparemod10, ram_use);

// define a runs merger for the sorted runs from rc_counter.
typedef stxxl::stream::runs_merger<rc_counter_type::sorted_runs_type, CompareMod10> rm_counter_type;

// instance of runs_merger which merges sorted runs from rc_counter.
rm_counter_type rm_counter (rc_counter.result(), comparemod10, ram_use);

// read sorted stream: runs_merger also conforms to the stream interface.
while (!rm_counter.empty())
{
    std::cout << *rm_counter << " ";
    ++rm_counter;
}
std::cout << std::endl;
\endcode
The output of the code above is:
\verbatim
10 20 30 40 50 60 70 80 [...] 990 1000 1 11 21 31 41 51 61 [...] 909 919 929 939 949 959 969 979 989 999
\endverbatim

Note that in the above example the input of the runs_creator is itself a stream. If however the data is not naturally available as a stream, one can use a variant of runs_creator which accepts input via a \c push() function. This is more useful when using an imperative programming style. Note that the runs_merger does not change.
\code
static const int ram_use = 10*1024*1024;   // amount of memory to use in runs creation

// define a runs sorter which accepts imperative push()s and orders by CompareMod10 object.
typedef stxxl::stream::runs_creator<stxxl::stream::use_push<int>, CompareMod10> rc_counter_type;

// instance of CompareMod10 comparator class.
CompareMod10    comparemod10;

// instance of runs_creator which waits for input.
rc_counter_type rc_counter (comparemod10, ram_use);

// write sequence of integers into runs
for (int i = 1; i <= 1000; ++i)
    rc_counter.push(i);

// define a runs merger for the sorted runs from rc_counter.
typedef stxxl::stream::runs_merger<rc_counter_type::sorted_runs_type, CompareMod10> rm_counter_type;

// instance of runs_merger which merges sorted runs from rc_counter.
rm_counter_type rm_counter (rc_counter.result(), comparemod10, ram_use);

// read sorted stream: runs_merger also conforms to the stream interface.
while (!rm_counter.empty())
{
    std::cout << *rm_counter << " ";
    ++rm_counter;
}
std::cout << std::endl;
\endcode

And as the last example in this tutorial we show how to use stxxl::sorter, which combines runs_creator and runs_merger into one object. The sorter has two states: input and output. During input, new elements can be sorted using \c push(). Then to switch to output state, the function \c sort() is called, after which the sorter can be queried using the usual stream interface.
\code
static const int ram_use = 10*1024*1024;   // amount of memory to use in runs creation

// define a runs sorter which accepts imperative push()s and orders by CompareMod10 object.
typedef stxxl::sorter<int, CompareMod10> sr_counter_type;

// instance of CompareMod10 comparator class.
CompareMod10    comparemod10;

// instance of sorter which waits for input.
sr_counter_type sr_counter (comparemod10, ram_use);

// write sequence of integers into sorter, which creates sorted runs
for (int i = 1; i <= 1000; ++i)
    sr_counter.push(i);

// signal sorter that the input stream is finished and switch to output mode.
sr_counter.sort();

// read sorted stream: sorter also conforms to the stream interface.
while (!sr_counter.empty())
{
    std::cout << *sr_counter << " ";
    ++sr_counter;
}
std::cout << std::endl;
\endcode

All three examples have the same output.

\example examples/stream/stream1.cpp
This example code is explain in the \ref tutorial_stream.

*/

/** \page tutorial_stream_edgesort Generating Random Graphs using Streams

\author Roman Dementiev (2007)

This page gives an example of how an application using "traditional" containers and algorithms can be converted into using pipelines streaming.

The purpose of our example is to generate a huge random directed graph in a sorted edge array representation, i.e. the edges in the edge array must be sorted lexicographically. The definitions of the classes \c edge, \c random_edge and \c edge_cmp are in the following code listing.

\code
struct edge { // edge class
    int src, dst; // nodes
    edge() {}
    edge(int src_, int dst_): src(src_), dst(dst_) {}
    bool operator == (const edge & b) const {
        return src == b.src && dst == b.dst;
    }
};
struct random_edge { // random edge generator functor
    edge operator () () const {
        edge Edge(random() - 1, random() - 1);
        while(Edge.dst == Edge.src)
             Edge.dst = random() - 1 ; // no self-loops
        return Edge;
    }
};
struct edge_cmp { // edge comparison functor
    edge min_value() const {
        return edge(std::numeric_limits<int>::min(),0); };
    edge max_value() const {
        return edge(std::numeric_limits<int>::max(),0); };
    bool operator () (const edge & a,
                      const edge & b) const {
        return a.src < b.src || (a.src == b.src && a.dst < b.dst);
    }
};
\endcode

A straightforward procedure to generate the graph is to: 1) generate a sequence of random edges, 2) sort the sequence, 3) remove duplicate edges from it. If we ignore definitions of helper classes the STL/STXXL code of the algorithm implementation is only five lines long:

\code
// create vector
stxxl::vector<edge> ExtEdgeVec(10000000000ULL);
// generate random edges
stxxl::generate(ExtEdgeVec.begin(), ExtEdgeVec.end(), random_edge());
// sort edges by source vertex
stxxl::sort(ExtEdgeVec.begin(), ExtEdgeVec.end(), edge_cmp(), 512*1024*1024);
// unify equal edges
stxxl::vector<edge>::iterator NewEnd =  std::unique(ExtEdgeVec.begin(), ExtEdgeVec.end());
ExtEdgeVec.resize(NewEnd - ExtEdgeVec.begin());
\endcode

Line 2 creates an STXXL external memory vector with 10 billion edges. Line 4 fills the vector with random edges (stxxl::generate from the STL is used). In the next line the STXXL external memory sorter sorts randomly generated edges using 512 megabytes of internal memory. The lexicographical order is defined by functor \c my_cmp, stxxl::sort also requires the comparison functor to provide upper and lower bounds for the elements being sorted. Line 8 deletes duplicate edges in the external memory vector with the help of the STL \c std::unique algorithm. The \c NewEnd vector iterator points to the right boundary of the range without duplicates. Finally (in the last line), we chop the vector at the \c NewEnd boundary.

Now we count the number of I/Os performed by this example: external vector construction takes no I/Os; filling with random values requires a scan --- \f$ N/DB \f$ I/Os; sorting will take \f$ 4N/DB \f$ I/Os; duplicate removal needs no more than \f$ 2N/DB \f$ I/Os; chopping a vector is I/O-free. The total number of I/Os is \f$ 7N/DB \f$.

# Pipelined random graph generation

Now we <b>"pipeline"</b> the random graph generation example shown in the previous chapter. The data flow graph of the algorithm is presented in the following figure:

\image html pipeline_randomgraph_small.png "Pipeline of Random Graph Generator"

The following code listing shows the pipelined code of the algorithm, the definitions of \c edge, \c random_edge, and \c edge_cmp are assumed to be available from the listing in the previous section.

\code
class random_edge_stream {
    stxxl::int64 counter;
    edge current;
    random_edge_stream();
public:
    typedef edge value_type;
    random_edge_stream(stxxl::int64 elements)
        : counter(elements), current(random_edge()())
    { }
    const edge & operator * () const { return current; }
    const edge * operator ->() const { return &current; }
    random_edge_stream & operator ++ ()
    {
        --counter;
        current = random_edge()();
        return *this;
    }
    bool empty() const { return counter == 0; }
};

random_edge_stream RandomStream(10000000000ULL);
typedef stxxl::stream::sort<random_edge_stream, edge_cmp> sorted_stream;
sorted_stream SortedStream(RandomStream, edge_cmp(), 512*1024*1024);
typedef stxxl::stream::unique<sorted_stream> unique_stream_type;
unique_stream_type UniqueStream(SortedStream);
stxxl::vector<edge> ExtEdgeVec(10000000000ULL);
stxxl::vector<edge>::iterator NewEnd =
    stxxl::stream::materialize(UniqueStream, ExtEdgeVec.begin());
ExtEdgeVec.resize(NewEnd - ExtEdgeVec.begin());
\endcode

Since the sorter of the streaming layer accepts an \c stream input, we do not need to output the random edges. Rather, we generate them on the fly. The \c random_edge_stream object (model of \c stream) supplies the sorter with a stream of random edges. We define the type of the sorter node as \c sorted_stream; it is parameterized by the type of the input stream and the type of the comparison function object. Then a \c SortedStream object is created and its input is attached to the \c RandomStream object's output.  The internal memory consumption of the sorter stream object is limited to 512 MB. The \c UniqueStream object filters the duplicates in its input edge stream. The generic \c stream::unique stream class stems from the STXXL library.  The stream::materialize function records the content of the \c UniqueStream into the external memory vector.  As in the previous non-pipelined version, we cut the vector at the \c NewEnd boundary.

Let us count the number of I/Os the program performs: random edge generation by \c RandomStream costs no I/O; sorting in \c SortedStream needs to store the sorted runs and read them again to merge --- \f$ 2N/DB \f$ I/Os; \c UniqueStream deletes duplicates on the fly, it does not need any I/O; and materializing the final output can cost up to \f$ N/DB \f$ I/Os. All in all, the program only incurs \f$ 3N/DB \f$ I/Os, compared to \f$ 7N/DB \f$ for the non-pipelined code.

*/

} // namespace stream
} // namespace stxxl