0

Fetch results from BigQueryOperator in airflow

I followed the suggestion in above link, the solution works and it is good and it works if my sql is a single line. but if SQL code is large and place it in a file and reference the file in the function then fails.

def MyChequer(**kwargs):
big_query_count = bigquery_operator.BigQueryOperator(
    task_id='my_bq_query',
    sql='/dags/sqls/invalidTable.sql'
)

Then I get error : BigQuery job failed. Final error was: {'reason': 'invalidQuery', 'location': 'query', 'message': 'Syntax error: Unexpected identifier "dags" at [1:1]'}

Normally I use in following way and the below works

BigQueryOperator(
        task_id='invalidXXX',
        use_legacy_sql=False,
        sql='/dags/sqls/invalid_v1.sql',
        destination_dataset_table=targetTable,
        create_disposition='CREATE_IF_NEEDED',
        write_disposition='WRITE_TRUNCATE',
        dag=dag
    )
   dag = DAG('invalidXXX', 
    default_args=default_args, 
    description='', 
    schedule_interval="0 5 * * *",
    catchup=False,
    template_searchpath=['/home/airflow/stgAirflow/']
   )

2 Answers 2

1

ok I got this fixed. It means, when dag is executed, the sql code which is in a file is used and executed. Not sure if its a optimized solution. so any more suggestions, are welcome.

//define 
class SQLTemplatedPythonOperator(PythonOperator):
    template_ext = ('.sql',)

//modify function
def loadCSV(**kwargs):
    print("inside loadCSV")
    query = kwargs['templates_dict']['query']
    big_query_count = bigquery_operator.BigQueryOperator(
        task_id='my_bq_query',
        sql=query,

//dag - task
SQLTemplatedPythonOperator(
    task_id='invalidBBDToCSV',
    templates_dict={'query': 'invalidBBD.sql'},
    provide_context=True,
    python_callable=loadCSV,
    dag=dag,
//dag
dag = DAG('invalidBBDLoad', 
    default_args=default_args, 
    description='DAG data', 
    schedule_interval="0 11 * * *",
    catchup=False,
    template_searchpath=['/home/stgairflow/dags/sqls'], 
    user_defined_macros={'myProjectId': myProjectId,}
)
Sign up to request clarification or add additional context in comments.

Comments

0

It looks like error is coming from trying to execute this string '/dags/sqls/invalid_v1.sql' as sql...which isn't valid.

Can you read in the file contents there if you want to keep the sql in a separate file? Seems like the sql arg is expecting an actual sql statement.

2 Comments

yes, I agree, it is interpreting the string as SQL and failing. Which is obvious, but normally, In a DAG, I give a 'template_searchpath' but in this case it is not DAG, it is a function(def MyChequer) and inside the function Operator. I dont know how I can make the function/operator interpret the sql search path...so I am looking for some help if there is any body who has come across and what logic is used to fix.
Yeah, kind of a bummer that the docs say they support parsing template files: ``` :param sql: the sql code to be executed (templated) :type sql: Can receive a str representing a sql statement, a list of str (sql statements), or reference to a template file. Template reference are recognized by str ending in '.sql'.``` but reading the code here, the template location is just going to get run here: if isinstance(self.sql, str):...there's no template handling.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.