Transformers

Transformers are components of graph that transform data from one or more input ports to output port. Most of transformers use transform function or key or some other method that defines particular data transformation.

Note: For characters that are impossible to enter directly, there are several escape sequences: \\, \n, \t, \r, \”, \f, \b. So be careful when using “\” character, eg. parameter filterExpression=“$Field1~="([^\\|]*\\|){3}"” will be interpreted as “$Field1~="([^\|]*\|){3}"” while “\\” is interpreted as only one “\”; to get “$Field1~="([^\\|]*\\|){3}"” write filterExpression=“$Field1~="([^\\\\|]*\\\\|){3}"”.



Common attributes



Attribute Description Exp.
id component identification string
type component type. This attribute is automatically generated from gui. string


Aggregate

NOTE: this documentation is valid for the Aggregate component in Clover version 2.2 or newer. The documentation for the older Aggregate can be found here.

Applies aggregation functions on groups of records defined by the aggregation key (similarly to GROUP BY in SQL). The key is defined by a name (or several names) of field(s) from the input record. The result of aggregation of each group is stored in a separate record in the output.

Which aggregation functions and operations are used is defined in the aggregation mapping. For each field of the output, a mapping can be specified in the form of $output_field := mapping. The mappings are separated by ”;”. The mapping can be one of the following:

  • aggregation function - the result of calling this function on all members of an aggregation group is stored in the field. Some functions require the name of an input field as a parameter (the field name must be prefixed with “$”). Example: $output_field := count(); $output_field := sum($input_field).
  • key field from the input - the value of a field (which must be a part of the aggregation key) is stored in the output field. The name of the key field must be prefixed with “$”. Example: $output_field := $input_field.
  • constant - a constant value is stored in the field. Supported types of constants:
    • string, quoted string in the form “Some string”. The string can contain backslash escaped double quotes(i.e. \” ). Example: $output_field := “Some string with a double quote \””.
    • integer, e.g. $output_field := 42
    • double, must contain a period: $output_field := 3.14
    • date in the yyyy-mm-dd or yyyy-mm-dd HH:MM:SS format, e.g. $field_name := 1999-12-31.

Example mappings:

  • $ShipCountry:=$ShipCountry;$AvgFreight:=stddev($Freight);
  • $ShipCountry:=$ShipCountry;$OrderDate:=1901-12-31 23:59:59;$ShipCity:=“very nice \\”city\\”; really!”;$Count:=42;$AvgFreight:=3.14159265358979323486;
  • $ShipCountry := $ShipCountry; $OrderDate := 1901-12-31 23:59:59; $CustomerID:= “blablabla”; $ShipCity:=FirstNonNull($ShipCity); $AvgFreight := 3.141; $Count:= count()

Available aggregation functions

The following table contains the names of available aggregation functions. The scope of all aggregation functions are the records which belong to one aggregation group, so the functions produce one result for every aggregation group. The aggregation key specifies how are the input records split into aggregation groups. The parameter type is the description of the required type (i.e. metadata) of the field which is the input parameter of the function (e.g. avg($weight), weight is the name of the field which is the parameter of the avg function). Output type is the description of the required type (i.e. metadata) of the field where the result of the function is stored. Input type N/A means that the functions does not have a parameter (i.e. count()). If the type is specified as number, then any numeric field type can be used (integer, long, …) .

Name Parameter type Output type Description
avg number number Average of the parameter values.
count N/A number Number of records in the aggregation group (ignores their values).
countnonnull any number Number of records in the aggregation group with a non-null value of the parameter.
countunique any number Number of unique values of the parameter.
crc32 any long, must be nullable if parameter is nullable For each value of the parameter, it updates the resulting CRC32 value.
first any same type as parameter, must be nullable if parameter is nullable First value of the parameter (can be null).
firstnonnull any same type as parameter, must be nullable if parameter is nullable First non-null value of the parameter. The result can be null if the parameter does not have any non-null value.
last any same type as parameter, must be nullable if parameter is nullable Last value of the parameter (can be null).
lastnonnull any same type as parameter, must be nullable if parameter is nullable Last non-null value of the parameter. The result can be null if the parameter does not have any non-null value.
max any same type as parameter, must be nullable if parameter is nullable Maximum value of the parameter. Works for non-number types too.
md5 any string, must be nullable if parameter is nullable For each value of the parameter, it updates the resulting MD5 hash value.
median any same type as parameter, must be nullable if parameter is nullable Median of the parameter values. Works for non-number types too.
min any same type as parameter, must be nullable if parameter is nullable Minimum value of the parameter. Works for non-number types too.
modus any same type as parameter, must be nullable if parameter is nullable Most frequently occurring value of the parameter. Works for non-number types too.
stddev number number, must be nullable if parameter is nullable Standard deviation of the parameter values.
sum number number, must be nullable if parameter is nullable Sum of the parameter values.

Backward compatibility:

The Aggregate component supports the old format of the aggregation mapping (which was used in Clover version 2.1 or older). The old mapping can be provided in the same “aggregateFunctions” attribute as in the old Aggregate component.

Input ports: * one input port defined/connected.

Output ports:

  • at least one output port defined/connected.


Xml attributes:

Attribute Mandatory Description Default ETL Version since
id yes component identification
type yes component type AGGREGATE
aggregateKey no aggregation key, defined by field names separated by :;| {colon, semicolon, pipe}
mapping no aggregation mappings separated by ; {semicolon}
aggregateFunctions no aggregation mapping in the format of the older aggregate component (from Clover version 2.1 or older)
sorted no if input data flow is sorted true
equalNULL no specifies whether two fields containing NULL values are considered equal. false
charset no character encoding of the input data stream for CRC32 and MD5 functions. ISO-8859-1 or defaultProperties

Example:

  <Node id="AGGREGATE"
        type="AGGREGATE" 
        aggregateKey="ShipCountry" 
        mapping="$ShipCountry:=$ShipCountry;$OrderDate:=min($OrderDate);$ShipCity:=firstnonnull($ShipCity);$AvgFreight:=avg($Freight);$Count:=count();"
        sorted="false"
  />



Concatenate

All records from all input ports are copied onto output port. It goes port by port (waiting/blocked) if there is currently no data on port. When reading from one port is done (EOF status), continues with the next.

Input ports: * at least one connected input port.

Output ports:

  • one output port defined/connected.


Xml attributes:

Attribute Mandatory Description Default ETL Version since
id yes component identification
type yes component type CONCATENATE

Example:

  <Node id="CONCATENATE0" type="CONCATENATE"/>



DataIntersection

Finds intersection of flow A (in-port0) and B (in-port1) based on specified key. Both inputs must be sorted according to specified key. DataRecords only in flow A are sent out through out-port0. DataRecords in both A & B are sent to specified transformation function and the result is sent through out-port1. DataRecords present only in flow B are sent through out-port2.

The component implements a RecordTransform interface or inherits from a DataRecordTransform class.

When you work with this component, you must implement the transform() function. In addition to this required function, you can define other two functions: init() and finished(). If you want to declare and initialize some variables, if you want to anything what should be done at the beginning of data processing by the component, you should do it within the init() function. If you want to free memory, delete some temporary files, you should define it within the finished() function. Either of these functions is called only once. The init() function is called at the beginning, the finished() function is called at the end. Unlike them, the required transform() function is called many times. It is called after init() and before finished().

Input ports: * 0 - records from set A - sorted according to specified key

  • 1 - records from set B - sorted according to specified key

Output ports:

  • 0 - records only in set A
  • 1 - records in set A&B
  • 2 - records only in set B


Xml attributes:

Attribute Mandatory Description Default ETL Version since
id yes component identification
type yes component type DATA_INTERSECT
joinKey yes field names separated by :;| {colon, semicolon, pipe}. Since 2.5 version it is advocated to use new firm syntax for join key: $fieldA1=$fieldB1;$fieldA2=$fieldB2;….
transform if no transformClass contains definition of transformation in internal clover format
transformClass if no transform name of Java library file (.jar,.zip,…) where to search for class to be used for transforming data.
transformURL no contains path to the file with transformation code.
charset no encoding of extern source ISO-8859-1
slaveOverrideKey no
since 2.5 ver. deprecated
can be used to specify different key field names for records on slave input; field names separated by :;| {colon, semicolon, pipe}
equalNULL no specifies whether two fields containing NULL values are considered equal. true
keyDuplicates no true/false - allows records to have duplicate keys. False - multiple duplicate records are discarded - only the last one is sent to transformation.
Above is valid from version 2.5 - older version can't hadle duplicate keys and only first occurrence was sent to product of A&B.
true 2.5
errorActions no defines if graph is to stop, when transformation returns negative value. Available actions are: STOP or CONTINUE. For CONTINUE action, error message is logged to console or file (if errorLog attribute is specified) and for STOP there is thrown TransformExceptions and graph execution is stopped. Error action can be set for each negative value (value1=action1;value2=action2;…) or for all values the same action (STOP or CONTINUE). It is possible to define error actions for some negative values and for all other values (MIN_INT=myAction). -1=CONTINUE;MIN_INT=STOP 2.6
errorLog no path to the error log file. 2.6

Example:

  <Node id="INTERSECT" type="DATA_INTERSECT" joinKey="CustomerID" transformClass="org.jetel.test.reformatOrders"/>
  <Node id="INTERSECT" 
        type="DATA_INTERSECT"
        joinKey="EmployeeID">
     <attr name="transform">
       import org.jetel.component.DataRecordTransform;
       import org.jetel.data.*;
 
       public class intersectionTest extends DataRecordTransform{
 
         public boolean transform(DataRecord[] source, DataRecord[] target){
           target[0].getField(0).setValue(source[0].getField(0).getValue());
           target[0].getField(1).setValue(source[0].getField(1).getValue());
           target[0].getField(2).setValue(source[1].getField(2).getValue());
           return true;
         }
       }
     </attr>
 </Node>
 <Node id="INTERSECT" joinKey="$EmployeeID=$EMP_NO;#" type="DATA_INTERSECTION">
   <attr name="transform"><![CDATA[${out.0.EMP_NO} = ${in.0.EmployeeID};
      ${out.0.FIRST_NAME} = ${in.0.FirstName};
      ${out.0.LAST_NAME} = ${in.0.LastName};
      ${out.0.FULL_NAME} = ${in.1.FULL_NAME};
      ${out.0.HIRE_DATE} = ${in.1.HIRE_DATE};
      ${out.0.HIRE_DATE} = ${in.0.HireDate};
   ]]></attr>
 </Node>



Dedup

Removes duplicates from data flow of sorted records based on specified key. The key is name (or combination of names) of field(s) from input record. It keeps either First or Last record from the group based on the parameter 'keep' specified. All duplicated records are rejected to the second optional port. When no key is defined, dedup act as UNIX 'head' (Keep = First), or UNIX 'tail' (Keep= Last).

Input ports: * one input port defined/connected.

Output ports:

  • result of deduplication
  • (optional) all rejected records


Xml attributes:

Attribute Mandatory Description Default ETL Version since
id yes component identification
type yes component type DEDUP
dedupKey yes field names separated by :;| {colon, semicolon, pipe}, can be also empty, then all records are considered as one group
keep no one of “First|Last|Unique” {the fist letter is sufficient} First
equalNULL no specifies whether two fields containing NULL values are considered equal. true
noDupRecord no number of duplicate records to be written to out port 1

Example:

  <Node id="DISTINCT" type="DEDUP" dedupKey="Name" />
 
  <Node id="DISTINCT"
        type="DEDUP"
        dedupKey="Name"
        keep="Last"
        equalNULL="false"
  />



Denormalizer

Denormalizes input records - i.e. composes one output record from several input records with identical key using user-specified transformation. The component implements a RecordDenormalize interface or inherits from a DataRecordDenormalize class.

Input ports: * one input port to receive the records to be denormalized.

Output ports:

  • one output port to send out the results of denormalization.

As an example of denormalization, you can imagine a case when average air temperatures and pressures were measured during the twelve months of a year. You can have this information about many years represented by records consisting of four fields. The first two are the year and the month, the third and fourth are the average temperature and average pressure. Thus, you have 12 records for one year. If you want to compose these 12 records together and convert them into a new one, such new record will consist of 25 fields. The first field contains the year. In the other 24 fields, the temperatures and pressures alternate for all the twelve months of the specified year. Now the information about the month is only in the order of the temperature or the pressure within all the 24 fields. The counterpart of this process is normalization. You must declare some variables and use two functions: append() and transform(). Within the first function, you assign the field values to the defined variables, within the second you map these variables to the outgoing fields. See below how you can define the denormalization. (We briefly illustrate it, for two months of a year - January and February. We suppose the records are sorted in ascending order according to the year field. This field is the Key.)

In addition to these required functions, you can define other three functions: init(), finished() and clean(). If you want to declare and initialize some variables, if you want to anything what should be done at the beginning of data processing by the component, you should do it within the init() function. (Thus it would even be better if the variables below were declared and initialized within the init() function.) If you want to free memory, delete some temporary files, you should define it within the finished() function. Either of these functions is called only once. The init() function is called at the beginning, the finished() function is called at the end. Unlike them, the required append() and transform() functions are called many times. They are called after init() and before finished(). However, if you want to reset values of some variables and/or delete some temporary files between parsing groups of records with different key values, you can do it within the clean() function. It is called many times, but once after parsing each group of records and sending the resulting outgoing record to output port.


Xml attributes:

Attribute Mandatory Description Default ETL Version since
id yes component identification
type yes component type DENORMALIZER
key yes list of key fields used to identify input record groups. Each group is sequence of input records with identical key field values.
denormalize if no denormalizeClass contains definition of transformation in Java or TransformLang.
denormalizeClass if no denormalize name of the class to be used for normalizing data.
denormalizeURL no contains path to the file with transformation code.
charset no encoding of extern source ISO-8859-1
order no describe expected order of input records. “asc” for ascending, “desc” for descending, “auto” for auto-detection, “ignore” for processing input records without order checking (this may produce unexpected results when input is not ordered). auto
equalNULL no specifies whether two fields containing NULL values are considered equal. true 2.8
errorActions no defines if graph is to stop, when denormalize function returns negative value. Available actions are: STOP or CONTINUE. For CONTINUE action, error message is logged to console or file (if errorLog attribute is specified) and for STOP there is thrown TransformExceptions and graph execution is stopped. Error action can be set for each negative value (value1=action1;value2=action2;…) or for all values the same action (STOP or CONTINUE). It is possible to define error actions for some negative values and for all other values (MIN_INT=myAction). -1=CONTINUE;MIN_INT=STOP 2.6
errorLog no path to the error log file. 2.6

Example:

  <Node id="DENORMALIZE"
        type="DENORMALIZER">
        <attr name="order">asc</attr>
        <attr name="key">year</attr>
        <attr name="denormalize">//#TL
              date Year;
              string Month;
              decimal (3,1) temperaturejan; 
              decimal (3,1) temperaturefeb;
              integer pressurejan; 
              integer pressurefeb;
              function addInputRecord() {
                  Year = $year;
                  if ($month == "January") {
                       temperaturejan = $temperature;
                       pressurejan = $pressure;
                  } 
                  if ($month == "February") {
                       temperaturefeb = $temperature;
                       pressurefeb = $pressure;
                  }
              }
 
              function getOutputRecord() {
                     $Year := Year;
                     $TemperatureJan := temperaturejan;
                     $PressureJan := pressurejan;
                     $TemperatureFeb := temperaturefeb;
                     $PressureFeb := pressurefeb;
              }
        </attr>
  </Node>
 
  <Node id="DENORMALIZE"
        type="DENORMALIZER">
        <attr name="order">ignore</attr>
 
 
 
        <attr name="key">EmployeeID</attr>
        <attr name="denormalize">//#TL
 
              int order;
              string customer;
              string employee;
 
              function init() {
                order = 0;
                customer = "";
                employee = "";
              }
 
              function addInputRecord() {
                order = $OrderID;
                customer = $CustomerID;
                employee = $EmployeeID;
              }
 
              function getOutputRecord() {
                $OrderID := order;
                $CustomerID := customer;
                $EmployeeID := employee;
              }
 
        </attr>
  </Node>
 
  <Node id="DENORMALIZE"
        type="DENORMALIZER"
        key="EmployeeID" 
        order="ignore">
        <attr name="denormalize">
          import org.jetel.component.denormalize.DataRecordDenormalize;
          import org.jetel.data.*;
 
          public class DenormalizeTest extends DataRecordDenormalize {
            private DataRecord lastRecord;
            public DenormalizeTest() {
              lastRecord = null;
            }
 
            public int append(DataRecord inRecord) {
              lastRecord = inRecord;
              return OK;
            }
 
            public int transform(DataRecord outRecord) {
              outRecord.copyFrom(lastRecord);
              return OK;
            }
          }
        </attr>
  </Node>



EmailFilter

COMMERCIAL, since version 2.7

Component which reads records on input port 0, verifies specified fields for valid e-mail addresses and either accepts the record on output port 0 or rejects them on port 1.

Method, which is used for e-mail address verification, can be specified as level of inspection. Each level extends its predecessor in detail of verification.


Level of inspection

Level Description
SYNTAX Checks the string for valid syntax using a regular expression
DOMAIN Queries DNS and checks whether the domain exists and has (some) MX records.
SMTP Communicates with SMTP servers associated with the domain and verifies the address to user level
MAIL Works the same as SMTP and additionaly sends a dummy message to the address


Input ports: * port 0

Output ports:

  • port 0 – accepted records (optional)
  • port 1 – rejected records (optional)

Both output ports are optional, you can connect ports according to your needs. If port 0 is connected, then it MUST share the same metadata as input port. Output port 1 can have arbitrary metadata - input record data is copied into output port 1 record by field names, so only matching fields contain original data. Additionaly, port 1 can contain fields specified in errorField and statusField which are automatically filed with error message and status code.


Xml attributes:

Attribute Mandatory Description Default ETL Version since
id yes component identification
type yes component type EMAIL_FILTER
field yes list of field names containing e-mail addresses to be verified. Separated by :;| {colon, semicolon, pipe}
level yes level of inspection. Must be SYNTAX, DOMAIN, SMTP or MAIL DOMAIN
errorField no field where error message will be put (if any and field set).
statusField no field where status code will be put (if field set).
acceptEmpty no whether empty string qualifies as valid address true
acceptCondition no how records with multiple addresses are handled. See notes for more details. Must be either STRICT or LENIENT LENIENT
multiDelimiter no specifies delimiter(s) used to split input in each field to email addresses. Specify as a regex. When empty, each field is treated as single address [,;]
emailBufferSize no maximum number of e-mail addresses read into memory before they are bulk processed. Must be at least 1. 2000
emailCacheSize no maximum number of cached e-mail address verification results. Zero value turns caching off. 2000
domainRetryCount no number of retries for failed DNS queries 2
domainRetryTimeout no timeout (in miliseconds) for each DNS query attempt. Maximum time spent resolving is thus retryCount * retryTimeout (ms) 800
domainQueryARec no by SMTP standard one should try to resolve domain A record when no MX record could be found. Setting this option to false doubles DNS query speed but breaks this (unnecessary) standard. true
domainCacheSize no specifies max. cache size for DNS query results. Zero value turns caching off. Ignored at levels < DOMAIN 3000
smtpConnectAttempts no attempts for connection and HELO. Number of attempts and delays between them as a comma-separated list (in miliseconds) 1000,2000
smtpRequestTimeout no specify TCP timeout after which a SMTP request fails (in seconds!) 300
smtpGreylistAttempts no anti-greylisting feature. Specifies number of attempts and delays between them as a comma-separated list. Zero value (“0”) turns anti-greylisting off. (values in seconds) 30,120,240
smtpConcurrentLimit no max. number of parallel tasks working when anti-greylisting is in effect 10
mailFrom no “From:” field of a dummy message sent on MAIL level Clover.ETL clover@cloveretl.org
mailSubject no “Subject:” field of a dummy message sent on MAIL level Hello, this is a test message
mailBody no Body of a dummy message sent on MAIL level Hello,\nThis is Clover.ETL test message.\n\nPlease ignore a don't respond. Thank you, have a nice day!

Notes:

  • acceptCondition – Applies when multiple fields with email addresses are specified via “field” property or when “multiDelimiter” splits one or more fields
    • STRICT – all addresses in the list must verify (or be empty – see accept_empty) in order to accept the record
    • LENIENT – at least one address in the list must verify. Users must be careful with accept_empty.
  • field – when multiple fields are specified then acceptCondition holds for all fields at a time, not for each field separately
  • SYNTAX level allows for non-strict conventions and international characters in domain name (except for TLD)
  • multiDelimiter is always treated as a regular expression. Thus multiple delimiters and other rules can be easily defined. Typically [;,]
  • Be careful with acceptEmpty and LENIENT accept condition - In this example the record holding a value of “bad@address, ,another@bad” gets accepted
  • emailBufferSize - must be set to at least such value that buffer can hold all addresses parsed from a single record
  • emailCacheSize set to maximum value you can afford especially if there is a great chance that addresses often repeat
  • domainRetryCount, domainRetryTimeout and domainQueryARec can be used to fine-tune the speed of DOMAIN check. Setting it too low can be dangerous since pretty big number addresses might get rejected just because of high latency of DNS
  • domainCacheSize the more the better.. holds results of DNS queries therefore subsequent queries are costless
  • smtpConnectAttempts is a list of comma-separated delays in miliseconds. Each delay represents a retry after specified time. Failed connect attempt might be either because of network problem, server down or server rejecting connection
  • Anti-greylisting is an advanced feature that greatly improves accuracy but takes lots of time. See special chapter for more info.
  • All email addresses are internally converted to lower case

DOMAIN level

At domain level multiple DNS requests are issued against system default DNS servers. Number of retries and their timeouts can be specified for performance tuning. Among that, following two optimization techniques can be used. Firstly, DNS query results can be cached in a in-memory cache of maximum size domainCacheSize. Setting domainCacheSize to zero disables local caching and uses only native DNS caching, setting it to large value improves performance. Secondly, it is a good practise to sort records by domain names before feeding them to EmailFilter. In such case only single DNS query or often merely a cache lookup needs to be performed for a large number of records thus rapidly increasing throughput.

SMTP level and above

At SMTP level several techniques are used to determine whether an email address exists or not. This requires significantly more time than previous levels but provides great increase in accuracy. Internally, at DOMAIN level records and associated e-mail addresses are organized in input email buffer by their domain names. A SMTP level check is performed on each group of addresses at once so the more addresses in the group the less there are repeated SMTP connections. Please, see DOMAIN level chapter for more details. When a group of addresses are processed for single domain, a right server is chosen (see SMTP server choice). Addresses are then checked for VRFY, RCPT TO and MAIL according to specified level. When checks pass addresses and records are accepted or rejected. If anti-greylisting is in effect (see “smtpGreylistAttempts” property), failed addresses are not immediately rejected but a retry is scheduled according to “smtpGreylistAttempts”. In each iteration the remaining list of rejected addresses is re-checked until final attempt when all addresses are either accepted or rejected. By default, anti-greylisting is turned off since it significantly slows down the process. It can be turned on by setting smtpGreylistAttempts property to a comma-separated list (at least one) of delays between each retry. The delays are specified in seconds and reasonable values vary from one up to 15 minutes. Default value would be “30, 120, 240” (i.e. first retry after half a minute, if failed wait another two minutes and than finally next four minutes, total 6.5 minutes and 4 retries). Since anti-greylisting involves a lot of waiting it is done in background. By setting “smtpConcurrentLimit” you can specify maximum number of concurrent “anti-greylisting” threads that run parallel to normal run of the component. Each thread performs anti-greylisting cycle for single domain. When it finishes it starts another domain. Main component run is never blocked by anti-greylisting thread until the very finish when main thread waits for all anti-greylisting cycles to finish. When MAIL level is specified, all rules apply the same. The only difference is that at this level a test message is sent to the verified address. That is why you should be careful to use this level as it is intrusive and releveals your attempt to verify your address. You can modify the message via mailFrom, mailSubject and mailBody properties.

SMTP server choice

Generally a domain can have multiple MX records which means we have to decide which one to connect to. According to specification the server with highest priority will be tried first. If it is unreachable or HELO fails after all smtpConnectAttempts attempts then next high priority server is chosen. If a server can be connected to and talked to then, by specification, we will NOT try other servers even if the address validation fails (after any optional anti-spam measures like anti-greylisting). This conforms to SMTP protocol and also works well against nolisting anti-spam technique. When a domain does exist but doesn't have any MX record we use its A record as a single MX server. This feature complies to SMTP recommendation but it is turned off by default - see domainQueryARec.

Verification algorithm

main component loop

  1. read a record from input, extract all email addresses from it
  2. do email cache lookup and mark all found addresses
  3. do SYNTAX check and mark bad addresses
  4. if a record accept/reject condition can be met (i.e. all addresses are either in cache, or all are bad, etc.) immediately accept/reject the record and redo
  5. if there are unresolved addresses, continue
  6. check how full is the email buffer. Email buffer keeps addresses organized by domain names and maintains association with the record. Each time an addresses gets marked in the buffer, accept condition for associated record is evaluated and if it resolves into accept/reject, then the whole record is accepted/rejected and (later) removed from the buffer.
  7. if buffer is full then
    1. pick any domain group in email buffer
    2. remove the group from email buffer
    3. perform DOMAIN check, if it fails mark all addresses in the group as bad and discard the group. On success, continue
    4. SMTP/MAIL check
      1. pick a SMTP server
      2. perform VRFY, RCPT TO and MAIL checks
      3. mark good addresses
      4. if anti-greylisting is off, mark bad addresses too
      5. if anti-greylisting is on, schedule a new task for this group which will be reponsilble for repeating SMTP/MAIL check for remaining bad addresses
  8. repeat previous step until there is enough room in the buffer (this can be assured because each run removes on group from the buffer - either resolving it or scheduling for further resolution in anti-greylisting thread)
  9. put remaining addresses into email buffer
  10. redo until no more records
  11. process all remaining groups in email buffer
  12. wait for all anti-greylisting tasks to finish
  13. die

Example:

 
  <Node id="EMAILFILTER" type="EMAIL_FILTER" field="customer_email">
 
  <Node id="EMAILFILTER" type="EMAIL_FILTER" field="customer_email" level="SMTP" acceptEmpty="false" acceptCondition="STRICT">

Filter (Ext)

All records for which the filterExpression evaluates TRUE are copied from input port:0 onto output port:0. Rejected records are copied onto output port:1 (if it is connected). It can filter on text, date, integer, numeric fields with comparison [>, <, ==, <=, >=, !=] Text fields/expressions can also be compared to a Java regexp. using ~= (tilda,equal sign) characters. A filter can be made of different parts separated by a logical operator AND, OR. You can as well use parenthesis to give precendence.

Note: Date format used for specifying date value is yyyy-MM-dd for dates only and yyyy-MM-dd HH:mm:ss for date&time. These patterns correspond to values specified in “defaultProperties” file.

When referencing particular field, you have to precede field's name with dollar [$] sign - e.g. $FirstName. To ease the burden of converting comparison operators to XML-compatible form, each operator has its textual abbreviation - [.eq. .ne. .lt. .le. .gt. .ge.]

Built-in functions you can use in expressions:

  • today()
  • uppercase( ..str expression.. )
  • lowercase( ..str expression.. )
  • substring( ..str expression.. , from, length)
  • trim( .. str expression.. )
  • length( ..str expression.. )
  • isnull( &lt;field reference&gt; )
  • concat( ..str expression.., ..str expression.. , …… )
  • dateadd( ..date expression.., ..amount.. , year|month|day|hour|minute|sec )
  • datediff( ..date expression.., ..date expression.. , year|month|day|hour|minute|sec )
  • nvl(&lt;field reference&gt;, ..expression.. )
  • replace(..str expression.., ..regex_pattern.., ..str expression.. )
  • num2str(..num expression.. )
  • str2num(..str expression.. )
  • iif ( ..condition expression .. , ..expression.. , ..expression.. )
  • print_err ( ..str expression.. )
  • date2str(..date expression..,..format str expression..)
  • str2date(..str expression.., .. format str expression..)

Input ports: * 0 - input records

Output ports:

  • 0 - accepted records
  • 1 - (optional) rejected records


Xml attributes:

Attribute Mandatory Description Default ETL Version since
id yes component identification
type yes component type EXT_FILTER
filterExpression yes Expression used for filtering records. See above.

Example:

  Want to filter on HireDate field. HireDate must be less than 31st of December 2003.
  <Node id="FILTEREMPL1" type="EXT_FILTER" filterExpression="$HireDate =< '2003-12-31'"/>
 
  Want to filter on Name and Age fields. Name must start with 'A' char and Age must be greater than 25.
  <Node id="FILTEREMPL1" type="EXT_FILTER" filterExpression="$Name~=&quot;^A.*&quot; and $Age &amp;gt;25"/>
  <Node id="FILTEREMPL1" type="EXT_FILTER"> $Name~="^A.*" and $Age.gt.25 </Node>
 
  More complex example showing how to use various built-in functions.
  <Node id="FILTEREMPL1" type="EXT_FILTER">
        ( trim($CustomerName)~=".*Peter.*" or $DateSale==dateadd(today(),-1,month) ) and 
          $Age*2.lt.$Weight-10
  </Node>
 
  Evaluating data fields with NULL values (have isNull() set) results in runtime error. 
  To circumvent such situation, use 'isnull' and 'nvl' functions.
  <Node id="FILTEREMPL1" type="EXT_FILTER"> 
        !isnull($CustomerName) and $CustomerName~=".*Peter.*" or nvl($DateSale,2001-1-1)>=$DateInvoice
  </Node>


Note: If you want to use special characters like ", you have to write double backslash '\\' before special character. In example: \\".



Merge

Merges data records from two input ports onto one output. It preserves sorted order (as specified by the merge key). The structure of records in all merged data flows must be the same - it implies that all input ports share the same metadata.

Input ports: * 2..n input ports defined/connected.

Output ports:

  • one output port defined/connected.


Xml attributes:

Attribute Mandatory Description Default ETL Version since
id yes component identification
type yes component type MERGE
mergeKey yes key which specifies the sort order to be preserved while merging
equalNULL no specifies whether two fields containing NULL values are considered equal. false (in a future release will be changed) 2.8.1

Example:

  <Node id="MERGE" type="MERGE" mergeKey="name"/>



Normalizer

Normalizes input records - ie decomposes each input record to several output records using user-specified transformation. The component implements a RecordNormalize interface or inherits from a DataRecordNormalize class.

Input ports: * one input port defined/connected to read the records to be normalized.

Output ports:

  • one output port defined/connected to write results of normalization.

As an example of normalization, you can imagine a case when average air temperatures and pressures were measured during the twelve months of a year. You can have this information about many years represented by records consisting of 25 fields. The first is the year and the others are the temperatures and pressures for all twelve the months of the specified year. Thus, you have one record for one year. If you want to split these records into more records (by one for each month of a year), such new records will consist of 4 fields. The first field is the year. The second is the month. The other two are the average temperature and pressure. Now the information about the month is mentioned in the second field explicitly. The counterpart of this process is denormalization. You must define the following functions: count(), transform() and some mappings map(). The first function defines the number of records to which the incoming records should be split. The transform defines the way how to decompose the records. And each mapping define the way how inputs should be mapped to outputs. The transform() function accepts one integer number that goes from 0 to N-1 (if you want to decompose the incoming records into new N records). See below how you can define the normalization. (We briefly illustrate it, for two months of a year - January and February.)

In addition to these required functions, you can define other three functions: init(), finished() and clean(). If you want to declare and initialize some variables, if you want to anything what should be done at the beginning of data processing by the component, you should do it within the init() function. If you want to free memory, delete some temporary files, you should define it within the finished() function. Either of these functions is called only once. The init() function is called at the beginning, the finished() function is called at the end. Unlike them, the required count() and transform() functions are called many times. They are called after init() and before finished(). However, if you want to reset values of some variables and/or delete some temporary files between parsing individual incoming records, you can do it within the clean() function. It is called many times, but once after parsing each group of records and sending the resulting outgoing record to output port.


Xml attributes:

Attribute Mandatory Description Default ETL Version since
id yes component identification
type yes component type NORMALIZER
normalizeClass if no normalize name of the class to be used for normalizing data.
normalize if no normalizeClass contains definition of transformation in Java or TransformLang.
normalizeURL no contains path to the file with normalize code.
charset no encoding of extern source ISO-8859-1
errorActions no defines if graph is to stop, when normalize function returns negative value. Available actions are: STOP or CONTINUE. For CONTINUE action, error message is logged to console or file (if errorLog attribute is specified) and for STOP there is thrown TransformExceptions and graph execution is stopped. Error action can be set for each negative value (value1=action1;value2=action2;…) or for all values the same action (STOP or CONTINUE). It is possible to define error actions for some negative values and for all other values (MIN_INT=myAction). -1=CONTINUE;MIN_INT=STOP 2.6
errorLog no path to the error log file. 2.6

Example:

  <Node id="NORMALIZE"  type="NORMALIZER">
        <attr name="normalize">//#TL
                function map1() {
                  $year := $Year;
                  $month := "January";
                  $temperature := $TemperatureJan;
                  $pressure := $PressureJan;
                }
 
                function map2() {
                  $year := $Year;
                  $month := "February";
                  $temperature := $TemperatureFeb;
                  $pressure := $PressureFeb;
                }
 
                function transform(idx) {
                  switch (idx) {
                    case 0:map1();
                    case 1:map2();
                  }
                }
 
                function count() {
                  return 2;
                }
    </attr>
  </Node>
 
  <Node id="NORMALIZE"  type="NORMALIZER">
        <attr name="normalize">
          import org.jetel.component.normalize.DataRecordNormalize;
          import org.jetel.data.*;
 
          public class normalizeTest extends DataRecordNormalize {
            public int count(DataRecord source) {
              return 2;
            }
 
            public int transform(DataRecord source, DataRecord target, int idx) {
              if (idx == 0) {
                target.copyFrom(source);
              } else {
                target.reset();
              }
              return OK;
            }
          }
    </attr>
  </Node>
 
  <Node id="NORMALIZE" type="NORMALIZER">
        <attr name="normalize">//#TL
          function transform(idx) {
            if (idx == 1) {
              return;
            }
            $OrderID := $OrderID;
            $CustomerID := $CustomerID;
            $EmployeeID := $EmployeeID;
          }
 
          function count() {
            return 2;
          }
        </attr>
  </Node>



Partition

Partitions data into distinct flows. Each connected port becomes one flow. For each record read, the partition function chosen denotes which output flow (port) will be the record sent to. The component implements a PartitionFunction interface.

Which partition algorithm becomes active depends on following:

  • If partitionClass or partitionSource is defined, there is used specified function for partition
  • If NO partitionClass NOR, partitionSource, NO partitionKey is specified/defined, RoundRobin algorithm is used.
  • If partitionKey IS specified and NO ranges NOR partitionClass NOR partitionSource is specified, then partition by calculated hash value is used. The formula used is: hashValue / MAX_HASH_VALUE * #connected_output_ports) MOD #connected_output_ports
  • If BOTH partitionKey and ranges are specified (but not partitionClass nor partitionSource), then partition by range is used - partitionKey's value is sequentially compared with defined range boundaris. If the value is less or equal to specified boundary, then the record is sent out through the port corresponding to that boundary.

Input ports: * one input port defined/connected.

Output ports:

  • at least one output port defined/connected.

This component evaluates all incoming records according to some specified criterion, splits this data flow and distributes different records among different output ports. To part the data flow, you can define some data field ranges, some exact values, etc.

As an example of partition, you can imagine a case when average air temperatures and pressures were measured during the twelve months of a year. You can have this information about many years represented by records consisting of four fields. The first two are the year and the month, the third and fourth are the average temperature and average pressure. Thus, you have 12 records for one year. You may want to split the data flow into more data flows. Different records can be sent out through different output ports. Suppose you want to send out the records in which the average temperature was above 0 to the first port and those with the temperatures below 0 to the second port. To do that, you must use the following function: getOutputPort(). Within this function, you assign individual records to different (or the same) output port(s). See the last example below.

If you define some partitioning transformation, you define some getOutputPort() function. In addition to this getOutputPort() function, you can define another function: init(). If you want to declare and initialize some variables, if you want to anything what should be done at the beginning of data processing by the component, you should do it within the init() function. The init() function is called only once, at the beginning. Unlike the init() function, the getOutpuPort() function is called many times. It is called after init().


Xml attributes:

Attribute Mandatory Description Default ETL Version since
id yes component identification
type yes component type PARTITION
partitionClass no name of the class to be used for partioning data
partitionSource no partition function for partition dataset. Can be java code or in CloverETL transform language.
charset no encoding of extern source
partitionKey no key which specifies which fields (one or more) will be used for calculating hash valu used for determinig output port. If ranges attribute is not defined, then partition method partition by hash will be used.
ranges no definition of intervals against which partitionKey will be checked. If key's value belongs to interval then interval's order number is converted to output port number. There is created RangeLookupTable based on this attribute. Each interval consist of parts, which specify start end end values of given key parts; each part is pair is built from two values separated by ”,” and enclosed with brackes: ”(” or ”)” if the value is to be excluded from interval and ”<” or ”>” if the value is to be included to the interval.
Eg. for partionKey: EmployeeID;BirthDate and ranges: <1,9)(,31/12/1959>;<9,)(,31/12/1959>;<1,9)(31/12/1959,);<9,)(31/12/1959,) there are specified four output flows
* 0 - EmployeeID between 1 included and 9 excluded, and BirthDate at most 31/12/1959
* 1 - EmployeeID greater or equal 9, and BirthDate at most 31/12/1959
* 2 - EmployeeID between 1 included and 9 excluded, and BirthDate greater then 31/12/1959 * 3 - EmployeeID greater or equal 9, and BirthDate greater then 31/12/1959
When there are connected more output ports then ranges defined, records which don't have corresponding interval will be sent to last connected ouput port.
useI18N no true/false perform sorting according to national rules - e.g. Czech or German handling of characters like “i”,”í”. Use it only if you are sorting data according to key which can contain accented characters or you want sorter to follow certain locale specific rules. false
locale no locale to be used when sorting using I18N rules. system default

Example:

  <Node id="PARTITION_BY_KEY" type="PARTITION" partitionKey="age"/>
 
  <Node id="PARTITION_BY_RANGE" type="PARTITION" partitionKey="EmployeeID;BirthDate">
        <attr name="ranges"><1,9)(,31/12/1959>;<9,)(,31/12/1959>;<1,9)(31/12/1959,>;<9,)(31/12/1959,><attr/>
  </Node>
 
  <Node id="PARTITION" type="PARTITION">
        <attr name="partitionSource">//#TL
            function getOutputPort(){
                if ($EmployeeID .lt. 3) return 0
                else if ($EmployeeID .lt. 5) return 1
                else return 2
            }
         </attr>
  </Node>
 
  <Node id="PARTITION" type="PARTITION">
        <attr name="partitionSource">//#TL
            function getOutputPort(){
                if ($temperature > 0) return 0
                else return 1
            }
         </attr>
  </Node>



Reformat

Changes / reformats data record between one INPUT and several OUTPUT ports. This component is only a wrapper around transformation class implementing org.jetel.component.RecordTransform interface. The method transform is called for every record passing through this component. The component implements a RecordTransform interface or inherits from a DataRecordTransform class.

Input ports: * one input port defined/connected.

Output ports:

  • at least one output port defined/connected.

This component creates new data records based on the incoming data records and on the specified transformation and sends them to output ports as specified in the transform() function. Transformation can be written in Java or Clover Transform Language. Component can do many changes of data records. It can change format of date data type, concatenate some fields, reorder them, split some fields, cut off some parts of data, change letter cases, convert different data types from one type to another or replace some field values by some other identification, etc. The transform() function serves to define the transformation. See the last example below. (The incoming records contain also the firstname and the lastname fields. You can concatenate these two fields with a space between them. Thus, if the incoming records consist of N fields, outgoing records consist of N-1 fields.)

In addition to this required function, you can define other two functions: init() and finished(). If you want to declare and initialize some variables, if you want to anything what should be done at the beginning of data processing by the component, you should do it within the init() function. If you want to free memory, delete some temporary files, you should define it within the finished() function. Either of these functions is called only once. The init() function is called at the beginning, the finished() function is called at the end. Unlike them, the required transform() function is called many times. It is called after init() and before finished().


Xml attributes:

Attribute Mandatory Description Default ETL Version since
id yes component identification
type yes component type REFORMAT
transform if no transformClass contains definition of transformation
transformClass if no transform name of the class to be used for transforming data
transformURL no contains path to the file with transformation code.
charset no encoding of extern source ISO-8859-1
errorActions no defines if graph is to stop, when transformation returns negative value. Available actions are: STOP or CONTINUE. For CONTINUE action, error message is logged to console or file (if errorLog attribute is specified) and for STOP there is thrown TransformExceptions and graph execution is stopped. Error action can be set for each negative value (value1=action1;value2=action2;…) or for all values the same action (STOP or CONTINUE). It is possible to define error actions for some negative values and for all other values (MIN_INT=myAction). -1=CONTINUE;MIN_INT=STOP 2.6
errorLog no path to the error log file. Each error (after which graph continues) is logged in following way: recordNumber;errorCode;errorMessage;semiResult - fields are delimited by Defaults.Component.KEY_FIELDS_DELIMITER. 2.6
..optional attribute.. no any additional attribute is passed to transformation class in Properties object - as a key→value pair. There is no limit to how many optional attributes can be used.

Example:

  <Node id="REF" type="REFORMAT" transformClass="org.jetel.test.reformatOrders" param1="123" param2="XYZ"/>
 
  <Node id="REF" type="REFORMAT">
        <attr name="transform">
import org.jetel.component.DataRecordTransform;
import org.jetel.data.*;
 
 
public class ReformatOrdersInline extends DataRecordTransform{
 
	int counter=0;
	int field=0;
 
	public int transform(DataRecord[] source, DataRecord[] target){
		StringBuffer strBuf=new StringBuffer(80);
		if (source[0]==null){
		   System.err.println("NULL source[0]");
		}
		try{
			// let's concatenate shipping address into one long string
			strBuf.append(GetVal.getString(source[0],"ShipName")).append(';');
			strBuf.append(GetVal.getString(source[0],"ShipAddress")).append(';');
			strBuf.append(GetVal.getString(source[0],"ShipCity")).append(';');
			strBuf.append(GetVal.getString(source[0],"ShipCountry"));
			// mapping among source & target fields
			// some fields get assigned directly from source fields, some
			// are assigned from internall variables
			SetVal.setInt(target[0],"PRODUCTID",counter);
			SetVal.setInt(target[0],"ORDERID",GetVal.getInt(source[0],"OrderID"));
			SetVal.setString(target[0],"CUSTOMERID",GetVal.getString(source[0],"CustomerID"));
			SetVal.setString(target[0],"CUSTOMER",strBuf.toString());
			SetVal.setInt(target[0], "SHIPTIME", (int)( (GetVal.getDate(
					source[0], "RequiredDate").getTime() - GetVal.getDate(
					source[0], "ShippedDate").getTime())
					/ 1000 / 60 / 60 / 24));
		}catch(Exception ex){
		  ex.printStackTrace();
			errorMessage=ex.getMessage()+" ->occured with record :"+counter;
			return SKIP;
		}
		counter++;
			return ALL;
	}
}
        </attr>
  </Node>
 
  <Node id="REF" type="REFORMAT">
        <attr name="transform">//#TL
           function getMemseqno(){
               if (!isnull($RecordName0.SSN)) {
                   return $RecordName0.Memseqno+1;
               }
               return $RecordName0.Memseqno;
           }
 
           function transform(){
               $0.Id:=$RecordName0.Id;
               $1.Id:=$RecordName0.Id;
               $0.SrcCode:=$RecordName0.SrcCode;
               $1.SrcCode:=$RecordName0.SrcCode;
               $0.Memseqno:=getMemseqno();
               $1.Memseqno:=getMemseqno();
            }
        </attr>
  </Node>
 
  <Node id="REF" type="REFORMAT">
        <attr name="transform">//#TL
           function transform(){
               $Name := $firstname+" "+$lastname;
               $field2 := $field2;
                      ... 
               $fieldN := $fieldN;
            }
        </attr>
  </Node>
 
  <Node errorActions="MIN_INT=CONTINUE;" errorLog="${DATATMP_DIR}/log.txt" id="REF" transformURL="${TRANS_DIR}/reformatOrders.java" type="REFORMAT"/>



Rollup

Since version 2.8

Rollup is a general transformation component that serves as an executor of rollup transforms written in Java or CTL. Rollup transforms written in Java have to implement the org.jetel.component.rollup.RecordRollup interface. Basically, rollup transforms are used to process groups of data records. Each group consisting of M data records may be used to output N different data records. The number of output data records is determined at runtime within the rollup transform. If no group key is defined, all input data records belong to a single group.

Each group of data records may share a data record referred to as a group “accumulator”. It may be used to store intermediate results. The group “accumulator” is created an initialized when the first data record of a group is encountered and updated for each data record in this group (including the first and the last data record). When the last data record of a group is encountered, processing of this group is finished and the group “accumulator” is disposed. If the input data records are not sorted, each group is finished as soon as all input data records are read and processed.

Because Rollup is a general transformation component, it can be used instead of Aggregate, Dedup, Normalizer and Denormalizer components. Rollup does not require sorted input data records and therefore can save overhead required to sort them. It can also be used to output intermediate results related to each input data record within a group.

The life cycle of a rollup transform is as follows:

  • The init(Properties, DataRecordMetadata, DataRecordMetadata, DataRecordMetadata[]) method is called to initialize the transform.
  • For each input data record as a current data record:
    • If the current data record belongs to a new group:
      • If requested, a group “accumulator” is created.
      • The initGroup(DataRecord, DataRecord) method is called to initialize processing of the group and to initialize the “accumulator” (if it exists).
    • The updateGroup(DataRecord, DataRecord) method is called for the current data record and the corresponding group “accumulator” (if it was requested).
    • If the method returned true, the updateTransform(int, DataRecord, DataRecord, DataRecord[]) method is called repeatedly to generate output data record(s) until it returns RecordRollup.SKIP. If it returns value < RecordRollup.SKIP, the getMessage() method is called.
    • If the current data record is the last one in its group:
      • The finishGroup(DataRecord, DataRecord) method is called to finish the group processing.
      • If the method returned true, the transform(int, DataRecord, DataRecord, DataRecord[]) method is called repeatedly to generate output data record(s) until it returns RecordRollup.SKIP. If it returns value < RecordRollup.SKIP, the getMessage() method is called.
      • If the group “accumulator” was requested, its contents is disposed.
  • The finished() method is called to notify that the rollup transform finished.
  • The reset() method may optionally be called to reset the transformation and so that the previous two steps may be executed again.


Input ports: * one input port defined/connected


Output ports:

  • at least one output port defined/connected


XML attributes:

Attribute Mandatory Description Default
id yes Identification of the component. N/A
type yes The type of the component (has to be ROLLUP). N/A
groupKeyFields no Field names that form the group key separated by ':', ';' or '|'. null
groupAccumulatorMetadataId no ID of data record meta data that should be used to create group “accumulators” (if required). null
transform yes/no Rollup transform as a Java or CTL source code. null
transformUrl yes/no URL of an external Java/CTL source code of the rollup transform. null
transformUrlCharset no Character set used in the source code of the rollup transform specified via an URL. ISO-8859-1
transformClassName yes/no Class name of a Java class implementing the rollup transform. null
inputSorted no Flag specifying whether the input data records are sorted or not. If set to false, the order of output data records is not specified. true
equalNULL no Flag specifying whether the null values are considered equal or not. true


Example:

<Node id="MY_ROLLUP" type="ROLLUP" groupKeyFields="id" groupAccumulatorMetadataId="accumulator"
      transformUrl="MyRollupTransform.java" transformUrlCharset="UTF-8" inputSorted="false"/>



SimpleCopy

All records from input port are copied onto all connected output ports.

Input ports: * one input port defined/connected.

Output ports:

  • at least one output port defined/connected.


Xml attributes:

Attribute Mandatory Description Default ETL Version since
id yes component identification
type yes component type SIMPLE_COPY

Example:

  <Node id="SIMPLE_COPY0" type="SIMPLE_COPY"/>



SimpleGather

All records from all input ports are gathered and copied onto output port. It goes port by port (waiting/blocked) if there is currently no data on port. Implements inverse RoundRobin.

Input ports: * at least one input port defined/connected.

Output ports:

  • one output port defined/connected.


Xml attributes:

Attribute Mandatory Description Default ETL Version since
id yes component identification
type yes component type SIMPLE_GATHER

Example:

  <Node id="SIMPLE_GATHER0" type="SIMPLE_GATHER"/>



Sort (Ext)

Sorts the incoming records based on specified key. The key is name (or combination of names) of field(s) from input record. Each field from the Sort key can be sorted independently on the others. The sort order for each individual field is either Ascending (default) or Descending which is expressed by the a or d letter in parentheses that follow the field name. For example: Surname(a);FirstName(a);Salary(d). In case there is not enough room in internal sort buffer, it performs external sorting - thus any number of internal records can be sorted.

Input ports: * one input port defined/connected.

Output ports:

  • at least one output port defined/connected.


Xml attributes:

Attribute Mandatory Description Default ETL Version since
id yes component identification
type yes component type EXT_SORT
sortKey yes sequence of field names followed by (a) or (d) meaning that the field is sorted in ascending or descending order, respectively. The individual expressions are separated by :;| {colon, semicolon, pipe}. Example: Surname(a);FirstName(a);Salary(d)
locale no Locale used for correct sorting order
caseSensitive no Casa sensitivity of ordering. Locale must be set if this attribute is in use.
sortOrder no One of “Ascending|Descending” {the fist letter is sufficient}. Deprecated attribute. In older versions of Clover, sortOrder was unique for all fields from the sortKey attribute. Now it can be different for different fields and is expressed as (a) or (d) following immediately after each field name from the sortKey attribute. Ascending
numberOfTapes no even number greater than 2 - denotes how many tapes (temporary files) will be used when external sorting data. 6
sorterInitialCapacity no the initial capacity of internal sorter used for in-memory sorting records.
bufferCapacity no what is the maximum number of records which are sorted in-memory. If number of records exceed this size, external sorting is performed.
tmpDirs no Semicolon (;) delimited list of directories which should be used for creating tape files - used when external sorting is performed. Default value is equal to Java's java.io.tmpdir system property.
  • sorterInitialCapacity - If the system has plenty of memory, specify high number here (5000 or more). If the system is short on memory, use low number (100).

    The final capacity is based on following formula:
    sorter_initial_capacity * (1 - grow_factor^max_num_collections)/(1 - grow_factor)
    where: grow_factor=1.6
    max_num_collections=8
    sorterInitialCapacity=2000

With the parameters above, the default total capacity roughly is 140000 records. The total capacity is approximately 69,91 * sorterInitialCapacity. Following tables shows Total Capacities of internal buffer for various Initial Capacity values:

Initial Capacity Total Capacity
10 1000
100 7000
1000 70000
2000 140000
5000 350000
10000 700000
20000 1399000
50000 3496000

Example:

  <Node id="SORT_CUSTOMER" type="EXT_SORT" sortKey="Name:Address" sortOrder="A"/>



FastSort

COMMERCIAL, since version 2.7

FastSort is an improved commerical alternative to regular ExtSort component. It can handle both internal (in-memory) and external (on-disk) sorting.

The modified multiway external sort algorithm used in FastSort yields up to 2.5 times faster sorting times than ExtSort. To achieve maximum performance, FastSort needs to know estimated number of records in the sorted sets. The number does not need to be exact but using accurate values produces best results.

Input ports: * port 0

Output ports:

  • at least one output port - sorted records


Basic XML attributes:

Attribute Mandatory Description Default ETL Version since
id yes component identification
type yes component type FAST_SORT
sortKey yes a list of fields which act as keys for sorting
locale no Locale used for correct sorting order
estRecCount yes unless runSize Estimated number of input records. Approximate value in order of magnitudes is sufficient
sortInMemory no Whether only in-memory sorting is used. Setting to “true” disables external sorting false
tmpDirs no A list of temporary directories where temporary files will be stored. Use directories on separate physical volumes to increase performance. system TMP dir

Advanced XML attributes:

^ Attribute ^ Mandatory ^ Description ^ Default ^ ETL Version since ^

runSize yes unless estRecCount Number of records in each run. This setting overrides automatic guessed value. Typically a 1/100th to 1/200th of total records is a good choice, values ranging from 5000 to 200,000 are most common. 20000
avgRecSize no Average record size in bytes. If omitted, it is determined automatically from a sample of records.
maxMemory no A rough estimate of maximum memory FastSort can use. Can must be specified with units, e.g. “300MB”, “1gb”, etc. (auto)
concurrencyLimit no Number of thread performing sorting and writing runs. Default value works best in all common cases. On heavily concurrent systems increasing this value might produce good performance boost.
readBuffers no Number of read buffers in first stage. Corresponds tightly with number of threads (concurrencyLimit). Must be equal or larger to concurrencyLimit. The more read buffers the less change the thread will block each other.
compress no Enables compression of temporary files. While this is a good option if disk space is an issue it has a negative impact on performance so use it only in cases where speed is irrelevant.
tmpCharset no Temporary files are stored on disk in their serialized form which means that all string fields use unicode (2-byte) encoding. Setting tmpCharset to “utf-8” or similar variable length or single byte encoding can save up to almost half the space needed for temporaries for the cost of slightly reduced performance.
tapeBuffer no Size in bytes for read buffer associated with each sorted run. Decreasing it from default value might avoid memory exhaustion for large numbers of runs 8192
tempFilesLimit no Number of maximum open files. Lower values produce significant performance drop. 0 for unlimited unlimited (0)

Note that estRecCount or runSize parameters are essential for performance tuning. If estimated record count is specified then a magic formula is used to determine runSize, which is the fundamental runtime parameter. Small run sizes (2,000-20,000) are efective for smaller sets while big run sizes (hundreds of thousands records) are best for large datasets. Typical run size ranges from 5,000 to 200,000 records. The first stage of external sort can benefit from multi-core or multi-processor environment so on systems with more than two processing units it might be useful to set concurrencyLimit property to a reasonable number of concurrent threads. Note that if this parameter is omitted it is determined automatically based on system environment.

FastSort uses relatively large numbers of temporary files during its operation. The following table shows approximative numbers of temporary files for different record set sizes.

Dataset size Number of temp.files Auto run size Note
1 000 000 ~100 ~10 000
10 000 000 ~220 ~45 000
1 000 000 000 20 000 to 2 000 50 000 to 500 000 Depends on available memory

Note that numbers in the table above are not exact and might be different on your system. However, sometimes such large numbers of files might cause problems hitting user quotas or other runtime limitations. This happens typically in complex graphs where parallel sorts take place, possibly with other graph components also having huge number of open files. This results in “Too many open files” error and graph execution failure. There are two possible solutions to this issue - either increase the limit (quota) or force FastSort to keep the number of temporary files below some limit. The first option is recomended for production systems since there is no speed sacrifice. Typically setting “ulimit” to higher number on Unix systems. If increasing the quota is not an option (e.g. for regular users on large servers) then one must set “Max open files” in FastSort to a reasonable value (assesing number of parallel FastSorts, other graph components, etc.). FastSort will then perform intermediate merges of the temporary files to keep their number below the limit. However, setting “Max open files” to such values, that intermediate merges are inevitable and occur often produces significant performace drop so keep it at highest value possible. If you are forced to limit FastSort to less than a hundred temporary files, even for large datasets, consider using ExtSort instead which is designed for performance with limited number of tapes.

Example:

   <Node id="FASTSORT" type="FAST_SORT" sortKey="price(a),date(d)"/>
   <Node id="FASTSORT" type="FAST_SORT" sortKey="key_field(a)" runSize="10000" concurrencyLimit="3" readBuffers="6" tmpDirs="/tmp; /mnt/volume1/temp/" tmpCharset="utf-8"/>

SortWithinGroups

Since version 2.6

Receives data records through connected input port and sorts them according to the specified sort key within a group specified by the group key. Sorted data records are sent to all the connected output ports. The sort key (as well as the group key) is a name or a combination of names of field(s) of incoming data records. Sort order of each data field can be either Ascending (default) or Descending. Any number of records can be sorted. If the internal buffer is full, external sorting is performed.

This component is actually an ExtSort optimized for sorting within groups.


Input ports: * one input port defined/connected


Output ports:

  • at least one output port defined/connected


XML attributes:

Attribute Mandatory Description Default
id yes The identification of the component. N/A
type yes The type of the component (has to be SORT_WITHIN_GROUPS). N/A
groupKey yes Field names separated by ':', ';' or '|'. N/A
sortKey yes Field names followed by ”(a)” (ascending order – default) or ”(d)” (descending order) and separated by ':', ';' or '|'. N/A
bufferCapacity no Defines the maximum number of records that will be sorted in memory. If the number of records exceeds this value, external sorting is performed. -1 for external sorting
numberOfTapes no Denotes how many tapes (temporary files) will be used for external data sorting. This value must be an even number greater than 2. 8
tempDirectories no A semicolon-delimited list of directories which should be used for creating tape files. These are used when external sorting is performed. java.io.tmpdir system property


Example:

<Node id="SORT_CUSTOMER" type="SORT_WITHIN_GROUPS" groupKey="id" sortKey="name(a);address(a)"/>




XSL Transformation

The component transforms input data according to a xslt file or xslt string and the result sends to output port or output file. The transformation is used for each field which is in transform function in mapping attribute, rest of the fields in the mapping attribute are only copied. There is also possible to use xmlInputFile and xmlOutputFile attributes which define input and output data file, dictionary or field.

Input ports: * one input port defined/connected.

Output ports:

  • one output port defined/connected.


Xml attributes:

Attribute Mandatory Description Default
id yes component identification
type yes component type XSL_TRANSFORMER
xsltFile if no xslt XSLT file needed for transformation
xslt if no xsltFile XSLT needed for transformation
mapping if no xmlInputFile,xmlOutputFile transformation mapping separated by ; {semicolon}. This attribute can be used instead of xmlInputFile and xmlOutputFile.
xmlInputFile if no mapping input data file
xmlOutputFile if no mapping output data file
charset no charset used for xslt and also if an input or output field is not byte or cbyte type. Defaults.Encoding (UTF-8)


Example:

  <Node id="XSL_TRANSFORMER0"
        type="XSL_TRANSFORMER"
        mapping="$0.OutputField1:=transform($0.InputField1);$0.OutputField2:=$0.InputField2;"
        <attr name="xslt">
           <![CDATA[
              <xsl:stylesheet version="1.0"
               xmlns:xsl="http://www.w3.org/1999/XSL/Transform">
 
              <xsl:template match="/">
                <html>
                  <body>
                    <h2>My CD Collection</h2>
                    <table border="1">
                      <tr bgcolor="#9acd32">
                        <th>Title</th>
                        <th>Artist</th>
                      </tr>
                      <xsl:for-each select="catalog/cd">
                      <tr>
                        <td><xsl:value-of select="title"/></td>
                        <td><xsl:value-of select="artist"/></td>
                      </tr>
                    </xsl:for-each>
                    </table>
                  </body>
                </html>
              </xsl:template>
              </xsl:stylesheet>
           ]]>
        </attr>
  </Node>



components/transformers.txt · Last modified: 2010/07/12 21:47 by dpavlis
Back to top
chimeric.de = chi`s home Valid CSS Driven by DokuWiki do yourself a favour and use a real browser - get firefox!! Recent changes RSS feed Valid XHTML 1.0