I have a Java Azure Function which has an input EventHub binding and an output EventHub binding as follows:
@FunctionName("test-output-function-request")
@EventHubOutput(
name = "outputEventHubMessage",
eventHubName = "my-output",
connection = "AzureEventHubConnection"
)
public OutputEvent eventHubProcessor(
@EventHubTrigger(
name = "msg",
eventHubName = "my-input",
connection = "AzureEventHubConnection",
cardinality = Cardinality.ONE
)
String input,
@BindingName("Properties") Map<String, String> metadata,
final ExecutionContext context
) {
OutputEvent outputEvent = new OutputEvent(metadata);
try {
FunctionResponse functionResponse = runFunction();
outputEvent.setFunctionResponse(functionResponse);
if (!functionResponse.isResponseSuccessful()) {throw new FunctionResponseException(functionResponse.getResponseMessage());}
} catch (Exception e) {
String errorMessage = String.format("Error handling request: %s", e.getMessage());
outputEvent.setFunctionResponse(FunctionResponse.error(errorMessage));
throw new Exception(errorMessage);
} finally {
return outputEvent;
}
}
The goal here is to return the output of the Function and send that to the output eventhub. I want to use this to determine the success/failure of the Function. The finally block is used to always return the OutputEvent, even if theres an exception thrown (which is what we want).
I want to see if this is the best way to do this. I know I could also just output the AppRequests data from the Function itself but I want the output to include the properties of the request itself as I’m sending some custom properties which need to be used downstream.
Tried to export the appRequests as mentioned which worked really well but I need the event properties from the input binding.
I have used below code in Java Azure function to achieve your requirement:
Function.java:
public class Function {
@FunctionName("test-output-function-request")
@EventHubOutput(
name = "outputEventHubMessage",
eventHubName = "kpevnhub-feXXXc",
connection = "EventHubConnection"
)
public OutputEvent eventHubProcessor(
@EventHubTrigger(
name = "msg",
eventHubName = "my-input",
connection = "EventHubConnection",
cardinality = Cardinality.ONE
)
String input,
@BindingName("Properties") Map<String, String> metadata,
final ExecutionContext context
) {
OutputEvent outputEvent = new OutputEvent(metadata);
try {
FunctionResponse functionResponse = runFunction();
outputEvent.setFunctionResponse(functionResponse);
if (!functionResponse.isResponseSuccessful()) {
throw new FunctionResponseException(functionResponse.getResponseMessage());
}
}
catch (Exception e) {
context.getLogger().severe(String.format("Error handling request: %s", e.getMessage()));
// Rethrow the exception to ensure the function invocation status is captured
throw new RuntimeException("Exception occurred during function processing", e);
}
// Return the OutputEvent
return outputEvent;
}
// implementation of runFunction
FunctionResponse runFunction() {
return new FunctionResponse(true, "Operation successful");
}
public HttpResponseMessage run(HttpRequestMessage<Optional<String>> req, ExecutionContext context) {
throw new UnsupportedOperationException("Unimplemented method 'run'");
}
}
FunctionResponse.java:
public class FunctionResponse {
private boolean successful;
private String message;
public FunctionResponse(boolean successful, String message) {
this.successful = successful;
this.message = message;
}
public boolean isResponseSuccessful() {
return successful;
}
public String getResponseMessage() {
return message;
}
public static FunctionResponse error(String message) {
return new FunctionResponse(false, message);
}
public FunctionResponse success(String message) {
return new FunctionResponse(true, message);
}
}
OutputEvent.java:
public class OutputEvent {
private Map<String, String> metadata;
private FunctionResponse functionResponse;
public OutputEvent(Map<String, String> metadata) {
this.metadata = metadata;
}
public Map<String, String> getMetadata() {
return metadata;
}
public void setMetadata(Map<String, String> metadata) {
this.metadata = metadata;
}
public FunctionResponse getFunctionResponse() {
return functionResponse;
}
public void setFunctionResponse(FunctionResponse functionResponse) {
this.functionResponse = functionResponse;
}
}
FunctionResponseException.java:
public class FunctionResponseException extends RuntimeException {
public FunctionResponseException(String message) {
super(message);
}
}
Sent Events from my-input event hub:
local response:
Functions:
test-output-function-request: eventHubTrigger
For detailed output, run func with --verbose flag.
[2024-09-12T11:23:32.802Z] Worker process started and initialized.
[2024-09-12T11:23:36.662Z] Host lock lease acquired by instance ID '000000000000000000000000F72731CC'.
[2024-09-12T11:24:24.870Z] Executing 'Functions.test-output-function-request' (Reason='(null)', Id=808c7a12-56b4-4388-8857-280ee0d2099c)
[2024-09-12T11:24:24.874Z] Trigger Details: PartionId: 0, Offset: 464, EnqueueTimeUtc: 2024-09-12T11:24:25.3630000+00:00, SequenceNumber: 2, Count: 1
[2024-09-12T11:24:25.005Z] Function "test-output-function-request" (Id: 808c7a12-56b4-4388-8857-280ee0d2099c) invoked by Java Worker
[2024-09-12T11:24:26.375Z] Executed 'Functions.test-output-function-request' (Succeeded, Id=808c7a12-56b4-XXX-280ee0d2099c, Duration=1537ms)
Portal:
Exported the function response data to the output eventhub:
3