I have a BigQuery table created using the following DDL:
CREATE TABLE mytable AS
(
id STRING,
source STRING,
PRIMARY KEY (id) NOT ENFORCED
);
As you can see, id
is set as the table Primary Key. My Beam pipeline is then defined as follows:
def process_message(message):
import apache_beam as beam
import struct
data = json.loads(message.decode("utf-8"))
if data == {}:
print(f"Running DELETE operation on row {this_message['Key']}")
data['_CHANGE_TYPE'] = 'DELETE'
else:
print(f"Running UPSERT operation on row {this_message['Key']}")
data['_CHANGE_TYPE'] = 'UPSERT'
data['_CHANGE_SEQUENCE_NUMBER'] = str(struct.pack('d', int(round(float(this_message['Value']['updated'])))).hex())
return [data]
with beam.Pipeline(options=PipelineOptions([
f"--project={project_id}",
"--region=europe-west2",
"--runner=DataflowRunner",
"--streaming",
"--temp_location=gs://tmp/cdc",
"--staging_location=gs://tmp/cdc",
])) as pipeline:
data = pipeline | 'ReadFromPubSub' >> ReadFromPubSub(subscription=f'projects/{project_id}/subscriptions/{bq_table_name}')
data | 'ProcessMessages' >> beam.ParDo(process_message) | 'WriteToBigQuery' >> WriteToBigQuery(
f'{project_id}:{bq_dataset}.{bq_table_name}',
schema=schema,
method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
triggering_frequency=5
)
However, when I query my table in BigQuery after consuming a handful of records, I have hundreds of duplicates of the id
field.
How do I get my pipeline to respect the primary key and perform an UPSERT operation, as it should do?