Transformation graph (further referred to only as graph) describes how to transform data from one form to another. Graph consists of at least three elements, Nodes (perform various simple transformations), Edges (connect Nodes and pass data around) and Metadata (describe data structure that is defined at every Nodes and Edges).
The input to the transformation process are Input Nodes (those which have no Edges coming into it). On the other side, there are Output Nodes storing results of transformation for example into data files, or database.
Example of transformation graph:
Transformation graph is both abstraction and class which performs some operation. Graph keeps track of all Nodes, Edges, metadata objects. It is also accompanied by class which enables reading the definition of graph from XML file and building everything dynamically.
TransformationGraph class includes three important methods:
Part of the graph is also WatchDog (running as a separate thread) which plays a role of dispatcher who sees to all other components of the graph.
There can be several graph objects created / running at the same time.
When transformation graph (the class) is executed - run() method is called, it first creates a WatchDog thread which in turns creates
as many threads as there are Nodes (also called Components). Each Node gets its own thread which executes Node's execute() method.
Individual Nodes are synchronized through Edges - each Edge carries data between two Nodes - thus putting one Node into producer role and the
other into consumer. When the producer has no more data, it signalizes that, through the Edge, into the consumer, which may in turn pass the
information into its consumer, etc.
When Node is done with processing data, it exits its execute() method and signalizes to all of its consumers that it will not be sending any more data.
Then it stops execution. When there are no more running Nodes in particular graph, the execution of graph is stopped.
This is, in very condensed and a bit simplified way, explanation of how CloverETL processes data.
===== Assembling graph from pieces of components =====
Following piece of code illustrates situation when we build graph in code:
<code java>
create Graph + Nodes + connections (edges)
// since version 2.6
// engine initialization - should be called only once EngineInitializer.initEngine(pluginsRootDirectory, configFileName, logHost);
// runtime customization GraphRuntimeContext runtimeContext = new GraphRuntimeContext(); // create new instance of transformation graph class TransformationGraph graph = new TransformationGraph();
// create graph phase Phase phase = new Phase(1);
// create simple metadata
DataRecordMetadata metadata = new DataRecordMetadata("RecordMedatada0", DataRecordMetadata.DELIMITED_RECORD);
metadata.addField(new DataFieldMetadata("FieldMetadata0", "\n"));
// or load metadata from file metadata=MetadataFactory.fromFile(graph, fmtMedataFileName);
// create edges
Edge inEdge=new Edge("InEdge",metadata);
Edge outEdge=new Edge("OutEdge",metadata);
Edge middleEdge=new Edge("OutEdge0",metadata);
// create nodes
Node nodeOne=new SimpleCopy("SimpleCopy1");
Node nodeTwo=new SimpleCopy("SimpleCopy2");
Node nodeParser=new DataReader("DataReader1", inputFileName);
Node nodeWriter=new DataWriter("DataWriter1", outputFileName, "UTF-8", true);
// add phase to graph; graph has to have at least one phase graph.addPhase(phase); // add nodes to phase - all nodes in one phase are executed concurrently // phases are executed sequentially - in order defined by their number phase.addNode(nodeOne); phase.addNode(nodeTwo); phase.addNode(nodeParser); phase.addNode(nodeWriter); // assign ports/nodex (input & output) // this links together components - creates data flows nodeParser.addOutputPort(0, inEdge); nodeOne.addInputPort(0, inEdge); nodeOne.addOutputPort(0, middleEdge); nodeTwo.addInputPort(0, middleEdge); nodeTwo.addOutputPort(0, outEdge); nodeWriter.addInputPort(0, outEdge); // add Edges & Nodes to graph graph.addEdge(inEdge); graph.addEdge(outEdge); graph.addEdge(middleEdge);
// engine initialization EngineInitializer.initGraph(graph, runtimeContext);
// graph running IThreadManager threadManager = new SimpleThreadManager(); WatchDog watchDog = new WatchDog(graph, runtimeContext); threadManager.executeWatchDog(watchDog);
// if we reached here =>> SUCCESS !!!
</code>
This example shows how to save some work and load graph definition from XML file:
// engine customization GraphRuntimeContext runtimeContext = new GraphRuntimeContext(); // engine initialization - should be called only once EngineInitializer.initEngine(pluginsRootDirectory, configFileName, logHost); // graph loading TransformationGraph graph = TransformationGraphXMLReaderWriter.loadGraph(in, runtimeContext.getAdditionalProperties()); // engine initialization EngineInitializer.initGraph(graph, runtimeContext); // graph running IThreadManager threadManager = new SimpleThreadManager(); WatchDog watchDog = new WatchDog(graph, runtimeContext); threadManager.executeWatchDog(watchDog);
For more details about loading graph definition from XML and initializing graph before run, see org.jetel.main.runGraph class of CloverETL engine.
This is the content of XML file describing graph's topology:
<?xml version="1.0" encoding="UTF-8"?> <Graph name="Testing"> <Global> <Metadata id="InMetadata" fileURL="c:\projects\jetel\recordFormat.xml"/> </Global> <Phase number="0"> <Node id="INPUT" type="DELIMITED_DATA_READER" fileURL="c:\projects\jetel\test2.dat" /> <Node id="COPY" type="REFORMAT" transformClass="org.jetel.test.testReformat"/> <Node id="OUTPUT" type="DELIMITED_DATA_WRITER" append="false" fileURL="c:\projects\jetel\test2.dat.out"/> <Edge id="INEDGE" fromNode="INPUT:0" toNode="COPY:0" metadata="InMetadata"/> <Edge id="OUTEDGE" fromNode="COPY:0" toNode="OUTPUT:0" metadata="InMetadata"/> </Phase> </Graph>
To test the validity of an XML file as a CloverETL graph, an XSD file was created: Graph XML Schema