I’m running a Python script on PythonAnywhere that connects to my iPhone via SSH using Paramiko and executes some commands. Additionally, I use subprocess.Popen to run another script, proxy.py, which uses mitmproxy to intercept and decrypt some requests. The scripts work perfectly on my local Windows machine, but when I run them on PythonAnywhere, I encounter the following error:
Starting the process…
Traceback (most recent call last):
File “/home/gaetanoeu/.local/lib/python3.10/site-packages/mitmproxy/proxy/server.py”, line 46, in init
super().init(
File “/home/gaetanoeu/.local/lib/python3.10/site-packages/mitmproxy/net/tcp.py”, line 593, in init
self.socket.bind(self.address)
OSError: [Errno 98] Address already in use
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File “/home/gaetanoeu/tokengen/proxy.py”, line 46, in <module>
m.server = ProxyServer(config)
File “/home/gaetanoeu/.local/lib/python3.10/site-packages/mitmproxy/proxy/server.py”, line 54, in init
raise exceptions.ServerException(
mitmproxy.exceptions.ServerException: Error starting proxy server: OSError(98, ‘Address already in use’)
proxy.py
from mitmproxy import http
from mitmproxy.options import Options
from mitmproxy.proxy import ProxyConfig, ProxyServer
from mitmproxy.tools.dump import DumpMaster
import json
import re
class Addon:
def request(self, flow: http.HTTPFlow) -> None:
if re.search(r'.*/auth/sign_in', flow.request.pretty_url):
self.process_packet(flow, 'Request', flow.request.text, flow.request.headers.get("content-type"))
flow.kill() # Kill the request to prevent it from being sent
def response(self, flow: http.HTTPFlow) -> None:
pass # No need to process responses
def process_packet(self, flow: http.HTTPFlow, packet_type: str, data: str, content_type: str):
if content_type == "application/json":
data = json.loads(data)
if 'device_token' in data:
device_token = data['device_token']
#print(f'Device Token: {device_token}')
# Salvare il token in un file JSON
tokens_file = 'tokens.json'
# Controlla se il file esiste e carica i dati esistenti
try:
with open(tokens_file, 'r') as file:
tokens = json.load(file)
except (FileNotFoundError, json.JSONDecodeError):
tokens = []
# Aggiungi il nuovo token
tokens.append(device_token)
# Scrivi di nuovo il file JSON
with open(tokens_file, 'w') as file:
json.dump(tokens, file, indent=4)
if __name__ == "__main__":
options = Options(listen_port=8888, http2=True)
m = DumpMaster(options, with_termlog=False, with_dumper=False)
config = ProxyConfig(options)
m.server = ProxyServer(config)
m.addons.add(Addon())
try:
print('Starting Dokkan mitmproxy')
m.run()
except KeyboardInterrupt:
m.shutdown()
bot.py
import time
import paramiko
import requests
import json
import subprocess
# Configurazione della connessione
SSH_IP = '192.168.1.89'
SSH_PORT = 22
SSH_USER = 'root'
SSH_PSW = 'alpine'
def send_tokens(file_path, url):
"""
Legge i token da un file JSON e li invia tramite una richiesta POST all'URL specificato.
:param file_path: Il percorso del file JSON contenente i token.
:param url: L'URL a cui inviare i token.
:return: La risposta del server alla richiesta POST.
"""
try:
# Apri e leggi il file JSON
with open(file_path, 'r') as file:
tokens = json.load(file) # Carica direttamente la lista di token
if not isinstance(tokens, list):
print("Il file JSON non contiene una lista di token.")
return
if not tokens:
print("Il file non contiene token.")
return
# Invia i token tramite una richiesta POST
response = requests.post(url, json={"tokens": tokens})
# Controlla lo stato della richiesta
if response.status_code == 201:
print("Token inviati con successo!")
else:
print(f"Errore nell'invio dei token: {response.status_code} - {response.text}")
# Svuota il contenuto del file JSON
open(file_path, 'w').close()
return response
except FileNotFoundError:
print(f"Il file {file_path} non è stato trovato.")
except json.JSONDecodeError:
print("Errore nella decodifica del file JSON.")
except Exception as e:
print(f"Si è verificato un errore: {e}")
def get_device_token():
try:
# Leggi i dati dal file JSON
with open('tokens.json', 'r') as f:
tokens = json.load(f)
# Verifica se la lista dei token non è vuota e restituisce il primo token
if tokens:
return tokens[0]
else:
print("No tokens found in the file.")
return None
except FileNotFoundError:
print("Device token file not found.")
return None
except json.JSONDecodeError:
print("Error decoding JSON from the device token file.")
return None
def execute_ssh_command(command):
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(SSH_IP, SSH_PORT, SSH_USER, SSH_PSW)
stdin, stdout, stderr = client.exec_command(command)
output = stdout.read().decode()
error = stderr.read().decode()
client.close()
return output, error
except paramiko.AuthenticationException:
return None, "Autenticazione fallita, controlla username e password."
except paramiko.SSHException as e:
return None, f"Errore SSH: {e}"
except Exception as e:
return None, f"Errore imprevisto: {e}"
def check_token_count(url):
"""
Controlla il numero di token disponibili tramite una richiesta GET all'URL specificato,
utilizzando un custom User-Agent.
:param url: L'URL da cui recuperare il conteggio dei token.
:return: Il valore del conteggio dei token se la richiesta ha successo, altrimenti None.
"""
headers = {
"User-Agent": "MyCustomUserAgent/1.0" # Imposta qui il tuo custom User-Agent
}
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
return data.get("count", None)
else:
print(f"Errore nella richiesta GET: {response.status_code} - {response.text}")
return None
except Exception as e:
print(f"Si è verificato un errore durante il controllo dei token: {e}")
return None
def main():
print("Starting the process...")
# Start the proxy
subprocess.Popen(["python", "proxy.py"])
# Execute SSH commands
commands = [
'nimbus powerbutton',
'nimbus home',
'curl http://192.168.1.89:8080/control/start_playing?path=/j1jtk65q5pwh2494mhawzg25j/DumpDokkanToken_jpn.at'
]
for command in commands:
output, error = execute_ssh_command(command)
if output:
print(f"Output comando '{command}': {output}")
if error:
print(f"Errore comando '{command}': {error}")
# Wait a few seconds to ensure the proxy has captured the token
# Recupera il device_token
device_token = get_device_token() # Chiamata sincrona
if device_token:
pass
else:
print("Unable to retrieve device token.")
print("Process completed.")
time.sleep(30)
send_tokens('tokens.json', 'https://gaetanoeu.eu.pythonanywhere.com/tokens')
if __name__ == "__main__":
while True:
# Controlla il conteggio dei token ogni minuto
token_count = check_token_count('https://gaetanoeu.eu.pythonanywhere.com/tokens/count')
if token_count is not None and token_count <= 3:
main()
else:
print(f"Conteggio attuale dei token: {token_count}. Non è necessario avviare il processo.")
# Attende 1 minuto prima di eseguire nuovamente il controllo
time.sleep(60)
I tried changing the port in the proxy.py
script. This resolved the “Address already in use” error, but now the script hangs on “Starting the process…” and “Starting mitmproxy”, without sending the SSH commands as expected.
jacorella is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.