Components (or nodes in graph terminology) are independent units which perform simple (or sometimes complicated) transformations on data.
Each component has 0 to N inputs and 0 to N outputs through which flows data. These inputs and outputs are represented by ports - Input Ports for reading data into component nad Output Ports for writing data out of component.
Each component runs as one thread (but of course can start other, if neccessary). Even though it shares memory with other components (threads executing other components), there is no direct interaction among them. The only way how to pass some data or information around is throuhg Edge.
Edge is an “interface” which comprises two ports - one Input and one Output. It is pipe between two components through which flows data - in one direction only !
With each Port (and Edge), there is a metadata associated describing layout of data conveyed by Edge between two components. This is important information for Component as it can extract this information from each Port and use it when dealing with data delivered through that port.
There is no fixed number of ports component can handle. Ports can be dynamically added or deleted, but only in so-called initialization phase. Later on, if component is running and some thread/task deletes one of the ports, the behaviour is undefined/unpredictable. However, the fact that there is no fix constant specifying number of ports, the behaviour of component can vary based on how many active ports (Input or Output) are connected.
In this section, we introduce simple component wich copies all data from input port (expects only one) to output ports.
At this place, it is worth mentioning, that every port has associated name with it and this name is used when component needs to interact with the port. For now, the name is numeric (starting with 0), but this can be changed into any string value in the future. There are two sets of ports - Input & Output and it is requred that the name of the port is unique within particular set. It means, that there can be only one Input port with name/ID 1 and one Output port with name/ID 1.
Every component has to be derived/inherited from org.jetel.graph.Node class. This class has some standard component operations defined and several abstract methods which need to be implemented in order to be able to execute component.
Code example with comments
public class SimpleCopy extends Node { /*Following line defines component type or identification name:*/ public static final String COMPONENT_TYPE="SIMPLE_COPY"; /*This component expects all input data to be on port 0:*/ private static final int READ_FROM_PORT=0; /*Everything is written to port 0. Well, this value is actually not used as we use different method which sends data to all connected output ports.*/ private static final int WRITE_TO_PORT=0; /*Simple constructor - everything is handled by super class In case we need some additional initialization, it is done in init() method */ public SimpleCopy(String id){ super(id); } /*Following method is mostly used by ComponentFactory when creating instance of this class.*/ public String getType() { return COMPONENT_TYPE; } /*This method is called prior to starting component. Any allocation and checking should be done here. If anything goes wrong, it should throw ComponentNotReadyException.*/ public void init() throws ComponentNotReadyException { super.init(); recordBuffer = ByteBuffer.allocateDirect(Defaults.Record.MAX_RECORD_SIZE); if (recordBuffer == null) { throw new ComponentNotReadyException("Can NOT allocate internal record buffer ! Required size:" + Defaults.Record.MAX_RECORD_SIZE); } } /*This is a main processing method of component. Node and subsequently this component is inherited from Thread class. By implementing run() method, we define what is the thread going to do. After the graph is initialized (by colling init() methods of all registered components), for every component in graph, there is a thread started and it executes run() method.*/ public Result execute() throws Exception { /*Bring in InputPort from which we expect to read data*/ InputPortDirect inPort = (InputPortDirect) getInputPort(READ_FROM_PORT); boolean isData = true; /*Main processing loop starts here. The variable runIt is true unless stop() method of Node class is called. This is nondestructive way of stopping component. The other possibility is to call abort(), which kills the thread immediately with all the consequences.*/ while (isData && runIt) { /*We try to read in one data record from input port. If the method readRecord() returns false, it means that no data is available and we finish the execution loop. Otherwise, we use writeRecordBroadcas() method which sends data record to all connected output ports.*/ isData = inPort.readRecordDirect(recordBuffer); if (isData) { writeRecordBroadcastDirect(recordBuffer); } SynchronizeUtils.cloverYield(); } /*When the main loop finishes, it means that the whole component is done with transforming. We then send EOF signal to all connected ports which efectively closes the ports. This efects spreads over other components connected to this via Edges which eventually leads to end of processing for the whole graph.*/ broadcastEOF(); /*We have to determine whether component/main loop finished because of extrenal interrupt - stop() method call or because we used up all available input data.*/ return runIt ? Result.FINISHED_OK : Result.ABORTED; } /*This method converts component's configuration into XML, so it can be later read in. No need to add anything here since SimpleCompy has no configuration parameters.*/ public void toXML() { super.toXML(xmlElement); } /*This method is responsible for getting all parameters needed for component consturctor from XML Node/Tag attributes. This method is declared as static, because it is an other way how to created component instance aside calling directly consturctor. It is heavily used by ComponentFactory class and TransformationGraphXMLReaderWriter*/ public static Node fromXML(TransformationGraph graph, Element xmlElement) throws XMLConfigurationException { ComponentXMLAttributes xattribs = new ComponentXMLAttributes(xmlElement, graph); try { return new SimpleCopy(xattribs.getString(XML_ID_ATTRIBUTE)); } catch (Exception ex) { throw new XMLConfigurationException(COMPONENT_TYPE + ":" + xattribs.getString(XML_ID_ATTRIBUTE," unknown ID ") + ":" + ex.getMessage(),ex); } } /*Following method is used by TransformationGraph class whent graph is initialized. This method is responsible for checking input output ports.*/ public ConfigurationStatus checkConfig(ConfigurationStatus status) { super.checkConfig(status); checkInputPorts(status, 1, 1); checkOutputPorts(status, 1, Integer.MAX_VALUE); try { init(); free(); } catch (ComponentNotReadyException e) { ConfigurationProblem problem = new ConfigurationProblem(e.getMessage(), ConfigurationStatus.Severity.ERROR, this, ConfigurationStatus.Priority.NORMAL); if(!StringUtils.isEmpty(e.getAttributeName())) { problem.setAttributeName(e.getAttributeName()); } status.add(problem); } return status; } }
When your component is finished, in order to make it known to the ComponentFactory, You have to update plugin.xml files in cloveretl.engine.jar and component directory, plugin.xml handles COMPONENT_TYPE to component class name translation.
If you don't perform this task, the new component (under the component type name) won't be recognize by TransformationGraphXMLReaderWriter - the class which parses graph's layout out of XML data.
In such a case, You can still use component's full class name (e.g. “org.jetel.component.Merge”) when specifying Node type.
Following code excerpt shows what is necessary to change in ComponentFactory.java:
<plugin id="org.jetel.component" version="2.1.0" provider-name="Javlin Consulting s.r.o."> <runtime> <library path="cloveretl.component.jar"/> </runtime> <requires engine-version="2.1.0"> <import plugin-id="org.jetel.connection"/> <import plugin-id="org.jetel.lookup"/> </requires> <extension point-id="component"> <parameter id="className" value="org.jetel.component.DataReader"/> <parameter id="type" value="DATA_READER"/> </extension> <extension point-id="component"> <parameter id="className" value="org.jetel.component.DataWriter"/> <parameter id="type" value="DATA_WRITER"/> </extension> . . . <extension point-id="component"> <parameter id="className" value="org.jetel.component.SimpleCopy"/> <parameter id="type" value="SIMPLE_COPY"/> </extension> . . . </plugin>
Custom component definition file is standard XML file with structure based on following DTD. Its purpose is to define components input and output ports, parameters which configure component and other necessary info.
Tip
See CloverGUI “DefaultComponents.xml” file (in gui plugin) which carries definition of all standard components. Base your definition file on some of the components listed there.
Example DTD of custom component definition file
<!ELEMENT ETLComponentList (ETLComponent*)> <!ELEMENT ETLComponent (shortDescription, description, inputPorts?, outputPorts?, properties)> <!ATTLIST ETLComponent category (readers | writers | transformers | others | deprecated | joiners) #REQUIRED className NMTOKEN #REQUIRED iconPath CDATA #IMPLIED name CDATA #REQUIRED smallIconPath CDATA #IMPLIED type NMTOKEN #REQUIRED passThrough (true|false) #IMPLIED defaultVisibility (true|false) #IMPLIED > <!ELEMENT shortDescription (#PCDATA)> <!ELEMENT description (#PCDATA)> <!ELEMENT inputPorts ((singlePort*,multiplePort?)|multiplePort)> <!ELEMENT outputPorts (singlePort*|multiplePort)> <!ELEMENT properties (property*)> <!ELEMENT singlePort EMPTY> <!ATTLIST singlePort name NMTOKEN #REQUIRED required (true|false) #IMPLIED label CDATA #IMPLIED > <!ELEMENT multiplePort EMPTY> <!ATTLIST multiplePort required (true|false) #IMPLIED label NMTOKEN #IMPLIED > <!ELEMENT property (singleType | enumType | keyType)> <!ATTLIST property category (clover|basic|advanced|deprecated) #REQUIRED displayName CDATA #REQUIRED modifiable (true | false) #REQUIRED name NMTOKEN #REQUIRED nullable (true | false) #IMPLIED defaultValue CDATA #IMPLIED required CDATA #IMPLIED defaultHint CDATA #IMPLIED > <!ELEMENT singleType EMPTY> <!ATTLIST singleType name NMTOKEN #REQUIRED inputPortName NMTOKEN #IMPLIED type NMTOKEN #IMPLIED outputPortName NMTOKEN #IMPLIED mappingType NMTOKEN #IMPLIED master NMTOKEN #IMPLIED title CDATA #IMPLIED leftLabel CDATA #IMPLIED rightLabel CDATA #IMPLIED labels CDATA #IMPLIED keyType NMTOKEN #IMPLIED size NMTOKEN #IMPLIED min NMTOKEN #IMPLIED max NMTOKEN #IMPLIED fields NMTOKEN #IMPLIED dictionary NMTOKEN #IMPLIED > <!ELEMENT enumType (item+)> <!ELEMENT keyType EMPTY> <!ATTLIST keyType inputPortName NMTOKEN #REQUIRED > <!ELEMENT item EMPTY> <!ATTLIST item value NMTOKEN #REQUIRED displayValue NMTOKENS #REQUIRED >
Example Definition file of Dedup component
<?xml version="1.0" encoding="UTF-8"?> <ETLComponentList> <ETLComponent category="transformers" className="org.jetel.component.Dedup" iconPath="icons/dedup32.png" name="Dedup" smallIconPath="icons/dedup16.png" type="DEDUP" passThrough="true"> <shortDescription>Removes duplicate records.</shortDescription> <description>Receives sorted data records through connected input port and removes records that are in duplicate with a view to the specified key values. Keeps defined number of records from either the start (First) or the end (Last) of each group with the same key value. If desired, only unique records are kept. Dedup key is name or combination of names of field(s) of incoming records. Rejected records are sent to the optional second output port if connected.</description> <inputPorts> <singlePort name="0" required="true"/> </inputPorts> <outputPorts> <singlePort name="0" required="true" label="unique"/> <singlePort name="1" required="false" label="duplicate"/> </outputPorts> <properties> <property category="basic" displayName="Dedup key" modifiable="true" name="dedupKey" nullable="true"> <singleType name="key" inputPortName="0"/> </property> <property category="basic" displayName="Keep" modifiable="true" name="keep" nullable="false" defaultValue="first"> <singleType name="keep"/> </property> <property category="basic" displayName="Equal NULL" modifiable="true" name="equalNULL" nullable="true" defaultHint="true"> <singleType name="bool"/> </property> <property category="basic" displayName="Number of duplicates" modifiable="true" name="noDupRecord" nullable="true"> <singleType name="int"/> </property> </properties> </ETLComponent> </ETLComponentList>
To import your XML definition file, open Preferences dialog (Window → Preferences) and from list of preference categories select CloverETL → Components. Click Browse button on right side of the dialog to select your definition file. Then hit Import button to read-in custom component definition.
Example Import custom component definition
Warning
If you create transformation graph in which you have used custom component, make sure that the CloverETL engine also knows about the custom component. You need to modify engine's “plugin.xml” file which is in component directory.
You can get additional information by studying source code of other components you can get on http://svn.berlios.de/wsvn/cloveretl and also by reading component documentation.
There is of course anytime possibility to send e-mail with questions.