I have a list of json objects in a file, see example json below:
{"user_id": "xyz", "num_bytes": 500, "ip": "xxx.xxx.xxx.xxx", "http_request": ""GET ehello.html HTTP/1.0"", "http_response": 200}
I want to group by the user_id and perform various aggregations, so an example result looks like below (all sample codes are in python):
{"user_id": "xyz", "count": 1000, "total_bytes": 1000, "max_bytes": 500, "min_bytes": 100}
Using schemas, the code looks below (just specifying the aggregation code here):
...
| 'PerUserAggregations' >> beam.GroupBy('user_id')
.aggregate_field('user_id', CountCombineFn(), 'count')
.aggregate_field('num_bytes', sum, 'total_bytes')
.aggregate_field('num_bytes', max, 'max_bytes')
.aggregate_field('num_bytes', min, 'min_bytes')
.with_output_types(PerUserAggregation)
class PerUserAggregation(typing.NamedTuple):
user_id: str
count: int
total_bytes: int
max_bytes: int
min_bytes: int
Without using Schemas, I could think of first grouping by the user_id and then iterating over the values per user in a ParDo like below:
...
| 'group by user id' >> beam.GroupBy(lambda x: x["user_id"])
| 'aggregate using par do' >> beam.ParDo(MyAggregation())
class MyAggregation(beam.DoFn):
def process(self, element):
vals = element[1]
count = 0
total_bytes = 0
max_bytes = 0
min_bytes = float('inf')
for val in vals:
count+= 1
total_bytes += val["num_bytes"]
max_bytes = max_bytes if max_bytes > val["num_bytes"] else val["num_bytes"]
min_bytes = min_bytes if min_bytes < val["num_bytes"] else val["num_bytes"]
dict = {"user_id": element[0], "count": count, "total_bytes": total_bytes, "max_bytes": max_bytes, "min_bytes": min_bytes}
yield dict
My questions are below:
- for the option without using Schema, are there better ways than the one I have mentioned?
- for the one with Schema, what is the actual logic being used, is it just a syntactical sugar, or is the processing fundamentally different than the one where no Schema is used