#StackBounty: #airflow How to retrieve default args in python callable

Bounty: 200

I need to be able to access default_args defined as part of DAG definition in a Python Operator, python_callable. Maybe it’s my unfamiliartiy with python or airflow in general, but could someone guide on how to achieve this.

Following is a code sample of what am trying to achieve

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': 'xyz@xyz.com',
    'email_on_failure': 'xyz@xyz.com',
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2017, 5, 15, 23, 20),
    'end_date': datetime(2017, 5, 16, 23, 45),
    'touchfile_path': '/user/myname/touchfiles/',
}

dag = DAG(
    'test',
    default_args=default_args,
    template_searchpath=['/Users/myname/Desktop/utils/airflow/resources'],
    user_defined_macros=dict(SCHEMA_NAME='abc'),
    #schedule_interval='*/2 * * * * ')
    schedule_interval='@once')

def webhdfs_touchfile_create(ds, *args, **kwargs):
    web_hdfs_hook = WebHDFSHook('webhdfs_default')
    client = web_hdfs_hook.get_conn()
    client.write("/user/myname/airflow_hdfs","stringToWrite")
    pp.pprint(kwargs)

task1 = PythonOperator(
    task_id='task1',
    provide_context=True, #enabling this would allow to pass arguments automatically to your callable function
    python_callable=webhdfs_touchfile_create,
    templates_dict={'attr1': {{ default_args['touchfile_path'] }}},
    dag=dag)

Since the template_dict for PythonOperator is the only attribute which jinja templating works, how can i retrieve the ‘touchfile_path‘ paramter in there?


Get this bounty!!!

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.