For some reason when using grpc.io with UnaryUnaryClientInterceptor and StreamStreamClientInterceptor, only the UnaryUnaryClientInterceptor gets called, heres the code in async:
import asyncio
import grpc
from grpc.aio import ClientCallDetails, AioRpcError
import geyser_pb2
import geyser_pb2_grpc
class WithHeaders(
grpc.aio.UnaryUnaryClientInterceptor,
grpc.aio.StreamStreamClientInterceptor,
):
def __init__(self, *headers: tuple[str, str]):
self.headers = headers
def _insert_headers(self, new_metadata, client_call_details) -> ClientCallDetails:
metadata = []
if client_call_details.metadata is not None:
metadata = list(client_call_details.metadata)
metadata.extend(new_metadata)
return ClientCallDetails(
method=client_call_details.method,
timeout=client_call_details.timeout,
metadata=metadata,
credentials=client_call_details.credentials,
wait_for_ready=client_call_details.wait_for_ready,
)
async def intercept_unary_unary(self, continuation, client_call_details, request):
print("intercept unary_unary")
new_client_call_details = self._insert_headers(
self.headers, client_call_details
)
return await continuation(new_client_call_details, request)
async def intercept_stream_stream(
self, continuation, client_call_details, request_iterator
):
print("intercept stream_stream")
new_client_call_details = self._insert_headers(
self.headers, client_call_details
)
return await continuation(new_client_call_details, request_iterator)
class GeyserClient:
def __init__(self, channel):
self.stub = geyser_pb2_grpc.GeyserStub(channel)
async def get_version(self):
request = geyser_pb2.GetVersionRequest()
return await self.stub.GetVersion(request)
async def subscribe_with_request(self, *requests: geyser_pb2.SubscribeRequest):
async def request_generator():
for req in requests:
yield req
async for response in self.stub.Subscribe(request_generator()):
yield response
async def main():
inter_auth = WithHeaders(("something", "12345"))
async with grpc.aio.secure_channel(
"...",
grpc.ssl_channel_credentials(),
interceptors=[inter_auth],
) as channel:
client = GeyserClient(channel)
ver = await client.get_version()
print(f"{ver=}")
req = geyser_pb2.SubscribeRequest(
...
)
async for data in client.subscribe_with_request(req):
print(data)
if __name__ == "__main__":
asyncio.run(main())
and working code use non-async:
import itertools
import grpc
from grpc import ssl_channel_credentials
from grpc.aio import ClientCallDetails
import geyser_pb2
import geyser_pb2_grpc
class WithHeaders(
grpc.UnaryUnaryClientInterceptor,
grpc.StreamStreamClientInterceptor,
):
def __init__(self, *headers: tuple[str, str]):
self.headers = headers
def _insert_headers(self, new_metadata, client_call_details) -> ClientCallDetails:
metadata = []
if client_call_details.metadata is not None:
metadata = list(client_call_details.metadata)
metadata.extend(new_metadata)
return ClientCallDetails(
client_call_details.method,
client_call_details.timeout,
metadata,
client_call_details.credentials,
False,
)
def intercept_unary_unary(self, continuation, client_call_details, request):
print("intercept unary_unary")
new_client_call_details = self._insert_headers(
self.headers, client_call_details
)
return continuation(new_client_call_details, request)
def intercept_stream_stream(
self,
continuation,
client_call_details,
request_iterator,
):
print("intercept stream_stream")
new_client_call_details = self._insert_headers(
self.headers, client_call_details
)
return continuation(new_client_call_details, request_iterator)
class GeyserClient:
def __init__(self, channel):
self.stub = geyser_pb2_grpc.GeyserStub(channel)
def get_version(self):
request = geyser_pb2.GetVersionRequest()
return self.stub.GetVersion(request)
def subscribe_with_request(
self,
*requests: geyser_pb2.SubscribeRequest,
):
it = iter(requests)
stream = self.stub.Subscribe(it)
return stream
def main():
inter_auth = WithHeaders(("something", "12345"))
with grpc.secure_channel(
"...",
ssl_channel_credentials(),
options=[("grpc.enable_interceptor", 1)],
) as channel:
channel = grpc.intercept_channel(channel, inter_auth)
client = GeyserClient(channel)
ver = client.get_version()
print(f"{ver=}")
req = geyser_pb2.SubscribeRequest(
...
)
for data in client.subscribe_with_request(req):
print(data)
if __name__ == "__main__":
main()
GetVersion -> unary_unary
Subscribe -> stream_stream
I tried the code in async (grpc.aio) and sync (grpc): expected intercept_stream_stream to work in both implementations, but didn’t only in sync
New contributor
Martim Martins is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.