I am working on a http proxy in Python with blocking websites functionality and logging. It works fine but it works only with http and not with https. I try it to make it work with the ssl library and creating a self signed key with openssl but no luck. I don’t know how to go about it.
Here is the code
import socket
import signal
import threading
import logging
from datetime import datetime
class ProxyServer:
def __init__(self, config):
# Shutdown on Ctrl+C
signal.signal(signal.SIGINT, self.shutdown)
# Create a TCP socket
self.serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Re-use the socket
self.serverSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Bind the socket to a public host, and a port
self.serverSocket.bind((config['HOST_NAME'], config['BIND_PORT']))
self.serverSocket.listen(10) # Become a server socket
self.__clients = {}
self.config = config
# Setup logging
logging.basicConfig(filename=config['LOG_FILE'], level=logging.INFO,
format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
# Blocklist
self.blocklist = config.get('BLOCKLIST',[])
def shutdown(self, signum, frame):
print("Shutting down server...")
self.serverSocket.close()
exit(0)
def _getClientName(self, client_address):
return f"{client_address[0]}:{client_address[1]}"
def proxy_thread(self, clientSocket, client_address):
# Get the request from the browser
request = clientSocket.recv(self.config['MAX_REQUEST_LEN'])
# Log the request
self.log_request(request, client_address)
# Parse the first line
first_line = request.split(b'n')[0]
# Get URL
url = first_line.split(b' ')[1].decode('utf-8')
http_pos = url.find("://") # Find pos of ://
if http_pos == -1:
temp = url
else:
temp = url[(http_pos+3):] # Get the rest of URL
port_pos = temp.find(":") # Find the port pos (if any)
# Find end of web server
webserver_pos = temp.find("/")
if webserver_pos == -1:
webserver_pos = len(temp)
webserver = ""
port = -1
if port_pos == -1 or webserver_pos < port_pos:
# Default port
port = 80
webserver = temp[:webserver_pos]
else: # Specific port
port = int((temp[(port_pos+1):])[:webserver_pos-port_pos-1])
webserver = temp[:port_pos]
# Check if the domain is in the blocklist
if self.is_blocked(webserver):
self.deny_request(clientSocket, webserver)
return
try:
# Create a socket to connect to the web server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(self.config['CONNECTION_TIMEOUT'])
s.connect((webserver, port))
s.sendall(request)
while True:
data = s.recv(self.config['MAX_REQUEST_LEN'])
if len(data) > 0:
clientSocket.send(data)
else:
break
except socket.error as e:
print(f"Socket error: {e}")
finally:
clientSocket.close()
s.close()
def is_blocked(self, domain):
return any(blocked_domain in domain for blocked_domain in self.blocklist)
def deny_request(self, clientSocket, domain):
response = (
"HTTP/1.1 403 Forbiddenrn"
"Content-Type: text/htmlrn"
"Connection: closern"
"rn"
"<html>"
"<head>"
"<style>"
"body {"
" font-family: Arial, sans-serif;"
" background-color: #FF0000;"
" display: flex;"
" justify-content: center;"
" align-items: center;"
" height: 100vh;"
" margin: 0;"
"}"
"h1 {"
" color: #333;"
" font-size: 36px;"
" margin-bottom: 20px;"
"}"
"p {"
" color: #666;"
" font-size: 18px;"
"}"
".container {"
" background-color: #fff;"
" padding: 40px;"
" border-radius: 8px;"
" box-shadow: 0 2px 8px rgba(0, 0, 0, 0.1);"
" text-align: center;"
"}"
"</style>"
"</head>"
"<body>"
"<div class='container'>"
"<h1>403 Forbidden</h1>"
f"<p>Access to {domain} is blocked.</p>"
"</div>"
"</body>"
"</html>"
)
clientSocket.sendall(response.encode('utf-8'))
clientSocket.close()
logging.info(f"Blocked access to {domain}")
def log_request(self, request, client_address):
request_lines = request.decode('utf-8', errors='replace').split('n')
log_message = f"Source IP: {client_address[0]}n"
for line in request_lines:
log_message += line + "n"
logging.info(log_message)
def start(self):
while True:
try:
# Establish the connection
clientSocket, client_address = self.serverSocket.accept()
d = threading.Thread(name=self._getClientName(client_address),
target=self.proxy_thread,
args=(clientSocket, client_address))
d.setDaemon(True)
d.start()
except Exception as e:
print(f"Error:{e}")
if __name__ == "__main__":
blocklist_file=open("blocklist.txt","r")
blocklist=blocklist_file.read().splitlines()
blocklist_file.close()
config = {
'HOST_NAME': '127.0.0.1',
'BIND_PORT': 8888,
'MAX_REQUEST_LEN': 8192,
'CONNECTION_TIMEOUT': 5,
'LOG_FILE': 'proxy_server.log',
'BLOCKLIST': blocklist
}
server = ProxyServer(config)
server.start()
I tried to import ssl library and generate a self signed certificate imported it to Firefox but it still can t I still can t visit https website(when I try to visit one it don t load). Here is the modified code side
def proxy_thread(self, clientSocket, client_address):
request = clientSocket.recv(self.config['MAX_REQUEST_LEN'])
self.log_request(request, client_address)
first_line = request.split(b'n')[0]
url = first_line.split(b' ')[1].decode('utf-8')
http_pos = url.find("://")
if http_pos == -1:
temp = url
else:
temp = url[(http_pos+3):]
port_pos = temp.find(":")
webserver_pos = temp.find("/")
if webserver_pos == -1:
webserver_pos = len(temp)
webserver = ""
port = -1
if port_pos == -1 or webserver_pos < port_pos:
port = 80
webserver = temp[:webserver_pos]
else:
port = int((temp[(port_pos+1):])[:webserver_pos-port_pos-1])
webserver = temp[:port_pos]
if self.is_blocked(webserver):
self.deny_request(clientSocket, webserver)
return
try:
if port == 443: # HTTPS
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
context.load_cert_chain(certfile='server.crt', keyfile='server.key')
s = context.wrap_socket(socket.socket(socket.AF_INET), server_hostname=webserver)
else: # HTTP
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(self.config['CONNECTION_TIMEOUT'])
s.connect((webserver, port))
s.sendall(request)
while True:
data = s.recv(self.config['MAX_REQUEST_LEN'])
if len(data) > 0:
clientSocket.send(data)
else:
break
except socket.error as e:
print(f"Socket error: {e}")
except ssl.SSLError as e:
print(f"SSL error: {e}")
finally:
clientSocket.close()
if 's' in locals():
s.close()
so as i said what i want is to be able to intercept https traffic too and also block https websites too if possible, Thank you in advance
exodia is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.