I am trying to set up a simple client and server using python-mbedtls, where only the server shall be certificated via one intermediate certificate. I was able to make it work without an intermediate certificate but I do not get why it is not working with it.
I have the following server.py and client.py but unfortunately I always get this error:
mbedtls.exceptions.TLSError: TLSError([0x2700] 'X509 - Certificate verification failed, e.g. CRL, CA or signature check failed')
Does anyone have an idea what could be the problem?
client.py:
#!/usr/bin/env python
# SPDX-License-Identifier: MIT
"""An example DTLS/TLS client.
Run ./programs/client.py --help.
"""
from __future__ import annotations
from mbedtls._tls import _enable_debug_output, _set_debug_level # type: ignore
import socket
import sys
import time
from contextlib import suppress
from typing import Any, Optional, Tuple, Union
from mbedtls.exceptions import TLSError
from mbedtls.tls import (
ClientContext,
DTLSConfiguration,
TLSConfiguration,
TLSWrappedSocket,
TrustStore
)
from mbedtls import x509
from conf import hostname, port
if sys.version_info < (3, 10):
from typing_extensions import TypeAlias
else:
from typing import TypeAlias
from typing import Final
__all__ = ["Client"]
_Address: TypeAlias = Union[Tuple[Any, ...], str]
def _echo_tls(sock: TLSWrappedSocket, buffer: bytes, chunksize: int) -> bytes:
view = memoryview(buffer)
received = bytearray()
for idx in range(0, len(view), chunksize):
part = view[idx : idx + chunksize]
sock.send(part)
received += sock.recv(chunksize)
return received
def _echo_dtls(sock: TLSWrappedSocket, buffer: bytes, chunksize: int) -> bytes:
view = memoryview(buffer)
received = bytearray()
while len(received) != len(buffer):
part = view[len(received) : len(received) + chunksize]
sock.send(part)
data, _addr = sock.recvfrom(chunksize)
received += data
if not data:
# Avoid tight loop.
time.sleep(0.01)
return received
class Client:
def __init__(
self,
cli_conf: Union[TLSConfiguration, DTLSConfiguration],
proto: socket.SocketKind,
srv_address: _Address,
srv_hostname: Optional[str],
) -> None:
super().__init__()
self.cli_conf: Final = cli_conf
self.proto: Final = proto
self.srv_address: Final = srv_address
self.srv_hostname: Final = srv_hostname
self._sock: Optional[TLSWrappedSocket] = None
self._echo: Final = {
socket.SOCK_STREAM: _echo_tls,
socket.SOCK_DGRAM: _echo_dtls,
}[self.proto]
def __enter__(self) -> Client:
self.start()
return self
def __exit__(self, *exc_info: object) -> None:
self.stop()
def __del__(self) -> None:
self.stop()
@property
def context(self) -> Optional[ClientContext]:
if self._sock is None:
return None
assert isinstance(self._sock.context, ClientContext)
return self._sock.context
def do_handshake(self) -> None:
if not self._sock:
return
self._sock.do_handshake()
def echo(self, buffer: bytes, chunksize: int) -> bytes:
if not self._sock:
return b""
return bytes(self._echo(self._sock, buffer, chunksize))
def start(self) -> None:
if self._sock:
self.stop()
self._sock = ClientContext(self.cli_conf).wrap_socket(
socket.socket(socket.AF_INET, self.proto, ),
server_hostname=self.srv_hostname,
)
self._sock.connect(self.srv_address)
def stop(self) -> None:
if not self._sock:
return
with suppress(TLSError, OSError):
self._sock.close()
self._sock = None
def restart(self) -> None:
self.stop()
self.start()
def main() -> None:
root_crt = x509.CRT.from_file("root.crt")
inter_crt = x509.CRT.from_file("inter.crt")
trust_store = TrustStore()
trust_store.add(root_crt)
trust_store.add(inter_crt)
conf = TLSConfiguration(
trust_store=trust_store,
validate_certificates=True,
)
message = "Hello world!"
with Client(
conf, socket.SOCK_STREAM, ("127.0.0.1", port), hostname
) as cli:
_enable_debug_output(cli.context)
_set_debug_level(2)
cli.do_handshake()
received = cli.echo(message.encode("utf-8"), 1024)
print(received.decode("utf-8"))
if __name__ == "__main__":
main()
server.py:
#!/usr/bin/env python
# SPDX-License-Identifier: MIT
"""An example DTLS/TLS server.
Run ./programs/server.py --help.
"""
from __future__ import annotations
from mbedtls._tls import _enable_debug_output, _set_debug_level # type: ignore
import socket
import sys
import time
from contextlib import suppress
from functools import partial
from typing import Any, Callable, NoReturn, Optional, Tuple, Union
import datetime as dt
from mbedtls.tls import (
DTLSConfiguration,
HelloVerifyRequest,
ServerContext,
TLSConfiguration,
TLSWrappedSocket,
)
from mbedtls.x509 import CRT, CSR, BasicConstraints
from mbedtls.pk import RSA, ECC
from mbedtls import hashlib, x509
from mbedtls.tls import TrustStore
from conf import hostname, port
if sys.version_info < (3, 10):
from typing_extensions import TypeAlias
else:
from typing import TypeAlias
from typing import Final
__all__ = ["Server"]
_Address: TypeAlias = Union[Tuple[Any, ...], str]
def _make_tls_connection(sock: TLSWrappedSocket) -> TLSWrappedSocket:
assert sock
conn, _addr = sock.accept()
conn.do_handshake()
return conn
def _make_dtls_connection(sock: TLSWrappedSocket) -> TLSWrappedSocket:
assert sock
conn, addr = sock.accept()
conn.setcookieparam(addr[0].encode("ascii"))
with suppress(HelloVerifyRequest):
conn.do_handshake()
_, (conn, addr) = conn, conn.accept()
_.close()
conn.setcookieparam(addr[0].encode("ascii"))
conn.do_handshake()
return conn
class Server:
def __init__(
self,
srv_conf: Union[TLSConfiguration, DTLSConfiguration],
proto: socket.SocketKind,
address: _Address,
) -> None:
super().__init__()
self.srv_conf: Final = srv_conf
self.proto: Final = proto
self.address: Final = address
self._make_connection: Final = {
socket.SOCK_STREAM: _make_tls_connection,
socket.SOCK_DGRAM: _make_dtls_connection,
}[self.proto]
self._sock: Optional[TLSWrappedSocket] = None
def __enter__(self) -> Server:
self.start()
return self
def __exit__(self, *exc_info: object) -> None:
self.stop()
def __del__(self) -> None:
self.stop()
@property
def context(self) -> Optional[ServerContext]:
if self._sock is None:
return None
assert isinstance(self._sock.context, ServerContext)
return self._sock.context
def start(self) -> None:
if self._sock:
self.stop()
self._sock = ServerContext(self.srv_conf).wrap_socket(
socket.socket(socket.AF_INET, self.proto)
)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._sock.bind(self.address)
if self.proto is socket.SOCK_STREAM:
self._sock.listen(1)
def stop(self) -> None:
if not self._sock:
return
self._sock.close()
self._sock = None
def run(
self, conn_handler: Callable[[TLSWrappedSocket], None]
) -> NoReturn:
if not self._sock:
raise ConnectionRefusedError("server not started")
while True:
self._run(conn_handler)
def _run(self, conn_handler: Callable[[TLSWrappedSocket], None]) -> None:
assert self._sock is not None
with self._make_connection(self._sock) as conn:
conn_handler(conn)
def echo_handler(conn: TLSWrappedSocket, *, packet_size: int) -> None:
while True:
data = conn.recv(packet_size)
if data:
break
# Avoid tight loop.
time.sleep(0.01)
print(data.decode())
sent = 0
view = memoryview(data)
while sent != len(data):
sent += conn.send(view[sent:])
def make_root_ca(
# pylint: disable=too-many-arguments
subject: Optional[str] = None,
not_before: Optional[dt.datetime] = None,
not_after: Optional[dt.datetime] = None,
serial_number: Optional[int] = None,
basic_constraints: Optional[BasicConstraints] = None,
digestmod: Optional[hashlib.Algorithm] = None,
) -> Tuple[CRT, _Key]:
if subject is None:
subject = "OU=test, CN=Trusted CA"
if not_before is None:
not_before = dt.datetime.utcnow()
if not_after is None:
not_after = not_before + dt.timedelta(days=90)
if serial_number is None:
serial_number = 0x123456
if basic_constraints is None:
basic_constraints = BasicConstraints(True, -1)
if digestmod is None:
digestmod = hashlib.sha256
key = RSA()
key.generate()
crt = CRT.selfsign(
csr=CSR.new(key, subject, digestmod()),
issuer_key=key,
not_before=not_before,
not_after=not_after,
serial_number=serial_number,
basic_constraints=basic_constraints,
)
return crt, key
def make_crt(
# pylint: disable=too-many-arguments
issuer_crt: CRT,
issuer_key: str,
subject: Optional[str] = None,
not_before: Optional[dt.datetime] = None,
not_after: Optional[dt.datetime] = None,
serial_number: Optional[int] = None,
basic_constraints: Optional[BasicConstraints] = None,
digestmod: Optional[hashlib.Algorithm] = None,
) -> Tuple[CRT, str]:
if subject is None:
subject = "OU=test, CN=hostname"
if not_before is None:
not_before = issuer_crt.not_before
if not_after is None:
not_after = issuer_crt.not_after
if serial_number is None:
serial_number = 0x123456
if basic_constraints is None:
basic_constraints = BasicConstraints()
if digestmod is None:
# TODO: issuer_crt.digestmod should work but doesn't.
digestmod = hashlib.sha256
key = RSA()
key.generate()
crt = issuer_crt.sign(
csr=CSR.new(key, subject, digestmod()),
issuer_key=issuer_key,
not_before=not_before,
not_after=not_after,
serial_number=serial_number,
basic_constraints=basic_constraints,
)
return crt, key
def create_and_export_certificates():
root_crt, root_key = make_root_ca()
inter_crt, inter_key = make_crt(
root_crt, root_key, subject="OU=test, CN=Intermediate CA"
)
ee_crt, ee_key = make_crt(
inter_crt, inter_key, subject=f"OU=test, CN={hostname}"
)
with open("root.crt", "wt") as file:
file.write(root_crt.to_PEM())
with open("inter.crt", "wt") as file:
file.write(inter_crt.to_PEM())
print(root_crt.verify(inter_crt))
print(inter_crt.verify(ee_crt))
return (ee_crt, inter_crt, root_crt), ee_key
def main() -> NoReturn:
(ee_crt, inter_crt, root_crt), ee_key = create_and_export_certificates()
conf = TLSConfiguration(
certificate_chain=([ee_crt, inter_crt, root_crt], ee_key),
validate_certificates=False
)
with Server(conf, socket.SOCK_STREAM, ("127.0.0.1", 60_111)) as srv:
_enable_debug_output(srv.context)
_set_debug_level(2)
srv.run(partial(echo_handler, packet_size=4069))
if __name__ == "__main__":
import faulthandler
faulthandler.enable()
with suppress(KeyboardInterrupt):
main()
- tried looking into the test cases of the repository
- was expecting to establish an encrypted communication between client and server
scrapp is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.