olivepy
messaging
module
olivepy.messaging.msgutil
Contains data structures that map message_type enum values to protobuf types and vice versa. Should be updated whenever new message types are added.
AllowableErrorFromServer (Exception)
This exception means that the server could not complete a request; however, the reason it could not do isn't considered an error. This special case most often occurs when requesting analysis of a submission that contains no speech, in which case the analysis could not complete since there was not speech and not due to an error running the plugin. Otherwise, this is identical to Python's plain old Exception
AudioTransferType (Enum)
The method used to send audio to the OLIVE server. There are three options for sending audio to the server:
-
AUDIO_PATH: Send the path of the audio file to the server. NOTE: If using this option, the path must be accessible to the server
-
AUDIO_DECODED: Send the audio as a buffer of decoded samples (PCM-16). This option is not well supported by this client since it does not
-
AUDIO_SERIALIZED: Send the file as a binary buffer
ExceptionFromServer (Exception)
This exception means that an error occured on the server side, and this error is being sent "up the chain" on the client side. Otherwise, it is identical to Python's plain old Exception
InputTransferType (Enum)
The method used to send audio/data to the OLIVE server. There are three options for sending data to the server:
-
PATH: Send the path of the audio file to the server. NOTE: If using this option, the path must be accessible to the server
-
DECODED: Send the audio as a buffer of decoded samples (PCM-16). This option is not well supported by this client since it does not
-
SERIALIZED: Send the file as a binary buffer
OliveInputDataType (Enum)
The type of input data send to the OLIVE server.
package_audio(audio_msg, audio_data, annotations=None, selected_channel=None, mode=<InputTransferType.PATH: 1>, num_channels=None, sample_rate=None, num_samples=None, validate_local_path=True, label=None)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
audio_msg |
Audio |
the Olive Audio message to populate |
required |
audio_data |
~AnyStr |
either a filename or binary buffer |
required |
annotations |
a list of tuple start/end regions (in seconds) |
None |
|
selected_channel |
if audio_data is multi-channel then select this channel for processing |
None |
|
mode |
the submission mode: pathname, serialized, samples |
<InputTransferType.PATH: 1> |
|
num_channels |
the number of channels in the audio |
None |
|
sample_rate |
the sample rate of the audio |
None |
|
num_samples |
the number of samples in the audio. |
None |
|
validate_local_path |
if sending audio as a path, throw an exception if the file does not exist. We let this be an option for the possible case where the client may want to provide a path on the server's filesystem, but not the local filesystem. |
True |
Returns:
Type | Description |
---|---|
a valid Audio message |
Source code in olivepy/messaging/msgutil.py
def package_audio(audio_msg: Audio,
audio_data: AnyStr,
annotations=None,
selected_channel=None,
mode=InputTransferType.PATH,
num_channels=None,
sample_rate=None,
num_samples=None,
validate_local_path=True,
label=None):
"""
:param audio_msg: the Olive Audio message to populate
:param audio_data: either a filename or binary buffer
:param annotations: a list of tuple start/end regions (in seconds)
:param selected_channel: if audio_data is multi-channel then select this channel for processing
:param mode: the submission mode: pathname, serialized, samples
:param num_channels: the number of channels in the audio
:param sample_rate: the sample rate of the audio
:param num_samples: the number of samples in the audio.
:param validate_local_path: if sending audio as a path, throw an exception if the file does not exist. We let this be an option for the possible case where the client may want to provide a path on the server's filesystem, but not the local filesystem.
:return: a valid Audio message
:raises Exception if unable to package the audio for the specified mode.
"""
if mode != InputTransferType.PATH and mode != InputTransferType.DECODED and mode != InputTransferType.SERIALIZED:
raise Exception(
'Called package_audio with an unknown mode. Must be PATH, DECODED, or SERIALIZED.')
# only supporting pathname now
if mode == InputTransferType.PATH:
if validate_local_path:
if not os.path.exists(audio_data):
raise Exception(
"Error creating an OLIVE Audio message, the Audio file '{}' does not exist.".format(audio_data))
audio_msg.path = audio_data
else:
audio_buffer = audio_msg.audioSamples
if isinstance(audio_data, bytes):
# audio has already been converted to a buffer... no need to change
audio_buffer.data = audio_data
else:
# Assume we have a filename with we will serialize (decoded samples not supported)
if mode != InputTransferType.SERIALIZED:
raise Exception("Converting '{}' into a decoded buffer is not supported. Client must "
"manually decode the file and pass bytes to package_audio()".format(audio_data))
buffer = serialize_audio(audio_data)
audio_buffer.data = buffer
if mode == InputTransferType.SERIALIZED:
# olive.proto says these are all ignored for serialized buffers:
# channels, rate, bitdepth, channels
audio_buffer.serialized_file = True
if mode == InputTransferType.DECODED:
# This mode assumes the client has passed in a numpy array of samples, but we don't assume numpy is
# installed for all clients so we don't do checks in ths this code
# Get the data as shorts:
# not if audio_data.dtype.kind == np.dtype(np.integer).kind:
# audio_data = audio_data.astype( np.int16 ).flatten().tolist()
# raise Exception("Error: Transferring decoded samples not supported")
problem = ''
if num_channels is None or num_channels == 0:
problem += 'channel '
if sample_rate is None or sample_rate == 0:
problem += 'sample_rate '
if num_samples is None or num_samples == 0:
problem += 'num_samples'
if problem != '':
raise Exception('Error: can not create an OLIVE audio message from decoded samples because missing required argument(s): {}'.format(problem))
audio_buffer.serialized_file = False
audio_buffer.channels = num_channels
audio_buffer.rate = sample_rate
audio_buffer.samples = num_samples
audio_buffer.bit_depth = BIT_DEPTH_16
audio_buffer.encoding = PCM16
if annotations:
for a in annotations:
# np.float32(a[0] would be better but can we assume numpy is installed?
region = audio_msg.regions.add()
region.start_t = a[0]
region.end_t = a[1]
if selected_channel:
# we can't do much validation, but if they selected a channel and specified the number of channels
if num_channels:
if selected_channel > num_channels:
raise Exception(
"Error: can not select channel '{}' if audio only contains '{}' channel(s)".format(selected_channel, num_channels))
if selected_channel < 1:
raise Exception(
"Error: invalid value for selected channel '{}'. Channel must be 1 or higher ".format(selected_channel))
audio_msg.selected_channel = selected_channel
if label:
audio_msg.label = label
return audio_msg
package_binary_media(binary_media_msg, media_data, annotations=None, mode=<InputTransferType.PATH: 1>, validate_local_path=True, label=None, selected_channel=None)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
binary_media_msg |
BinaryMedia |
the Olive BinaryMedia message to populate |
required |
media_data |
~AnyStr |
either a filename or binary buffer |
required |
annotations |
a list of tuple start/end regions (in seconds) |
None |
|
mode |
the submission mode: pathname, serialized, samples |
<InputTransferType.PATH: 1> |
|
validate_local_path |
if sending audio as a path, throw an exception if the file does not exist. We let this be an option for the possible case where the client may want to provide a path on the server's filesystem, but not the local filesystem. |
True |
Returns:
Type | Description |
---|---|
a valid Audio message |
Source code in olivepy/messaging/msgutil.py
def package_binary_media(binary_media_msg: BinaryMedia,
media_data: AnyStr,
annotations=None,
mode=InputTransferType.PATH,
validate_local_path=True,
label=None,
selected_channel=None):
"""
:param binary_media_msg: the Olive BinaryMedia message to populate
:param media_data: either a filename or binary buffer
:param annotations: a list of tuple start/end regions (in seconds)
:param mode: the submission mode: pathname, serialized, samples
:param validate_local_path: if sending audio as a path, throw an exception if the file does not exist. We let this be an option for the possible case where the client may want to provide a path on the server's filesystem, but not the local filesystem.
:return: a valid Audio message
:raises Exception if unable to package the audio for the specified mode.
"""
print("adding binary media")
# todo support selected channel (if audio is to be handled form this data), label, and annotations
if mode != InputTransferType.PATH and mode != InputTransferType.SERIALIZED:
raise Exception(
'Called package_visual_media with an unknown mode. Must be AUDIO_PATH, or AUDIO_SERIALIZED.')
# only supporting pathname now
if mode == InputTransferType.PATH:
if validate_local_path:
if not os.path.exists(media_data):
raise Exception(
"Error creating an OLIVE media message, the Audio file '{}' does not exist.".format(media_data))
binary_media_msg.path = media_data
else:
media_buffer = binary_media_msg.buffer
if isinstance(media_data, bytes):
# audio has already been converted to a buffer... no need to change
media_buffer.data = media_data
else:
# Assume we have a filename with we will serialize (decoded samples not supported)
if mode != InputTransferType.SERIALIZED:
raise Exception("Converting '{}' into a decoded buffer is not supported.".format(media_data))
buffer = serialize_audio(media_data)
media_buffer.data = buffer
if label:
binary_media_msg.label = label
if selected_channel:
binary_media_msg.selected_channel = selected_channel
if annotations:
classic_region = binary_media_msg.regions.add()
for a in annotations:
# np.float32(a[0] would be better but can we assume numpy is installed?
region = classic_region.regions.add()
# print("Adding region: {} to {}".format(a[0], a[1]))
region.start_t = a[0]
region.end_t = a[1]
return binary_media_msg
serialize_audio(filename)
Helper function used to read in an audio file and output a serialized buffer. Can be used with package_audio() when using the AUDIO_SERIALIZED mode and the audio input has not already been serialized
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
the local path to the file to serialize |
required |
Returns:
Type | Description |
---|---|
~AnyStr |
the contents of the file as a byte buffer, otherwise an exception if the file can not be opened. This buffer contains the raw content of the file, it does NOT contain encoded samples |
Source code in olivepy/messaging/msgutil.py
def serialize_audio(filename: str) -> AnyStr:
"""
Helper function used to read in an audio file and output a serialized buffer. Can be used with package_audio() \
when using the AUDIO_SERIALIZED mode and the audio input has not already been serialized
:param filename: the local path to the file to serialize
:return: the contents of the file as a byte buffer, otherwise an exception if the file can not be opened. \
This buffer contains the raw content of the file, it does NOT contain encoded samples
"""
if not os.path.exists(os.path.expanduser(filename)):
raise Exception(
"Error serializing an audio file, the file '{}' does not exist.".format(filename))
with open(os.path.expanduser(filename), 'rb') as f:
serialized_buffer = f.read()
# return the buffer
return serialized_buffer
olivepy.messaging.olive_pb2
olivepy.messaging.response
OliveClassStatusResponse (OliveServerResponse)
The container/wrapper for WorkflowClassStatusResult from an OLIVE server (when using the AsyncOliveClient). This is intended to make it easier for clients to handle the traditional (original) protobuf message results (such as RegionScorerResult) returned from the server.
get_workflow_type(self)
Return the type of workflow done in this response (analysis, enrollment, adaptation)
Returns:
Type | Description |
---|---|
A WorkflowType: WORKFLOW_ANALYSIS_TYPE, WORKFLOW_ENROLLMENT_TYPE, WORKFLOW_ADAPT_TYPE or an Exception if an non-workflow response message was wrapped |
Source code in olivepy/messaging/response.py
def get_workflow_type(self):
return WORKFLOW_ANALYSIS_TYPE
parse_from_response(self, request, response, message)
Create this response from the
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
required | ||
response |
required | ||
message |
required |
Returns:
Type | Description |
---|---|
Source code in olivepy/messaging/response.py
def parse_from_response(self, request, response, message):
OliveServerResponse.parse_from_response(self, request, response, message)
if self.is_error():
return
status_result = []
for jc in self._response.job_class:
job_name = jc.job_name
job_dict = dict()
self._job_names.add(job_name)
job_dict[KEY_JOB_NAME] = job_name
# we have a list of data items:
job_dict[KEY_TASkS] = {}
status_result.append(job_dict)
for task_class in jc.task:
task_class_dict = json.loads(MessageToJson(task_class, preserving_proto_field_name=True))
del task_class_dict['task_name']
if task_class.task_name not in job_dict[KEY_TASkS]:
job_dict[KEY_TASkS][task_class.task_name] = []
job_dict[KEY_TASkS][task_class.task_name].append(task_class_dict)
#job_dict[KEY_TASkS].append({task_class.task_name: task_class_dict})
self._json_result = status_result
to_json(self, indent=None)
Generate the response as a JSON string
Parameters:
Name | Type | Description | Default |
---|---|---|---|
indent |
if a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. |
None |
Returns:
Type | Description |
---|---|
the Workflow response as as JSON string: |
Source code in olivepy/messaging/response.py
def to_json(self, indent=None):
"""
Generate the response as a JSON string
:param indent: if a non-negative integer, then JSON array elements and object members will be pretty-printed with \
that indent level. An indent level of 0 will only insert newlines. ``None`` is the most compact \
representation. A negative value will return the JSON document
:return: the Workflow response as as JSON string:
"""
# consider setting preserving_proto_field_name to true
# return json.dumps(json.loads(MessageToJson(self._response, preserving_proto_field_name=True)), indent=indent)
if self.is_error():
return self.get_error()
if indent and indent < 0:
return json.loads(MessageToJson(self._response, preserving_proto_field_name=True))
return json.dumps(json.loads(MessageToJson(self._response, preserving_proto_field_name=True)), indent=indent, ensure_ascii=False)
OliveServerResponse
The default container/wrapper for responses from an OLIVE server (when using the AsyncOliveClient). This is intended to make it easier for clients to handle the traditional (original) protobuf message results (such as RegionScorerResult) returned from the server.
get_response(self)
The Protobuf message returned from the OLIVE server
Returns:
Type | Description |
---|---|
Source code in olivepy/messaging/response.py
def get_response(self):
"""
The Protobuf message returned from the OLIVE server
:return:
"""
# todo exception if none?
return self._response
get_workflow_type(self)
Return the type of workflow done in this response (analysis, enrollment, adaptation)
Returns:
Type | Description |
---|---|
A WorkflowType: WORKFLOW_ANALYSIS_TYPE, WORKFLOW_ENROLLMENT_TYPE, WORKFLOW_ADAPT_TYPE or an Exception if an non-workflow response message was wrapped |
Source code in olivepy/messaging/response.py
def get_workflow_type(self):
"""
Return the type of workflow done in this response (analysis, enrollment, adaptation)
:return: A WorkflowType: WORKFLOW_ANALYSIS_TYPE, WORKFLOW_ENROLLMENT_TYPE, WORKFLOW_ADAPT_TYPE or an Exception if an non-workflow response message was wrapped
"""
if not self._response:
raise Exception("No valid response")
if isinstance(self._request, WorkflowAnalysisRequest):
return WORKFLOW_ANALYSIS_TYPE
elif isinstance(self._request, WorkflowEnrollRequest):
return WORKFLOW_ENROLLMENT_TYPE
elif isinstance(self._request, WorkflowAdaptRequest):
return WORKFLOW_ADAPT_TYPE
elif isinstance(self._request, WorkflowUnenrollRequest):
return WORKFLOW_UNENROLLMENT_TYPE
raise Exception("Unknown Workflow Message: {}".format(type(self._request)))
parse_from_response(self, request, response, message)
Create this response from the
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
required | ||
response |
required | ||
message |
required |
Returns:
Type | Description |
---|---|
Source code in olivepy/messaging/response.py
def parse_from_response(self, request, response, message):
"""
Create this response from the
:param request:
:param response:
:param message:
:return:
"""
self._request = request
if message:
# No results due to error
self._iserror = True
self._message = message
# self._request = request
if response is not None:
try:
if response.HasField("error"):
self._iserror = True
self._message = response.error
else:
# we assume no errors
self._issuccessful = True
except:
# Some messages have no error field
self._issuccessful = True
self._response = response
to_json(self, indent=None)
Generate the response as a JSON string
Parameters:
Name | Type | Description | Default |
---|---|---|---|
indent |
if a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. |
None |
Returns:
Type | Description |
---|---|
the response as as JSON string: |
Source code in olivepy/messaging/response.py
def to_json(self, indent=None):
"""
Generate the response as a JSON string
:param indent: if a non-negative integer, then JSON array elements and object members will be pretty-printed with \
that indent level. An indent level of 0 will only insert newlines. ``None`` is the most compact \
representation. A negative value will return the JSON document
:return: the response as as JSON string:
"""
#consider setting preserving_proto_field_name to true
if indent and indent < 0:
return json.loads(MessageToJson(self._response, preserving_proto_field_name=True))
return json.dumps(json.loads(MessageToJson(self._response, preserving_proto_field_name=True)), indent=indent, ensure_ascii=False)
OliveWorkflowActualizedResponse (OliveServerResponse)
Extracts info from an actualized workflow definition
get_analysis_jobs(self)
Return the names of analysis jobs. Typically a workflow has just one job with multiple tasks, the most likely reason to have multiple jobs is for workflows using multi-channel audio so there may be a set of job tasks for each channel of audio submitted.
Returns:
Type | Description |
---|---|
a list of job names in the analysis |
Source code in olivepy/messaging/response.py
def get_analysis_jobs(self):
"""
Return the names of analysis jobs. Typically a workflow has just one job with multiple tasks, the most likely
reason to have multiple jobs is for workflows using multi-channel audio so there may be a set of job tasks for
each channel of audio submitted.
:return: a list of job names in the analysis
"""
return [job_dict[KEY_JOB_NAME] for job_dict in self._json_result]
#todo get analysis job name(s)
get_request_jobs(self, workflow_type)
return the jobs in the original request for the specified analysis type
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow_type |
the type of workflow (i.e. WORKFLOW_ANALYSIS_TYPE) |
required |
Returns:
Type | Description |
---|---|
the list of jobs for this type |
Source code in olivepy/messaging/response.py
def get_request_jobs(self, workflow_type):
"""
return the jobs in the original request for the specified analysis type
:param workflow_type: the type of workflow (i.e. WORKFLOW_ANALYSIS_TYPE)
:return: the list of jobs for this type
"""
if self._request is not None:
return get_workflow_jobs(self._request.workflow_definition, workflow_type)
raise Exception("No jobs for the requested workflow type: {}".format(workflow_type))
is_allowable_error(self)
Returns:
Type | Description |
---|---|
true if this response failed with an allowable error |
Source code in olivepy/messaging/response.py
def is_allowable_error(self):
"""
:return: true if this response failed with an allowable error
"""
return self._isallowable_error
parse_from_response(self, request, response, message)
Create this response from the
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
required | ||
response |
required | ||
message |
required |
Returns:
Type | Description |
---|---|
Source code in olivepy/messaging/response.py
def parse_from_response(self, request, response, message):
OliveServerResponse.parse_from_response(self, request, response, message)
# Now create a JSON representation from the response
# we will walk the tree and deserialize any encoded messages
if self.is_error():
# todo provide error info in JSON?
return
# make a new message for this type...
if not isinstance(self._request, WorkflowActualizeRequest):
# we received an some other workflow message so se don't need to convert it to json...
return
# we only parse the analyze part now
analysis_task = []
workflow_analysis_order_msg = None
for order in self._response.workflow.order:
if order.workflow_type == WORKFLOW_ANALYSIS_TYPE:
workflow_analysis_order_msg = order
break
if workflow_analysis_order_msg is None:
# no analysis results
return
# for job in self._response.job_result:
for job in workflow_analysis_order_msg.job_definition:
# create a dictionary for each job result
job_dict = dict()
job_name = job.job_name
self._job_names.add(job_name)
# and a dictionary of tasks:
# job_dict[KEY_TASkS] = {}
# add to our results - in most cases we will have just one job
analysis_task.append(job_dict)
# get data handling info for this job
data_prop = job.data_properties
job_dict['Data Input'] = json.loads(MessageToJson(data_prop, preserving_proto_field_name=True))
# if data_prop.mode == SPLIT:
# # Hack to make split/mulit-channel mode more clear
# job_dict['data']['mode'] = 'SPLIT: Process each channel as a job'
for task in job.tasks:
task_result_dict = json.loads(MessageToJson(task, preserving_proto_field_name=True))
# Deserialize message_data, and replace it in the task_result_dict
task_type_msg = self._extract_serialized_message(task.message_type, task.message_data)
task_result_dict[KEY_JOB_NAME] = job_name
task_result_dict['analysis'] = json.loads(MessageToJson(task_type_msg, preserving_proto_field_name=True))
del task_result_dict['message_data']
job_dict[task.consumer_result_label] = task_result_dict
self._json_result = analysis_task
to_json(self, indent=None)
Generate the response as a JSON string
Parameters:
Name | Type | Description | Default |
---|---|---|---|
indent |
if a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. |
None |
Returns:
Type | Description |
---|---|
the Workflow response as as JSON string: |
Source code in olivepy/messaging/response.py
def to_json(self, indent=None):
"""
Generate the response as a JSON string
:param indent: if a non-negative integer, then JSON array elements and object members will be pretty-printed with \
that indent level. An indent level of 0 will only insert newlines. ``None`` is the most compact \
representation. A negative value will return the JSON document
:return: the Workflow response as as JSON string:
"""
if indent and indent < 0:
return json.loads(MessageToJson(self._json_result, preserving_proto_field_name=True))
return json.dumps(self._json_result, indent=indent, ensure_ascii=False)
# return json.dumps(json.loads(MessageToJson(self._json_result, preserving_proto_field_name=False)), indent=indent)
OliveWorkflowAnalysisResponse (OliveServerResponse)
The default container/wrapper for responses from an OLIVE server (when using the AsyncOliveClient). This is intended to make it easier for clients to handle the traditional (original) protobuf message results (such as RegionScorerResult) returned from the server.
get_analysis_jobs(self)
Return the names of analysis jobs. Typically a workflow has just one job with multiple tasks, the most likely reason to have multiple jobs is for workflows using multi-channel audio so there may be a set of job tasks for each channel of audio submitted.
Returns:
Type | Description |
---|---|
a list of job names in the analysis |
Source code in olivepy/messaging/response.py
def get_analysis_jobs(self):
"""
Return the names of analysis jobs. Typically a workflow has just one job with multiple tasks, the most likely
reason to have multiple jobs is for workflows using multi-channel audio so there may be a set of job tasks for
each channel of audio submitted.
:return: a list of job names in the analysis
"""
return [job_dict[KEY_JOB_NAME] for job_dict in self._json_result]
#todo get analysis job name(s)
get_analysis_task_result(self, job_name, task_name)
Get the result(s) for the specified job_name and task_name, also include the data used for this task. If the workflow analyzes each channel in multi-channel data then there can be multiple jobs with the same name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_name |
for convenience can be None, since there is normally only one job. But if the workflow has multiple jobs then a valid name must be specified. |
required | |
task_name |
the name to the task |
required |
Returns:
Type | Description |
---|---|
a list of dictionaries, where each dictionary in the list includes the results for the specified task and a list of the data analyzed by this task, such as [ {task_name:{}, data:[] }] |
Source code in olivepy/messaging/response.py
def get_analysis_task_result(self, job_name, task_name):
"""
Get the result(s) for the specified job_name and task_name, also include the data used for this task. If
the workflow analyzes each channel in multi-channel data then there can be multiple jobs with the
same name.
:param job_name: for convenience can be None, since there is normally only one job. But if the workflow has multiple jobs then a valid name must be specified.
:param task_name: the name to the task
:return: a list of dictionaries, where each dictionary in the list includes the results for the specified task and a list of the data analyzed by this task, such as [ {task_name:{}, data:[] }]
"""
if job_name is None:
job_name = self._get_default_job_name()
results = []
for job_dict in self._json_result:
if job_name == job_dict[KEY_JOB_NAME]:
task_dict = dict()
# there may be one or more result for task_name
task_dict[task_name] = job_dict[KEY_TASkS][task_name]
task_dict[KEY_DATA] = job_dict[KEY_DATA]
results.append(task_dict)
return results
get_request_jobs(self, workflow_type)
return the jobs in the original request for the specified analysis type
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow_type |
the type of workflow (i.e. WORKFLOW_ANALYSIS_TYPE) |
required |
Returns:
Type | Description |
---|---|
the list of jobs for this type |
Source code in olivepy/messaging/response.py
def get_request_jobs(self, workflow_type):
"""
return the jobs in the original request for the specified analysis type
:param workflow_type: the type of workflow (i.e. WORKFLOW_ANALYSIS_TYPE)
:return: the list of jobs for this type
"""
if self._request is not None:
return get_workflow_jobs(self._request.workflow_definition, workflow_type)
raise Exception("No jobs for the requested workflow type: {}".format(workflow_type))
is_allowable_error(self)
Returns:
Type | Description |
---|---|
true if this response failed with an allowable error |
Source code in olivepy/messaging/response.py
def is_allowable_error(self):
"""
:return: true if this response failed with an allowable error
"""
return self._isallowable_error
parse_from_response(self, request, response, message)
Create this response from the
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
required | ||
response |
required | ||
message |
required |
Returns:
Type | Description |
---|---|
Source code in olivepy/messaging/response.py
def parse_from_response(self, request, response, message):
OliveServerResponse.parse_from_response(self, request, response, message)
# Now create a JSON representation from the response
# we will walk the tree and deserialize any encoded messages
if self.is_error():
self._json_result={}
self._json_result['error'] = self.get_error()
return
# make a new message for this type...
if isinstance(self._request, WorkflowActualizeRequest):
# we received an actualized workflow, so se don't need to convert it to json...
return
# this should only contain an analysis request... but check just in case:
wk_type = self.get_workflow_type()
if wk_type == WORKFLOW_ANALYSIS_TYPE or wk_type == WORKFLOW_ENROLLMENT_TYPE or wk_type == WORKFLOW_UNENROLLMENT_TYPE:
# analysis is a list of dictionary elements, which looks like: [ {job_name: X, data: [], tasks: {}} ]
# there is a dictionary for each job, but due to the way jobs work in OLIVE for mulit-channel data we
# consider a jobs to be unique by a combination of job_name plus data, so multiples dictionary elements may
# have the same job_name, but will have different data properties (channel numbers)
# get the analysis job order from the original request:
analysis_result = [] # or enrollment/unenrollment result
# analysis_result['jobs'] = []
# analysis_result['data inputs'] = []
job_requests = get_workflow_jobs(self._request.workflow_definition, wk_type)
for job in self._response.job_result:
# create a dictionary for each job result
job_dict = dict()
job_name = job.job_name
self._job_names.add(job_name)
# job_dict[job_name] = dict()
job_dict[KEY_JOB_NAME] = job_name
if job.error:
job_dict['error'] = job.error
# we have a list of data items:
job_dict[KEY_DATA] = []
# and a dictionary of tasks: (although note it is possible to have multiple tasks with the same name, so a task has a list of results)
job_dict[KEY_TASkS] = {}
# add to our results - in most cases we will have just one job
analysis_result.append(job_dict)
# get the tasks for the current (and likely, only) job:
task_requests = get_workflow_job_tasks(job_requests, job_name)
# task_result_dict = dict()
# I don't think this can happen yet:
# if job.HasField('error'):
# Allowable job error
#
for task in job.task_results:
task_result_dict = json.loads(MessageToJson(task, preserving_proto_field_name=True))
# check if this task failed with an error
if task.HasField('error'):
# Allowable error
if KEY_ERROR in job_dict:
job_dict[KEY_ERROR] = job_dict[KEY_ERROR] + "," + task.error
else:
job_dict[KEY_ERROR] = task.error
self._isallowable_error = True
# should have an empty message data;
del task_result_dict['message_data']
if job_name not in self._allowable_failed_job_tasks:
self._allowable_failed_job_tasks[job_name] = []
self._allowable_failed_job_tasks[job_name].append(task.task_name)
else:
# Deserialize message_data, and replace it in the task_result_dict
if task.message_type in msgutil.debug_message_map:
# Get the pimiento message (debug only - these messages are not guaranteed to be supported
print("CLG special msg type: {}".format(msgutil.MessageType.Name(task.message_type)))
pimiento_msg = self._extract_serialized_message(task.message_type, task.message_data)
if task.message_type == DATA_OUTPUT_TRANSFORMER_RESULT:
if pimiento_msg.data_type == TEXT:
# only supported type for now...
pie_data_msg = WorkflowTextResult()
pie_data_msg.ParseFromString(pimiento_msg.message_data)
task_result_dict['analysis'] = json.loads(
MessageToJson(pie_data_msg, preserving_proto_field_name=True))
else:
print("Unsupported debug message type: {}".format(msgutil.InputDataType.Name(pimiento_msg.data_type)))
elif task.message_type == SCORE_OUTPUT_TRANSFORMER_RESULT:
# these should be standard trait message
pie_score_msg = self._extract_serialized_message(pimiento_msg.message_type, pimiento_msg.message_data)
task_result_dict['analysis'] = json.loads(
MessageToJson(pie_score_msg, preserving_proto_field_name=True))
else:
task_type_msg = self._extract_serialized_message(task.message_type, task.message_data)
task_result_dict['analysis'] = json.loads(
MessageToJson(task_type_msg, preserving_proto_field_name=True))
# Should we create special handlers for analysis results, like we sort global score results,
# but what should be do with an AUDIO_MODIFICATION_RESULT?
if task.message_type == GLOBAL_SCORER_RESULT:
# Sort region scores
task_result_dict['analysis']['score'] = sorted(task_result_dict['analysis']['score'], key=sort_global_scores, reverse=True)
# messageData has been replaced with the actual task
del task_result_dict['message_data']
# taskName is the key, so remove it:
del task_result_dict['task_name']
# check if we need to add the plugin/domain name
if task.task_name in task_requests:
orig_task = task_requests[task.task_name]
if orig_task.message_type in msgutil.plugin_message_map:
# Get the original task
task_req_msg = self._extract_serialized_message(orig_task.message_type, orig_task.message_data)
task_result_dict['plugin'] = task_req_msg.plugin
task_result_dict['domain'] = task_req_msg.domain
if orig_task.message_type == CLASS_MODIFICATION_REQUEST or orig_task.message_type == CLASS_REMOVAL_REQUEST:
# add class ID
task_result_dict['class_id'] = self._request.class_id
# print("adding {} as {}".format(task_result_dict, task.task_name))
# fixme: there can be multiple taks with the same name if conditions in a workflow caused the task to be ran twice
if task.task_name not in job_dict[KEY_TASkS]:
job_dict[KEY_TASkS][task.task_name] = []
job_dict[KEY_TASkS][task.task_name].append(task_result_dict)
#job_dict[KEY_TASkS].append({task.task_name: task_result_dict})
# A job usually has one data input/output, but we handle as if there are multiple
for data_result in job.data_results:
data_result_dict = json.loads(MessageToJson(data_result, preserving_proto_field_name=True))
# Deserialize the data portion
data_type_msg = self._extract_serialized_message(data_result.msg_type, data_result.result_data)
del data_result_dict['result_data']
# del data_result_dict['dataId'] # redundant into, used as the key
# data_result_dict['data'] = json.loads(MessageToJson(data_type_msg))
data_result_dict.update(json.loads(MessageToJson(data_type_msg, preserving_proto_field_name=True)))
job_dict[KEY_DATA].append(data_result_dict)
# analysis_result['data inputs'].append(data_result_dict)
self._json_result = analysis_result
to_json(self, indent=None)
Generate the workflow as a JSON string
:indent: if a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. None
is the most compact representation. A negative value will return the JSON document
Returns:
Type | Description |
---|---|
the Workflow Definition as as JSON string: |
Source code in olivepy/messaging/response.py
def to_json(self, indent=None):
"""
Generate the workflow as a JSON string
:indent: if a non-negative integer, then JSON array elements and object members will be pretty-printed with \
that indent level. An indent level of 0 will only insert newlines. ``None`` is the most compact \
representation. A negative value will return the JSON document
:return: the Workflow Definition as as JSON string:
"""
# return json.dumps(self._json_result, indent=indent, ensure_ascii=False)
if indent and indent < 0:
return json.loads(MessageToJson(self._json_result, preserving_proto_field_name=True))
# return json.dumps(json.loads(MessageToJson(self._json_result, preserving_proto_field_name=True)), indent=indent, ensure_ascii=False)
return json.dumps(self._json_result, indent=indent, ensure_ascii=False)
OliveWorkflowEnrollmentResponse (OliveServerResponse)
The container/wrapper for responses from an OLIVE server (when using the AsyncOliveClient) for enrollment. This is intended to make it easier for clients to handle the traditional (original) protobuf message results (such as RegionScorerResult) returned from the server.
is_allowable_error(self)
Returns:
Type | Description |
---|---|
true if this message failed with an allowable error |
Source code in olivepy/messaging/response.py
def is_allowable_error(self):
"""
:return: true if this message failed with an allowable error
"""
return self._isallowable_error
parse_from_response(self, request, response, message)
Create this response from the
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
required | ||
response |
required | ||
message |
required |
Returns:
Type | Description |
---|---|
Source code in olivepy/messaging/response.py
def parse_from_response(self, request, response, message):
OliveServerResponse.parse_from_response(self, request, response, message)
# Now create a JSON representation from the response
# we will the tree and deserialize any encoded messages
if self.is_error():
# todo provide error info in JSON
return
# make a new message for this type...
if isinstance(self._request, WorkflowActualizeRequest):
# we received an actualized workflow, so se don't need to convert it to json...
return
# this should only contain an analysis request... but check just in case:
type = self.get_workflow_type()
if type == WORKFLOW_ENROLLMENT_TYPE:
# not much info in an enrollment response...
enroll_result = dict()
if self._response.HasField('error'):
enroll_result['error'] = self._response.error
else:
enroll_result['successful'] = True
self._json_result = enroll_result
to_json(self, indent=None)
Generate the response as a JSON string
Parameters:
Name | Type | Description | Default |
---|---|---|---|
indent |
if a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. |
None |
Returns:
Type | Description |
---|---|
the Workflow response as as JSON string: |
Source code in olivepy/messaging/response.py
def to_json(self, indent=None):
"""
Generate the response as a JSON string
:param indent: if a non-negative integer, then JSON array elements and object members will be pretty-printed with \
that indent level. An indent level of 0 will only insert newlines. ``None`` is the most compact \
representation. A negative value will return the JSON document
:return: the Workflow response as as JSON string:
"""
#consider setting preserving_proto_field_name to true
# return json.dumps(json.loads(MessageToJson(self._json_result, preserving_proto_field_name=True)), indent=indent)
if indent and indent < 0:
return json.loads(MessageToJson(self._json_result, preserving_proto_field_name=True))
return json.dumps(self._json_result, indent=indent, ensure_ascii=False)
get_workflow_job_names(workflow_definition, workflow_type)
parse a workflow definition, returning a list of job definition name (job_name)
return [job_name] for the requested workflow_type
Source code in olivepy/messaging/response.py
def get_workflow_job_names(workflow_definition, workflow_type):
""" parse a workflow definition, returning a list of job definition name (job_name)
return [job_name] for the requested workflow_type
"""
rtn_job_names = list()
if workflow_definition is not None:
for order in workflow_definition.order:
if order.workflow_type == workflow_type:
for job in order.job_definition:
rtn_job_names.append(job.job_name)
return rtn_job_names
get_workflow_job_tasks(jobs, job_name=None)
Fetch the tasks from a job
Parameters:
Name | Type | Description | Default |
---|---|---|---|
jobs |
a dictionary of WorkflowTasks, indexed by a job names |
required | |
job_name |
find tasks that belong to a job having this name. This can be None if there is only one job |
None |
Returns:
Type | Description |
---|---|
a dictionary of WorkflowTask indexed by the task's consumer_result_label for the specified job. An exception is thrown if there are multiple jobs but no job_name was specified |
Source code in olivepy/messaging/response.py
def get_workflow_job_tasks(jobs, job_name=None):
"""
Fetch the tasks from a job
:param jobs: a dictionary of WorkflowTasks, indexed by a job names
:param job_name: find tasks that belong to a job having this name. This can be None if there is only one job
:return: a dictionary of WorkflowTask indexed by the task's consumer_result_label for the specified job. An exception is thrown if there are multiple jobs but no job_name was specified
"""
if job_name is None:
if len(jobs) == 1:
# hack to make it easier to fetch data since most workflows only have one job
job_name = list(jobs.keys())[0]
else:
raise Exception("Must specify a job name when there are multiple JobDefinitions in a Workflow")
rtn_tasks = dict()
# and job_name is None:
if job_name in jobs:
for workflow_task in jobs[job_name]:
rtn_tasks[workflow_task.consumer_result_label] = workflow_task
# CLG: I don't think we need to generate a message/warning if a task name isn't in the original jobs since we can
# have dynamic jobs and with streaming jobs are very dynamic
# else:
# print("Job '{}' not one of the expected job names: {}".format(job_name, list(jobs.keys())))
return rtn_tasks
get_workflow_jobs(workflow_definition, workflow_type)
parse a workflow definition, returning a dictionary indexed job (definition) name (job_name) and a list of WorkflowTask elements.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow_definition |
find jobs in this workflow definition |
required | |
workflow_type |
the type of workflow order (analysis, enrollment, unenrollment) return {job_name: [WorkflowTask]} for the requested workflow_type |
required |
Source code in olivepy/messaging/response.py
def get_workflow_jobs(workflow_definition, workflow_type):
""" parse a workflow definition, returning a dictionary indexed job (definition) name (job_name) and a list of \
WorkflowTask elements.
:param workflow_definition: find jobs in this workflow definition
:param workflow_type: the type of workflow order (analysis, enrollment, unenrollment)
return {job_name: [WorkflowTask]} for the requested workflow_type
"""
rtn_jobs = dict()
if workflow_definition is not None:
for order in workflow_definition.order:
if order.workflow_type == workflow_type:
for job in order.job_definition:
rtn_jobs[job.job_name] = job.tasks
return rtn_jobs