Spoiler
My question is the following: Are there any design patterns for representing chainable functions that are for the problem described below?
High-Level Decription of the Process
I’m currently building an image-processing server, whose functionality is exposed via a web API.
Client Behavior
Clients first authenticate, then request that one or more analyses be performed by the server. Once a client has authenticated and requested his analysis stack, he streams his video data to the analysis server.
Server Behavior
On the server side, the Stream
class is used to represent the flow of data. It’s the first-class object around which the pipeline is built, and it can be thought of as an ordered FIFO queue of sorts. Frames are pushed, in order of arrival, onto the Stream
instance, and each stream may optionally map a function onto its data. Results from a mapped function are projected onto a new Stream
instance.
For example:
# input_stream is asynchronously updated with incoming frames
stream = input_stream.map(do_sobel_filter) # apply Sobel filter to each incoming frame
stream2 = stream.window(fn, 10) # apply function to a sliding window of 10 frames
An important side-note is that our use of Stream
objects allows us to apply functions either to individual frames or to groups of frames. This is useful as some analyses require context.
Requirements & Constraints
The above bold text is particularly important; A given client will be authorized only to perform a subset of the image analyses offered by our system. This in turn makes it critical that we be able to dynamically compose the image-processing pipeline — we don’t want to perform expensive computations only to throw away the result.
Further complicating the matter is the fact that almost all of the analyses we offer have some prerequisites, that is, operation Oi may depend on the result of operation Oi-n. Our system needs to resolve these dependencies and run each operation exactly once.
Finally, the chaining/composition of operations must be associative. This is because we’d like to be able to compose reusable higher-level computations from lower-level atomic computations. For instance, consider the sequence of operations x -> y -> z
. We would like to be able to assign the ordered chain x->y->z
to a variable, thus CMP1 = x->y->z
. Similarly, CMP2
is the result of chaining operations a->b->c
. Ultimately, we would like to be able to do the equivalent of CMP3 = CMP1 -> CMP2
, and have this be strictly equivalent to CMP3 = x->y->z->a->b->c
(associativity).
To summarize, here are the requirements for my image analysis function objects:
- Functions must be composeable, meaning that it should be possible to manipulate a first-class object that represents a series of sub-computations.
- Ideally, functions should resolve dependencies, meaning that doing
A -> B
should implicitly evaluate toA -> X -> C
ifC
depends on bothA
andX
. This may be worthy of it’s own question, and so should be treated as a soft requirement. - Chaining of operations should be associative, and composed functions should also be chainable. The idea is to have a reusable pattern for representing increasingly complex pipelines.
Are there any well-tested patterns for achieving such a result?
Edits:
1) In response to @Giorgio’s questions, my (admittedly ambiguous) notation serves to distinguish between the temporal order of frames and the order of operations. Below is an example of how an operation can depend on its preceding counterparts:
FindFace -> FindEyes -> SegmentEye -> MeasurePupilDiameter
Here we can see that each function after FindFace
depends on the cumulative result of the preceding functions. Now consider:
FindFace -> FindEyes -> SegmentEye -> GetIrisColorHistogram
This second example demonstrates how there is often a common core of operations that needs to be performed across analyses. When composing operations, I’d like to do steps 1 – 3 only once, and then apply MeasurePupilDiameter
and GetIrisColorHistogram
from there.
Intuitively, it seems like the right way to do this would be to assign a dictionary to my Frame
object and fill it with features as they’re computed — a memoize pattern, if you will. From there, however, it’s unclear to me how I should go about abstracting all the boilerplate code pertaining to (a) checking if a prerequisite feature has already been computed and (b) automatically calling the appropriate function if it has not.
9
Your problem has striking parallels to functional programming: Monads, Functors, and composing functions. Your composability requirements essentially states that each operation must be a function that takes a stream and returns a stream:
operation : Stream -> Stream
Most functions will not be expressed in terms of whole streams but rather single frames or windows of frames. You have correctly noted that you can lift these frame-wise operations to stream operations by functions like map
.
class Operation:
def __init__(self, function, dependencies):
self.dependencies = dependencies
self.function = function
def __call__(self, stream):
return self.function(stream)
def lift_frame_to_stream(frame_operation, dependencies):
def stream_operation(stream):
for frame in stream:
yield frame_operation(frame)
return Operation(stream_operation, dependencies)
This composability is at odds with the requirement that the operations can have dependencies. We can solve this by handling the dependencies outside of the type system. Given a flat list of operations op1, op2, ..., opn
that each have dependencies, then we can iterate through the list to determine whether these dependencies are fulfilled. In the simplest case, this is done by keeping a set of all operations encountered up to that point:
# unsatisfied_dependencies: Iterable[Operation] -> List[(Operation, List[Operation])]
def unsatisfied_dependencies(operations):
seen = set()
needed = []
for op in operations:
op_needs = []
for dep in op.dependencies:
if dep not in seen:
op_needs.append(dep)
if op_needs:
needed.append((op, op_needs))
seen.add(op)
return needed
I would be wary of automatically fulfilling dependencies. If this would be done, you’d have to iterate the operations by index. If an unmet dependency is discovered, rather than adding it to the list of needed operations you’d insert the dependency before the current operation, then back up one element, and continue checking the dependencies starting with the newly inserted operation.
The problem with automatic dependency insertion is that the order of operations could matter, and this resolution strategy does not account for that. I think it would be better to make users aware of dependencies and require these to be met before processing starts.
Processing the stream is a fairly simple job of taking a pipeline with all dependencies met and applying the operations. Assuming that all operations are lazy (i.e. don’t immediately consume the stream but only calculate each frame that is requested; easily implemented using Python’s generators), then the composed stream would have to be force-evaluated.
stream = input_stream
for op in verified_pipeline:
stream = op(stream)
# force the stream
The pipeline cannot be composed any other way; “associativity” is a misnomer since stream operations are unary and not binary operators. However, your requirement for larger “building blocks“ of operations that contain child operations is understandable, and easy to implement – the object-oriented Composite Pattern makes sense here. The dependencies of the composite are all dependencies of the child operations that are not met inside that composite.
Actually, a pipeline of operations is equivalent to a single operation, so we can use a pipeline as the composite.
class Pipeline:
def __init__(self, childs, extra_dependencies=[]):
self.childs = childs
deps = []
for (op, dependencies) in unsatisfied_dependencies(childs):
deps.extend(dependencies)
deps.extend(extra_dependencies)
self.dependencies = deps
def __call__(self, stream):
for op in self.childs:
stream = op(stream)
return stream
More important than the specifics of composing operations is how you’ll represent each frame. Supposedly, each frame will consist primarily of image data. However, some operations might want to add metadata (e.g. one operation might calculate the colour distribution of a frame). It would therefore make a lot of sense to have each frame contain a dicitionary that can be filled with fairly unstructured data. Since we want the operations to be freely composable, it would not be viable to define multiple frame classes as output of each operation since that way metadata from previous operations would be discarded.
But this permanent metadata has an important problem: it can be invalidated by subsequent operations. E.g. a colour profile would be invalidated by an operation that performs a gamma correction on the image data. One possibility to reconcile this would be to track for each operation which operations it invalidates. When checking for unmet dependencies, such invalidated operations would then be removed from the seen
state. However, this requires you to do extensive bookkeeping for each operation. Adding one new operation involves looking through all existing operations to discover potential incompatibilities.
Regarding first-class representation of the operations, I think that representing each operation as a callable object with some metadata such as dependencies tacked on to it makes most sense. As shown above, it’s easy to verify dependencies and build composable blocks out of this. It is also fairly easy to turn user input into a pipeline. Assuming you get a list of strings that refer to some operation, you can use a dictionary to map these strings to operations:
operations_map = { ... }
chosen_operations = []
for name in input_list:
chosen_operations.append(operations_map[name]) # TODO error handling
pipeline = Pipeline(chosen_operations)
if pipeline.dependencies:
# oops
result_stream = pipeline(input_stream)
However, it might be good to have each operation contain a name
field to ease creation of meaningful error messages etc.
8