"""
The :ref:`Workflow<WorkflowClass>` represents a black box for an SLU microservice, and its terminals, :ref:`input <Input>` and :ref:`output <Output>`
represent request response structure of the SLU API.
.. note::
The :ref:`Input<Input>` has some extra properties that aren't part of the SLU API.
We have kept reserves for intermediates, and flattened as most other nested inputs
aren't required and add to bloat.
.. attention::
:ref:`Input<Input>` and :ref:`Output<Output>` instances are immutable.
What does it do?
----------------
Produce the response for a single turn of a conversation. It recieves (not an exhaustive list):
- Utterance.
- Timezone
- Language code
- Locale
- Current State of the conversation
- Slot tracker
as :ref:`inputs<Input>`, parses these, produces a few intermediates and finally returns the :ref:`intent <Intent>`
and optionally :ref:`entities <BaseEntity>`. Details will be documented within the specific plugins.
.. mermaid::
classDiagram
direction TB
Workflow --> "1" Input: has
Workflow --> "1" Output: has
Workflow --> "many" Plugin: has
class Workflow {
Workflow: +Input input
Workflow: +Output output
Workflow: +set(path: str, value: Any)
Workflow: +run(input: Input)
}
class Input {
+List~Utterance~ utterance
+int reference_time
+str current_state
+str lang
+str locale
+dict slot_tracker
+str timezone
+dict json()
}
class Output {
+List~Intent~ intents
+List~BaseEntity~ entities
+dict json()
}
<<abstract>> Plugin
class Plugin {
+str dest
*any utility(input: Input, output: Output)
}
How does it do it?
------------------
- A workflow contains a sequence of :ref:`plugins <AbstractPlugin>`. The sequence is important.
- The sequence describes the order in which :ref:`plugins <AbstractPlugin>` are run.
- The plugins can save their output within the workflow's :ref:`input<Input>` or :ref:`output<Output>`.
- After execution of all plugins, the :ref:`workflow <WorkflowClass>` returns a pair of serialized :ref:`input<Input>` and :ref:`output<Output>`.
.. mermaid::
flowchart TB
subgraph Workflow
subgraph input
utterance
reference_time
end
subgraph output
intents
entities
end
input --> plugin
output --> plugin
plugin -->|update| input
plugin -->|update| output
plugin -->|foreach| plugin
end
output --> output_json
input --> input_json
output_json --> workflow_output
input_json --> workflow_output
subgraph Response
output_json
end
"""
from __future__ import annotations
import copy
import time
from threading import Lock
from typing import Any, Dict, List, Optional, Tuple
import attr
import pandas as pd
from dialogy import constants as const
from dialogy.base.input import Input
from dialogy.base.output import Output
from dialogy.base.plugin import Plugin
from dialogy.utils.logger import logger
[docs]@attr.s
class Workflow:
"""
SLU API blackbox.
.. _WorkflowClass:
"""
plugins = attr.ib(
factory=list,
type=List[Plugin],
validator=attr.validators.instance_of(list),
)
"""
List of :ref:`plugins <AbstractPlugin>`.
"""
input: Optional[Input] = attr.ib(
default=None,
kw_only=True,
validator=attr.validators.optional(attr.validators.instance_of(Input)),
)
"""
Represents the SLU API request object. A few nested properties and intermediates are flattened out
for readability.
"""
output: Optional[Output] = attr.ib(
default=None,
kw_only=True,
validator=attr.validators.optional(attr.validators.instance_of(Output)),
)
"""
Represents the SLU API response object.
"""
debug = attr.ib(
type=bool, default=False, validator=attr.validators.instance_of(bool)
)
"""
Switch on/off the debug logs for the workflow.
"""
lock: Lock
"""
A :ref:`workflow <WorkflowClass>` isn't thread-safe by itself. We need locks
to prevent race conditions.
"""
NON_SERIALIZABLE_FIELDS = [const.PLUGINS, const.DEBUG, const.LOCK]
def __attrs_post_init__(self) -> None:
"""
Post init hook.
"""
self.__reset()
self.lock = Lock()
for plugin in self.plugins:
if isinstance(plugin, Plugin):
plugin.debug = self.debug & plugin.debug
def __reset(self) -> None:
"""
Use this method to keep workflow-io in the same format as expected.
"""
self.input = None
self.output = Output()
[docs] def set(self, path: str, value: Any, replace: bool = False, sort_output_attributes: bool = True) -> Workflow:
"""
Set attribute path with value.
This method (re)-sets the input or output object without losing information
from previous instances.
:param path: A '.' separated attribute path.
:type path: str
:param value: A value to set.
:type value: Any
:param sort_output_attributes: A boolean to sort output attributes.
:type value: bool
:return: This instance
:rtype: Workflow
"""
dest, attribute = path.split(".")
if dest == const.INPUT:
self.input = Input.from_dict({attribute: value}, reference=self.input)
elif attribute in const.OUTPUT_ATTRIBUTES and isinstance(value, (list, dict)):
if not replace and isinstance(value, list):
previous_value = self.output.intents if attribute == const.INTENTS else self.output.entities # type: ignore
if sort_output_attributes:
value = sorted(
previous_value + value,
key=lambda parse: parse.score or 0,
reverse=True,
)
else:
value = previous_value + value
self.output = Output.from_dict({attribute: value}, reference=self.output)
elif dest == const.OUTPUT:
raise ValueError(f"{value=} should be a List[Intent] or List[BaseEntity].")
else:
raise ValueError(f"{path} is not a valid path.")
return self
[docs] def execute(self) -> Workflow:
"""
Update input, output attributes.
We iterate through pre/post processing functions and update the input and
output attributes of the class. It is expected that pre-processing functions
would modify the input, and post-processing functions would modify the output.
"""
history = {}
for plugin in self.plugins:
if not callable(plugin):
raise TypeError(f"{plugin=} is not a callable")
# logs are available only when debug=False during class initialization
if self.debug:
history = {
"plugin": plugin,
"before": {
"input": self.input,
"output": self.output,
},
}
start = time.perf_counter()
plugin(self)
end = time.perf_counter()
# logs are available only when debug=False during class initialization
if self.debug:
history["after"] = {
"input": self.input,
"output": self.output,
}
history["perf"] = round(end - start, 4)
if history:
logger.debug(history)
return self
[docs] def run(self, input_: Input) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""
.. _workflow_run:
Get final results from the workflow.
The current workflow exhibits the following simple procedure:
pre-processing -> inference -> post-processing.
"""
with self.lock:
self.input = input_
try:
return self.execute().flush()
finally:
self.__reset()
[docs] def flush(self) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""
Reset :code:`workflow.input` and :code:`workflow.output`.
"""
if self.input is None or self.output is None:
return {}, {}
input_ = copy.deepcopy(self.input.json())
output = copy.deepcopy(self.output.json())
self.__reset()
return input_, output
[docs] def json(self) -> Dict[str, Any]:
"""
Represent the workflow as a python dict.
:rtype: Dict[str, Any]
"""
return attr.asdict(
self,
filter=lambda attribute, _: attribute.name
not in self.NON_SERIALIZABLE_FIELDS,
)
[docs] def train(self, training_data: pd.DataFrame) -> Workflow:
"""
Train all the plugins in the workflow.
Plugin's have a no-op train method by default. The one's that do require training
should override this method. All trainable plugins manage their data validation
(in case the data isn't suitable for them) and saving/loading artifacts.
:param training_data: [description]
:type training_data: pd.DataFrame
"""
for plugin in self.plugins:
plugin.train(training_data)
transformed_data = plugin.transform(training_data)
if transformed_data is not None:
training_data = transformed_data
return self
47