Skip to content

Custom Pipeline

This tutorial gives an overview of how to extend the Codex pipeline with custom components. The examples provided are very simple and not very useful – they are just examples.
To create custom producers, consumers and filters just inherit from cx::Producer, cx::Consumer and cx::Filter respectively.

Producer example

Imagine you have your data contained in two files. Instead of  catching an exception about production exhaustion when reading from the first file, you can create a producer that read from the first file while possible, then change to the second. The cx::Producer class has a pure virtual function produce, receiving a chunk of data to be filled with fresh data, and returning true if the producer can produce more data on a future call (false if the producer is done). That’s the function that we’ll implement. A cx::File is a cx::Producer itself (as well as a cx::Consumer), so we can delegate the produce call to the file currently being read:

class DoubleSource: public cx::Producer{
private:
    cx::File f1, f2;
    bool usingFirstFile;
public:

    DoubleSource():f1("firstFile.txt"), f2("secondFile.txt"), usingFirstFile(true){}

    bool produce(cx::Chunk& chunk){
        if (usingFirstFile){
            usingFirstFile = f1.produce(chunk);
            return true;
        }else
            return f2.produce(chunk);
    }

};

Consumer example

The cx::Consumer class has a pure virtual function consume, receiving a chunk of data to be consumed, and returning true if the consumer can receive more data in future calls to consume (false if the consumer is done). This example simply write the consumed data directly on std::cout (note that this may not make sense for structured data).

class Printer: public cx::Consumer{
public:

    bool consume(const cx::Chunk& chunk){
        const char* data = chunk.data();
        for(unsigned int i = 0 ; i < chunk.size() ; ++i){
            std::cout << data[i];
        }
        return true;
    }

};

Filter example

The cx::Filter class has a pure virtual void function named pass. It receives a chunk to be transformed. Additionally there is a virtual void function finish that can be overridden for additional finalization steps. Those steps can eventually produce additional information into the chunk received as argument (e.g. some footer).
This example transforms lowercase characters into uppercase ones, and write ” finish” in the end.

class UpperCaseFilter: public cx::Filter{
public:

    void pass(cx::Chunk& chunk){
        char dif = 'A' - 'a';
        char* data = &chunk[0];
        for(unsigned int i = 0 ; i < chunk.size() ; ++i){
            if (data[i] >= 'a' && data[i] <= 'z'){
                data[i] += dif;
            }
        }
    }

    void finish(cx::Chunk& out){
        out.reset("finish\n", 7*sizeof(char));
    }

};

Using custom components

Simply use them with streams as you do for the default components:

DoubleSource    producer;
UpperCaseFilter filter;
Printer         consumer;
try{
    // print the text in uppercase
    cx::Stream stream(producer, filter, consumer);
}catch(std::exception& e){
    ...
}

Single pass

You can apply filters, produce or consume data in one step without using streams. That may be useful to process raw information. For example, you can apply an image processing filter directly over the image data, you don’t need a producer and consumer for that. Example:

cx::Chunk imageData = ...    // chunk with the image bytes
MyBlurFilter blurFilter;     // inherits from cx::Filter
blurFilter.apply(imageData); // apply the filter to the image bytes

Parallel processing

Codex provides multi-threaded pipeline processing, however it is always linear in the pipeline. Different components may be executing at the same time, but the same component never executes at the same time. This guarantees that the chunks are always processed in the correct order, and never race in parallel against each other.
Parallel processing may be desired, but requires additional synchronization. This can be achieved by creating a custom component and making sure that it is synchronized with the rest of the pipeline.
For example, a filter may split his input data into several smaller chunks, split the program execution by creating a thread for each chunk, process them in parallel, and in the end put all results together in the original chunk again. Such filter may internally use other filters or even complete streams over each chunk.
To properly synchronize your code, check the concurrency package on the documentation.

Conclusion

This tutorial opened the range of possibilities on what you can do with the Codex framework. This ends the tutorial, now it’s time to explore it by yourself accordingly to your needs. Always check the documentation for complete and detailed information about the Codex classes.


Index ||  Resource Classes <<  Custom Pipeline