I create a python that will retrieve message from AWS IOT, the problem here is that the message that send by the gateway to aws iot is every 5 min, but my code keep reconnecting/subscribing making it retrieve the message every few seconds instead of 5 min. How do I fix this, so that I can retrieve the message from aws iot following the timestamp that the message arrive from the gateway.
def on_connection_interrupted(connection, error, **kwargs):
print(f"Connection interrupted. Error: {error}")
def on_connection_resumed(connection, return_code, session_present, **kwargs):
print(f"Connection resumed. Return code: {return_code}, session present: {session_present}")
if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
print("Session did not persist. Resubscribing to existing topics...")
resubscribe_future, _ = connection.resubscribe_existing_topics()
resubscribe_future.add_done_callback(on_resubscribe_complete)
def on_resubscribe_complete(resubscribe_future):
resubscribe_results = resubscribe_future.result()
print(f"Resubscribe results: {resubscribe_results}")
for topic, qos in resubscribe_results['topics']:
if qos is None:
sys.exit(f"Server rejected resubscribe to topic: {topic}")
def on_message_received(topic, payload, dup, qos, retain, **kwargs):
global received_count
received_count += 1
print(f"Received message from topic '{topic}': {payload.decode()}")
print(f"Count:{received_count}")
timestamp = kwargs.get('timestamp', None)
try:
payload_str = payload.decode('utf-8')
data = json.loads(payload_str)
timestamp = data.get('timestamp')
if timestamp is not None:
timestamp_seconds = timestamp / 1000
dt_object = datetime.datetime.fromtimestamp(timestamp_seconds)
else:
dt_object = datetime.datetime.now()
insert_into_mysql(data.get('AHUList'), dt_object, mysql_config)
if received_count == cmdData.input_count:
received_all_event.set()
except UnicodeDecodeError as decode_error:
print(f"Error decoding payload: {decode_error}")
except json.JSONDecodeError as json_error:
print(f"Error decoding JSON payload: {json_error}")
except Exception as ex:
print(f"Error occurred: {ex}")
def on_connection_success(connection, callback_data):
assert isinstance(callback_data, mqtt.OnConnectionSuccessData)
print(f"Connection Successful with return code: {callback_data.return_code}, session present: {callback_data.session_present}")
def on_connection_failure(connection, callback_data):
assert isinstance(callback_data, mqtt.OnConnectionFailureData)
print(f"Connection failed with error code: {callback_data.error}")
def on_connection_closed(connection, callback_data):
print("Connection closed")
if __name__ == '__main__':
cmdData = CommandLineUtils.parse_sample_input_pubsub()
proxy_options = None
if cmdData.input_proxy_host and cmdData.input_proxy_port:
proxy_options = http.HttpProxyOptions(
host_name=cmdData.input_proxy_host,
port=cmdData.input_proxy_port
)
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=cmdData.input_endpoint,
port=cmdData.input_port,
cert_filepath=cmdData.input_cert,
pri_key_filepath=cmdData.input_key,
ca_filepath=cmdData.input_ca,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
client_id=cmdData.input_clientId,
clean_session=False,
keep_alive_secs=30,
http_proxy_options=proxy_options,
on_connection_success=on_connection_success,
on_connection_failure=on_connection_failure,
on_connection_closed=on_connection_closed
)
print(f"Connecting to {cmdData.input_endpoint} with client ID '{cmdData.input_clientId}'...")
connect_future = mqtt_connection.connect()
connect_future.result()
print("Connected!")
message_topic = cmdData.input_topic
print(f"Subscribing to topic '{message_topic}'...")
subscribe_future, packet_id = mqtt_connection.subscribe(
topic=message_topic,
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_message_received
)
subscribe_result = subscribe_future.result()
print(f"Subscribed with {str(subscribe_result['qos'])}")
print("Disconnecting...")
disconnect_future = mqtt_connection.disconnect()
disconnect_future.result()
print("Disconnected!")
I already try make the code wait for 5 min before execution, but its not the solution that I want.
KACS – Mohammad Ridzwan Syah is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.