My data source emits IOT data with the following structure –
io_id,value,timestamp
232,1223,1718191205
321,671,1718191254
54,2313,1718191275
232,432,1718191315
321,983,1718191394
........
I want to monitor change in individual io_id values. I’m using Flink for this. Below code works fine for this purpose.
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.common.typeinfo import Types
from pyflink.datastream.state import ValueStateDescriptor
class ValueChangeMonitor(KeyedProcessFunction):
def __init__(self):
self.previous_value_state = None
def open(self, runtime_context: RuntimeContext):
self.previous_value_state = runtime_context.get_state(
ValueStateDescriptor("previous_value", Types.INT())
)
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
io_id, io_value = value
previous_value = self.previous_value_state.value()
if previous_value is not None:
change = abs(io_value - previous_value)
if change > 100:
print(f"Significant change detected for IO {io_id}: {change}")
self.previous_value_state.update(io_value)
def main():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
data_stream = env.from_collection([
(232,1223,1718191205)
(321,671,1718191254),
(54,2313,1718191275),
(232,432,1718191315),
(321,983,1718191394)
], type_info=Types.TUPLE([Types.INT(), Types.INT()]))
keyed_stream = data_stream.key_by(lambda x: x[0])
keyed_stream.process(ValueChangeMonitor()).print()
env.execute("IO Value Change Monitor")
if __name__ == '__main__':
main()
Now I want to monitor change in value of equations built up using io sensors. ex – (io_1==234 && io_2==423) can be one such combination. I want to implement this using PyFlink instead of writing my own monitoring solution in Python.
How can I achieve the above? Is Flink the right tool for the above use case?