How can I modify (in Python), the Window set-up in order to eliminate the ‘incomplete’ records which are beyond the max date? Second question is why the window_max_timestamp per window has two weeks difference compared with date_id if window.TimestampedValue is computed from it? I want to be the same.
import apache_beam as beam
from apache_beam.transforms.window import SlidingWindows, GlobalWindows, TimestampedValue
from datetime import datetime, timedelta, timezone
class AddTimestampDoFn(beam.DoFn):
def process(self, element):
cust_id, date_id, features = element
# date_obj = datetime.strptime(date_id, "%Y-%m-%d")
date_obj_utc = datetime.strptime(date_id, '%Y-%m-%d').replace(tzinfo=timezone.utc)
timestamp_utc = date_obj_utc.timestamp()
yield beam.window.TimestampedValue((cust_id, date_id, features), timestamp_utc)
class ExtractMaxDateAndValuesDoFn(beam.DoFn):
def process(self, record, window=beam.DoFn.WindowParam):
customer_id, data = record
# Find the maximum date tuple
max_date_record = max(data, key=lambda x: x[0])
max_date = max_date_record[0]
# Extract only the values from the data
values = [item[1] for item in data]
# extract window max timestamp
window_max_timestamp = window.max_timestamp()
# Convert Unix timestamp back to string
window_max_timestamp_string = datetime.utcfromtimestamp(window_max_timestamp).strftime('%Y-%m-%d')
yield (customer_id, window_max_timestamp_string, max_date, values)
class ExtractTimestamp(beam.DoFn):
def process(self, element, window=beam.DoFn.WindowParam):
data_last_timestamp = window.max_timestamp()
# Convert Unix timestamp back to string
data_last_timestamp_string = datetime.utcfromtimestamp(data_last_timestamp).strftime('%Y-%m-%d')
new_element = (data_last_timestamp_string,) + element
yield new_element
# Example input data
input_data = [
("cust1", "2024-03-31", (1.0, 2.0, 3.0)),
("cust1", "2024-04-30", (1.1, 2.1, 3.1)),
("cust1", "2024-05-31", (1.2, 2.2, 3.2)),
("cust1", "2024-06-30", (1.3, 2.3, 3.3)),
("cust2", "2024-03-31", (4.0, 5.0, 6.0)),
("cust2", "2024-04-30", (4.1, 5.1, 6.1)),
("cust2", "2024-05-31", (4.2, 5.2, 6.2)),
("cust2", "2024-06-30", (4.3, 5.3, 6.3)),
]
def run_pipeline(input_data):
with beam.Pipeline() as pipeline:
original_data = (pipeline
| 'CreateInput' >> beam.Create(input_data)
| "Add Timestamps" >> beam.ParDo(AddTimestampDoFn())
)
original_data_unnested = original_data | 'Unnest orig data' >> beam.ParDo(TransformData())
windowed_feature_values = ( original_data_unnested | "Window into 3-Month Intervals" >> beam.WindowInto(
SlidingWindows(size=3*30*24*60*60, period=30*24*60*60), # 3 months window, sliding every 1 month
# allowed_lateness=timedelta(days=0).total_seconds() # fixed allowed lateness
)
)
grouped_data = (windowed_feature_values | "Group by Key" >> beam.GroupByKey())
transformed_data = grouped_data | "ExtractMaxDateAndValues" >> beam.ParDo(ExtractMaxDateAndValuesDoFn())
#transformed_data = grouped_data | 'Data last timestamp' >> beam.ParDo(ExtractTimestamp())
transformed_data | 'print4' >> beam.Map(print)
run_pipeline(input_data)
The output is:
('cust1', '2024-06-15', '2024-05-31', [(1.0, 2.0, 3.0), (1.1, 2.1, 3.1), (1.2, 2.2, 3.2)])
('cust1', '2024-05-16', '2024-04-30', [(1.0, 2.0, 3.0), (1.1, 2.1, 3.1)])
('cust1', '2024-04-16', '2024-03-31', [(1.0, 2.0, 3.0)])
('cust1', '2024-07-15', '2024-06-30', [(1.1, 2.1, 3.1), (1.2, 2.2, 3.2), (1.3, 2.3, 3.3)])
>> ('cust1', '2024-08-14', '2024-06-30', [(1.2, 2.2, 3.2), (1.3, 2.3, 3.3)])
>> ('cust1', '2024-09-13', '2024-06-30', [(1.3, 2.3, 3.3)])
('cust2', '2024-06-15', '2024-05-31', [(4.0, 5.0, 6.0), (4.1, 5.1, 6.1), (4.2, 5.2, 6.2)])
('cust2', '2024-05-16', '2024-04-30', [(4.0, 5.0, 6.0), (4.1, 5.1, 6.1)])
('cust2', '2024-04-16', '2024-03-31', [(4.0, 5.0, 6.0)])
('cust2', '2024-07-15', '2024-06-30', [(4.1, 5.1, 6.1), (4.2, 5.2, 6.2), (4.3, 5.3, 6.3)])
>> ('cust2', '2024-08-14', '2024-06-30', [(4.2, 5.2, 6.2), (4.3, 5.3, 6.3)])
>> ('cust2', '2024-09-13', '2024-06-30', [(4.3, 5.3, 6.3)])
I want to drop the records marked with “>>”. Thanks.