Pipeline data processing in POLYMAP3

In a nutshell

One of the basic parts of POLYMAP3 is a geospatial data processing, a model and the engine. Main part of the model is a chain/pipeline of independent processors. A processor can contribute to the control and the data flow of the system by handling requests and corresponding responses. The actual set of requests/responses which a processor can handle defines its signature. Particular signatures are usecases. WMS is the typical image pipeline usecase. WFS is the typical feature pipeline usecase.

While the engine is complex, implementing a processor is easy:

  • choose what usecase to support
  • code the corresponding requests/responses
  • optional: provide a UI class that allows to configurate the processor in the workbench

Data processing in POLYMAP3

The core of POLYMAP3 is all about data processing. One of the goals is to integrate as many as possible backend systems and data formats and provide these data to the user in a way that best suits the particular requirements (ad-hoc workbench, OGC services, GeoJSON and GeoRSS via RESTful service etc). In almost all cases the data has to be processed/changed/adapted on its way from the source to the sink. Therefore POLYMAP3 contains a central data processing model. This model and engine is meant to do each and every data processing in POLYMAP3, including...

  • format transformation
  • data transformation
  • data aggregation
  • rendering
  • authorisation
  • caching
  • logging
  • raster and vector processing
  • ...

Design and implementation meets the following requirements:

Interceptable control and data flow

The model should allow to intercept the control flow as well es the data flow.

In a typical usecase POLYMAP provides data from several backend systems via an OGC service. In this use case the control flows from the sink (OGC) to the source (backend) while the data flow the same way back. The idea is that a processor is able to manipulate for example the filter of an OWS request as well as it should be able to manipulate the downstream data response.

Pipelined execution

The data processing executes in a chain/pipeline of independent processors. The processing model allow that portions ("chunks") of the result are send to the client while other chunks are still in the pipeline. The processing model does not force the entiry data set to be hold in memory.

The processing logic is provided by implementations of the PipelineProcessor interface. A processor does its logic by handling/intercepting/manipulating upstream ProcessorRequest and downstream ProcessorResponse. A processor is able to send several data chunks in response to a particular request.

For example: for a minimal render pipeline: [backend WMS] -> [image buffer] -> [display/OpenLayers]. In this pipeline the image byte chunks received from the backend WMS should be send immendiatelly to the display, "as they arrive" in the pipeline.

Multi-Threaded

The data processing should support distribution over several threads/CPUs.

Processors are stateless (and thus thread safe). The actual execution of a pipeline is bound to an PipelineExecutor. The executor defines the execution context of each processor. The executor implementation is pluggable. So, the execution can distributed over several threads in two ways:

  • horizontally: several executors can be started in separate threads for one given pipeline; this is used in tiled rendering for example
  • vertically: each processor can run inside its own thread

A processor should be easy and fun to write

All the above considerations should be hidden behind an easy the implement processor interface.

Annotations could be used to inject the context/state variables into the processor. Currently this is done via a tabel of context variables.

Processor signature and use cases

A processor is free to support whatever combination of requests/responses. However, there are special sets of requests/responses that support a particular "use case". Use cases are often defined by a particular front end. Such use cases are WMS, WFS. A WMS is the typical image pipeline, while WFS is the typical feature pipeline. Other use cases can be defined. Processors can adhere the the interface a particular use case so that it can be used directly in such a pipeline. Or a special processor may provide a transformation between the use cases.

Example: ImageBufferProcessor

The following example is a simple processor that reads upstream AWT image (maybe coming from the renderer) and send encoded PNG (for a WMS) down the stream. The complete source can be found here: plugins/org.polymap.core.data/src/org/polymap/core/data/image/ImageEncodeProcessor.java

  /**
   * Get upstream AWT image and send encoded PNG down the stream.
   */
  public class ImageEncodeProcessor
        implements PipelineProcessor {
    
    /* The signature/interface of this processor */
    private static final ProcessorSignature signature = new ProcessorSignature(
            new Class[] {GetMapRequest.class, GetLegendGraphicRequest.class, GetLayerTypesRequest.class},
            new Class[] {GetMapRequest.class, GetLegendGraphicRequest.class, GetLayerTypesRequest.class},
            new Class[] {ImageResponse.class, GetLayerTypesResponse.class},
            new Class[] {EncodedImageResponse.class, GetLayerTypesResponse.class}
            );
    
    /* SPI method */
    public static ProcessorSignature signature() {
        return signature;
    }
    
    public void init( Properties props ) {
    }
    
    public void processRequest( ProcessorRequest r, ProcessorContext context )
    throws Exception {
        // GetMapRequest
        if (r instanceof GetMapRequest) {
            String requestFormat = ((GetMapRequest)r).getFormat();
            if (! requestFormat.equals( format )) {
                throw new IllegalArgumentException( "This processor supports PNG encoding only. Requested format: " + requestFormat );
            }
            context.sendRequest( r );
        }
        // pass GetLegendGraphicRequest and GetLayerTypes
        else {
            context.sendRequest( r );
        }
    }
        
    public void processResponse( ProcessorResponse r, ProcessorContext context )
    throws Exception {
        // ImageResponse
        if (r instanceof ImageResponse) {
            Image image = ((ImageResponse)r).getImage();
            
            // do the work: encode PNG
            PngEncoder pngEncoder = new PngEncoder( image, true, null, 9 );
            pngEncoder.encode( out );
            context.sendResponse( ProcessorResponse.EOP );
        }
        // pass a other responses
        else {
            context.sendResponse( r );
        }
    }
Trac Appliance - Powered by TurnKey Linux