Transformers are components of graph that transform data from one or more input ports to output port. Most of transformers use transform function or key or some other method that defines particular data transformation.
Note: For characters that are impossible to enter directly, there are several escape sequences: \\, \n, \t, \r, \”, \f, \b. So be careful when using “\” character, eg. parameter filterExpression=“$Field1~="([^\\|]*\\|){3}"” will be interpreted as “$Field1~="([^\|]*\|){3}"” while “\\” is interpreted as only one “\”; to get “$Field1~="([^\\|]*\\|){3}"” write filterExpression=“$Field1~="([^\\\\|]*\\\\|){3}"”.
Common attributes
| Attribute | Description | Exp. |
|---|---|---|
id | component identification | string |
type | component type. This attribute is automatically generated from gui. | string |
NOTE: this documentation is valid for the Aggregate component in Clover version 2.2 or newer. The documentation for the older Aggregate can be found here.
Applies aggregation functions on groups of records defined by the aggregation key (similarly to GROUP BY in SQL). The key is defined by a name (or several names) of field(s) from the input record. The result of aggregation of each group is stored in a separate record in the output.
Which aggregation functions and operations are used is defined in the aggregation mapping. For each field of the output, a mapping can be specified in the form of $output_field := mapping. The mappings are separated by ”;”. The mapping can be one of the following:
$output_field := count(); $output_field := sum($input_field). $output_field := $input_field.\” ). Example: $output_field := “Some string with a double quote \””.$output_field := 42$output_field := 3.14$field_name := 1999-12-31.Example mappings:
$ShipCountry:=$ShipCountry;$AvgFreight:=stddev($Freight);$ShipCountry:=$ShipCountry;$OrderDate:=1901-12-31 23:59:59;$ShipCity:=“very nice \\”city\\”; really!”;$Count:=42;$AvgFreight:=3.14159265358979323486;$ShipCountry := $ShipCountry; $OrderDate := 1901-12-31 23:59:59; $CustomerID:= “blablabla”; $ShipCity:=FirstNonNull($ShipCity); $AvgFreight := 3.141; $Count:= count()Available aggregation functions
The following table contains the names of available aggregation functions. The scope of all aggregation functions are the records which belong to one aggregation group, so the functions produce one result for every aggregation group. The aggregation key specifies how are the input records split into aggregation groups. The parameter type is the description of the required type (i.e. metadata) of the field which is the input parameter of the function (e.g. avg($weight), weight is the name of the field which is the parameter of the avg function). Output type is the description of the required type (i.e. metadata) of the field where the result of the function is stored. Input type N/A means that the functions does not have a parameter (i.e. count()). If the type is specified as number, then any numeric field type can be used (integer, long, …) .
| Name | Parameter type | Output type | Description |
|---|---|---|---|
| avg | number | number | Average of the parameter values. |
| count | N/A | number | Number of records in the aggregation group (ignores their values). |
| countnonnull | any | number | Number of records in the aggregation group with a non-null value of the parameter. |
| countunique | any | number | Number of unique values of the parameter. |
| crc32 | any | long, must be nullable if parameter is nullable | For each value of the parameter, it updates the resulting CRC32 value. |
| first | any | same type as parameter, must be nullable if parameter is nullable | First value of the parameter (can be null). |
| firstnonnull | any | same type as parameter, must be nullable if parameter is nullable | First non-null value of the parameter. The result can be null if the parameter does not have any non-null value. |
| last | any | same type as parameter, must be nullable if parameter is nullable | Last value of the parameter (can be null). |
| lastnonnull | any | same type as parameter, must be nullable if parameter is nullable | Last non-null value of the parameter. The result can be null if the parameter does not have any non-null value. |
| max | any | same type as parameter, must be nullable if parameter is nullable | Maximum value of the parameter. Works for non-number types too. |
| md5 | any | string, must be nullable if parameter is nullable | For each value of the parameter, it updates the resulting MD5 hash value. |
| median | any | same type as parameter, must be nullable if parameter is nullable | Median of the parameter values. Works for non-number types too. |
| min | any | same type as parameter, must be nullable if parameter is nullable | Minimum value of the parameter. Works for non-number types too. |
| modus | any | same type as parameter, must be nullable if parameter is nullable | Most frequently occurring value of the parameter. Works for non-number types too. |
| stddev | number | number, must be nullable if parameter is nullable | Standard deviation of the parameter values. |
| sum | number | number, must be nullable if parameter is nullable | Sum of the parameter values. |
Backward compatibility:
The Aggregate component supports the old format of the aggregation mapping (which was used in Clover version 2.1 or older). The old mapping can be provided in the same “aggregateFunctions” attribute as in the old Aggregate component.
Input ports: * one input port defined/connected.
Output ports:
Xml attributes:
| Attribute | Mandatory | Description | Default | ETL Version since |
|---|---|---|---|---|
id | yes | component identification | ||
type | yes | component type | AGGREGATE | |
aggregateKey | no | aggregation key, defined by field names separated by :;| {colon, semicolon, pipe} | ||
mapping | no | aggregation mappings separated by ; {semicolon} | ||
aggregateFunctions | no | aggregation mapping in the format of the older aggregate component (from Clover version 2.1 or older) | ||
sorted | no | if input data flow is sorted | true | |
equalNULL | no | specifies whether two fields containing NULL values are considered equal. | false | |
charset | no | character encoding of the input data stream for CRC32 and MD5 functions. | ISO-8859-1 or defaultProperties |
Example:
<Node id="AGGREGATE" type="AGGREGATE" aggregateKey="ShipCountry" mapping="$ShipCountry:=$ShipCountry;$OrderDate:=min($OrderDate);$ShipCity:=firstnonnull($ShipCity);$AvgFreight:=avg($Freight);$Count:=count();" sorted="false" />
All records from all input ports are copied onto output port. It goes port by port (waiting/blocked) if there is currently no data on port. When reading from one port is done (EOF status), continues with the next.
Input ports: * at least one connected input port.
Output ports:
Xml attributes:
| Attribute | Mandatory | Description | Default | ETL Version since |
|---|---|---|---|---|
id | yes | component identification | ||
type | yes | component type | CONCATENATE |
Example:
<Node id="CONCATENATE0" type="CONCATENATE"/>
Finds intersection of flow A (in-port0) and B (in-port1) based on specified key. Both inputs must be sorted according to specified key. DataRecords only in flow A are sent out through out-port0. DataRecords in both A & B are sent to specified transformation function and the result is sent through out-port1. DataRecords present only in flow B are sent through out-port2.
The component implements a RecordTransform interface or inherits from a DataRecordTransform class.
When you work with this component, you must implement the transform() function. In addition to this required function, you can define other two functions: init() and finished(). If you want to declare and initialize some variables, if you want to anything what should be done at the beginning of data processing by the component, you should do it within the init() function. If you want to free memory, delete some temporary files, you should define it within the finished() function. Either of these functions is called only once. The init() function is called at the beginning, the finished() function is called at the end. Unlike them, the required transform() function is called many times. It is called after init() and before finished().
Input ports: * 0 - records from set A - sorted according to specified key
Output ports:
Xml attributes:
| Attribute | Mandatory | Description | Default | ETL Version since |
|---|---|---|---|---|
id | yes | component identification | ||
type | yes | component type | DATA_INTERSECT | |
joinKey | yes | field names separated by :;| {colon, semicolon, pipe}. Since 2.5 version it is advocated to use new firm syntax for join key: $fieldA1=$fieldB1;$fieldA2=$fieldB2;…. | ||
transform | if no transformClass | contains definition of transformation in internal clover format | ||
transformClass | if no transform | name of Java library file (.jar,.zip,…) where to search for class to be used for transforming data. | ||
transformURL | no | contains path to the file with transformation code. | ||
charset | no | encoding of extern source | ISO-8859-1 | |
slaveOverrideKey | no since 2.5 ver. deprecated | can be used to specify different key field names for records on slave input; field names separated by :;| {colon, semicolon, pipe} | ||
equalNULL | no | specifies whether two fields containing NULL values are considered equal. | true | |
keyDuplicates | no | true/false - allows records to have duplicate keys. False - multiple duplicate records are discarded - only the last one is sent to transformation. Above is valid from version 2.5 - older version can't hadle duplicate keys and only first occurrence was sent to product of A&B. | true | 2.5 |
errorActions | no | defines if graph is to stop, when transformation returns negative value. Available actions are: STOP or CONTINUE. For CONTINUE action, error message is logged to console or file (if errorLog attribute is specified) and for STOP there is thrown TransformExceptions and graph execution is stopped. Error action can be set for each negative value (value1=action1;value2=action2;…) or for all values the same action (STOP or CONTINUE). It is possible to define error actions for some negative values and for all other values (MIN_INT=myAction). | -1=CONTINUE;MIN_INT=STOP | 2.6 |
errorLog | no | path to the error log file. | 2.6 |
Example:
<Node id="INTERSECT" type="DATA_INTERSECT" joinKey="CustomerID" transformClass="org.jetel.test.reformatOrders"/> <Node id="INTERSECT" type="DATA_INTERSECT" joinKey="EmployeeID"> <attr name="transform"> import org.jetel.component.DataRecordTransform; import org.jetel.data.*; public class intersectionTest extends DataRecordTransform{ public boolean transform(DataRecord[] source, DataRecord[] target){ target[0].getField(0).setValue(source[0].getField(0).getValue()); target[0].getField(1).setValue(source[0].getField(1).getValue()); target[0].getField(2).setValue(source[1].getField(2).getValue()); return true; } } </attr> </Node> <Node id="INTERSECT" joinKey="$EmployeeID=$EMP_NO;#" type="DATA_INTERSECTION"> <attr name="transform"><![CDATA[${out.0.EMP_NO} = ${in.0.EmployeeID}; ${out.0.FIRST_NAME} = ${in.0.FirstName}; ${out.0.LAST_NAME} = ${in.0.LastName}; ${out.0.FULL_NAME} = ${in.1.FULL_NAME}; ${out.0.HIRE_DATE} = ${in.1.HIRE_DATE}; ${out.0.HIRE_DATE} = ${in.0.HireDate}; ]]></attr> </Node>
Removes duplicates from data flow of sorted records based on specified key. The key is name (or combination of names) of field(s) from input record. It keeps either First or Last record from the group based on the parameter 'keep' specified. All duplicated records are rejected to the second optional port. When no key is defined, dedup act as UNIX 'head' (Keep = First), or UNIX 'tail' (Keep= Last).
Input ports: * one input port defined/connected.
Output ports:
Xml attributes:
| Attribute | Mandatory | Description | Default | ETL Version since |
|---|---|---|---|---|
id | yes | component identification | ||
type | yes | component type | DEDUP | |
dedupKey | yes | field names separated by :;| {colon, semicolon, pipe}, can be also empty, then all records are considered as one group | ||
keep | no | one of “First|Last|Unique” {the fist letter is sufficient} | First | |
equalNULL | no | specifies whether two fields containing NULL values are considered equal. | true | |
noDupRecord | no | number of duplicate records to be written to out port | 1 |
Example:
<Node id="DISTINCT" type="DEDUP" dedupKey="Name" /> <Node id="DISTINCT" type="DEDUP" dedupKey="Name" keep="Last" equalNULL="false" />
Denormalizes input records - i.e. composes one output record from several input records with identical key using user-specified transformation.
The component implements a RecordDenormalize interface or inherits from a DataRecordDenormalize class.
Input ports: * one input port to receive the records to be denormalized.
Output ports:
As an example of denormalization, you can imagine a case when average air temperatures and pressures were measured during the twelve months of a year. You can have this information about many years represented by records consisting of four fields. The first two are the year and the month, the third and fourth are the average temperature and average pressure. Thus, you have 12 records for one year. If you want to compose these 12 records together and convert them into a new one, such new record will consist of 25 fields. The first field contains the year. In the other 24 fields, the temperatures and pressures alternate for all the twelve months of the specified year. Now the information about the month is only in the order of the temperature or the pressure within all the 24 fields. The counterpart of this process is normalization. You must declare some variables and use two functions: append() and transform(). Within the first function, you assign the field values to the defined variables, within the second you map these variables to the outgoing fields. See below how you can define the denormalization. (We briefly illustrate it, for two months of a year - January and February. We suppose the records are sorted in ascending order according to the year field. This field is the Key.)
In addition to these required functions, you can define other three functions: init(), finished() and clean(). If you want to declare and initialize some variables, if you want to anything what should be done at the beginning of data processing by the component, you should do it within the init() function. (Thus it would even be better if the variables below were declared and initialized within the init() function.) If you want to free memory, delete some temporary files, you should define it within the finished() function. Either of these functions is called only once. The init() function is called at the beginning, the finished() function is called at the end. Unlike them, the required append() and transform() functions are called many times. They are called after init() and before finished(). However, if you want to reset values of some variables and/or delete some temporary files between parsing groups of records with different key values, you can do it within the clean() function. It is called many times, but once after parsing each group of records and sending the resulting outgoing record to output port.
Xml attributes:
| Attribute | Mandatory | Description | Default | ETL Version since |
|---|---|---|---|---|
id | yes | component identification | ||
type | yes | component type | DENORMALIZER | |
key | yes | list of key fields used to identify input record groups. Each group is sequence of input records with identical key field values. | ||
denormalize | if no denormalizeClass | contains definition of transformation in Java or TransformLang. | ||
denormalizeClass | if no denormalize | name of the class to be used for normalizing data. | ||
denormalizeURL | no | contains path to the file with transformation code. | ||
charset | no | encoding of extern source | ISO-8859-1 | |
order | no | describe expected order of input records. “asc” for ascending, “desc” for descending, “auto” for auto-detection, “ignore” for processing input records without order checking (this may produce unexpected results when input is not ordered). | auto | |
equalNULL | no | specifies whether two fields containing NULL values are considered equal. | true | 2.8 |
errorActions | no | defines if graph is to stop, when denormalize function returns negative value. Available actions are: STOP or CONTINUE. For CONTINUE action, error message is logged to console or file (if errorLog attribute is specified) and for STOP there is thrown TransformExceptions and graph execution is stopped. Error action can be set for each negative value (value1=action1;value2=action2;…) or for all values the same action (STOP or CONTINUE). It is possible to define error actions for some negative values and for all other values (MIN_INT=myAction). | -1=CONTINUE;MIN_INT=STOP | 2.6 |
errorLog | no | path to the error log file. | 2.6 |
Example:
<Node id="DENORMALIZE" type="DENORMALIZER"> <attr name="order">asc</attr> <attr name="key">year</attr> <attr name="denormalize">//#TL date Year; string Month; decimal (3,1) temperaturejan; decimal (3,1) temperaturefeb; integer pressurejan; integer pressurefeb; function addInputRecord() { Year = $year; if ($month == "January") { temperaturejan = $temperature; pressurejan = $pressure; } if ($month == "February") { temperaturefeb = $temperature; pressurefeb = $pressure; } } function getOutputRecord() { $Year := Year; $TemperatureJan := temperaturejan; $PressureJan := pressurejan; $TemperatureFeb := temperaturefeb; $PressureFeb := pressurefeb; } </attr> </Node> <Node id="DENORMALIZE" type="DENORMALIZER"> <attr name="order">ignore</attr> <attr name="key">EmployeeID</attr> <attr name="denormalize">//#TL int order; string customer; string employee; function init() { order = 0; customer = ""; employee = ""; } function addInputRecord() { order = $OrderID; customer = $CustomerID; employee = $EmployeeID; } function getOutputRecord() { $OrderID := order; $CustomerID := customer; $EmployeeID := employee; } </attr> </Node> <Node id="DENORMALIZE" type="DENORMALIZER" key="EmployeeID" order="ignore"> <attr name="denormalize"> import org.jetel.component.denormalize.DataRecordDenormalize; import org.jetel.data.*; public class DenormalizeTest extends DataRecordDenormalize { private DataRecord lastRecord; public DenormalizeTest() { lastRecord = null; } public int append(DataRecord inRecord) { lastRecord = inRecord; return OK; } public int transform(DataRecord outRecord) { outRecord.copyFrom(lastRecord); return OK; } } </attr> </Node>
COMMERCIAL, since version 2.7
Component which reads records on input port 0, verifies specified fields for valid e-mail addresses and either accepts the record on output port 0 or rejects them on port 1.
Method, which is used for e-mail address verification, can be specified as level of inspection. Each level extends its predecessor in detail of verification.
Level of inspection
| Level | Description |
|---|---|
SYNTAX | Checks the string for valid syntax using a regular expression |
DOMAIN | Queries DNS and checks whether the domain exists and has (some) MX records. |
SMTP | Communicates with SMTP servers associated with the domain and verifies the address to user level |
MAIL | Works the same as SMTP and additionaly sends a dummy message to the address |
Input ports: * port 0
Output ports:
Both output ports are optional, you can connect ports according to your needs. If port 0 is connected, then it MUST share the same metadata as input port. Output port 1 can have arbitrary metadata - input record data is copied into output port 1 record by field names, so only matching fields contain original data. Additionaly, port 1 can contain fields specified in errorField and statusField which are automatically filed with error message and status code.
Xml attributes:
| Attribute | Mandatory | Description | Default | ETL Version since |
|---|---|---|---|---|
id | yes | component identification | ||
type | yes | component type | EMAIL_FILTER | |
field | yes | list of field names containing e-mail addresses to be verified. Separated by :;| {colon, semicolon, pipe} | ||
level | yes | level of inspection. Must be SYNTAX, DOMAIN, SMTP or MAIL | DOMAIN | |
errorField | no | field where error message will be put (if any and field set). | ||
statusField | no | field where status code will be put (if field set). | ||
acceptEmpty | no | whether empty string qualifies as valid address | true | |
acceptCondition | no | how records with multiple addresses are handled. See notes for more details. Must be either STRICT or LENIENT | LENIENT | |
multiDelimiter | no | specifies delimiter(s) used to split input in each field to email addresses. Specify as a regex. When empty, each field is treated as single address | [,;] | |
emailBufferSize | no | maximum number of e-mail addresses read into memory before they are bulk processed. Must be at least 1. | 2000 | |
emailCacheSize | no | maximum number of cached e-mail address verification results. Zero value turns caching off. | 2000 | |
domainRetryCount | no | number of retries for failed DNS queries | 2 | |
domainRetryTimeout | no | timeout (in miliseconds) for each DNS query attempt. Maximum time spent resolving is thus retryCount * retryTimeout (ms) | 800 | |
domainQueryARec | no | by SMTP standard one should try to resolve domain A record when no MX record could be found. Setting this option to false doubles DNS query speed but breaks this (unnecessary) standard. | true | |
domainCacheSize | no | specifies max. cache size for DNS query results. Zero value turns caching off. Ignored at levels < DOMAIN | 3000 | |
smtpConnectAttempts | no | attempts for connection and HELO. Number of attempts and delays between them as a comma-separated list (in miliseconds) | 1000,2000 | |
smtpRequestTimeout | no | specify TCP timeout after which a SMTP request fails (in seconds!) | 300 | |
smtpGreylistAttempts | no | anti-greylisting feature. Specifies number of attempts and delays between them as a comma-separated list. Zero value (“0”) turns anti-greylisting off. (values in seconds) | 30,120,240 | |
smtpConcurrentLimit | no | max. number of parallel tasks working when anti-greylisting is in effect | 10 | |
mailFrom | no | “From:” field of a dummy message sent on MAIL level | Clover.ETL clover@cloveretl.org | |
mailSubject | no | “Subject:” field of a dummy message sent on MAIL level | Hello, this is a test message | |
mailBody | no | Body of a dummy message sent on MAIL level | Hello,\nThis is Clover.ETL test message.\n\nPlease ignore a don't respond. Thank you, have a nice day! |
Notes:
acceptCondition – Applies when multiple fields with email addresses are specified via “field” property or when “multiDelimiter” splits one or more fieldsaccept_empty) in order to accept the recordaccept_empty.field – when multiple fields are specified then acceptCondition holds for all fields at a time, not for each field separatelymultiDelimiter is always treated as a regular expression. Thus multiple delimiters and other rules can be easily defined. Typically [;,]emailBufferSize - must be set to at least such value that buffer can hold all addresses parsed from a single recordemailCacheSize set to maximum value you can afford especially if there is a great chance that addresses often repeatdomainRetryCount, domainRetryTimeout and domainQueryARec can be used to fine-tune the speed of DOMAIN check. Setting it too low can be dangerous since pretty big number addresses might get rejected just because of high latency of DNSdomainCacheSize the more the better.. holds results of DNS queries therefore subsequent queries are costlesssmtpConnectAttempts is a list of comma-separated delays in miliseconds. Each delay represents a retry after specified time. Failed connect attempt might be either because of network problem, server down or server rejecting connectionDOMAIN level
At domain level multiple DNS requests are issued against system default DNS servers. Number of retries and their timeouts can be specified for performance tuning. Among that, following two optimization techniques can be used. Firstly, DNS query results can be cached in a in-memory cache of maximum size domainCacheSize. Setting domainCacheSize to zero disables local caching and uses only native DNS caching, setting it to large value improves performance. Secondly, it is a good practise to sort records by domain names before feeding them to EmailFilter. In such case only single DNS query or often merely a cache lookup needs to be performed for a large number of records thus rapidly increasing throughput.
SMTP level and above
At SMTP level several techniques are used to determine whether an email address exists or not. This requires significantly more time than previous levels but provides great increase in accuracy. Internally, at DOMAIN level records and associated e-mail addresses are organized in input email buffer by their domain names. A SMTP level check is performed on each group of addresses at once so the more addresses in the group the less there are repeated SMTP connections. Please, see DOMAIN level chapter for more details.
When a group of addresses are processed for single domain, a right server is chosen (see SMTP server choice). Addresses are then checked for VRFY, RCPT TO and MAIL according to specified level. When checks pass addresses and records are accepted or rejected.
If anti-greylisting is in effect (see “smtpGreylistAttempts” property), failed addresses are not immediately rejected but a retry is scheduled according to “smtpGreylistAttempts”. In each iteration the remaining list of rejected addresses is re-checked until final attempt when all addresses are either accepted or rejected. By default, anti-greylisting is turned off since it significantly slows down the process. It can be turned on by setting smtpGreylistAttempts property to a comma-separated list (at least one) of delays between each retry. The delays are specified in seconds and reasonable values vary from one up to 15 minutes. Default value would be “30, 120, 240” (i.e. first retry after half a minute, if failed wait another two minutes and than finally next four minutes, total 6.5 minutes and 4 retries). Since anti-greylisting involves a lot of waiting it is done in background. By setting “smtpConcurrentLimit” you can specify maximum number of concurrent “anti-greylisting” threads that run parallel to normal run of the component. Each thread performs anti-greylisting cycle for single domain. When it finishes it starts another domain. Main component run is never blocked by anti-greylisting thread until the very finish when main thread waits for all anti-greylisting cycles to finish.
When MAIL level is specified, all rules apply the same. The only difference is that at this level a test message is sent to the verified address. That is why you should be careful to use this level as it is intrusive and releveals your attempt to verify your address. You can modify the message via mailFrom, mailSubject and mailBody properties.
SMTP server choice
Generally a domain can have multiple MX records which means we have to decide which one to connect to. According to specification the server with highest priority will be tried first. If it is unreachable or HELO fails after all smtpConnectAttempts attempts then next high priority server is chosen. If a server can be connected to and talked to then, by specification, we will NOT try other servers even if the address validation fails (after any optional anti-spam measures like anti-greylisting). This conforms to SMTP protocol and also works well against nolisting anti-spam technique. When a domain does exist but doesn't have any MX record we use its A record as a single MX server. This feature complies to SMTP recommendation but it is turned off by default - see domainQueryARec.
Verification algorithm
main component loop
Example:
<Node id="EMAILFILTER" type="EMAIL_FILTER" field="customer_email"> <Node id="EMAILFILTER" type="EMAIL_FILTER" field="customer_email" level="SMTP" acceptEmpty="false" acceptCondition="STRICT">
All records for which the filterExpression evaluates TRUE are copied from input port:0 onto output port:0. Rejected records are copied onto output port:1 (if it is connected). It can filter on text, date, integer, numeric fields with comparison [>, <, ==, <=, >=, !=] Text fields/expressions can also be compared to a Java regexp. using ~= (tilda,equal sign) characters. A filter can be made of different parts separated by a logical operator AND, OR. You can as well use parenthesis to give precendence.
Note: Date format used for specifying date value is yyyy-MM-dd for dates only and yyyy-MM-dd HH:mm:ss for date&time. These patterns correspond to values specified in “defaultProperties” file.
When referencing particular field, you have to precede field's name with dollar [$] sign - e.g. $FirstName. To ease the burden of converting comparison operators to XML-compatible form, each operator has its textual abbreviation - [.eq. .ne. .lt. .le. .gt. .ge.]
Built-in functions you can use in expressions:
Input ports: * 0 - input records
Output ports:
Xml attributes:
| Attribute | Mandatory | Description | Default | ETL Version since |
|---|---|---|---|---|
id | yes | component identification | ||
type | yes | component type | EXT_FILTER | |
filterExpression | yes | Expression used for filtering records. See above. |
Example:
Want to filter on HireDate field. HireDate must be less than 31st of December 2003. <Node id="FILTEREMPL1" type="EXT_FILTER" filterExpression="$HireDate =< '2003-12-31'"/> Want to filter on Name and Age fields. Name must start with 'A' char and Age must be greater than 25. <Node id="FILTEREMPL1" type="EXT_FILTER" filterExpression="$Name~="^A.*" and $Age &gt;25"/> <Node id="FILTEREMPL1" type="EXT_FILTER"> $Name~="^A.*" and $Age.gt.25 </Node> More complex example showing how to use various built-in functions. <Node id="FILTEREMPL1" type="EXT_FILTER"> ( trim($CustomerName)~=".*Peter.*" or $DateSale==dateadd(today(),-1,month) ) and $Age*2.lt.$Weight-10 </Node> Evaluating data fields with NULL values (have isNull() set) results in runtime error. To circumvent such situation, use 'isnull' and 'nvl' functions. <Node id="FILTEREMPL1" type="EXT_FILTER"> !isnull($CustomerName) and $CustomerName~=".*Peter.*" or nvl($DateSale,2001-1-1)>=$DateInvoice </Node>
Note: If you want to use special characters like ", you have to write double backslash '\\' before special character. In example: \\".
Merges data records from two input ports onto one output. It preserves sorted order (as specified by the merge key). The structure of records in all merged data flows must be the same - it implies that all input ports share the same metadata.
Input ports: * 2..n input ports defined/connected.
Output ports:
Xml attributes:
| Attribute | Mandatory | Description | Default | ETL Version since |
|---|---|---|---|---|
id | yes | component identification | ||
type | yes | component type | MERGE | |
mergeKey | yes | key which specifies the sort order to be preserved while merging | ||
equalNULL | no | specifies whether two fields containing NULL values are considered equal. | false (in a future release will be changed) | 2.8.1 |
Example:
<Node id="MERGE" type="MERGE" mergeKey="name"/>
Normalizes input records - ie decomposes each input record to several output records using user-specified transformation. The component implements a RecordNormalize interface or inherits from a DataRecordNormalize class.
Input ports: * one input port defined/connected to read the records to be normalized.
Output ports:
As an example of normalization, you can imagine a case when average air temperatures and pressures were measured during the twelve months of a year. You can have this information about many years represented by records consisting of 25 fields. The first is the year and the others are the temperatures and pressures for all twelve the months of the specified year. Thus, you have one record for one year. If you want to split these records into more records (by one for each month of a year), such new records will consist of 4 fields. The first field is the year. The second is the month. The other two are the average temperature and pressure. Now the information about the month is mentioned in the second field explicitly. The counterpart of this process is denormalization. You must define the following functions: count(), transform() and some mappings map(). The first function defines the number of records to which the incoming records should be split. The transform defines the way how to decompose the records. And each mapping define the way how inputs should be mapped to outputs. The transform() function accepts one integer number that goes from 0 to N-1 (if you want to decompose the incoming records into new N records). See below how you can define the normalization. (We briefly illustrate it, for two months of a year - January and February.)
In addition to these required functions, you can define other three functions: init(), finished() and clean(). If you want to declare and initialize some variables, if you want to anything what should be done at the beginning of data processing by the component, you should do it within the init() function. If you want to free memory, delete some temporary files, you should define it within the finished() function. Either of these functions is called only once. The init() function is called at the beginning, the finished() function is called at the end. Unlike them, the required count() and transform() functions are called many times. They are called after init() and before finished(). However, if you want to reset values of some variables and/or delete some temporary files between parsing individual incoming records, you can do it within the clean() function. It is called many times, but once after parsing each group of records and sending the resulting outgoing record to output port.
Xml attributes:
| Attribute | Mandatory | Description | Default | ETL Version since |
|---|---|---|---|---|
id | yes | component identification | ||
type | yes | component type | NORMALIZER | |
normalizeClass | if no normalize | name of the class to be used for normalizing data. | ||
normalize | if no normalizeClass | contains definition of transformation in Java or TransformLang. | ||
normalizeURL | no | contains path to the file with normalize code. | ||
charset | no | encoding of extern source | ISO-8859-1 | |
errorActions | no | defines if graph is to stop, when normalize function returns negative value. Available actions are: STOP or CONTINUE. For CONTINUE action, error message is logged to console or file (if errorLog attribute is specified) and for STOP there is thrown TransformExceptions and graph execution is stopped. Error action can be set for each negative value (value1=action1;value2=action2;…) or for all values the same action (STOP or CONTINUE). It is possible to define error actions for some negative values and for all other values (MIN_INT=myAction). | -1=CONTINUE;MIN_INT=STOP | 2.6 |
errorLog | no | path to the error log file. | 2.6 |
Example:
<Node id="NORMALIZE" type="NORMALIZER"> <attr name="normalize">//#TL function map1() { $year := $Year; $month := "January"; $temperature := $TemperatureJan; $pressure := $PressureJan; } function map2() { $year := $Year; $month := "February"; $temperature := $TemperatureFeb; $pressure := $PressureFeb; } function transform(idx) { switch (idx) { case 0:map1(); case 1:map2(); } } function count() { return 2; } </attr> </Node> <Node id="NORMALIZE" type="NORMALIZER"> <attr name="normalize"> import org.jetel.component.normalize.DataRecordNormalize; import org.jetel.data.*; public class normalizeTest extends DataRecordNormalize { public int count(DataRecord source) { return 2; } public int transform(DataRecord source, DataRecord target, int idx) { if (idx == 0) { target.copyFrom(source); } else { target.reset(); } return OK; } } </attr> </Node> <Node id="NORMALIZE" type="NORMALIZER"> <attr name="normalize">//#TL function transform(idx) { if (idx == 1) { return; } $OrderID := $OrderID; $CustomerID := $CustomerID; $EmployeeID := $EmployeeID; } function count() { return 2; } </attr> </Node>
Partitions data into distinct flows. Each connected port becomes one flow. For each record read, the partition function chosen denotes which output flow (port) will be the record sent to.
The component implements a PartitionFunction interface.
Which partition algorithm becomes active depends on following:
partitionClass or partitionSource is defined, there is used specified function for partitionpartitionClass NOR, partitionSource, NO partitionKey is specified/defined, RoundRobin algorithm is used.partitionKey IS specified and NO ranges NOR partitionClass NOR partitionSource is specified, then partition by calculated hash value is used. The formula used is: hashValue / MAX_HASH_VALUE * #connected_output_ports) MOD #connected_output_portspartitionKey and ranges are specified (but not partitionClass nor partitionSource), then partition by range is used - partitionKey's value is sequentially compared with defined range boundaris. If the value is less or equal to specified boundary, then the record is sent out through the port corresponding to that boundary.Input ports: * one input port defined/connected.
Output ports:
This component evaluates all incoming records according to some specified criterion, splits this data flow and distributes different records among different output ports. To part the data flow, you can define some data field ranges, some exact values, etc.
As an example of partition, you can imagine a case when average air temperatures and pressures were measured during the twelve months of a year. You can have this information about many years represented by records consisting of four fields. The first two are the year and the month, the third and fourth are the average temperature and average pressure. Thus, you have 12 records for one year. You may want to split the data flow into more data flows. Different records can be sent out through different output ports. Suppose you want to send out the records in which the average temperature was above 0 to the first port and those with the temperatures below 0 to the second port. To do that, you must use the following function: getOutputPort(). Within this function, you assign individual records to different (or the same) output port(s). See the last example below.
If you define some partitioning transformation, you define some getOutputPort() function. In addition to this getOutputPort() function, you can define another function: init(). If you want to declare and initialize some variables, if you want to anything what should be done at the beginning of data processing by the component, you should do it within the init() function. The init() function is called only once, at the beginning. Unlike the init() function, the getOutpuPort() function is called many times. It is called after init().
Xml attributes:
| Attribute | Mandatory | Description | Default | ETL Version since |
|---|---|---|---|---|
id | yes | component identification | ||
type | yes | component type | PARTITION | |
partitionClass | no | name of the class to be used for partioning data | ||
partitionSource | no | partition function for partition dataset. Can be java code or in CloverETL transform language. | ||
charset | no | encoding of extern source | ||
partitionKey | no | key which specifies which fields (one or more) will be used for calculating hash valu used for determinig output port. If ranges attribute is not defined, then partition method partition by hash will be used. | ||
ranges | no | definition of intervals against which partitionKey will be checked. If key's value belongs to interval then interval's order number is converted to output port number. There is created RangeLookupTable based on this attribute. Each interval consist of parts, which specify start end end values of given key parts; each part is pair is built from two values separated by ”,” and enclosed with brackes: ”(” or ”)” if the value is to be excluded from interval and ”<” or ”>” if the value is to be included to the interval. Eg. for partionKey: EmployeeID;BirthDate and ranges: <1,9)(,31/12/1959>;<9,)(,31/12/1959>;<1,9)(31/12/1959,);<9,)(31/12/1959,) there are specified four output flows * 0 - EmployeeID between 1 included and 9 excluded, and BirthDate at most 31/12/1959 * 1 - EmployeeID greater or equal 9, and BirthDate at most 31/12/1959 * 2 - EmployeeID between 1 included and 9 excluded, and BirthDate greater then 31/12/1959 * 3 - EmployeeID greater or equal 9, and BirthDate greater then 31/12/1959 When there are connected more output ports then ranges defined, records which don't have corresponding interval will be sent to last connected ouput port. | ||
useI18N | no | true/false perform sorting according to national rules - e.g. Czech or German handling of characters like “i”,”í”. Use it only if you are sorting data according to key which can contain accented characters or you want sorter to follow certain locale specific rules. | false | |
locale | no | locale to be used when sorting using I18N rules. | system default |
Example:
<Node id="PARTITION_BY_KEY" type="PARTITION" partitionKey="age"/> <Node id="PARTITION_BY_RANGE" type="PARTITION" partitionKey="EmployeeID;BirthDate"> <attr name="ranges"><1,9)(,31/12/1959>;<9,)(,31/12/1959>;<1,9)(31/12/1959,>;<9,)(31/12/1959,><attr/> </Node> <Node id="PARTITION" type="PARTITION"> <attr name="partitionSource">//#TL function getOutputPort(){ if ($EmployeeID .lt. 3) return 0 else if ($EmployeeID .lt. 5) return 1 else return 2 } </attr> </Node> <Node id="PARTITION" type="PARTITION"> <attr name="partitionSource">//#TL function getOutputPort(){ if ($temperature > 0) return 0 else return 1 } </attr> </Node>
Changes / reformats data record between one INPUT and several OUTPUT ports. This component is only a wrapper around transformation class implementing org.jetel.component.RecordTransform interface. The method transform is called for every record passing through this component.
The component implements a RecordTransform interface or inherits from a DataRecordTransform class.
Input ports: * one input port defined/connected.
Output ports:
This component creates new data records based on the incoming data records and on the specified transformation and sends them to output ports as specified in the transform() function. Transformation can be written in Java or Clover Transform Language. Component can do many changes of data records. It can change format of date data type, concatenate some fields, reorder them, split some fields, cut off some parts of data, change letter cases, convert different data types from one type to another or replace some field values by some other identification, etc. The transform() function serves to define the transformation. See the last example below. (The incoming records contain also the firstname and the lastname fields. You can concatenate these two fields with a space between them. Thus, if the incoming records consist of N fields, outgoing records consist of N-1 fields.)
In addition to this required function, you can define other two functions: init() and finished(). If you want to declare and initialize some variables, if you want to anything what should be done at the beginning of data processing by the component, you should do it within the init() function. If you want to free memory, delete some temporary files, you should define it within the finished() function. Either of these functions is called only once. The init() function is called at the beginning, the finished() function is called at the end. Unlike them, the required transform() function is called many times. It is called after init() and before finished().
Xml attributes:
| Attribute | Mandatory | Description | Default | ETL Version since |
|---|---|---|---|---|
id | yes | component identification | ||
type | yes | component type | REFORMAT | |
transform | if no transformClass | contains definition of transformation | ||
transformClass | if no transform | name of the class to be used for transforming data | ||
transformURL | no | contains path to the file with transformation code. | ||
charset | no | encoding of extern source | ISO-8859-1 | |
errorActions | no | defines if graph is to stop, when transformation returns negative value. Available actions are: STOP or CONTINUE. For CONTINUE action, error message is logged to console or file (if errorLog attribute is specified) and for STOP there is thrown TransformExceptions and graph execution is stopped. Error action can be set for each negative value (value1=action1;value2=action2;…) or for all values the same action (STOP or CONTINUE). It is possible to define error actions for some negative values and for all other values (MIN_INT=myAction). | -1=CONTINUE;MIN_INT=STOP | 2.6 |
errorLog | no | path to the error log file. Each error (after which graph continues) is logged in following way: recordNumber;errorCode;errorMessage;semiResult - fields are delimited by Defaults.Component.KEY_FIELDS_DELIMITER. | 2.6 | |
..optional attribute.. | no | any additional attribute is passed to transformation class in Properties object - as a key→value pair. There is no limit to how many optional attributes can be used. |
Example:
<Node id="REF" type="REFORMAT" transformClass="org.jetel.test.reformatOrders" param1="123" param2="XYZ"/> <Node id="REF" type="REFORMAT"> <attr name="transform"> import org.jetel.component.DataRecordTransform; import org.jetel.data.*; public class ReformatOrdersInline extends DataRecordTransform{ int counter=0; int field=0; public int transform(DataRecord[] source, DataRecord[] target){ StringBuffer strBuf=new StringBuffer(80); if (source[0]==null){ System.err.println("NULL source[0]"); } try{ // let's concatenate shipping address into one long string strBuf.append(GetVal.getString(source[0],"ShipName")).append(';'); strBuf.append(GetVal.getString(source[0],"ShipAddress")).append(';'); strBuf.append(GetVal.getString(source[0],"ShipCity")).append(';'); strBuf.append(GetVal.getString(source[0],"ShipCountry")); // mapping among source & target fields // some fields get assigned directly from source fields, some // are assigned from internall variables SetVal.setInt(target[0],"PRODUCTID",counter); SetVal.setInt(target[0],"ORDERID",GetVal.getInt(source[0],"OrderID")); SetVal.setString(target[0],"CUSTOMERID",GetVal.getString(source[0],"CustomerID")); SetVal.setString(target[0],"CUSTOMER",strBuf.toString()); SetVal.setInt(target[0], "SHIPTIME", (int)( (GetVal.getDate( source[0], "RequiredDate").getTime() - GetVal.getDate( source[0], "ShippedDate").getTime()) / 1000 / 60 / 60 / 24)); }catch(Exception ex){ ex.printStackTrace(); errorMessage=ex.getMessage()+" ->occured with record :"+counter; return SKIP; } counter++; return ALL; } } </attr> </Node> <Node id="REF" type="REFORMAT"> <attr name="transform">//#TL function getMemseqno(){ if (!isnull($RecordName0.SSN)) { return $RecordName0.Memseqno+1; } return $RecordName0.Memseqno; } function transform(){ $0.Id:=$RecordName0.Id; $1.Id:=$RecordName0.Id; $0.SrcCode:=$RecordName0.SrcCode; $1.SrcCode:=$RecordName0.SrcCode; $0.Memseqno:=getMemseqno(); $1.Memseqno:=getMemseqno(); } </attr> </Node> <Node id="REF" type="REFORMAT"> <attr name="transform">//#TL function transform(){ $Name := $firstname+" "+$lastname; $field2 := $field2; ... $fieldN := $fieldN; } </attr> </Node> <Node errorActions="MIN_INT=CONTINUE;" errorLog="${DATATMP_DIR}/log.txt" id="REF" transformURL="${TRANS_DIR}/reformatOrders.java" type="REFORMAT"/>
Since version 2.8
Rollup is a general transformation component that serves as an executor of rollup transforms written in Java or CTL. Rollup transforms written in Java have to implement the org.jetel.component.rollup.RecordRollup interface. Basically, rollup transforms are used to process groups of data records. Each group consisting of M data records may be used to output N different data records. The number of output data records is determined at runtime within the rollup transform. If no group key is defined, all input data records belong to a single group.
Each group of data records may share a data record referred to as a group “accumulator”. It may be used to store intermediate results. The group “accumulator” is created an initialized when the first data record of a group is encountered and updated for each data record in this group (including the first and the last data record). When the last data record of a group is encountered, processing of this group is finished and the group “accumulator” is disposed. If the input data records are not sorted, each group is finished as soon as all input data records are read and processed.
Because Rollup is a general transformation component, it can be used instead of Aggregate, Dedup, Normalizer and Denormalizer components. Rollup does not require sorted input data records and therefore can save overhead required to sort them. It can also be used to output intermediate results related to each input data record within a group.
The life cycle of a rollup transform is as follows:
init(Properties, DataRecordMetadata, DataRecordMetadata, DataRecordMetadata[]) method is called to initialize the transform.initGroup(DataRecord, DataRecord) method is called to initialize processing of the group and to initialize the “accumulator” (if it exists).updateGroup(DataRecord, DataRecord) method is called for the current data record and the corresponding group “accumulator” (if it was requested).true, the updateTransform(int, DataRecord, DataRecord, DataRecord[]) method is called repeatedly to generate output data record(s) until it returns RecordRollup.SKIP. If it returns value < RecordRollup.SKIP, the getMessage() method is called.finishGroup(DataRecord, DataRecord) method is called to finish the group processing.true, the transform(int, DataRecord, DataRecord, DataRecord[]) method is called repeatedly to generate output data record(s) until it returns RecordRollup.SKIP. If it returns value < RecordRollup.SKIP, the getMessage() method is called.finished() method is called to notify that the rollup transform finished.reset() method may optionally be called to reset the transformation and so that the previous two steps may be executed again.
Input ports:
* one input port defined/connected
Output ports:
XML attributes:
| Attribute | Mandatory | Description | Default |
|---|---|---|---|
id | yes | Identification of the component. | N/A |
type | yes | The type of the component (has to be ROLLUP). | N/A |
groupKeyFields | no | Field names that form the group key separated by ':', ';' or '|'. | null |
groupAccumulatorMetadataId | no | ID of data record meta data that should be used to create group “accumulators” (if required). | null |
transform | yes/no | Rollup transform as a Java or CTL source code. | null |
transformUrl | yes/no | URL of an external Java/CTL source code of the rollup transform. | null |
transformUrlCharset | no | Character set used in the source code of the rollup transform specified via an URL. | ISO-8859-1 |
transformClassName | yes/no | Class name of a Java class implementing the rollup transform. | null |
inputSorted | no | Flag specifying whether the input data records are sorted or not. If set to false, the order of output data records is not specified. | true |
equalNULL | no | Flag specifying whether the null values are considered equal or not. | true |
Example:
<Node id="MY_ROLLUP" type="ROLLUP" groupKeyFields="id" groupAccumulatorMetadataId="accumulator" transformUrl="MyRollupTransform.java" transformUrlCharset="UTF-8" inputSorted="false"/>
All records from input port are copied onto all connected output ports.
Input ports: * one input port defined/connected.
Output ports:
Xml attributes:
| Attribute | Mandatory | Description | Default | ETL Version since |
|---|---|---|---|---|
id | yes | component identification | ||
type | yes | component type | SIMPLE_COPY |
Example:
<Node id="SIMPLE_COPY0" type="SIMPLE_COPY"/>
All records from all input ports are gathered and copied onto output port. It goes port by port (waiting/blocked) if there is currently no data on port. Implements inverse RoundRobin.
Input ports: * at least one input port defined/connected.
Output ports:
Xml attributes:
| Attribute | Mandatory | Description | Default | ETL Version since |
|---|---|---|---|---|
id | yes | component identification | ||
type | yes | component type | SIMPLE_GATHER |
Example:
<Node id="SIMPLE_GATHER0" type="SIMPLE_GATHER"/>
Sorts the incoming records based on specified key. The key is name (or combination of names) of field(s) from input record. Each field from the Sort key can be sorted independently on the others. The sort order for each individual field is either Ascending (default) or Descending which is expressed by the a or d letter in parentheses that follow the field name. For example: Surname(a);FirstName(a);Salary(d). In case there is not enough room in internal sort buffer, it performs external sorting - thus any number of internal records can be sorted.
Input ports: * one input port defined/connected.
Output ports:
Xml attributes:
| Attribute | Mandatory | Description | Default | ETL Version since |
|---|---|---|---|---|
id | yes | component identification | ||
type | yes | component type | EXT_SORT | |
sortKey | yes | sequence of field names followed by (a) or (d) meaning that the field is sorted in ascending or descending order, respectively. The individual expressions are separated by :;| {colon, semicolon, pipe}. Example: Surname(a);FirstName(a);Salary(d) | ||
locale | no | Locale used for correct sorting order | ||
caseSensitive | no | Casa sensitivity of ordering. Locale must be set if this attribute is in use. | ||
sortOrder | no | One of “Ascending|Descending” {the fist letter is sufficient}. Deprecated attribute. In older versions of Clover, sortOrder was unique for all fields from the sortKey attribute. Now it can be different for different fields and is expressed as (a) or (d) following immediately after each field name from the sortKey attribute. | Ascending | |
numberOfTapes | no | even number greater than 2 - denotes how many tapes (temporary files) will be used when external sorting data. | 6 | |
sorterInitialCapacity | no | the initial capacity of internal sorter used for in-memory sorting records. | ||
bufferCapacity | no | what is the maximum number of records which are sorted in-memory. If number of records exceed this size, external sorting is performed. | ||
tmpDirs | no | Semicolon (;) delimited list of directories which should be used for creating tape files - used when external sorting is performed. Default value is equal to Java's java.io.tmpdir system property. |
With the parameters above, the default total capacity roughly is 140000 records. The total capacity is approximately 69,91 * sorterInitialCapacity. Following tables shows Total Capacities of internal buffer for various Initial Capacity values:
| Initial Capacity | Total Capacity |
|---|---|
| 10 | 1000 |
| 100 | 7000 |
| 1000 | 70000 |
| 2000 | 140000 |
| 5000 | 350000 |
| 10000 | 700000 |
| 20000 | 1399000 |
| 50000 | 3496000 |
Example:
<Node id="SORT_CUSTOMER" type="EXT_SORT" sortKey="Name:Address" sortOrder="A"/>
COMMERCIAL, since version 2.7
FastSort is an improved commerical alternative to regular ExtSort component. It can handle both internal (in-memory) and external (on-disk) sorting.
The modified multiway external sort algorithm used in FastSort yields up to 2.5 times faster sorting times than ExtSort. To achieve maximum performance, FastSort needs to know estimated number of records in the sorted sets. The number does not need to be exact but using accurate values produces best results.
Input ports: * port 0
Output ports:
Basic XML attributes:
| Attribute | Mandatory | Description | Default | ETL Version since |
|---|---|---|---|---|
id | yes | component identification | ||
type | yes | component type | FAST_SORT | |
sortKey | yes | a list of fields which act as keys for sorting | ||
locale | no | Locale used for correct sorting order | ||
estRecCount | yes unless runSize | Estimated number of input records. Approximate value in order of magnitudes is sufficient | ||
sortInMemory | no | Whether only in-memory sorting is used. Setting to “true” disables external sorting | false | |
tmpDirs | no | A list of temporary directories where temporary files will be stored. Use directories on separate physical volumes to increase performance. | system TMP dir |
Advanced XML attributes:
^ Attribute ^ Mandatory ^ Description ^ Default ^ ETL Version since ^
runSize | yes unless estRecCount | Number of records in each run. This setting overrides automatic guessed value. Typically a 1/100th to 1/200th of total records is a good choice, values ranging from 5000 to 200,000 are most common. | 20000 | |
avgRecSize | no | Average record size in bytes. If omitted, it is determined automatically from a sample of records. | ||
maxMemory | no | A rough estimate of maximum memory FastSort can use. Can must be specified with units, e.g. “300MB”, “1gb”, etc. | (auto) | |
concurrencyLimit | no | Number of thread performing sorting and writing runs. Default value works best in all common cases. On heavily concurrent systems increasing this value might produce good performance boost. | ||
readBuffers | no | Number of read buffers in first stage. Corresponds tightly with number of threads (concurrencyLimit). Must be equal or larger to concurrencyLimit. The more read buffers the less change the thread will block each other. | ||
compress | no | Enables compression of temporary files. While this is a good option if disk space is an issue it has a negative impact on performance so use it only in cases where speed is irrelevant. | ||
tmpCharset | no | Temporary files are stored on disk in their serialized form which means that all string fields use unicode (2-byte) encoding. Setting tmpCharset to “utf-8” or similar variable length or single byte encoding can save up to almost half the space needed for temporaries for the cost of slightly reduced performance. | ||
tapeBuffer | no | Size in bytes for read buffer associated with each sorted run. Decreasing it from default value might avoid memory exhaustion for large numbers of runs | 8192 | |
tempFilesLimit | no | Number of maximum open files. Lower values produce significant performance drop. 0 for unlimited | unlimited (0) |
Note that estRecCount or runSize parameters are essential for performance tuning. If estimated record count is specified then a magic formula is used to determine runSize, which is the fundamental runtime parameter. Small run sizes (2,000-20,000) are efective for smaller sets while big run sizes (hundreds of thousands records) are best for large datasets. Typical run size ranges from 5,000 to 200,000 records.
The first stage of external sort can benefit from multi-core or multi-processor environment so on systems with more than two processing units it might be useful to set concurrencyLimit property to a reasonable number of concurrent threads. Note that if this parameter is omitted it is determined automatically based on system environment.
FastSort uses relatively large numbers of temporary files during its operation. The following table shows approximative numbers of temporary files for different record set sizes.
| Dataset size | Number of temp.files | Auto run size | Note |
|---|---|---|---|
| 1 000 000 | ~100 | ~10 000 | |
| 10 000 000 | ~220 | ~45 000 | |
| 1 000 000 000 | 20 000 to 2 000 | 50 000 to 500 000 | Depends on available memory |
Note that numbers in the table above are not exact and might be different on your system. However, sometimes such large numbers of files might cause problems hitting user quotas or other runtime limitations. This happens typically in complex graphs where parallel sorts take place, possibly with other graph components also having huge number of open files. This results in “Too many open files” error and graph execution failure. There are two possible solutions to this issue - either increase the limit (quota) or force FastSort to keep the number of temporary files below some limit. The first option is recomended for production systems since there is no speed sacrifice. Typically setting “ulimit” to higher number on Unix systems. If increasing the quota is not an option (e.g. for regular users on large servers) then one must set “Max open files” in FastSort to a reasonable value (assesing number of parallel FastSorts, other graph components, etc.). FastSort will then perform intermediate merges of the temporary files to keep their number below the limit. However, setting “Max open files” to such values, that intermediate merges are inevitable and occur often produces significant performace drop so keep it at highest value possible. If you are forced to limit FastSort to less than a hundred temporary files, even for large datasets, consider using ExtSort instead which is designed for performance with limited number of tapes.
Example:
<Node id="FASTSORT" type="FAST_SORT" sortKey="price(a),date(d)"/> <Node id="FASTSORT" type="FAST_SORT" sortKey="key_field(a)" runSize="10000" concurrencyLimit="3" readBuffers="6" tmpDirs="/tmp; /mnt/volume1/temp/" tmpCharset="utf-8"/>
Since version 2.6
Receives data records through connected input port and sorts them according to the specified sort key within a group specified by the group key. Sorted data records are sent to all the connected output ports. The sort key (as well as the group key) is a name or a combination of names of field(s) of incoming data records. Sort order of each data field can be either Ascending (default) or Descending. Any number of records can be sorted. If the internal buffer is full, external sorting is performed.
This component is actually an ExtSort optimized for sorting within groups.
Input ports:
* one input port defined/connected
Output ports:
XML attributes:
| Attribute | Mandatory | Description | Default |
|---|---|---|---|
id | yes | The identification of the component. | N/A |
type | yes | The type of the component (has to be SORT_WITHIN_GROUPS). | N/A |
groupKey | yes | Field names separated by ':', ';' or '|'. | N/A |
sortKey | yes | Field names followed by ”(a)” (ascending order – default) or ”(d)” (descending order) and separated by ':', ';' or '|'. | N/A |
bufferCapacity | no | Defines the maximum number of records that will be sorted in memory. If the number of records exceeds this value, external sorting is performed. | -1 for external sorting |
numberOfTapes | no | Denotes how many tapes (temporary files) will be used for external data sorting. This value must be an even number greater than 2. | 8 |
tempDirectories | no | A semicolon-delimited list of directories which should be used for creating tape files. These are used when external sorting is performed. | java.io.tmpdir system property |
Example:
<Node id="SORT_CUSTOMER" type="SORT_WITHIN_GROUPS" groupKey="id" sortKey="name(a);address(a)"/>
The component transforms input data according to a xslt file or xslt string and the result sends to output port or output file. The transformation is used for each field which is in transform function in mapping attribute, rest of the fields in the mapping attribute are only copied. There is also possible to use xmlInputFile and xmlOutputFile attributes which define input and output data file, dictionary or field.
Input ports: * one input port defined/connected.
Output ports:
Xml attributes:
| Attribute | Mandatory | Description | Default |
|---|---|---|---|
id | yes | component identification | |
type | yes | component type | XSL_TRANSFORMER |
xsltFile | if no xslt | XSLT file needed for transformation | |
xslt | if no xsltFile | XSLT needed for transformation | |
mapping | if no xmlInputFile,xmlOutputFile | transformation mapping separated by ; {semicolon}. This attribute can be used instead of xmlInputFile and xmlOutputFile. | |
xmlInputFile | if no mapping | input data file | |
xmlOutputFile | if no mapping | output data file | |
charset | no | charset used for xslt and also if an input or output field is not byte or cbyte type. | Defaults.Encoding (UTF-8) |
Example:
<Node id="XSL_TRANSFORMER0" type="XSL_TRANSFORMER" mapping="$0.OutputField1:=transform($0.InputField1);$0.OutputField2:=$0.InputField2;" <attr name="xslt"> <![CDATA[ <xsl:stylesheet version="1.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform"> <xsl:template match="/"> <html> <body> <h2>My CD Collection</h2> <table border="1"> <tr bgcolor="#9acd32"> <th>Title</th> <th>Artist</th> </tr> <xsl:for-each select="catalog/cd"> <tr> <td><xsl:value-of select="title"/></td> <td><xsl:value-of select="artist"/></td> </tr> </xsl:for-each> </table> </body> </html> </xsl:template> </xsl:stylesheet> ]]> </attr> </Node>