Question

I'm trying to create Ooize coordinator. The problem is that I do already have staging data awaiting for processing using oozie.

Imagine such situation.

  1. Current date is: 01.03.2013 (the 1st march of 2013 year)

  2. I do have these input catalogs:

    /staging/landing/source/xvlr/2013/02/01/00 (the frist of February, 2013 year, the first hour of a day) /staging/landing/source/xvlr/2013/02/01/01

    /staging/landing/source/xvlr/2013/02/01/02

    /staging/landing/source/xvlr/2013/02/01/03

    /staging/landing/source/xvlr/2013/02/01/04

    ....

    /staging/landing/source/xvlr/2013/02/28/00

    ...

    /staging/landing/source/xvlr/2013/02/28/23

I want my oozie coordinator to consume ALL previously created landing data and produce such output:

/masterdata/source/xvlr/2013/02/01/00 
/masterdata/source/xvlr/2013/02/01/01
/masterdata/source/xvlr/2013/02/01/02
/masterdata/source/xvlr/2013/02/01/03
/masterdata/source/xvlr/2013/02/01/04
....
/masterdata/source/xvlr/2013/02/28/00
...
/masterdata/source/xvlr/2013/02/28/23

Then I want my coordinator to run each hour and produce new output for masterdata.

How Can I do it using coordinator spec? Here is my coordinator. It does nothing. It does reach the time I do need and then waits. It doesn't start the job.

Please advice.

<coordinator-app name="Xvlr-parser-coordinator" frequency="60"
                 start="2013-03-07T05:35Z" end="2113-01-01T00:35Z" timezone="Europe/Moscow"  xmlns="uri:oozie:coordinator:0.3">
    <controls>
        <timeout>5</timeout>
        <concurrency>4</concurrency>
    </controls>

    <datasets>
        <dataset name="xvlrInputDataset" frequency="${coord:hours(1)}" initial-instance="2013-03-07T05:35Z" timezone="Europe/Moscow">
            <uri-template>${nameNode}/staging/landing/source/xvlr/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template>
            <done-flag></done-flag>
        </dataset>
        <dataset name="xvlrOutputDataset" frequency="${coord:hours(1)}" initial-instance="2013-03-07T05:35Z" timezone="Europe/Moscow">
            <uri-template>${nameNode}/masterdata/source/xvlr/archive/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template>
            <done-flag></done-flag>
        </dataset>

    </datasets>

    <input-events>
        <data-in name="xvlrInputEvent" dataset="xvlrInputDataset">
            <instance>${coord:current(0)}</instance>
        </data-in>
    </input-events>

    <output-events>
        <data-out name="xvlrOutputEvent" dataset="xvlrOutputDataset">
            <instance>${coord:current(0)}</instance>
        </data-out>
    </output-events>
    <action>
        <workflow>
            <app-path>${oozieAppHomeCatalog}/sub-workflows/Xvlr-parser-subworkflow.xml</app-path>
            <configuration>
                <property>
                    <name>inputDir</name>
                    <value>${coord:dataIn('xvlrInputEvent')}</value>
                </property>
                <property>
                    <name>outputDir</name>
                    <value>${coord:dataOut('xvlrOutputEvent')}</value>
                </property>

            </configuration>

        </workflow>
    </action>
</coordinator-app>
Was it helpful?

Solution

Here is the correct solution (it works for several days :))) ):

<coordinator-app name="Xvlr-parser-coordinator" frequency="60"
                 start="2013-03-07T16:35Z" end="2113-01-01T00:35Z" timezone="Europe/Moscow"  xmlns="uri:oozie:coordinator:0.3">
    <controls>
        <timeout>3</timeout>
        <concurrency>1</concurrency>
    </controls>

    <datasets>
        <dataset name="xvlrInputDataset" frequency="${coord:hours(1)}" initial-instance="2013-03-07T16:35Z" timezone="Europe/Moscow">
            <uri-template>${nameNode}/staging/landing/source/xvlr/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template>
            <done-flag></done-flag>
        </dataset>
        <dataset name="xvlrOutputDataset" frequency="${coord:hours(1)}" initial-instance="2013-03-07T16:35Z" timezone="Europe/Moscow">
            <uri-template>${nameNode}/masterdata/source/xvlr/archive/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template>
            <done-flag></done-flag>
        </dataset>

    </datasets>

    <input-events>
        <data-in name="xvlrInputEvent" dataset="xvlrInputDataset">
            <instance>${coord:current(0)}</instance>
        </data-in>
    </input-events>

    <output-events>
        <data-out name="xvlrOutputEvent" dataset="xvlrOutputDataset">
            <instance>${coord:current(0)}</instance>
        </data-out>
    </output-events>
    <action>
        <workflow>
            <app-path>${oozieAppHomeCatalog}/sub-workflows/Xvlr-parser-subworkflow.xml</app-path>
            <configuration>
                <property>
                    <name>inputDir</name>
                    <value>${coord:dataIn('xvlrInputEvent')}</value>
                </property>
                <property>
                    <name>outputDir</name>
                    <value>${coord:dataOut('xvlrOutputEvent')}</value>
                </property>
            </configuration>

        </workflow>
    </action>
</coordinator-app>

What does it do?

  • At first it did start on 2013-03-07T16:35Z, so all previously
    collected data has been passed through underlying workflow (an mr-job invocation with parsing functionality)
    • While working with "past time datasets" (dataset time less than current time) workflow was running one by one: it did consume /pastdate/hour_00, then it immediately started to consume /pastdate/hour_01, e.t.c.
    • When coordinator reached present time, it started to invoke workflow each hour (as designed: 05:35, 06:35... 23:35).
    • See the timeout declaration: I did have missing datasets: for example there was no data for the 10th hour of the first of march. Workflow did wait for 3 minutes and then died.

The problem is solved.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top