I am trying to create custom data flow template for JDBC connection however when importing the template (python code to json converted) it is giving error/warning in console.
<code>Error/Warning:Fail to process as Flex Template and Legacy Template. Flex Template Process result:No SDK language information is provided., Legacy Template Process result:Unable to parse template file
</code>
<code>Error/Warning:Fail to process as Flex Template and Legacy Template. Flex Template Process result:No SDK language information is provided., Legacy Template Process result:Unable to parse template file
</code>
Error/Warning:Fail to process as Flex Template and Legacy Template. Flex Template Process result:No SDK language information is provided., Legacy Template Process result:Unable to parse template file
custom_template_code:
<code>{
"name": "Oracle_to_gcs_metadata",
"description": "A pipeline which loads data from Oracle to GCS as Parquet files",
"streaming": false,
"parameters": [
{
"name": "jdbcConnectionUrl",
"label": "JDBC Connection URL String",
"helpText": "The JDBC connection URL to connect to the Oracle database (e.g., jdbc:oracle:thin:@//hostname:port/service_name)",
"regexes": [
"^jdbc:oracle:thin:@\/\/[^\n\r]+$"
],
"paramType": "TEXT"
},
{
"name": "jdbcUsername",
"label": "JDBC Connection User Name",
"helpText": "The username for the JDBC connection",
"paramType": "TEXT"
},
{
"name": "jdbcPassword",
"label": "JDBC Connection Password",
"helpText": "The password for the JDBC connection",
"paramType": "PASSWORD"
},
{
"name": "jdbcSourceQuery",
"label": "JDBC Source SQL Query",
"helpText": "The SQL query to execute on the Oracle database (e.g., SELECT * FROM table_name)",
"paramType": "TEXT"
},
{
"name": "outputGcsLocation",
"label": "Output GCS Location",
"helpText": "The GCS location where the output Parquet files will be saved (e.g., gs://bucket-name/output-directory/)",
"regexes": [
"^gs:\/\/[^\n\r]+$"
],
"paramType": "TEXT"
}
]
}
</code>
<code>{
"name": "Oracle_to_gcs_metadata",
"description": "A pipeline which loads data from Oracle to GCS as Parquet files",
"streaming": false,
"parameters": [
{
"name": "jdbcConnectionUrl",
"label": "JDBC Connection URL String",
"helpText": "The JDBC connection URL to connect to the Oracle database (e.g., jdbc:oracle:thin:@//hostname:port/service_name)",
"regexes": [
"^jdbc:oracle:thin:@\/\/[^\n\r]+$"
],
"paramType": "TEXT"
},
{
"name": "jdbcUsername",
"label": "JDBC Connection User Name",
"helpText": "The username for the JDBC connection",
"paramType": "TEXT"
},
{
"name": "jdbcPassword",
"label": "JDBC Connection Password",
"helpText": "The password for the JDBC connection",
"paramType": "PASSWORD"
},
{
"name": "jdbcSourceQuery",
"label": "JDBC Source SQL Query",
"helpText": "The SQL query to execute on the Oracle database (e.g., SELECT * FROM table_name)",
"paramType": "TEXT"
},
{
"name": "outputGcsLocation",
"label": "Output GCS Location",
"helpText": "The GCS location where the output Parquet files will be saved (e.g., gs://bucket-name/output-directory/)",
"regexes": [
"^gs:\/\/[^\n\r]+$"
],
"paramType": "TEXT"
}
]
}
</code>
{
"name": "Oracle_to_gcs_metadata",
"description": "A pipeline which loads data from Oracle to GCS as Parquet files",
"streaming": false,
"parameters": [
{
"name": "jdbcConnectionUrl",
"label": "JDBC Connection URL String",
"helpText": "The JDBC connection URL to connect to the Oracle database (e.g., jdbc:oracle:thin:@//hostname:port/service_name)",
"regexes": [
"^jdbc:oracle:thin:@\/\/[^\n\r]+$"
],
"paramType": "TEXT"
},
{
"name": "jdbcUsername",
"label": "JDBC Connection User Name",
"helpText": "The username for the JDBC connection",
"paramType": "TEXT"
},
{
"name": "jdbcPassword",
"label": "JDBC Connection Password",
"helpText": "The password for the JDBC connection",
"paramType": "PASSWORD"
},
{
"name": "jdbcSourceQuery",
"label": "JDBC Source SQL Query",
"helpText": "The SQL query to execute on the Oracle database (e.g., SELECT * FROM table_name)",
"paramType": "TEXT"
},
{
"name": "outputGcsLocation",
"label": "Output GCS Location",
"helpText": "The GCS location where the output Parquet files will be saved (e.g., gs://bucket-name/output-directory/)",
"regexes": [
"^gs:\/\/[^\n\r]+$"
],
"paramType": "TEXT"
}
]
}
Python Code:
<code>import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.io.parquetio import WriteToParquet
class CustomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--jdbcConnectionUrl', type=str, help='JDBC Connection URL String')
parser.add_value_provider_argument('--jdbcUsername', type=str, help='JDBC Connection User Name')
parser.add_value_provider_argument('--jdbcPassword', type=str, help='JDBC Connection Password')
parser.add_value_provider_argument('--jdbcSourceQuery', type=str, help='JDBC Source SQL Query')
parser.add_value_provider_argument('--outputGcsLocation', type=str, help='GCS Location for Output Parquet Files')
class ConstructJdbcUrl(beam.DoFn):
def process(self, element, jdbcConnectionUrl):
yield f"jdbc:oracle:thin:@{jdbcConnectionUrl.get()}"
class ReadFromJdbcAndWriteToGCS(beam.DoFn):
def process(self, element, custom_options):
jdbc_url = f"jdbc:oracle:thin:@{custom_options.jdbcConnectionUrl.get()}"
query = custom_options.jdbcSourceQuery.get()
username = custom_options.jdbcUsername.get()
password = custom_options.jdbcPassword.get()
output_gcs_location = custom_options.outputGcsLocation.get()
# Read from JDBC
with beam.Pipeline() as p:
jdbc_data = (p
| 'Read from JDBC' >> ReadFromJdbc(
fetch_size=None,
table_name="ACCOUNTS",
driver_class_name='oracle.jdbc.driver.OracleDriver',
jdbc_url=jdbc_url,
username=username,
password=password,
query=query
))
# Write to GCS as Parquet files
(jdbc_data
| 'Write to GCS as Parquet' >> WriteToParquet(
file_path_prefix=output_gcs_location,
schema=None
))
def run(argv=None):
# Define your pipeline options
options = PipelineOptions(argv)
custom_options = options.view_as(CustomOptions)
# Enable saving of the main session state for any global imports.
options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=options)
# Create a PCollection with a single element to pass around
jdbc_url_pcoll = (p
| 'Create Empty' >> beam.Create([None]))
# Read from JDBC and write to GCS
_ = (jdbc_url_pcoll
| 'ReadFromJdbcAndWriteToGCS' >> beam.ParDo(ReadFromJdbcAndWriteToGCS(), custom_options))
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
run()
</code>
<code>import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.io.parquetio import WriteToParquet
class CustomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--jdbcConnectionUrl', type=str, help='JDBC Connection URL String')
parser.add_value_provider_argument('--jdbcUsername', type=str, help='JDBC Connection User Name')
parser.add_value_provider_argument('--jdbcPassword', type=str, help='JDBC Connection Password')
parser.add_value_provider_argument('--jdbcSourceQuery', type=str, help='JDBC Source SQL Query')
parser.add_value_provider_argument('--outputGcsLocation', type=str, help='GCS Location for Output Parquet Files')
class ConstructJdbcUrl(beam.DoFn):
def process(self, element, jdbcConnectionUrl):
yield f"jdbc:oracle:thin:@{jdbcConnectionUrl.get()}"
class ReadFromJdbcAndWriteToGCS(beam.DoFn):
def process(self, element, custom_options):
jdbc_url = f"jdbc:oracle:thin:@{custom_options.jdbcConnectionUrl.get()}"
query = custom_options.jdbcSourceQuery.get()
username = custom_options.jdbcUsername.get()
password = custom_options.jdbcPassword.get()
output_gcs_location = custom_options.outputGcsLocation.get()
# Read from JDBC
with beam.Pipeline() as p:
jdbc_data = (p
| 'Read from JDBC' >> ReadFromJdbc(
fetch_size=None,
table_name="ACCOUNTS",
driver_class_name='oracle.jdbc.driver.OracleDriver',
jdbc_url=jdbc_url,
username=username,
password=password,
query=query
))
# Write to GCS as Parquet files
(jdbc_data
| 'Write to GCS as Parquet' >> WriteToParquet(
file_path_prefix=output_gcs_location,
schema=None
))
def run(argv=None):
# Define your pipeline options
options = PipelineOptions(argv)
custom_options = options.view_as(CustomOptions)
# Enable saving of the main session state for any global imports.
options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=options)
# Create a PCollection with a single element to pass around
jdbc_url_pcoll = (p
| 'Create Empty' >> beam.Create([None]))
# Read from JDBC and write to GCS
_ = (jdbc_url_pcoll
| 'ReadFromJdbcAndWriteToGCS' >> beam.ParDo(ReadFromJdbcAndWriteToGCS(), custom_options))
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
run()
</code>
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.io.parquetio import WriteToParquet
class CustomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--jdbcConnectionUrl', type=str, help='JDBC Connection URL String')
parser.add_value_provider_argument('--jdbcUsername', type=str, help='JDBC Connection User Name')
parser.add_value_provider_argument('--jdbcPassword', type=str, help='JDBC Connection Password')
parser.add_value_provider_argument('--jdbcSourceQuery', type=str, help='JDBC Source SQL Query')
parser.add_value_provider_argument('--outputGcsLocation', type=str, help='GCS Location for Output Parquet Files')
class ConstructJdbcUrl(beam.DoFn):
def process(self, element, jdbcConnectionUrl):
yield f"jdbc:oracle:thin:@{jdbcConnectionUrl.get()}"
class ReadFromJdbcAndWriteToGCS(beam.DoFn):
def process(self, element, custom_options):
jdbc_url = f"jdbc:oracle:thin:@{custom_options.jdbcConnectionUrl.get()}"
query = custom_options.jdbcSourceQuery.get()
username = custom_options.jdbcUsername.get()
password = custom_options.jdbcPassword.get()
output_gcs_location = custom_options.outputGcsLocation.get()
# Read from JDBC
with beam.Pipeline() as p:
jdbc_data = (p
| 'Read from JDBC' >> ReadFromJdbc(
fetch_size=None,
table_name="ACCOUNTS",
driver_class_name='oracle.jdbc.driver.OracleDriver',
jdbc_url=jdbc_url,
username=username,
password=password,
query=query
))
# Write to GCS as Parquet files
(jdbc_data
| 'Write to GCS as Parquet' >> WriteToParquet(
file_path_prefix=output_gcs_location,
schema=None
))
def run(argv=None):
# Define your pipeline options
options = PipelineOptions(argv)
custom_options = options.view_as(CustomOptions)
# Enable saving of the main session state for any global imports.
options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=options)
# Create a PCollection with a single element to pass around
jdbc_url_pcoll = (p
| 'Create Empty' >> beam.Create([None]))
# Read from JDBC and write to GCS
_ = (jdbc_url_pcoll
| 'ReadFromJdbcAndWriteToGCS' >> beam.ParDo(ReadFromJdbcAndWriteToGCS(), custom_options))
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
run()
Deployment Command:
<code>python oracle-to-gcs.py --runner DataflowRunner --no_use_public_ips --subnetwork <> --staging_location <> --temp_location <> --project $DEVSHELL_PROJECT_ID --job_name oracle-to-gcs --region us-east4 --template_location gs://$DEVSHELL_PROJECT_ID-templates/templates/oracle_to_gcs --experiment=use_beam_bq_sink
</code>
<code>python oracle-to-gcs.py --runner DataflowRunner --no_use_public_ips --subnetwork <> --staging_location <> --temp_location <> --project $DEVSHELL_PROJECT_ID --job_name oracle-to-gcs --region us-east4 --template_location gs://$DEVSHELL_PROJECT_ID-templates/templates/oracle_to_gcs --experiment=use_beam_bq_sink
</code>
python oracle-to-gcs.py --runner DataflowRunner --no_use_public_ips --subnetwork <> --staging_location <> --temp_location <> --project $DEVSHELL_PROJECT_ID --job_name oracle-to-gcs --region us-east4 --template_location gs://$DEVSHELL_PROJECT_ID-templates/templates/oracle_to_gcs --experiment=use_beam_bq_sink
Generally when importing custom template it should load the input values in the console Although its not happening.