The cluster XSD specification is available here: A cluster contains different interfaces which are used by Falcon like readonly, write, workflow and messaging. A cluster is referenced by feeds and processes which are on-boarded to Falcon by its name.
Following are the tags defined in a cluster.xml:
<cluster colo="gs" description="" name="corp" xmlns="uri:falcon:cluster:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
The colo specifies the colo to which this cluster belongs to and name is the name of the cluster which has to be unique.
A cluster has various interfaces as described below:
<interface type="readonly" endpoint="hftp://localhost:50010" version="0.20.2" />
A readonly interface specifies the endpoint for Hadoop's HFTP protocol, this would be used in the context of feed replication.
<interface type="write" endpoint="hdfs://localhost:8020" version="0.20.2" />
A write interface specifies the interface to write to hdfs, it's endpoint is the value of fs.defaultFS. Falcon uses this interface to write system data to hdfs and feeds referencing this cluster are written to hdfs using the same write interface.
<interface type="execute" endpoint="localhost:8021" version="0.20.2" />
An execute interface specifies the interface for job tracker, it's endpoint is the value of mapreduce.jobtracker.address. Falcon uses this interface to submit the processes as jobs on JobTracker defined here.
<interface type="workflow" endpoint="http://localhost:11000/oozie/" version="4.0" />
A workflow interface specifies the interface for workflow engine, example of its endpoint is the value for OOZIE_URL. Falcon uses this interface to schedule the processes referencing this cluster on workflow engine defined here.
<interface type="registry" endpoint="thrift://localhost:9083" version="0.11.0" />
A registry interface specifies the interface for metadata catalog, such as Hive Metastore (or HCatalog). Falcon uses this interface to register/de-register partitions for a given database and table. Also, uses this information to schedule data availability events based on partitions in the workflow engine. Although Hive metastore supports both RPC and HTTP, Falcon comes with an implementation for RPC over thrift. For Hive HA mode, make sure the uris are separated with comma and you only add protocol "thrift://" at the beginning. See below for an example of Hive HA mode:
<interface type="registry" endpoint="thrift://c6402.ambari.apache.org:9083,c6403.ambari.apache.org:9083" version="0.11.0" />
<interface type="messaging" endpoint="tcp://localhost:61616?daemon=true" version="5.4.6" />
A messaging interface specifies the interface for sending feed availability messages, it's endpoint is broker url with tcp address.
A cluster has a list of locations defined:
<location name="staging" path="/projects/falcon/staging" /> <location name="working" path="/projects/falcon/working" /> <!--optional-->
Location has the name and the path, name is the type of locations .Allowed values of name are staging, temp and working. Path is the hdfs path for each location. Falcon would use the location to do intermediate processing of entities in hdfs and hence Falcon should have read/write/execute permission on these locations. These locations MUST be created prior to submitting a cluster entity to Falcon. staging should have 777 permissions and is a mandatory location .The parent dirs must have execute permissions so multiple users can write to this location. working must have 755 permissions and is a optional location. If working is not specified, falcon creates a sub directory in the staging location with 755 perms. The parent dir for working must have execute permissions so multiple users can read from this location
A cluster has ACL (Access Control List) useful for implementing permission requirements and provide a way to set different permissions for specific users or named groups.
<ACL owner="test-user" group="test-group" permission="*"/>
ACL indicates the Access control list for this cluster. owner is the Owner of this entity. group is the one which has access to read. permission indicates the permission.
A cluster has a list of properties: A key-value pair, which are propagated to the workflow engine.
<property name="brokerImplClass" value="org.apache.activemq.ActiveMQConnectionFactory" />
Ideally JMS impl class name of messaging engine (brokerImplClass) should be defined here.
The datasource entity contains connection information required to connect to a data source like MySQL database. The datasource XSD specification is available here: A datasource contains read and write interfaces which are used by Falcon to import or export data from or to datasources respectively. A datasource is referenced by feeds which are on-boarded to Falcon by its name.
Following are the tags defined in a datasource.xml:
<datasource colo="west-coast" description="Customer database on west coast" type="mysql" name="test-hsql-db" xmlns="uri:falcon:datasource:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
The colo specifies the colo to which the datasource belongs to and name is the name of the datasource which has to be unique.
A datasource has two interfaces as described below:
<interface type="readonly" endpoint="jdbc:hsqldb:localhost/db"/>
A readonly interface specifies the endpoint and protocol to connect to a datasource. This would be used in the context of import from datasource into HDFS.
<interface type="write" endpoint="jdbc:hsqldb:localhost/db1">
A write interface specifies the endpoint and protocol to to write to the datasource. Falcon uses this interface to export data from hdfs to datasource.
<credential type="password-text"> <userName>SA</userName> <passwordText></passwordText> </credential>
A credential is associated with an interface (read or write) providing user name and password to authenticate to the datasource.
<credential type="password-text"> <userName>SA</userName> <passwordFile>hdfs-file-path</passwordText> </credential>
The credential can be specified via a password file present in the HDFS. This file should only be accessible by the user.
The Feed XSD specification is available here. A Feed defines various attributes of feed like feed location, frequency, late-arrival handling and retention policies. A feed can be scheduled on a cluster, once a feed is scheduled its retention and replication process are triggered in a given cluster.
<feed description="clicks log" name="clicks" xmlns="uri:falcon:feed:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
A feed should have a unique name and this name is referenced by processes as input or output feed.
Falcon introduces a new abstraction to encapsulate the storage for a given feed which can either be expressed as a path on the file system, File System Storage or a table in a catalog such as Hive, Catalog Storage.
<xs:choice minOccurs="1" maxOccurs="1"> <xs:element type="locations" name="locations"/> <xs:element type="catalog-table" name="table"/> </xs:choice>
Feed should contain one of the two storage options. Locations on File System or Table in a Catalog.
<clusters> <cluster name="test-cluster"> <validity start="2012-07-20T03:00Z" end="2099-07-16T00:00Z"/> <retention limit="days(10)" action="delete"/> <sla slaLow="hours(3)" slaHigh="hours(4)"/> <locations> <location type="data" path="/hdfsDataLocation/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}"/> <location type="stats" path="/projects/falcon/clicksStats" /> <location type="meta" path="/projects/falcon/clicksMetaData" /> </locations> </cluster> ..... more clusters </clusters>
Feed references a cluster by it's name, before submitting a feed all the referenced cluster should be submitted to Falcon. type: specifies whether the referenced cluster should be treated as a source or target for a feed. A feed can have multiple source and target clusters. If the type of cluster is not specified then the cluster is not considered for replication. Validity of a feed on cluster specifies duration for which this feed is valid on this cluster. Retention specifies how long the feed is retained on this cluster and the action to be taken on the feed after the expiry of retention period. The retention limit is specified by expression frequency(times), ex: if feed should be retained for at least 6 hours then retention's limit="hours(6)". The field partitionExp contains partition tags. Number of partition tags has to be equal to number of partitions specified in feed schema. A partition tag can be a wildcard(*), a static string or an expression. Atleast one of the strings has to be an expression. sla specifies sla for the feed on this cluster. This is an optional parameter and sla can be same or different from the global sla tag (mentioned outside the clusters tag ). This tag provides the user to flexibility to have different sla for different clusters e.g. in case of replication. If this attribute is missing then the default global sla is picked from the feed definition. Location specifies where the feed is available on this cluster. This is an optional parameter and path can be same or different from the global locations tag value ( it is mentioned outside the clusters tag ) . This tag provides the user to flexibility to have feed at different locations on different clusters. If this attribute is missing then the default global location is picked from the feed definition. Also the individual location tags data, stats, meta are optional.
<location type="data" path="/projects/falcon/clicks" /> <location type="stats" path="/projects/falcon/clicksStats" /> <location type="meta" path="/projects/falcon/clicksMetaData" />
A location tag specifies the type of location like data, meta, stats and the corresponding paths for them. A feed should at least define the location for type data, which specifies the HDFS path pattern where the feed is generated periodically. ex: type="data" path="/projects/TrafficHourly/${YEAR}-${MONTH}-${DAY}/traffic" The granularity of date pattern in the path should be at least that of a frequency of a feed. Other location type which are supported are stats and meta paths, if a process references a feed then the meta and stats paths are available as a property in a process.
A table tag specifies the table URI in the catalog registry as:
catalog:$database-name:$table-name#partition-key=partition-value);partition-key=partition-value);*
This is modeled as a URI (similar to an ISBN URI). It does not have any reference to Hive or HCatalog. Its quite generic so it can be tied to other implementations of a catalog registry. The catalog implementation specified in the startup config provides implementation for the catalog URI.
Top-level partition has to be a dated pattern and the granularity of date pattern should be at least that of a frequency of a feed.
<xs:complexType name="catalog-table"> <xs:annotation> <xs:documentation> catalog specifies the uri of a Hive table along with the partition spec. uri="catalog:$database:$table#(partition-key=partition-value);+" Example: catalog:logs-db:clicks#ds=${YEAR}-${MONTH}-${DAY} </xs:documentation> </xs:annotation> <xs:attribute type="xs:string" name="uri" use="required"/> </xs:complexType>
Examples:
<table uri="catalog:default:clicks#ds=${YEAR}-${MONTH}-${DAY}-${HOUR};region=${region}" /> <table uri="catalog:src_demo_db:customer_raw#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" /> <table uri="catalog:tgt_demo_db:customer_bcp#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
<partitions> <partition name="country" /> <partition name="cluster" /> </partitions>
A feed can define multiple partitions, if a referenced cluster defines partitions then the number of partitions in feed has to be equal to or more than the cluster partitions.
Note: This will only apply for FileSystem storage but not Table storage as partitions are defined and maintained in Hive (HCatalog) registry.
<groups>online,bi</groups>
A feed specifies a list of comma separated groups, a group is a logical grouping of feeds and a group is said to be available if all the feeds belonging to a group are available. The frequency of all the feed which belong to the same group must be same.
<availabilityFlag>_SUCCESS</availabilityFlag>
An availabilityFlag specifies the name of a file which when present/created in a feeds data directory, the feed is termed as available. ex: _SUCCESS, if this element is ignored then Falcon would consider the presence of feed's data directory as feed availability.
<frequency>minutes(20)</frequency>
A feed has a frequency which specifies the frequency by which this feed is generated. ex: it can be generated every hour, every 5 minutes, daily, weekly etc. valid frequency type for a feed are minutes, hours, days, months. The values can be negative, zero or positive.
<sla slaLow="hours(40)" slaHigh="hours(44)" />
A feed can have SLA and each SLA has two properties - slaLow and slaHigh. Both slaLow and slaHigh are written using expressions like frequency. slaLow is intended to serve for alerting for feed instances which are in danger of missing their availability SLAs. slaHigh is intended to serve for reporting the feeds which missed their SLAs. SLAs are relative to feed instance time.
<import> <source name="test-hsql-db" tableName="customer"> <extract type="full"> <mergepolicy>snapshot</mergepolicy> </extract> <fields> <includes> <field>id</field> <field>name</field> </includes> </fields> </source> <arguments> <argument name="--split-by" value="id"/> <argument name="--num-mappers" value="2"/> </arguments> </import> A feed can have an import policy associated with it. The souce name specified the datasource reference to the datasource entity from which the data will be imported to HDFS. The tableName spcified the table or topic to be imported from the datasource. The extract type specifies the pull mechanism (full or incremental extract). Full extract method extracts all the data from the datasource. The incremental extraction method feature implementation is in progress. The mergeplocy determines how the data is to be layed out on HDFS. The snapshot layout creates a snapshot of the data on HDFS using the feed's location specification. Fields is used to specify the projection columns. Feed import from database underneath uses sqoop to achieve the task. Any advanced Sqoop options can be specified via the arguments. ---+++ Late Arrival <verbatim> <late-arrival cut-off="hours(6)" />
A late-arrival specifies the cut-off period till which the feed is expected to arrive late and should be honored be processes referring to it as input feed by rerunning the instances in case the data arrives late with in a cut-off period. The cut-off period is specified by expression frequency(times), ex: if the feed can arrive late upto 8 hours then late-arrival's cut-off="hours(8)"
Note: This will only apply for FileSystem storage but not Table storage until a future time.
<notification type="email" to="bob@xyz.com"/>
Specifying the notification element with "type" property allows users to receive email notification when a scheduled feed instance completes. Multiple recipients of an email can be provided as comma separated addresses with "to" property. To send email notification ensure that SMTP parameters are defined in Falcon startup.properties. Refer to Falcon Email Notification for more details.
A feed has ACL (Access Control List) useful for implementing permission requirements and provide a way to set different permissions for specific users or named groups.
<ACL owner="test-user" group="test-group" permission="*"/>
ACL indicates the Access control list for this cluster. owner is the Owner of this entity. group is the one which has access to read. permission indicates the permission.
<properties> <property name="tmpFeedPath" value="tmpFeedPathValue" /> <property name="field2" value="value2" /> <property name="queueName" value="hadoopQueue"/> <property name="jobPriority" value="VERY_HIGH"/> <property name="timeout" value="hours(1)"/> <property name="parallel" value="3"/> <property name="maxMaps" value="8"/> <property name="mapBandwidth" value="1"/> <property name="overwrite" value="true"/> <property name="ignoreErrors" value="false"/> <property name="skipChecksum" value="false"/> <property name="removeDeletedFiles" value="true"/> <property name="preserveBlockSize" value="true"/> <property name="preserveReplicationNumber" value="true"/> <property name="preservePermission" value="true"/> <property name="order" value="LIFO"/> </properties>
A key-value pair, which are propagated to the workflow engine. "queueName" and "jobPriority" are special properties available to user to specify the Hadoop job queue and priority, the same values are used by Falcon's launcher job. "timeout", "parallel" and "order" are other special properties which decides replication instance's timeout value while waiting for the feed instance, parallel decides the concurrent replication instances that can run at any given time and order decides the execution order for replication instances like FIFO, LIFO and LAST_ONLY. DistCp options can be passed as custom properties, which will be propagated to the DistCp tool. "maxMaps" represents the maximum number of maps used during replication. "mapBandwidth" represents the bandwidth in MB/s used by each mapper during replication. "overwrite" represents overwrite destination during replication. "ignoreErrors" represents ignore failures not causing the job to fail during replication. "skipChecksum" represents bypassing checksum verification during replication. "removeDeletedFiles" represents deleting the files existing in the destination but not in source during replication. "preserveBlockSize" represents preserving block size during replication. "preserveReplicationNumber" represents preserving replication number during replication. "preservePermission" represents preserving permission during
<lifecycle> <retention-stage> <frequency>hours(10)</frequency> <queue>reports</queue> <priority>NORMAL</priority> <properties> <property name="retention.policy.agebaseddelete.limit" value="hours(9)"></property> </properties> </retention-stage> </lifecycle>
lifecycle tag is the new way to define various stages of a feed's lifecycle. In the example above we have defined a retention-stage using lifecycle tag. You may define lifecycle at global level or a cluster level or both. Cluster level configuration takes precedence and falcon falls back to global definition if cluster level specification is missing.
----++++ Retention Stage As of now there are two ways to specify retention. One is through the <retention> tag in the cluster and another is the new way through <retention-stage> tag in <lifecycle> tag. If both are defined for a feed, then the lifecycle tag will be considered effective and falcon will ignore the <retention> tag in the cluster. If there is an invalid configuration of retention-stage in lifecycle tag, then falcon will NOT fall back to retention tag even if it is defined and will throw validation error.
In this new method of defining retention you can specify the frequency at which the retention should occur, you can also define the queue and priority parameters for retention jobs. The default behavior of retention-stage is same as the existing one which is to delete all instances corresponding to instance-time earlier than the duration provided in "retention.policy.agebaseddelete.limit"
Property "retention.policy.agebaseddelete.limit" is a mandatory property and must contain a valid duration e.g. "hours(1)" Retention frequency is not a mandatory parameter. If user doesn't specify the frequency in the retention stage then it doesn't fallback to old retention policy frequency. Its default value is set to 6 hours if feed frequency is less than 6 hours else its set to feed frequency as retention shouldn't be more frequent than data availability to avoid wastage of compute resources.
In future, we will allow more customisation like customising how to choose instances to be deleted through this method.
A process defines configuration for a workflow. A workflow is a directed acyclic graph(DAG) which defines the job for the workflow engine. A process definition defines the configurations required to run the workflow job. For example, process defines the frequency at which the workflow should run, the clusters on which the workflow should run, the inputs and outputs for the workflow, how the workflow failures should be handled, how the late inputs should be handled and so on.
The different details of process are:
Each process is identified with a unique name. Syntax:
<process name="[process name]"> ... </process>
An optional list of comma separated tags which are used for classification of processes. Syntax:
... <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, department=forecasting</tags>
An optional list of comma separated word strings, specifies the data processing pipeline(s) to which this process belongs. Only letters, numbers and underscore are allowed for pipeline string. Syntax:
... <pipelines>test_Pipeline, dataReplication, clickStream_pipeline</pipelines>
The cluster on which the workflow should run. A process should contain one or more clusters. Cluster definition for the cluster name gives the end points for workflow execution, name node, job tracker, messaging and so on. Each cluster inturn has validity mentioned, which tell the times between which the job should run on that specified cluster. Syntax:
<process name="[process name]"> ... <clusters> <cluster name="test-cluster1"> <validity start="2012-12-21T08:15Z" end="2100-01-01T00:00Z"/> </cluster> <cluster name="test-cluster2"> <validity start="2012-12-21T08:15Z" end="2100-01-01T00:00Z"/> </cluster> .... .... </clusters> ... </process>
Parallel defines how many instances of the workflow can run concurrently. It should be a positive integer > 0. For example, parallel of 1 ensures that only one instance of the workflow can run at a time. The next instance will start only after the running instance completes. Syntax:
<process name="[process name]"> ... <parallel>[parallel]</parallel> ... </process>
Order defines the order in which the ready instances are picked up. The possible values are FIFO(First In First Out), LIFO(Last In First Out), and ONLYLAST(Last Only). Syntax:
<process name="[process name]"> ... <order>[order]</order> ... </process>
A optional Timeout specifies the maximum time an instance waits for a dataset before being killed by the workflow engine, a time out is specified like frequency. If timeout is not specified, falcon computes a default timeout for a process based on its frequency, which is six times of the frequency of process or 30 minutes if computed timeout is less than 30 minutes.
<process name="[process name]"> ... <timeout>[timeunit]([frequency])</timeout> ... </process>
Frequency defines how frequently the workflow job should run. For example, hours(1) defines the frequency as hourly, days(7) defines weekly frequency. The values for timeunit can be minutes/hours/days/months and the frequency number should be a positive integer > 0. Syntax:
<process name="[process name]"> ... <frequency>[timeunit]([frequency])</order> ... </process>
<sla shouldStartIn="hours(2)" shouldEndIn="hours(4)"/>
A process can have SLA which is defined by 2 optional attributes - shouldStartIn and shouldEndIn. All the attributes are written using expressions like frequency. shouldStartIn is the time by which the process should have started. shouldEndIn is the time by which the process should have finished.
Validity defines how long the workflow should run. It has 3 components - start time, end time and timezone. Start time and end time are timestamps defined in yyyy-MM-dd'T'HH:mm'Z' format and should always be in UTC. Timezone is used to compute the next instances starting from start time. The workflow will start at start time and end before end time specified on a given cluster. So, there will not be a workflow instance at end time. Syntax:
<process name="[process name]"> ... <validity start=[start time] end=[end time] timezone=[timezone]/> ... </process>
Examples:
<process name="sample-process"> ... <frequency>days(1)</frequency> <validity start="2012-01-01T00:40Z" end="2012-04-01T00:00" timezone="UTC"/> ... </process>
The daily workflow will start on Jan 1st 2012 at 00:40 UTC, it will run at 40th minute of every hour and the last instance will be at March 31st 2012 at 23:40 UTC.
<process name="sample-process"> ... <frequency>hours(1)</frequency> <validity start="2012-03-11T08:40Z" end="2012-03-12T08:00" timezone="PST8PDT"/> ... </process>
The hourly workflow will start on March 11th 2012 at 00:40 PST, the next instances will be at 01:40 PST, 03:40 PDT, 04:40 PDT and so on till 23:40 PDT. So, there will be just 23 instances of the workflow for March 11th 2012 because of DST switch.
Inputs define the input data for the workflow. The workflow job will start executing only after the schedule time and when all the inputs are available. There can be 0 or more inputs and each of the input maps to a feed. The path and frequency of input data is picked up from feed definition. Each input should also define start and end instances in terms of EL expressions and can optionally specify specific partition of input that the workflow requires. The components in partition should be subset of partitions defined in the feed.
For each input, Falcon will create a property with the input name that contains the comma separated list of input paths. This property can be used in workflow actions like pig scripts and so on.
Syntax:
<process name="[process name]"> ... <inputs> <input name=[input name] feed=[feed name] start=[start el] end=[end el] partition=[partition]/> ... </inputs> ... </process>
Example:
<feed name="feed1"> ... <partition name="isFraud"/> <partition name="country"/> <frequency>hours(1)</frequency> <locations> <location type="data" path="/projects/bootcamp/feed1/${YEAR}-${MONTH}-${DAY}-${HOUR}"/> ... </locations> ... </feed> <process name="sample-process"> ... <inputs> <input name="input1" feed="feed1" start="today(0,0)" end="today(1,0)" partition="*/US"/> ... </inputs> ... </process>
The input for the workflow is a hourly feed and takes 0th and 1st hour data of today(the day when the workflow runs). If the workflow is running for 2012-03-01T06:40Z, the inputs are /projects/bootcamp/feed1/2012-03-01-00/*/US and /projects/bootcamp/feed1/2012-03-01-01/*/US. The property for this input is input1=/projects/bootcamp/feed1/2012-03-01-00/*/US,/projects/bootcamp/feed1/2012-03-01-01/*/US
Also, feeds with Hive table storage can be used as inputs to a process. Several parameters from inputs are passed as params to the user workflow or pig script.
${wf:conf('falcon_input_database')} - database name associated with the feed for a given input ${wf:conf('falcon_input_table')} - table name associated with the feed for a given input ${wf:conf('falcon_input_catalog_url')} - Hive metastore URI for this input feed ${wf:conf('falcon_input_partition_filter_pig')} - value of ${coord:dataInPartitionFilter('$input', 'pig')} ${wf:conf('falcon_input_partition_filter_hive')} - value of ${coord:dataInPartitionFilter('$input', 'hive')} ${wf:conf('falcon_input_partition_filter_java')} - value of ${coord:dataInPartitionFilter('$input', 'java')}
NOTE: input is the name of the input configured in the process, which is input.getName().
<input name="input" feed="clicks-raw-table" start="yesterday(0,0)" end="yesterday(20,0)"/>
Example workflow configuration:
<configuration> <property> <name>falcon_input_database</name> <value>falcon_db</value> </property> <property> <name>falcon_input_table</name> <value>input_table</value> </property> <property> <name>falcon_input_catalog_url</name> <value>thrift://localhost:29083</value> </property> <property> <name>falcon_input_storage_type</name> <value>TABLE</value> </property> <property> <name>feedInstancePaths</name> <value>hcat://localhost:29083/falcon_db/output_table/ds=2012-04-21-00</value> </property> <property> <name>falcon_input_partition_filter_java</name> <value>(ds='2012-04-21-00')</value> </property> <property> <name>falcon_input_partition_filter_hive</name> <value>(ds='2012-04-21-00')</value> </property> <property> <name>falcon_input_partition_filter_pig</name> <value>(ds=='2012-04-21-00')</value> </property> ... </configuration>
User can mention one or more inputs as optional inputs. In such cases the job does not wait on those inputs which are mentioned as optional. If they are present it considers them otherwise continues with the mandatory ones. If some instances of the optional feed are present for the given data window, those are considered and passed on to the process. While checking for presence of an feed instance, Falcon looks for availabilityFlag in the directory, if specified in the feed definition. If no availabilityFlag is specified, presence of the instance directory is treated as indication of availability of data. Example:
<feed name="feed1"> ... <partition name="isFraud"/> <partition name="country"/> <frequency>hours(1)</frequency> <locations> <location type="data" path="/projects/bootcamp/feed1/${YEAR}-${MONTH}-${DAY}-${HOUR}"/> ... </locations> ... </feed> <process name="sample-process"> ... <inputs> <input name="input1" feed="feed1" start="today(0,0)" end="today(1,0)" partition="*/US"/> <input name="input2" feed="feed2" start="today(0,0)" end="today(1,0)" partition="*/UK" optional="true" /> ... </inputs> ... </process>
Note: This is only supported for FileSystem storage but not Table storage at this point.
Outputs define the output data that is generated by the workflow. A process can define 0 or more outputs. Each output is mapped to a feed and the output path is picked up from feed definition. The output instance that should be generated is specified in terms of EL expression.
For each output, Falcon creates a property with output name that contains the path of output data. This can be used in workflows to store in the path. Syntax:
<process name="[process name]"> ... <outputs> <output name=[input name] feed=[feed name] instance=[instance el]/> ... </outputs> ... </process>
Example:
<feed name="feed2"> ... <frequency>days(1)</frequency> <locations> <location type="data" path="/projects/bootcamp/feed2/${YEAR}-${MONTH}-${DAY}"/> ... </locations> ... </feed> <process name="sample-process"> ... <outputs> <output name="output1" feed="feed2" instance="today(0,0)"/> ... </outputs> ... </process>
The output of the workflow is feed instance for today. If the workflow is running for 2012-03-01T06:40Z, the workflow generates output /projects/bootcamp/feed2/2012-03-01. The property for this output that is available for workflow is: output1=/projects/bootcamp/feed2/2012-03-01
Also, feeds with Hive table storage can be used as outputs to a process. Several parameters from outputs are passed as params to the user workflow or pig script.
${wf:conf('falcon_output_database')} - database name associated with the feed for a given output ${wf:conf('falcon_output_table')} - table name associated with the feed for a given output ${wf:conf('falcon_output_catalog_url')} - Hive metastore URI for the given output feed ${wf:conf('falcon_output_dataout_partitions')} - value of ${coord:dataOutPartitions('$output')}
NOTE: output is the name of the output configured in the process, which is output.getName().
<output name="output" feed="clicks-summary-table" instance="today(0,0)"/>
Example workflow configuration:
<configuration> <property> <name>falcon_output_database</name> <value>falcon_db</value> </property> <property> <name>falcon_output_table</name> <value>output_table</value> </property> <property> <name>falcon_output_catalog_url</name> <value>thrift://localhost:29083</value> </property> <property> <name>falcon_output_storage_type</name> <value>TABLE</value> </property> <property> <name>feedInstancePaths</name> <value>hcat://localhost:29083/falcon_db/output_table/ds=2012-04-21-00</value> </property> <property> <name>falcon_output_dataout_partitions</name> <value>'ds=2012-04-21-00'</value> </property> .... </configuration>
The properties are key value pairs that are passed to the workflow. These properties are optional and can be used in workflow to parameterize the workflow. Syntax:
<process name="[process name]"> ... <properties> <property name=[key] value=[value]/> ... </properties> ... </process>
The following are some special properties, which when present are used by the Falcon's launcher job, the same property is also available in workflow which can be used to propagate to pig or M/R job.
<property name="queueName" value="hadoopQueue"/> <property name="jobPriority" value="VERY_HIGH"/> <!-- This property is used to turn off JMS notifications for this process. JMS notifications are enabled by default. --> <property name="userJMSNotificationEnabled" value="false"/>
The workflow defines the workflow engine that should be used and the path to the workflow on hdfs. Libraries required can be specified using lib attribute in the workflow element and will be comma separated HDFS paths. The workflow definition on hdfs contains the actual job that should run and it should confirm to the workflow specification of the engine specified. The libraries required by the workflow should be in lib folder inside the workflow path.
The properties defined in the cluster and cluster properties(nameNode and jobTracker) will also be available for the workflow.
There are 4 engines supported today.
As part of oozie workflow engine support, users can embed a oozie workflow. Refer to oozie workflow overview and workflow specification for details.
Syntax:
<process name="[process name]"> ... <workflow engine=[workflow engine] path=[workflow path] lib=[comma separated lib paths]/> ... </process>
Example:
<process name="sample-process"> ... <workflow engine="oozie" path="/projects/bootcamp/workflow"/> ... </process>
This defines the workflow engine to be oozie and the workflow xml is defined at /projects/bootcamp/workflow/workflow.xml. The libraries are at /projects/bootcamp/workflow/lib. Libraries path can be overridden using lib attribute. e.g.: lib="/projects/bootcamp/wf/libs,/projects/bootcamp/oozie/libs" in the workflow element.
Falcon also adds the Pig engine which enables users to embed a Pig script as a process.
Example:
<process name="sample-process"> ... <workflow engine="pig" path="/projects/bootcamp/pig.script" lib="/projects/bootcamp/wf/libs,/projects/bootcamp/pig/libs"/> ... </process>
This defines the workflow engine to be pig and the pig script is defined at /projects/bootcamp/pig.script.
Feeds with Hive table storage will send one more parameter apart from the general ones:
$input_filter
Falcon also adds the Hive engine as part of Hive Integration which enables users to embed a Hive script as a process. This would enable users to create materialized queries in a declarative way.
Example:
<process name="sample-process"> ... <workflow engine="hive" path="/projects/bootcamp/hive-script.hql"/> ... </process>
This defines the workflow engine to be hive and the hive script is defined at /projects/bootcamp/hive-script.hql.
Feeds with Hive table storage will send one more parameter apart from the general ones:
$input_filter
Falcon also adds the Spark engine as part of Spark Integration which enables users to run the Java/Python Spark application as a process. When "spark" workflow engine is mentioned spark related parameters must be provided through <spark-attributes> Examples:
<process name="spark-process"> ... <workflow engine="spark" path="/resources/action"> <spark-attributes> <master>local</master> <name>Spark WordCount</name> <class>org.examples.WordCount</class> <jar>/resources/action/lib/spark-application.jar</jar> <spark-opts>--num-executors 1 --driver-memory 512m</spark-opts> </spark-attributes> ... </process>
This defines the workflow engine to be spark and Java/Python Spark application must be defined with "jar" option that need to be executed. There is flexibility to override the Spark master through process entity either to "yarn-client" or "yarn-cluster", if spark interface is already defined in cluster entity. Input and Output data to the Spark application will be set as argument when Spark workflow will be generated, if input and output feed entity is defined in the process entity. In the set of arguments, first argument will always correspond to input feed, second argument will always correspond to output feed and then user's provided argument will be set.
For running the Spark SQL process entity, that read and write the data stored on Hive, the datanucleus jars under the $HIVE_HOME/lib directory and hive-site.xml under $SPARK_HOME/conf/ directory need to be available on the driver and all executors launched by the YARN cluster. The convenient way to do this is adding them through the --jars option and --file option of the spark-opts attribute. Example:
<process name="spark-process"> ... <workflow engine="spark" path="/resources/action"> <spark-attributes> <master>local</master> <name>Spark SQL</name> <class>org.examples.SparkSQLProcessTable</class> <jar>/resources/action/lib/spark-application.jar</jar> <spark-opts>--num-executors 1 --driver-memory 512m --jars /usr/local/hive/lib/datanucleus-rdbms.jar,/usr/local/hive/lib/datanucleus-core.jar,/usr/local/hive/lib/datanucleus-api-jdo.jar --files /usr/local/spark/conf/hive-site.xml</spark-opts> </spark-attributes> ... </process>
Input and Output to the Spark SQL application will be set as argument when Spark workflow will be generated, if input and output feed entity is defined in the process entity. If input feed is of table type, then input table partition, table name and database name will be set as input arguments. If output feed is of table type, then output table partition, table name and database name will be set as output arguments. Once input and output arguments is set, then user's provided argument will be set.
Retry policy defines how the workflow failures should be handled. Three retry policies are defined: periodic, exp-backoff(exponential backoff) and final. Depending on the delay and number of attempts, the workflow is re-tried after specific intervals. If user sets the onTimeout attribute to "true", retries will happen for TIMED_OUT instances. Syntax:
<process name="[process name]"> ... <retry policy=[retry policy] delay=[retry delay] attempts=[retry attempts] onTimeout=[retry onTimeout]/> ... </process>
Examples:
<process name="sample-process"> ... <retry policy="periodic" delay="minutes(10)" attempts="3" onTimeout="true"/> ... </process>
The workflow is re-tried after 10 mins, 20 mins and 30 mins. With exponential backoff, the workflow will be re-tried after 10 mins, 20 mins and 40 mins.
NOTE : If user does a manual rerun with -force option (using the instance rerun API), then the runId will get reset and user might see more Falcon system retries than configured in the process definition.
To enable retries for instances for feeds, user will have to set the following properties in runtime.properties
falcon.retry.policy=periodic falcon.retry.delay=minutes(30) falcon.retry.attempts=3 falcon.retry.onTimeout=false <verbatim> ---+++ Late data Late data handling defines how the late data should be handled. Each feed is defined with a late cut-off value which specifies the time till which late data is valid. For example, late cut-off of hours(6) means that data for nth hour can get delayed by upto 6 hours. Late data specification in process defines how this late data is handled. Late data policy defines how frequently check is done to detect late data. The policies supported are: backoff, exp-backoff(exponention backoff) and final(at feed's late cut-off). The policy along with delay defines the interval at which late data check is done. Late input specification for each input defines the workflow that should run when late data is detected for that input. Syntax: <verbatim> <process name="[process name]"> ... <late-process policy=[late handling policy] delay=[delay]> <late-input input=[input name] workflow-path=[workflow path]/> ... </late-process> ... </process>
Example:
<feed name="feed1"> ... <frequency>hours(1)</frequency> <late-arrival cut-off="hours(6)"/> ... </feed> <process name="sample-process"> ... <inputs> <input name="input1" feed="feed1" start="today(0,0)" end="today(1,0)"/> ... </inputs> <late-process policy="final"> <late-input input="input1" workflow-path="/projects/bootcamp/workflow/lateinput1" /> ... </late-process> ... </process>
This late handling specifies that late data detection should run at feed's late cut-off which is 6 hours in this case. If there is late data, Falcon should run the workflow specified at /projects/bootcamp/workflow/lateinput1/workflow.xml
Note: This is only supported for FileSystem storage but not Table storage at this point.
<notification type="email" to="bob@@xyz.com"/>
Specifying the notification element with "type" property allows users to receive email notification when a scheduled process instance completes. Multiple recipients of an email can be provided as comma separated addresses with "to" property. To send email notification ensure that SMTP parameters are defined in Falcon startup.properties. Refer to Falcon Email Notification for more details.
A process has ACL (Access Control List) useful for implementing permission requirements and provide a way to set different permissions for specific users or named groups.
<ACL owner="test-user" group="test-group" permission="*"/>
ACL indicates the Access control list for this cluster. owner is the Owner of this entity. group is the one which has access to read. permission indicates the permission.