I’m trying to create an Apache Beam source/sink using google.cloud.sql.connector. I would pass minimal parameters so code can be used in different condition. Here is my code:
class _PostgresReadFn(DoFn):
def __init__(self, instance_name: str, driver: str, query: str, **kwargs):
self._instance_name = instance_name
self._driver = driver
self._query = query
self._conn_args = kwargs
def process(self, element):
with Connector().connect(self._instance_name, self._driver , self._conn_args) as conn:
cur = conn.cursor()
cur.execute(self._query )
for record in cur.fetchall():
yield record
class ReadAllFromPostgres(PTransform):
def __init__(self, instance_name: str, driver: str, query: str, **kwargs):
self._instance_name = instance_name
self._driver = driver
self._query = query
self._conn_args = kwargs
def expand(self, input_or_inputs):
postgres_read_fn = _PostgresReadFn(
self._instance_name, self._driver, self._query, self._conn_args
)
return (
input_or_inputs
| Create([1])
| "ReadAllFromPostgres" >> ParDo(postgres_read_fn)
)
and here is my pipeline:
pipeline
| "ReadFromPostgres"
>> ReadAllFromPostgres(
"instance",
"pg8000",
"SELECT * FROM table limit 10",
db="db",
user="user",
password="pwd",
ip_type="public",
)
| "WriteToStdout" >> beam.Map(print)
When I run I get the error:
TypeError: _PostgresReadFn.init() takes 4 positional arguments but 5 were given