명령어에 escaping 문자가 포함이 되어있다면, dag_run conf 파일을 사용합니다.
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "here is the message: \'$message\'"',
env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'},
)
env 키워드를 사용하여 메세지를 전달하는 것이 올바른 방법입니다.
아래처럼 bash_command에 포함하는 것은 올바르지 않습니다.
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run else "" }}\'"',
)
PythonOperator
파이썬으로 작성한 코드를 실행합니다. 자세한 내용은 PythonOperator에서 확인할 수 있습니다.
기본적으로 PythonOperator 인스턴스로 print_context 함수를 두번째 Argument로 전달합니다.
from pprint import pprint
def print_context(ds, **kwargs):
"""Print the Airflow context and ds variable from the context."""
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print_the_context',
python_callable=print_context,
)
Python callable에 전달할 추가 Argument는 op_args와 op_kwargs 키워드를 사용합니다.
def my_sleeping_function(random_base):
"""This is a function that will run within the DAG execution"""
time.sleep(random_base)
# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
for i in range(5):
task = PythonOperator(
task_id='sleep_for_' + str(i),
python_callable=my_sleeping_function,
op_kwargs={'random_base': float(i) / 10},
)
run_this >> task