Skip to content

olivepy messaging module

olivepy.messaging.msgutil

Contains data structures that map message_type enum values to protobuf types and vice versa. Should be updated whenever new message types are added.

AllowableErrorFromServer (Exception)

This exception means that the server could not complete a request; however, the reason it could not do isn't considered an error. This special case most often occurs when requesting analysis of a submission that contains no speech, in which case the analysis could not complete since there was not speech and not due to an error running the plugin. Otherwise, this is identical to Python's plain old Exception

AudioTransferType (Enum)

The method used to send audio to the OLIVE server. There are three options for sending audio to the server:

  1. AUDIO_PATH: Send the path of the audio file to the server. NOTE: If using this option, the path must be accessible to the server

  2. AUDIO_DECODED: Send the audio as a buffer of decoded samples (PCM-16). This option is not well supported by this client since it does not

  3. AUDIO_SERIALIZED: Send the file as a binary buffer

ExceptionFromServer (Exception)

This exception means that an error occured on the server side, and this error is being sent "up the chain" on the client side. Otherwise, it is identical to Python's plain old Exception

InputTransferType (Enum)

The method used to send audio/data to the OLIVE server. There are three options for sending data to the server:

  1. PATH: Send the path of the audio file to the server. NOTE: If using this option, the path must be accessible to the server

  2. DECODED: Send the audio as a buffer of decoded samples (PCM-16). This option is not well supported by this client since it does not

  3. SERIALIZED: Send the file as a binary buffer

OliveInputDataType (Enum)

The type of input data send to the OLIVE server.

package_audio(audio_msg, audio_data, annotations=None, selected_channel=None, mode=<InputTransferType.PATH: 1>, num_channels=None, sample_rate=None, num_samples=None, validate_local_path=True, label=None)

Parameters:

Name Type Description Default
audio_msg Audio

the Olive Audio message to populate

required
audio_data ~AnyStr

either a filename or binary buffer

required
annotations

a list of tuple start/end regions (in seconds)

None
selected_channel

if audio_data is multi-channel then select this channel for processing

None
mode

the submission mode: pathname, serialized, samples

<InputTransferType.PATH: 1>
num_channels

the number of channels in the audio

None
sample_rate

the sample rate of the audio

None
num_samples

the number of samples in the audio.

None
validate_local_path

if sending audio as a path, throw an exception if the file does not exist. We let this be an option for the possible case where the client may want to provide a path on the server's filesystem, but not the local filesystem.

True

Returns:

Type Description

a valid Audio message

Source code in olivepy/messaging/msgutil.py
def package_audio(audio_msg: Audio,
                  audio_data: AnyStr,
                  annotations=None,
                  selected_channel=None,
                  mode=InputTransferType.PATH,
                  num_channels=None,
                  sample_rate=None,
                  num_samples=None,
                  validate_local_path=True,
                  label=None):
    """
    :param audio_msg: the Olive Audio message to populate
    :param audio_data:  either a filename or binary buffer
    :param annotations: a list of tuple start/end regions (in seconds)
    :param selected_channel: if audio_data is multi-channel then select this channel for processing
    :param mode: the submission mode: pathname, serialized, samples
    :param num_channels: the number of channels in the audio
    :param sample_rate: the sample rate of the audio
    :param num_samples: the number of samples in the audio.
    :param validate_local_path: if sending audio as a path, throw an exception if the file does not exist. We let this be an option for the possible case where the client may want to provide a path on the server's filesystem, but not the local filesystem.

    :return: a valid Audio message

    :raises Exception if unable to package the audio for the specified mode.
    """

    if mode != InputTransferType.PATH and mode != InputTransferType.DECODED and mode != InputTransferType.SERIALIZED:
        raise Exception(
            'Called package_audio with an unknown mode. Must be PATH, DECODED, or SERIALIZED.')

    # only supporting pathname now
    if mode == InputTransferType.PATH:
        if validate_local_path:
            if not os.path.exists(audio_data):
                raise Exception(
                    "Error creating an OLIVE Audio message, the Audio file '{}' does not exist.".format(audio_data))

        audio_msg.path = audio_data
    else:
        audio_buffer = audio_msg.audioSamples
        if isinstance(audio_data, bytes):
            # audio has already been converted to a buffer... no need to change
            audio_buffer.data = audio_data
        else:
            # Assume we have a filename with we will serialize (decoded samples not supported)
            if mode != InputTransferType.SERIALIZED:
                raise Exception("Converting '{}' into a decoded buffer is not supported.  Client must "
                                "manually decode the file and pass bytes to package_audio()".format(audio_data))
            buffer = serialize_audio(audio_data)
            audio_buffer.data = buffer

        if mode == InputTransferType.SERIALIZED:
            # olive.proto says these are all ignored for serialized buffers:
            # channels, rate, bitdepth, channels
            audio_buffer.serialized_file = True

        if mode == InputTransferType.DECODED:
            # This mode assumes the client has passed in a numpy array of samples, but we don't assume numpy is
            # installed for all clients so we don't do checks in ths this code

            # Get the data as shorts:
            # not if audio_data.dtype.kind == np.dtype(np.integer).kind:
            # audio_data = audio_data.astype( np.int16 ).flatten().tolist()
            # raise Exception("Error: Transferring decoded samples not supported")

            problem = ''
            if num_channels is None or num_channels == 0:
                problem += 'channel '
            if sample_rate is None or sample_rate == 0:
                problem += 'sample_rate '
            if num_samples is None or num_samples == 0:
                problem += 'num_samples'
            if problem != '':
                raise Exception('Error: can not create an OLIVE audio message from decoded samples because missing required argument(s): {}'.format(problem))
            audio_buffer.serialized_file    = False
            audio_buffer.channels = num_channels
            audio_buffer.rate = sample_rate
            audio_buffer.samples = num_samples
            audio_buffer.bit_depth = BIT_DEPTH_16
            audio_buffer.encoding = PCM16

    if annotations:
        for a in annotations:
            # np.float32(a[0] would be better but can we assume numpy is installed?
            region = audio_msg.regions.add()
            region.start_t = a[0]
            region.end_t = a[1]

    if selected_channel:
        # we can't do much validation, but if they selected a channel and specified the number of channels
        if num_channels:
            if selected_channel > num_channels:
                raise Exception(
                    "Error: can not select channel '{}' if audio only contains '{}' channel(s)".format(selected_channel, num_channels))

        if selected_channel < 1:
            raise Exception(
                "Error: invalid value for selected channel '{}'.  Channel must be 1 or higher ".format(selected_channel))

        audio_msg.selected_channel = selected_channel

    if label:
        audio_msg.label = label

    return audio_msg

package_binary_media(binary_media_msg, media_data, annotations=None, mode=<InputTransferType.PATH: 1>, validate_local_path=True, label=None, selected_channel=None)

Parameters:

Name Type Description Default
binary_media_msg BinaryMedia

the Olive BinaryMedia message to populate

required
media_data ~AnyStr

either a filename or binary buffer

required
annotations

a list of tuple start/end regions (in seconds)

None
mode

the submission mode: pathname, serialized, samples

<InputTransferType.PATH: 1>
validate_local_path

if sending audio as a path, throw an exception if the file does not exist. We let this be an option for the possible case where the client may want to provide a path on the server's filesystem, but not the local filesystem.

True

Returns:

Type Description

a valid Audio message

Source code in olivepy/messaging/msgutil.py
def package_binary_media(binary_media_msg: BinaryMedia,
                         media_data: AnyStr,
                         annotations=None,
                         mode=InputTransferType.PATH,
                         validate_local_path=True,
                         label=None,
                         selected_channel=None):
    """
    :param binary_media_msg: the Olive BinaryMedia message to populate
    :param media_data: either a filename or binary buffer
    :param annotations: a list of tuple start/end regions (in seconds)
    :param mode: the submission mode: pathname, serialized, samples
    :param validate_local_path: if sending audio as a path, throw an exception if the file does not exist. We let this be an option for the possible case where the client may want to provide a path on the server's filesystem, but not the local filesystem.

    :return: a valid Audio message

    :raises Exception if unable to package the audio for the specified mode.
    """

    print("adding binary media")
    # todo support selected channel (if audio is to be handled form this data), label, and annotations
    if mode != InputTransferType.PATH  and mode != InputTransferType.SERIALIZED:
        raise Exception(
            'Called package_visual_media with an unknown mode. Must be AUDIO_PATH, or AUDIO_SERIALIZED.')

    # only supporting pathname now
    if mode == InputTransferType.PATH:
        if validate_local_path:
            if not os.path.exists(media_data):
                raise Exception(
                    "Error creating an OLIVE media message, the Audio file '{}' does not exist.".format(media_data))

        binary_media_msg.path = media_data
    else:
        media_buffer = binary_media_msg.buffer
        if isinstance(media_data, bytes):
            # audio has already been converted to a buffer... no need to change
            media_buffer.data = media_data
        else:
            # Assume we have a filename with we will serialize (decoded samples not supported)
            if mode != InputTransferType.SERIALIZED:
                raise Exception("Converting '{}' into a decoded buffer is not supported.".format(media_data))
            buffer = serialize_audio(media_data)
            media_buffer.data = buffer

    if label:
        binary_media_msg.label = label
    if selected_channel:
        binary_media_msg.selected_channel = selected_channel

    if annotations:
        classic_region = binary_media_msg.regions.add()
        for a in annotations:
            # np.float32(a[0] would be better but can we assume numpy is installed?
            region = classic_region.regions.add()
            # print("Adding region: {} to {}".format(a[0], a[1]))
            region.start_t = a[0]
            region.end_t = a[1]

    return binary_media_msg

serialize_audio(filename)

Helper function used to read in an audio file and output a serialized buffer. Can be used with package_audio() when using the AUDIO_SERIALIZED mode and the audio input has not already been serialized

Parameters:

Name Type Description Default
filename str

the local path to the file to serialize

required

Returns:

Type Description
~AnyStr

the contents of the file as a byte buffer, otherwise an exception if the file can not be opened. This buffer contains the raw content of the file, it does NOT contain encoded samples

Source code in olivepy/messaging/msgutil.py
def serialize_audio(filename: str) -> AnyStr:
    """
    Helper function used to read in an audio file and output a serialized buffer.  Can be used with package_audio() \
    when using the AUDIO_SERIALIZED mode and the audio input has not already been serialized

    :param filename: the local path to the file to serialize

    :return: the contents of the file as a byte buffer, otherwise an exception if the file can not be opened.  \
    This buffer contains the raw content of the file, it does NOT contain encoded samples
    """

    if not os.path.exists(os.path.expanduser(filename)):
        raise Exception(
            "Error serializing an audio file, the  file '{}' does not exist.".format(filename))

    with open(os.path.expanduser(filename), 'rb') as f:
        serialized_buffer = f.read()

    # return the buffer
    return serialized_buffer

olivepy.messaging.olive_pb2

olivepy.messaging.response

OliveClassStatusResponse (OliveServerResponse)

The container/wrapper for WorkflowClassStatusResult from an OLIVE server (when using the AsyncOliveClient). This is intended to make it easier for clients to handle the traditional (original) protobuf message results (such as RegionScorerResult) returned from the server.

get_workflow_type(self)

Return the type of workflow done in this response (analysis, enrollment, adaptation)

Returns:

Type Description

A WorkflowType: WORKFLOW_ANALYSIS_TYPE, WORKFLOW_ENROLLMENT_TYPE, WORKFLOW_ADAPT_TYPE or an Exception if an non-workflow response message was wrapped

Source code in olivepy/messaging/response.py
def get_workflow_type(self):
    return WORKFLOW_ANALYSIS_TYPE

parse_from_response(self, request, response, message)

Create this response from the

Parameters:

Name Type Description Default
request required
response required
message required

Returns:

Type Description
Source code in olivepy/messaging/response.py
def parse_from_response(self, request, response, message):
    OliveServerResponse.parse_from_response(self, request, response, message)

    if self.is_error():
        return

    status_result = []
    for jc in self._response.job_class:
        job_name = jc.job_name
        job_dict = dict()
        self._job_names.add(job_name)

        job_dict[KEY_JOB_NAME] = job_name
        # we have a list of data items:
        job_dict[KEY_TASkS] = {}
        status_result.append(job_dict)

        for task_class in jc.task:
            task_class_dict = json.loads(MessageToJson(task_class, preserving_proto_field_name=True))
            del task_class_dict['task_name']
            if task_class.task_name not in job_dict[KEY_TASkS]:
                job_dict[KEY_TASkS][task_class.task_name] = []
            job_dict[KEY_TASkS][task_class.task_name].append(task_class_dict)
            #job_dict[KEY_TASkS].append({task_class.task_name: task_class_dict})

    self._json_result = status_result

to_json(self, indent=None)

Generate the response as a JSON string

Parameters:

Name Type Description Default
indent

if a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. None is the most compact representation. A negative value will return the JSON document

None

Returns:

Type Description

the Workflow response as as JSON string:

Source code in olivepy/messaging/response.py
def to_json(self, indent=None):
    """
       Generate the response as a JSON string
       :param indent:  if a non-negative integer, then JSON array elements and object members will be pretty-printed with \
       that indent level. An indent level of 0 will only insert newlines. ``None`` is the most compact  \
       representation. A negative value will return the JSON document

       :return: the Workflow response as as JSON string:
    """
    # consider setting preserving_proto_field_name to true
    # return json.dumps(json.loads(MessageToJson(self._response, preserving_proto_field_name=True)), indent=indent)

    if self.is_error():
        return self.get_error()
    if indent and indent < 0:
        return json.loads(MessageToJson(self._response, preserving_proto_field_name=True))
    return json.dumps(json.loads(MessageToJson(self._response, preserving_proto_field_name=True)), indent=indent, ensure_ascii=False)

OliveServerResponse

The default container/wrapper for responses from an OLIVE server (when using the AsyncOliveClient). This is intended to make it easier for clients to handle the traditional (original) protobuf message results (such as RegionScorerResult) returned from the server.

get_response(self)

The Protobuf message returned from the OLIVE server

Returns:

Type Description
Source code in olivepy/messaging/response.py
def get_response(self):
    """
    The Protobuf message returned from the OLIVE server
    :return:
    """
    # todo exception if none?
    return self._response

get_workflow_type(self)

Return the type of workflow done in this response (analysis, enrollment, adaptation)

Returns:

Type Description

A WorkflowType: WORKFLOW_ANALYSIS_TYPE, WORKFLOW_ENROLLMENT_TYPE, WORKFLOW_ADAPT_TYPE or an Exception if an non-workflow response message was wrapped

Source code in olivepy/messaging/response.py
def get_workflow_type(self):
    """
    Return the type of workflow done in this response (analysis, enrollment, adaptation)

    :return: A WorkflowType: WORKFLOW_ANALYSIS_TYPE, WORKFLOW_ENROLLMENT_TYPE, WORKFLOW_ADAPT_TYPE or an Exception if an non-workflow response message was wrapped
    """

    if not self._response:
        raise Exception("No valid response")

    if isinstance(self._request, WorkflowAnalysisRequest):
        return WORKFLOW_ANALYSIS_TYPE
    elif isinstance(self._request, WorkflowEnrollRequest):
        return WORKFLOW_ENROLLMENT_TYPE
    elif isinstance(self._request, WorkflowAdaptRequest):
        return WORKFLOW_ADAPT_TYPE
    elif isinstance(self._request, WorkflowUnenrollRequest):
        return WORKFLOW_UNENROLLMENT_TYPE

    raise Exception("Unknown Workflow Message: {}".format(type(self._request)))

parse_from_response(self, request, response, message)

Create this response from the

Parameters:

Name Type Description Default
request required
response required
message required

Returns:

Type Description
Source code in olivepy/messaging/response.py
def parse_from_response(self, request, response, message):
    """
    Create this response from the
    :param request:
    :param response:
    :param message:
    :return:
    """
    self._request = request
    if message:
        # No results due to error
        self._iserror = True
        self._message = message
        # self._request = request
    if response is not None:
        try:
            if response.HasField("error"):
                self._iserror = True
                self._message = response.error
            else:
                # we assume no errors
                self._issuccessful = True
        except:
            # Some messages have no error field
            self._issuccessful = True
        self._response = response

to_json(self, indent=None)

Generate the response as a JSON string

Parameters:

Name Type Description Default
indent

if a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. None is the most compact representation. A negative value will return the JSON document

None

Returns:

Type Description

the response as as JSON string:

Source code in olivepy/messaging/response.py
def to_json(self, indent=None):
    """
       Generate the response as a JSON string
       :param indent:  if a non-negative integer, then JSON array elements and object members will be pretty-printed with \
       that indent level. An indent level of 0 will only insert newlines. ``None`` is the most compact  \
       representation. A negative value will return the JSON document

       :return: the response as as JSON string:
    """
    #consider setting preserving_proto_field_name to true
    if indent and indent < 0:
        return json.loads(MessageToJson(self._response, preserving_proto_field_name=True))
    return json.dumps(json.loads(MessageToJson(self._response, preserving_proto_field_name=True)), indent=indent, ensure_ascii=False)

OliveWorkflowActualizedResponse (OliveServerResponse)

Extracts info from an actualized workflow definition

get_analysis_jobs(self)

Return the names of analysis jobs. Typically a workflow has just one job with multiple tasks, the most likely reason to have multiple jobs is for workflows using multi-channel audio so there may be a set of job tasks for each channel of audio submitted.

Returns:

Type Description

a list of job names in the analysis

Source code in olivepy/messaging/response.py
def get_analysis_jobs(self):
    """
    Return the names of analysis jobs.  Typically a workflow has just one job with multiple tasks, the most likely
    reason to have multiple jobs is for workflows using multi-channel audio so there may be a set of job tasks for
    each channel of audio submitted.

    :return: a list of job names in the analysis
    """
    return [job_dict[KEY_JOB_NAME] for job_dict in self._json_result]


    #todo get analysis job name(s)

get_request_jobs(self, workflow_type)

return the jobs in the original request for the specified analysis type

Parameters:

Name Type Description Default
workflow_type

the type of workflow (i.e. WORKFLOW_ANALYSIS_TYPE)

required

Returns:

Type Description

the list of jobs for this type

Source code in olivepy/messaging/response.py
def get_request_jobs(self, workflow_type):
    """
    return the jobs in the original request for the specified analysis type

    :param workflow_type: the type of workflow (i.e. WORKFLOW_ANALYSIS_TYPE)

    :return: the list of jobs for this type
    """
    if self._request is not None:
        return get_workflow_jobs(self._request.workflow_definition, workflow_type)

    raise Exception("No jobs for the requested workflow type: {}".format(workflow_type))

is_allowable_error(self)

Returns:

Type Description

true if this response failed with an allowable error

Source code in olivepy/messaging/response.py
def is_allowable_error(self):
    """

    :return: true if this response failed with an allowable error
    """
    return self._isallowable_error

parse_from_response(self, request, response, message)

Create this response from the

Parameters:

Name Type Description Default
request required
response required
message required

Returns:

Type Description
Source code in olivepy/messaging/response.py
def parse_from_response(self, request, response, message):
    OliveServerResponse.parse_from_response(self, request, response, message)

    # Now create a JSON representation from the response
    # we will walk the tree and deserialize any encoded messages

    if self.is_error():
        # todo provide error info in JSON?
        return

        # make a new message for this type...
    if not isinstance(self._request, WorkflowActualizeRequest):
        # we received an some other workflow message so se don't need to convert it to json...
        return

    # we only parse the analyze part now

    analysis_task = []

    workflow_analysis_order_msg = None
    for order in self._response.workflow.order:
        if order.workflow_type == WORKFLOW_ANALYSIS_TYPE:
            workflow_analysis_order_msg = order
            break

    if workflow_analysis_order_msg is None:
        # no analysis results
        return

    # for job in self._response.job_result:
    for job in workflow_analysis_order_msg.job_definition:
        # create a dictionary for each job result
        job_dict = dict()
        job_name = job.job_name
        self._job_names.add(job_name)

        # and a dictionary of tasks:
        # job_dict[KEY_TASkS] = {}
        # add to our results - in most cases we will have just one job
        analysis_task.append(job_dict)

        # get data handling info for this job
        data_prop = job.data_properties
        job_dict['Data Input'] = json.loads(MessageToJson(data_prop, preserving_proto_field_name=True))
        # if data_prop.mode == SPLIT:
        #     # Hack to make split/mulit-channel mode more clear
        #     job_dict['data']['mode'] = 'SPLIT: Process each channel as a job'

        for task in job.tasks:
            task_result_dict = json.loads(MessageToJson(task, preserving_proto_field_name=True))

            # Deserialize message_data, and replace it in the task_result_dict
            task_type_msg = self._extract_serialized_message(task.message_type, task.message_data)
            task_result_dict[KEY_JOB_NAME] = job_name
            task_result_dict['analysis'] = json.loads(MessageToJson(task_type_msg, preserving_proto_field_name=True))
            del task_result_dict['message_data']

            job_dict[task.consumer_result_label] = task_result_dict

    self._json_result = analysis_task

to_json(self, indent=None)

Generate the response as a JSON string

Parameters:

Name Type Description Default
indent

if a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. None is the most compact representation. A negative value will return the JSON document

None

Returns:

Type Description

the Workflow response as as JSON string:

Source code in olivepy/messaging/response.py
def to_json(self, indent=None):
    """
       Generate the response as a JSON string
       :param indent:  if a non-negative integer, then JSON array elements and object members will be pretty-printed with \
       that indent level. An indent level of 0 will only insert newlines. ``None`` is the most compact  \
       representation. A negative value will return the JSON document

       :return: the Workflow response as as JSON string:
    """
    if indent and indent < 0:
        return json.loads(MessageToJson(self._json_result, preserving_proto_field_name=True))
    return json.dumps(self._json_result, indent=indent, ensure_ascii=False)
    # return json.dumps(json.loads(MessageToJson(self._json_result, preserving_proto_field_name=False)), indent=indent)

OliveWorkflowAnalysisResponse (OliveServerResponse)

The default container/wrapper for responses from an OLIVE server (when using the AsyncOliveClient). This is intended to make it easier for clients to handle the traditional (original) protobuf message results (such as RegionScorerResult) returned from the server.

get_analysis_jobs(self)

Return the names of analysis jobs. Typically a workflow has just one job with multiple tasks, the most likely reason to have multiple jobs is for workflows using multi-channel audio so there may be a set of job tasks for each channel of audio submitted.

Returns:

Type Description

a list of job names in the analysis

Source code in olivepy/messaging/response.py
def get_analysis_jobs(self):
    """
    Return the names of analysis jobs.  Typically a workflow has just one job with multiple tasks, the most likely
    reason to have multiple jobs is for workflows using multi-channel audio so there may be a set of job tasks for
    each channel of audio submitted.

    :return: a list of job names in the analysis
    """
    return [job_dict[KEY_JOB_NAME] for job_dict in self._json_result]


    #todo get analysis job name(s)

get_analysis_task_result(self, job_name, task_name)

Get the result(s) for the specified job_name and task_name, also include the data used for this task. If the workflow analyzes each channel in multi-channel data then there can be multiple jobs with the same name.

Parameters:

Name Type Description Default
job_name

for convenience can be None, since there is normally only one job. But if the workflow has multiple jobs then a valid name must be specified.

required
task_name

the name to the task

required

Returns:

Type Description

a list of dictionaries, where each dictionary in the list includes the results for the specified task and a list of the data analyzed by this task, such as [ {task_name:{}, data:[] }]

Source code in olivepy/messaging/response.py
def get_analysis_task_result(self, job_name, task_name):
    """
    Get the result(s) for the specified job_name and task_name, also include the data used for this task.  If
    the workflow analyzes each channel in multi-channel data then there can be multiple jobs with the
    same name.

    :param job_name: for convenience can be None, since there is normally only one job.  But if the workflow has multiple jobs then a valid name must be specified.
    :param task_name: the name to the task

    :return: a list of dictionaries, where each dictionary in the list includes the results for the specified task and a list of the data analyzed by this task, such as [ {task_name:{}, data:[] }]
    """
    if job_name is None:
        job_name = self._get_default_job_name()

    results = []
    for job_dict in self._json_result:
        if job_name == job_dict[KEY_JOB_NAME]:
            task_dict = dict()
            # there may be one or more result for task_name
            task_dict[task_name] = job_dict[KEY_TASkS][task_name]
            task_dict[KEY_DATA] = job_dict[KEY_DATA]
            results.append(task_dict)

    return results

get_request_jobs(self, workflow_type)

return the jobs in the original request for the specified analysis type

Parameters:

Name Type Description Default
workflow_type

the type of workflow (i.e. WORKFLOW_ANALYSIS_TYPE)

required

Returns:

Type Description

the list of jobs for this type

Source code in olivepy/messaging/response.py
def get_request_jobs(self, workflow_type):
    """
    return the jobs in the original request for the specified analysis type

    :param workflow_type: the type of workflow (i.e. WORKFLOW_ANALYSIS_TYPE)

    :return: the list of jobs for this type
    """
    if self._request is not None:
        return get_workflow_jobs(self._request.workflow_definition, workflow_type)

    raise Exception("No jobs for the requested workflow type: {}".format(workflow_type))

is_allowable_error(self)

Returns:

Type Description

true if this response failed with an allowable error

Source code in olivepy/messaging/response.py
def is_allowable_error(self):
    """
    :return: true if this response failed with an allowable error
    """
    return self._isallowable_error

parse_from_response(self, request, response, message)

Create this response from the

Parameters:

Name Type Description Default
request required
response required
message required

Returns:

Type Description
Source code in olivepy/messaging/response.py
def parse_from_response(self, request, response, message):
    OliveServerResponse.parse_from_response(self, request, response, message)
    # Now create a JSON representation from the response
    # we will walk the tree and deserialize any encoded messages

    if self.is_error():
        self._json_result={}
        self._json_result['error'] = self.get_error()
        return

    # make a new message for this type...
    if isinstance(self._request, WorkflowActualizeRequest):
        # we received an actualized workflow, so se don't need to convert it to json...
        return

    # this should only contain an analysis request... but check just in case:
    wk_type = self.get_workflow_type()
    if wk_type == WORKFLOW_ANALYSIS_TYPE or wk_type == WORKFLOW_ENROLLMENT_TYPE or wk_type == WORKFLOW_UNENROLLMENT_TYPE:
        # analysis is a list of dictionary elements, which looks like: [ {job_name: X, data: [], tasks: {}} ]
        # there is a dictionary for each job, but due to the way jobs work in OLIVE for mulit-channel data we
        # consider a jobs to be unique by a combination of job_name plus data, so multiples dictionary elements may
        # have the same job_name, but will have different data properties (channel numbers)

        # get the analysis job order from the original request:

        analysis_result = []  # or enrollment/unenrollment result
        # analysis_result['jobs'] = []
        # analysis_result['data inputs'] = []
        job_requests = get_workflow_jobs(self._request.workflow_definition, wk_type)
        for job in self._response.job_result:
            # create a dictionary for each job result
            job_dict = dict()
            job_name = job.job_name
            self._job_names.add(job_name)


            # job_dict[job_name] = dict()
            job_dict[KEY_JOB_NAME] = job_name

            if job.error:
                job_dict['error'] = job.error

            # we have a list of data items:
            job_dict[KEY_DATA] = []
            # and a dictionary of tasks: (although note  it is possible to have multiple tasks with the same name, so a task has a list of results)
            job_dict[KEY_TASkS] = {}
            # add to our results - in most cases we will have just one job
            analysis_result.append(job_dict)

            # get the tasks for the current (and likely, only) job:
            task_requests = get_workflow_job_tasks(job_requests, job_name)
            # task_result_dict = dict()
            # I don't think this can happen yet:
            # if job.HasField('error'):
            #     Allowable job error
            #

            for task in job.task_results:
                task_result_dict = json.loads(MessageToJson(task, preserving_proto_field_name=True))

                # check if this task failed with an error
                if task.HasField('error'):
                    # Allowable error
                    if KEY_ERROR in job_dict:
                        job_dict[KEY_ERROR] = job_dict[KEY_ERROR] + "," + task.error
                    else:
                        job_dict[KEY_ERROR] = task.error
                    self._isallowable_error = True
                    # should have an empty message data;
                    del task_result_dict['message_data']
                    if job_name not in self._allowable_failed_job_tasks:
                        self._allowable_failed_job_tasks[job_name] = []
                    self._allowable_failed_job_tasks[job_name].append(task.task_name)
                else:
                    # Deserialize message_data, and replace it in the task_result_dict
                    if task.message_type in msgutil.debug_message_map:
                        # Get the pimiento message (debug only - these messages are not guaranteed to be supported
                        print("CLG special msg type: {}".format(msgutil.MessageType.Name(task.message_type)))
                        pimiento_msg = self._extract_serialized_message(task.message_type, task.message_data)
                        if task.message_type == DATA_OUTPUT_TRANSFORMER_RESULT:
                            if pimiento_msg.data_type == TEXT:
                                # only supported type for now...
                                pie_data_msg = WorkflowTextResult()
                                pie_data_msg.ParseFromString(pimiento_msg.message_data)
                                task_result_dict['analysis'] = json.loads(
                                    MessageToJson(pie_data_msg, preserving_proto_field_name=True))
                            else:
                                print("Unsupported debug message type: {}".format(msgutil.InputDataType.Name(pimiento_msg.data_type)))
                        elif task.message_type == SCORE_OUTPUT_TRANSFORMER_RESULT:
                            # these should be standard trait message
                            pie_score_msg = self._extract_serialized_message(pimiento_msg.message_type, pimiento_msg.message_data)
                            task_result_dict['analysis'] = json.loads(
                                MessageToJson(pie_score_msg, preserving_proto_field_name=True))
                    else:
                        task_type_msg = self._extract_serialized_message(task.message_type, task.message_data)
                        task_result_dict['analysis'] = json.loads(
                            MessageToJson(task_type_msg, preserving_proto_field_name=True))


                    # Should we create special handlers for analysis results, like we sort global score results,
                    # but what should be do with an AUDIO_MODIFICATION_RESULT?
                    if task.message_type == GLOBAL_SCORER_RESULT:
                        # Sort region scores
                        task_result_dict['analysis']['score'] = sorted(task_result_dict['analysis']['score'], key=sort_global_scores, reverse=True)

                    # messageData has been replaced with the actual task
                    del task_result_dict['message_data']
                    # taskName is the key, so remove it:
                    del task_result_dict['task_name']

                # check if we need to add the plugin/domain name
                if task.task_name in task_requests:
                    orig_task = task_requests[task.task_name]
                    if orig_task.message_type in msgutil.plugin_message_map:
                        # Get the original task
                        task_req_msg = self._extract_serialized_message(orig_task.message_type, orig_task.message_data)
                        task_result_dict['plugin'] = task_req_msg.plugin
                        task_result_dict['domain'] = task_req_msg.domain
                        if orig_task.message_type == CLASS_MODIFICATION_REQUEST or orig_task.message_type == CLASS_REMOVAL_REQUEST:
                            # add class ID
                            task_result_dict['class_id'] = self._request.class_id

                # print("adding {} as {}".format(task_result_dict, task.task_name))
                # fixme: there can be multiple taks with the same name if conditions in a workflow caused the task to be ran twice
                if task.task_name not in job_dict[KEY_TASkS]:
                    job_dict[KEY_TASkS][task.task_name] = []

                job_dict[KEY_TASkS][task.task_name].append(task_result_dict)
                #job_dict[KEY_TASkS].append({task.task_name: task_result_dict})

            # A job usually has one data input/output, but we handle as if there are multiple
            for data_result in job.data_results:
                data_result_dict = json.loads(MessageToJson(data_result, preserving_proto_field_name=True))
                # Deserialize the data portion
                data_type_msg = self._extract_serialized_message(data_result.msg_type, data_result.result_data)
                del data_result_dict['result_data']
                # del data_result_dict['dataId'] # redundant into, used as the key
                # data_result_dict['data'] = json.loads(MessageToJson(data_type_msg))
                data_result_dict.update(json.loads(MessageToJson(data_type_msg, preserving_proto_field_name=True)))
                job_dict[KEY_DATA].append(data_result_dict)
                # analysis_result['data inputs'].append(data_result_dict)

        self._json_result = analysis_result

to_json(self, indent=None)

Generate the workflow as a JSON string :indent: if a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. None is the most compact representation. A negative value will return the JSON document

Returns:

Type Description

the Workflow Definition as as JSON string:

Source code in olivepy/messaging/response.py
def to_json(self, indent=None):
    """
       Generate the workflow as a JSON string
       :indent:  if a non-negative integer, then JSON array elements and object members will be pretty-printed with \
       that indent level. An indent level of 0 will only insert newlines. ``None`` is the most compact  \
       representation. A negative value will return the JSON document

       :return: the Workflow Definition as as JSON string:
    """
    # return json.dumps(self._json_result, indent=indent, ensure_ascii=False)

    if indent and indent < 0:
        return json.loads(MessageToJson(self._json_result, preserving_proto_field_name=True))
    # return json.dumps(json.loads(MessageToJson(self._json_result, preserving_proto_field_name=True)), indent=indent, ensure_ascii=False)
    return json.dumps(self._json_result, indent=indent, ensure_ascii=False)

OliveWorkflowEnrollmentResponse (OliveServerResponse)

The container/wrapper for responses from an OLIVE server (when using the AsyncOliveClient) for enrollment. This is intended to make it easier for clients to handle the traditional (original) protobuf message results (such as RegionScorerResult) returned from the server.

is_allowable_error(self)

Returns:

Type Description

true if this message failed with an allowable error

Source code in olivepy/messaging/response.py
def is_allowable_error(self):
    """

    :return: true if this message failed with an allowable error
    """
    return self._isallowable_error

parse_from_response(self, request, response, message)

Create this response from the

Parameters:

Name Type Description Default
request required
response required
message required

Returns:

Type Description
Source code in olivepy/messaging/response.py
def parse_from_response(self, request, response, message):
    OliveServerResponse.parse_from_response(self, request, response, message)

    # Now create a JSON representation from the response
    # we will  the tree and deserialize any encoded messages

    if self.is_error():
        # todo provide error info in JSON
        return

    # make a new message for this type...
    if isinstance(self._request, WorkflowActualizeRequest):
        # we received an actualized workflow, so se don't need to convert it to json...
        return

    # this should only contain an analysis request... but check just in case:
    type = self.get_workflow_type()
    if type == WORKFLOW_ENROLLMENT_TYPE:
        # not much info in an enrollment response...

        enroll_result = dict()
        if self._response.HasField('error'):
            enroll_result['error'] = self._response.error
        else:
            enroll_result['successful'] = True

        self._json_result = enroll_result

to_json(self, indent=None)

Generate the response as a JSON string

Parameters:

Name Type Description Default
indent

if a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. None is the most compact representation. A negative value will return the JSON document

None

Returns:

Type Description

the Workflow response as as JSON string:

Source code in olivepy/messaging/response.py
def to_json(self, indent=None):
    """
       Generate the response as a JSON string
       :param indent:  if a non-negative integer, then JSON array elements and object members will be pretty-printed with \
       that indent level. An indent level of 0 will only insert newlines. ``None`` is the most compact  \
       representation. A negative value will return the JSON document

       :return: the Workflow response as as JSON string:
    """
    #consider setting preserving_proto_field_name to true
    # return json.dumps(json.loads(MessageToJson(self._json_result, preserving_proto_field_name=True)), indent=indent)

    if indent and indent < 0:
        return json.loads(MessageToJson(self._json_result, preserving_proto_field_name=True))
    return json.dumps(self._json_result, indent=indent, ensure_ascii=False)

get_workflow_job_names(workflow_definition, workflow_type)

parse a workflow definition, returning a list of job definition name (job_name)

return [job_name] for the requested workflow_type

Source code in olivepy/messaging/response.py
def get_workflow_job_names(workflow_definition, workflow_type):
    """ parse a workflow definition, returning a list of job definition name  (job_name)

     return [job_name] for the requested workflow_type
    """
    rtn_job_names = list()
    if workflow_definition is not None:
        for order in workflow_definition.order:
            if order.workflow_type == workflow_type:
                for job in order.job_definition:
                    rtn_job_names.append(job.job_name)

    return rtn_job_names

get_workflow_job_tasks(jobs, job_name=None)

Fetch the tasks from a job

Parameters:

Name Type Description Default
jobs

a dictionary of WorkflowTasks, indexed by a job names

required
job_name

find tasks that belong to a job having this name. This can be None if there is only one job

None

Returns:

Type Description

a dictionary of WorkflowTask indexed by the task's consumer_result_label for the specified job. An exception is thrown if there are multiple jobs but no job_name was specified

Source code in olivepy/messaging/response.py
def get_workflow_job_tasks(jobs, job_name=None):
    """
    Fetch the tasks from a job

    :param jobs: a dictionary of WorkflowTasks, indexed by a job names
    :param job_name: find tasks that belong to a job having this name. This can be None if there is only one job
    :return: a dictionary of WorkflowTask indexed by the task's consumer_result_label for the specified job.  An exception is thrown if there are multiple jobs but no job_name was specified
    """

    if job_name is None:
        if len(jobs) == 1:
            # hack to make it easier to fetch data since most workflows only have one job
            job_name = list(jobs.keys())[0]
        else:
            raise Exception("Must specify a job name when there are multiple JobDefinitions in a Workflow")

    rtn_tasks = dict()
    #  and job_name is None:
    if job_name in jobs:
        for workflow_task in jobs[job_name]:
            rtn_tasks[workflow_task.consumer_result_label] = workflow_task
    # CLG: I don't think we need to generate a message/warning if a task name isn't in the original jobs since we can
    # have dynamic jobs and with streaming jobs are very dynamic
    # else:
    #     print("Job '{}' not one of the expected job names: {}".format(job_name, list(jobs.keys())))

    return rtn_tasks

get_workflow_jobs(workflow_definition, workflow_type)

parse a workflow definition, returning a dictionary indexed job (definition) name (job_name) and a list of WorkflowTask elements.

Parameters:

Name Type Description Default
workflow_definition

find jobs in this workflow definition

required
workflow_type

the type of workflow order (analysis, enrollment, unenrollment) return {job_name: [WorkflowTask]} for the requested workflow_type

required
Source code in olivepy/messaging/response.py
def get_workflow_jobs(workflow_definition, workflow_type):
    """ parse a workflow definition, returning a dictionary  indexed job (definition) name (job_name) and a list of \
     WorkflowTask elements.

     :param workflow_definition: find jobs in this workflow definition
     :param workflow_type: the type of workflow order (analysis, enrollment, unenrollment)

     return {job_name: [WorkflowTask]} for the requested workflow_type
    """
    rtn_jobs = dict()
    if workflow_definition is not None:
        for order in workflow_definition.order:
            if order.workflow_type == workflow_type:
                for job in order.job_definition:
                    rtn_jobs[job.job_name] = job.tasks

    return rtn_jobs