Skip to content

olivepy api module

olivepy.api.oliveclient

ClientBrokerWorker (Thread)

Performs async interactions with Olive

run(self)

Method representing the thread's activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

Source code in olivepy/api/oliveclient.py
def run(self):
    logging.debug("Starting Olive Status Monitor Worker for id: {}".format(self.client_id))

    self.working = True
    self.status_socket.subscribe("")

    poller = zmq.Poller()
    poller.register(self.status_socket, zmq.POLLIN)

    while self.working:

        # Now check for any results from the server
        # logging.info("checking for response")
        socks = dict(poller.poll(BLOCK_TIMEOUT_MS))
        if self.status_socket in socks:
            logging.debug("Received status message from OLIVE...")
            heatbeat_data = self.status_socket.recv()
            heatbeat = Heartbeat()
            heatbeat.ParseFromString(heatbeat_data)

            # do something with heartbeat...
            if heatbeat.HasField("stats"):
                stats = heatbeat.stats

                logging.info("System CPU Used:    %02.01f%%" % stats.cpu_percent)
                logging.info("System CPU Average: %02.01f%%" % stats.cpu_average)
                logging.info("System MEM Used:    %02.01f%%" % stats.mem_percent)
                logging.info("System MEM Max:     %02.01f%%" % stats.max_mem_percent)
                logging.info("System SWAP Used:   %02.01f%%" % stats.swap_percent)
                logging.info("System SWAP Max:    %02.01f%%" % stats.max_swap_percent)
                logging.debug("Number active jobs: " + str(stats.pool_busy))
                logging.debug("Number pending jobs: " + str(stats.pool_pending))
                logging.debug("Number finished jobs: " + str(stats.pool_finished))
                logging.debug("Max number jobs: " + str(stats.max_num_jobs))
                logging.debug("Server version: " + str(stats.server_version))

    self.status_socket.close()

OliveClient

This is a simplified version of network library used to contact the Olive server via python code. All OLIVE calls below are synchronous, and block and until a response is received from the OLIVE server. These example API calls are intended to make working with the OLIVE API clearer since all calls are blocking. To make asynchronous requests to the OLIVE server use olivepy.api.olive_async_client.AsyncOliveClient for your enterprise application.

__init__(self, client_id, address='localhost', request_port=5588, timeout_second=10) special

Parameters:

Name Type Description Default
client_id

The unique name of this client. Due to a ZMQ bug on some platforms this ID can not end in '1'

required
address

the address of the olive server, such as localhost

'localhost'
request_port

default olive port is 5588

5588
timeout_second

time in seconds, to wait for a response from the server

10
Source code in olivepy/api/oliveclient.py
def __init__(self, client_id, address='localhost', request_port=5588, timeout_second=10):
    """
    :param client_id: The unique name of this client.  Due to a ZMQ bug on some platforms this ID can not end in '1'
    :param address: the address of the olive server, such as localhost
    :param request_port: default olive port is 5588
    :param timeout_second:  time in seconds, to wait for a response from the server
    """

    self.client_id = client_id

    # due to a ZMQ bug the last character of the client ID can not be 1, so remove it
    if client_id[-1] == "1":
        self.client_id = client_id[:-1]
        logging.warning("Last character of the client ID can not be '1', removing to avoid a ZMQ bug")

    self.server_address = address
    self.server_request_port = request_port
    self.server_status_port = request_port+1

    self.timeout_seconds = timeout_second

    self.olive_connected = False
    self.info = self.fullobj = None

    OliveClient.setup_multithreading()

adapt_supervised(self, plugin, domain, annotations_file_name, new_domain_name)

Parameters:

Name Type Description Default
plugin

the plugin for adaptation

required
domain

the domain for adaptation

required
adapt_workspace

a unique label for this client's adaptation

required
annotations_file_name

the name of a file containing annotations. This file contains lines with four tokens: filename, start, end, and class. start and end are in milliseconds, but that should change to seconds.

required

Returns:

Type Description

the full path name of the new domain.

Source code in olivepy/api/oliveclient.py
def adapt_supervised(self, plugin, domain, annotations_file_name, new_domain_name):
    """
    :param plugin: the plugin for adaptation
    :param domain: the domain for adaptation
    :param adapt_workspace: a unique label for this client's adaptation
    :param annotations_file_name: the name of a file containing annotations.
            This file contains lines with four tokens: filename, start, end, and class.
            start and end are in milliseconds, but that should change to seconds.

    :return: the full path name of the new domain.
    """
    adapt_workspace = 'adapt-'+ msgutil.get_uuid()
    processed_audio_list = []

    file_annotations = self.parse_annotation_file(annotations_file_name)
    for filename, regions in file_annotations.items():
        audio_id = self.preprocess_supervised_audio(plugin, domain, filename, adapt_workspace)
        if audio_id:
            processed_audio_list.append([audio_id, regions])

    if len(processed_audio_list) == 0:
        raise Exception("All audio requests failed")

    # Now convert the file based annotations into class based annotations
    protobuf_class_annots = self.convert_preprocessed_annotations(processed_audio_list)

    #Finally, complete the adaptation request by making a finalize reqeust
    return self.finalize_supervised_adaptation(plugin, domain, new_domain_name, protobuf_class_annots, adapt_workspace)

adapt_supervised_old(self, plugin, domain, file_annotations, new_domain_name)

Parameters:

Name Type Description Default
plugin

the plugin for adaptation

required
domain

the domain for adaptation

required
adapt_workspace

a unique label for this client's adaptation

required
file_annotations

a dictionary of files to preprocess, each file has one or more annotated regions for processing {filename: [(start_ms, end_ms, class)]}, for example {test.wav: [(2618, 6200, 'S'), (7200, 9500, 'NS')]}

required

Returns:

Type Description

the full path name of the new domain.

Source code in olivepy/api/oliveclient.py
def adapt_supervised_old(self, plugin, domain, file_annotations, new_domain_name):
    """
    :param plugin: the plugin for adaptation
    :param domain: the domain for adaptation
    :param adapt_workspace: a unique label for this client's adaptation
    :param file_annotations: a dictionary of files to preprocess, each file has one or more annotated regions for
            processing {filename: [(start_ms, end_ms, class)]}, for example {test.wav: [(2618, 6200, 'S'), (7200, 9500, 'NS')]}
    :return: the full path name of the new domain.
    """
    adapt_workspace = 'adapt-'+ msgutil.get_uuid()
    processed_audio_list = []
    for filename, regions in file_annotations.items():
        audio_id = self.preprocess_supervised_audio(plugin, domain, filename, adapt_workspace)
        if audio_id:
            processed_audio_list.append([audio_id, regions])

    if len(processed_audio_list) == 0:
        raise Exception("All audio requests failed")

    # Now convert the file based annotations into class based annotations
    protobuf_class_annots = self.convert_preprocessed_annotations(processed_audio_list)

    #Finally, complete the adaptation request by making a finalize reqeust
    return self.finalize_supervised_adaptation(plugin, domain, new_domain_name, protobuf_class_annots, adapt_workspace)

analyze_bounding_box(self, plugin, domain, filename, data_msg=None, mode=<AudioTransferType.AUDIO_PATH: 1>, opts=None, classes=None)

Request a analysis of 'filename' returning bounding box scores

Parameters:

Name Type Description Default
plugin

the name of the plugin

required
domain

the name of the plugin domain

required
filename

the name of the audio file to score

required
mode

the way audio is submitted to the server

<AudioTransferType.AUDIO_PATH: 1>
opts

a dictionary of name/value pair options

None
classes

optionally, a list of classes classes to be scored

None

Returns:

Type Description

a list of (start, end) regions in seconds, each region indicates a speech region found in the submitted file.

Source code in olivepy/api/oliveclient.py
def analyze_bounding_box(self, plugin, domain, filename, data_msg=None, mode=AudioTransferType.AUDIO_PATH, opts=None, classes=None):
    """
     Request a analysis of 'filename' returning bounding box scores

    :param plugin: the name of the plugin
    :param domain: the name of the plugin domain
    :param filename: the name of the audio file to score
    :param mode: the way audio is submitted to the server
    :param opts: a dictionary of name/value pair options
    :param classes: optionally,  a list of classes classes to be scored

    :return: a list of (start, end) regions in seconds, each region indicates a speech region found in the submitted file.
    """
    self.info = self.fullobj = None
    region_score_result = self._request_bounding_box_scores(plugin, domain, filename, data_msg=data_msg, mode=mode, opts=opts, classes=classes)
    self.fullobj = region_score_result
    return region_score_result

analyze_frames(self, plugin, domain, filename, data_msg=None, opts=None, classes=None, mode=<AudioTransferType.AUDIO_PATH: 1>)

Request a analysis of 'filename' returning frame scores.

Parameters:

Name Type Description Default
plugin

the name of the plugin

required
domain

the name of the plugin domain

required
filename

the name of the audio file to score. if None, then provide (audio) input as a

required
data_msg

Optionally specify the data input as a fully formed Audio or BinaryMedia message instead of creating from filename

None
opts

a dictionary of name/value pair options

None
classes

optionally, a list of classes classes to be scored

None

Returns:

Type Description

the analysis as a list of (frame) scores

Source code in olivepy/api/oliveclient.py
def analyze_frames(self, plugin, domain, filename, data_msg=None,  opts=None, classes=None,  mode=AudioTransferType.AUDIO_PATH):
    """
     Request a analysis of 'filename' returning frame scores.

    :param plugin: the name of the plugin
    :param domain: the name of the plugin domain
    :param filename: the name of the audio file to score.  if None, then provide (audio) input as a
    :param data_msg: Optionally specify the data input as a fully formed Audio or BinaryMedia message instead of creating from filename
    :param opts: a dictionary of name/value pair options
    :param classes: optionally,  a list of classes classes to be scored

    :return: the analysis as a list of (frame) scores
    """

    self.info = self.fullobj = None
    frame_score_result = self._request_frame_scores(plugin, domain, filename, data_msg= data_msg, opts=opts, classes=classes, mode=mode)
    if frame_score_result is not None:
        return frame_score_result.score

    return []

analyze_global(self, plugin, domain, filename, data_msg=None, mode=<AudioTransferType.AUDIO_PATH: 1>, opts=None, classes=None)

Request a LID analysis of 'filename'

Parameters:

Name Type Description Default
plugin

the name of the LID plugin

required
domain

the name of the plugin domain

required
filename

the name of the audio file to score

required
mode

the audio transfer mode

<AudioTransferType.AUDIO_PATH: 1>
opts

a dictionary of name/value pair options

None
classes

optionally, a list of classes classes to be scored

None

Returns:

Type Description

the analysis result as a list of (global) scores

Source code in olivepy/api/oliveclient.py
def analyze_global(self, plugin, domain, filename, data_msg=None , mode=AudioTransferType.AUDIO_PATH, opts=None, classes=None):
    """
     Request a LID analysis of 'filename'

    :param plugin: the name of the LID plugin
    :param domain: the name of the plugin domain
    :param filename: the name of the audio file to score
    :param mode: the audio transfer mode
    :param opts: a dictionary of name/value pair options
    :param classes: optionally,  a list of classes classes to be scored

    :return: the analysis result as a list of (global) scores
    """

    self.info = self.fullobj = None
    request = GlobalScorerRequest()
    request.plugin = plugin
    request.domain = domain
    if data_msg:
        request.audio.CopyFrom(data_msg)
    else:
        audio = request.audio
        package_audio(audio, filename, mode=mode)

    self._add_options(request, opts)
    self._add_classes(request, classes)

    # alternatively, you could send an audio buffer:
    # from scipy.io import wavfile
    # sample_rate, data = wavfile.read(filename)
    # package_buffer_audio(audio, data, data.shape[0], sample_rate)

    _, env = _wrap_message(self.client_id, request)
    # Now send the message
    logging.debug("Sending a global score request message")
    result = self._sync_request(env)
    return result.score

analyze_regions(self, plugin, domain, filename, data_msg=None, mode=<AudioTransferType.AUDIO_PATH: 1>, opts=None, classes=None)

Request a analysis of 'filename' returning regions

Parameters:

Name Type Description Default
plugin

the name of the plugin

required
domain

the name of the plugin domain

required
filename

the name of the audio file to score

required
mode

the way audio is submitted to the server

<AudioTransferType.AUDIO_PATH: 1>
opts

a dictionary of name/value pair options

None
classes

optionally, a list of classes classes to be scored

None

Returns:

Type Description

a list of (start, end) regions in seconds, each region indicates a speech region found in the submitted file.

Source code in olivepy/api/oliveclient.py
def analyze_regions(self, plugin, domain, filename, data_msg=None, mode=AudioTransferType.AUDIO_PATH, opts=None, classes=None):
    """
     Request a analysis of 'filename' returning regions

    :param plugin: the name of the plugin
    :param domain: the name of the plugin domain
    :param filename: the name of the audio file to score
    :param mode: the way audio is submitted to the server
    :param opts: a dictionary of name/value pair options
    :param classes: optionally,  a list of classes classes to be scored

    :return: a list of (start, end) regions in seconds, each region indicates a speech region found in the submitted file.
    """
    self.info = self.fullobj = None
    region_score_result = self._request_region_scores(plugin, domain, filename, data_msg=data_msg, mode=mode, opts=opts, classes=classes)
    self.fullobj = region_score_result
    return region_score_result

apply_threshold(self, scores, threshold, rate)

Very simple method to convert frame scores to regions. If speech regions are desired we can provide a SAD plugin that returns regions instead of frame scores

Parameters:

Name Type Description Default
scores required
threshold required
rate required

Returns:

Type Description

frame scores a regions

Source code in olivepy/api/oliveclient.py
def apply_threshold(self, scores, threshold, rate):
    """
    Very simple  method to convert frame scores to regions.  If speech regions are desired
    we can provide a SAD plugin that returns regions instead of frame scores

    :param scores:
    :param threshold:
    :param rate:

    :return: frame scores a regions
    """
    inSegment = False
    start = 0
    segments = []

    for i in range (len(scores)):
        if not inSegment and scores[i] >= threshold:
            inSegment = True
            start = i
        elif inSegment and (scores[i] < threshold or i == len(scores)- 1):
            inSegment = False
            startT = ((1.0*start / rate))
            endT = (1.0* i / rate)
            segments.append((startT, endT))

    return segments

audio_modification(self, plugin, domain, filename, data_msg=None, mode=<AudioTransferType.AUDIO_PATH: 1>)

Do an audio modification (such as an enhansement). This function only accepts one audio and returns on modified audio.

Parameters:

Name Type Description Default
plugin

the name of the plugin

required
domain

the name of the plugin domain

required
filename

the name of the audio file to score

required

Returns:

Type Description

the analysis as a list of (frame) scores

Source code in olivepy/api/oliveclient.py
def audio_modification(self, plugin, domain, filename, data_msg=None, mode=AudioTransferType.AUDIO_PATH):
    """
    Do an audio modification (such as an enhansement). This function only accepts one audio and returns on modified audio.
    :param plugin: the name of the plugin
    :param domain: the name of the plugin domain
    :param filename: the name of the audio file to score

    :return: the analysis as a list of (frame) scores
    """

    if mode != AudioTransferType.AUDIO_PATH:
        raise Exception('oliveclient.audio_modification requires an filename path and will not work with binary audio data.')
    request = AudioModificationRequest()
    request.plugin = plugin
    request.domain = domain
    request.requested_channels = 1
    request.requested_rate = 8000

    if data_msg:
        request.modifications.append(data_msg)
    else:
        audio = Audio()
        package_audio(audio, filename, mode=mode)
        # audio = Audio()
        # audio.path = filename
        request.modifications.append(audio)

    _, env = _wrap_message(self.client_id, request)
    # Now send the message
    logging.debug("Sending a audio modification/enhancement request message")
    result = self._sync_request(env)
    return result.successful, result.modification_result[0]

connect(self, monitor_server=False)

Connect this client to the server

Parameters:

Name Type Description Default
monitor_server

if true, start a thread to monitor the server connection (helpful if debugging connection issues)

False
Source code in olivepy/api/oliveclient.py
def connect(self, monitor_server=False):
    """
    Connect this client to the server

    :param monitor_server: if true, start a thread to monitor the server connection (helpful if debugging connection issues)
    """
    # init the request and status socket
    request_addr = "tcp://" + self.server_address + ":" + str(self.server_request_port)
    status_addr = "tcp://" + self.server_address + ":" + str(self.server_status_port)

    context = zmq.Context()
    self.request_socket = context.socket(zmq.DEALER)
    self.status_socket = context.socket(zmq.SUB)

    self.request_socket.connect(request_addr)
    self.status_socket.connect(status_addr)

    # logging.debug("Starting Olive status monitor...")

    # Run this to get status about the server (helpful to confirm the server is connected and up)
    if(monitor_server):
        self.worker = ClientBrokerWorker(self.status_socket,  self.client_id)
        self.worker.start()
    else:
        self.worker = None

    self.olive_connected = True

    logging.debug("Olive client ready")

convert_preprocessed_annotations(self, processed_audio_list)

Convert the file annotations (a dictionary grouped by file ID, where annotations are grouped by file ID, which has one or more regions/classes) into class annotations (where annotations are grouped by class ID, with each class having one or more files, then each file having one or more regions).

Parameters:

Name Type Description Default
processed_audio_list

the list of files (indexed by an OLIVE generated ID) and the regions/classes annotated in that file

required

Returns:

Type Description

a dictionary of ClassAnnotation objects, indexed by class ID

Source code in olivepy/api/oliveclient.py
def convert_preprocessed_annotations(self, processed_audio_list):
    """
    Convert the file annotations (a dictionary grouped by file ID, where annotations are grouped by file ID, which
    has one or more regions/classes) into class annotations (where annotations are grouped by class ID, with each
    class having one or more files, then each file having one or more regions).
    :param processed_audio_list: the list of files (indexed by an OLIVE generated ID) and
    the regions/classes annotated in that file
    :return: a dictionary of ClassAnnotation objects, indexed by class ID
    """
    # Now convert the annotations that are grouped by file into a list of annotations grouped by class ID
    # (speech, non-speech).  This is done in two passes, the first passes builds then new mapping of
    # class_id -->* audio_id -->* region,
    # then we convert this new data structure into ClassAnnotation (Protobuf) message(s)
    class_annots = {}
    for audio_id, regions in processed_audio_list:
        for region in regions:
            start    = region[0]
            end      = region[1]
            class_id = region[2]

            if class_id not in class_annots:
                class_annots[class_id] = {}

            if audio_id not in class_annots[class_id]:
                class_annots[class_id][audio_id] = []

            class_annots[class_id][audio_id].append((start, end))

    # now that the annotations have been grouped by class id, create the annotation protobuf(s)
    protobuf_class_annots = {}
    for class_id in class_annots.keys():
        protobuf_class_annots[class_id] = ClassAnnotation()
        protobuf_class_annots[class_id].class_id = class_id
        # Add AudioAnnotation(s)
        for audio_id in class_annots[class_id]:
            aa = AudioAnnotation() # aa = protobuf_class_annots[class_id].annotations.add() in python2.7?
            aa.audio_id = audio_id
            for region in class_annots[class_id][audio_id]:
                # times are in milliseconds
                ar = AnnotationRegion()  # might need to do ar = aa.regions.add() for Python2.7
                ar.start_t = region[0]
                ar.end_t =  region[1]
                aa.regions.append(ar)
            protobuf_class_annots[class_id].annotations.append(aa)

    return protobuf_class_annots

enroll(self, plugin, domain, class_id, filename, data_msg=None)

Request a enrollment of 'audio'

Parameters:

Name Type Description Default
plugin

the name of the plugin

required
domain

the name of the plugin domain

required
class_id

the name of the class (speaker) to enroll

required
filename

the filename to add as an audio only enrollment addition

required
data_msg

an BinaryMedia message to add as an enrollment addition

None

Returns:

Type Description

True if enrollment successful

Source code in olivepy/api/oliveclient.py
def enroll(self, plugin, domain, class_id, filename, data_msg=None):
    """
     Request a enrollment of 'audio'

    :param plugin: the name of the plugin
    :param domain: the name of the plugin domain
    :param class_id: the name of the class (speaker) to enroll
    :param filename: the filename to add as an audio only enrollment addition
    :param data_msg: an BinaryMedia message to add as an enrollment addition

    :return: True if enrollment successful
    """

    self.info = self.fullobj = None
    enrollment = ClassModificationRequest()
    enrollment.plugin = plugin
    enrollment.domain = domain
    enrollment.class_id = class_id
    enrollment.finalize = True
    if data_msg:
        if isinstance(data_msg, Audio):
            enrollment.addition.append(data_msg)
        else:
            enrollment.addition_media.append(data_msg)
    else:
        audio = Audio()
        package_audio(audio, filename)
        enrollment.addition.append(audio)

    # Wrap message in an Envelope
    _, env = _wrap_message(self.client_id, enrollment)
    # Now send the envelope
    logging.debug("Sending an enrollment message")
    result = self._sync_request(env)

    return result  # ClassModificationResult

    # Wrap message in an Envelope
    # request = self._wrap_message(enrollment)

    # # Now send the message
    # logging.debug("Sending a class modification request (enrollment) message")
    # self.request_socket.send(request.SerializeToString())
    # logging.debug("Sending a class modification request (enrollment) message")
    # # TODO THIS IS A SYNC REQUST, CAN BE DONE ASYN WITH A CALLBACK...
    # # Wait for the response from the server
    # # logging.info("checking for response")
    # protobuf_data = self.request_socket.recv()
    # logging.info("Received message from server...")
    # envelope = Envelope()
    # envelope.ParseFromString(protobuf_data)
    #
    # # for this use case the server will only have one response in the evevelope:
    # for i in range(len(envelope.message)):
    #     olive_msg = envelope.message[i]
    #
    #     if olive_msg.HasField("info"):
    #         self.info = olive_msg.info
    #     if olive_msg.HasField("error"):
    #         raise ExceptionFromServer('Got an error from the server: ' + olive_msg.error)
    #     else:
    #         enrollment_msg = ClassModificationResult()
    #         enrollment_msg.ParseFromString(olive_msg.message_data[0])
    #
    #         # Assume there is only one result set (for 'speech'):  frame_score_msg.result[0]
    #         # TODO - clean up return.  Maybe do something with message.
    #         self.fullobj = enrollment_msg
    #         self.info = enrollment_msg.addition_result[0].message  # CLG this would only be set if there was an issue with the enrollment
    #         return enrollment_msg.addition_result[0].successful
    #
    # return False

    # alternatively, you could send an audio buffer:
    # from scipy.io import wavfile
    # sample_rate, data = wavfile.read(filename)
    # package_buffer_audio(audio, data, data.shape[0], sample_rate)

finalize_supervised_adaptation(self, plugin, domain, new_domain_name, class_annotations, adapt_workspace)

Complete the adaptation

Parameters:

Name Type Description Default
plugin

the name of the plugin to adapt

required
domain

the name of the plugin domain to adapt

required
new_domain_name

the name of the new domain that is created within the plugin

required
class_annotations

the audio annotations, grouped by class ID

required

Returns:

Type Description

the name of the new domain

Source code in olivepy/api/oliveclient.py
def finalize_supervised_adaptation(self, plugin, domain, new_domain_name, class_annotations, adapt_workspace):
    """
     Complete the adaptation

    :param plugin: the name of the plugin to adapt
    :param domain: the name of the plugin domain to adapt
    :param new_domain_name: the name of the new domain that is created within the plugin

    :param class_annotations: the audio annotations, grouped by class ID

    :return: the name of the new domain
    """

    self.info = self.fullobj = None
    request = SupervisedAdaptationRequest()
    request.plugin = plugin
    request.domain = domain
    request.adapt_space = adapt_workspace
    request.new_domain = new_domain_name

    # Add the class annotations
    for class_id in class_annotations:
        request.class_annotations.append(class_annotations[class_id])  # request.class_annotations.extend([class_annotations[class_id]]) for Python2.7?

    # package the request
    _, request = _wrap_message(self.client_id, request)

    # Now send the message
    logging.debug("Sending a finalize adatation message")
    self.request_socket.send(request.SerializeToString())
    # Wait for the response from the server
    protobuf_data = self.request_socket.recv()
    logging.info("Received message from server...")

    #Unpack message - boiler plate code, this can be simplified
    envelope = Envelope()
    envelope.ParseFromString(protobuf_data)

    # for this use case the server will only have one response in the envelope:
    for i in range(len(envelope.message)):
        olive_msg = envelope.message[i]

        if olive_msg.HasField("info"):
            self.info = olive_msg.info
        if olive_msg.HasField("error"):
            raise ExceptionFromServer('Got an error from the server: ' + olive_msg.error)
        else:
            result_msg = SupervisedAdaptationResult()
            result_msg.ParseFromString(olive_msg.message_data[0])
            # get the new domain
            #if hasattr(result_msg, 'new_domain') and result_msg.new_domain is not None:
            #    print("Adaptation successfully created new domain: '{}'".format(result_msg.new_domain))
            self.fullobj = result_msg
            return result_msg.new_domain

    # adapt failed... TODO: thrown exception instead?
    return None

get_fullobj(self)

This object should be used for debugging only. Example use::success = client.enroll('sid-embed-v5-py3', 'multilang-v1', 'joshua', 'file') if troubleshooting: fullobj = client.get_fullobj() print('Whole object returned from server: '+str(fullobj))

Returns:

Type Description

the full object returned from the last call to the server.

Source code in olivepy/api/oliveclient.py
def get_fullobj(self):
    """
    This object should be used for debugging only.  Example use::success = client.enroll('sid-embed-v5-py3', 'multilang-v1', 'joshua', 'file') \
       if troubleshooting:
           fullobj = client.get_fullobj()
           print('Whole object returned from server: '+str(fullobj))

    :return: the full object returned from the last call to the server.
    """
    return self.fullobj    

get_info(self)

Returns:

Type Description

the info data from the last call to the server. Will return None if the last call did not return any info.

Source code in olivepy/api/oliveclient.py
def get_info(self):
    """
    :return: the info data from the last call to the server. Will return None if the last call did not return any info.
    """
    return self.info

parse_annotation_file(self, filename)

Parse a file for the names of files of audio files and their regions to use for adaptation.

Parameters:

Name Type Description Default
filename

the path and name of the file that contains the input. This file must have one or more lines having 4 columns: # filename, class, start_region_ms, end_region_ms

required

Returns:

Type Description

the parsed output, in a dictionary indexed by the filename, each element having one or more regions, for example {test.wav: [(2618, 6200, 'S'), (7200, 9500, 'NS')]}

Source code in olivepy/api/oliveclient.py
def parse_annotation_file(self, filename):
    """
    Parse a file for the names of files of audio files and their regions to use for adaptation.
    :param filename: the path and name of the file that contains the input. This file must have one or more lines having 4 columns:
    # filename, class, start_region_ms, end_region_ms
    :return: the parsed output, in a dictionary indexed by the filename, each element having one or more regions,
    for example {test.wav: [(2618, 6200, 'S'), (7200, 9500, 'NS')]}
    """
    data_lines = []
    file_annotations = {}

    if not os.path.exists(filename):
        raise Exception("The annotation file '{}' does not exist".format(filename))

    with open(filename) as f:
        data_lines.extend([line.strip() for line in f.readlines()])

    # process the file
    for line in data_lines:
        pieces = line.split()

        if len(pieces) != 4:
            raise Exception("The annotation file does not contain data in the correct format, found line '{}'".format(line))

        adapt_audio_path = pieces[0]

        # assume a relative file is used, so the full path must be specified since being sent to server
        # This is being sent to server.  If full path is given, do nothing.  Otherwise make absolute.
        # TODO: this will not work from UNIX to Windows or other way around.
        # TODO: should use Python's abspath here, don't you think?
        if adapt_audio_path[0] != '/' and adapt_audio_path[1] != ':':
            adapt_audio_path = os.path.join(os.getcwd(), adapt_audio_path)

        # todo validate file is valid...

        if adapt_audio_path not in file_annotations:
            file_annotations[adapt_audio_path] = []

        class_id = pieces[1]
        start    = float(pieces[2])
        end      = float(pieces[3])

        file_annotations[adapt_audio_path].append((start, end, class_id))

    return file_annotations

preprocess_supervised_audio(self, plugin, domain, filename, adapt_workspace)

Submit audio for pre-processing phase of adaptation.

Parameters:

Name Type Description Default
plugin

the name of the plugin to adapt

required
domain

the name of the plugin domain to adapt

required
filename

the name of the audio file to submit to the server/plugin/domain for preprocessing

required

Returns:

Type Description

the unique id generated by the server for the preprocess audio, which must be used

Source code in olivepy/api/oliveclient.py
def preprocess_supervised_audio(self, plugin, domain, filename, adapt_workspace):
    """
     Submit audio for pre-processing phase of adaptation.

    :param plugin: the name of the plugin to adapt
    :param domain: the name of the plugin domain to adapt
    :param filename: the name of the audio file to submit to the server/plugin/domain for preprocessing
    :return: the unique id generated by the server for the preprocess audio, which must be used
    """

    # [(2.618, 6.2, 'S'), (7.2, 9.5, 'NS')]

    self.info = self.fullobj = None
    request = PreprocessAudioAdaptRequest()
    request.plugin = plugin
    request.domain = domain
    request.adapt_space = adapt_workspace
    request.class_id = "supervised"         # HACK: for supervised validation in the backend - we will fix this in a future release so not needed
    # we currently don't need to set annotations (start_t, end_t) when doing pre-processing

    # finally, set the audio:
    audio = request.audio
    # send the name of the file to the server:
    audio.path = filename
    # alternatively, you could send an audio buffer:
    # from scipy.io import wavfile
    # sample_rate, data = wavfile.read(filename)
    # package_buffer_audio(audio, data, data.shape[0], sample_rate)
    # TODO SERIALIZE EXAMPLE...

    # package the request
    _, request = _wrap_message(self.client_id, request)

    # Now send the message
    logging.debug("Sending a preprocess audio (for adaptation) message")
    self.request_socket.send(request.SerializeToString())

    # Wait for the response from the server
    # logging.info("checking for response")
    protobuf_data = self.request_socket.recv()
    logging.info("Received message from server...")

    #Unpack message
    envelope = Envelope()
    envelope.ParseFromString(protobuf_data)

    # for this use case the server will only have one response in the envelope:
    for i in range(len(envelope.message)):
        olive_msg = envelope.message[i]

        if olive_msg.HasField("info"):
            self.info = olive_msg.info
        if olive_msg.HasField("error"):
            raise ExceptionFromServer('Got an error from the server: ' + olive_msg.error)
        else:
            result_msg = PreprocessAudioAdaptResult()
            result_msg.ParseFromString(olive_msg.message_data[0])

            # get audio id from results, use for final annotations...
            # print("Preprocess audio ID {} having duration {}".format(result_msg.audio_id, result_msg.duration))
            self.fullobj = result_msg
            return result_msg.audio_id

    # preprocessing failed... TODO: thrown exception instead?
    return None

requst_sad_adaptation(self)

Example of performing SAD adaptation

Returns:

Type Description
Source code in olivepy/api/oliveclient.py
def requst_sad_adaptation(self):
    """
    Example of performing SAD adaptation
    :return:
    """

    # todo move to client example (i.e. olivelearn)

    # using Julie's sadRegression dataset...

    # Assume the working directory is root directory for the SAD regression tests

    # Setup processing variables (get this config or via command line optons
    plugin = "sad-dnn-v6a"
    domain = "multi-v1"
    new_domain_name = "python_adapted_multi-v2"

    # Build the list of files plus the regions in the those files to adaptn by parsing the input file:
    file_annotations = self.parse_annotation_file("lists/adapt_ms.lst")

    return self.adapt_supervised_old(plugin, domain, file_annotations, new_domain_name)

setup_multithreading() classmethod

This function is only needed for multithreaded programs. For those programs, you must call this function from the main thread, so it can properly set up your signals so that control-C will work properly to exit your program.

Source code in olivepy/api/oliveclient.py
@classmethod
def setup_multithreading(cls):
    """This function is only needed for multithreaded programs.  For those programs,
       you must call this function from the main thread, so it can properly set up
       your signals so that control-C will work properly to exit your program.
    """
    # https://stackoverflow.com/questions/17174001/stop-pyzmq-receiver-by-keyboardinterrupt
    # https://stackoverflow.com/questions/23206787/check-if-current-thread-is-main-thread-in-python
    if threading.current_thread() is threading.main_thread():
        signal.signal(signal.SIGINT, signal.SIG_DFL)

unenroll(self, plugin, domain, class_id)

Unenrollment the class_id

Parameters:

Name Type Description Default
plugin

the name of the plugin

required
domain

the name of the plugin domain

required
class_id

the name of the class (speaker) to enroll

required

Returns:

Type Description

True if enrollment successful

Source code in olivepy/api/oliveclient.py
def unenroll(self, plugin, domain, class_id):
    """
     Unenrollment the class_id

    :param plugin: the name of the plugin
    :param domain: the name of the plugin domain
    :param class_id: the name of the class (speaker) to enroll

    :return: True if enrollment successful
    """

    self.info = self.fullobj = None
    removal = ClassRemovalRequest()
    removal.plugin = plugin
    removal.domain = domain
    removal.class_id = class_id

    # Wrap message in an Envelope
    _, request = _wrap_message(self.client_id, removal)

    logging.debug("Sending a class modification request (removal) message")
    result = self._sync_request(request)
    # do something?
    return True

get_bit_depth(audio)

Not using since not assuming numpy is available...

Source code in olivepy/api/oliveclient.py
def get_bit_depth(audio):
    """Not using since not assuming numpy is available..."""
    # Numpy is needed to support this...
    dt = audio.dtype
    if dt == np.int8:
        return  BIT_DEPTH_8
    elif dt == np.int16:
        return  BIT_DEPTH_16
    elif dt == np.int32:
        return BIT_DEPTH_24
    else:
        return BIT_DEPTH_32

package_buffer_audio(audio, data, num_samples, sample_rate=8000, num_channels=1)

Helper function to wrap audio data (decoded samples) into a AudioBuffer message that can submitted to the server instead of a file name.

Parameters:

Name Type Description Default
data

the data as a numpy ndarray

required
num_samples

the number of samples

required
sample_rate

the audio sample rate

8000
num_channels

the number of channels in the audio

1

Returns:

Type Description
Source code in olivepy/api/oliveclient.py
def package_buffer_audio(audio, data, num_samples, sample_rate=8000, num_channels=1):
    """
    Helper function to wrap audio data (decoded samples) into a AudioBuffer message that can submitted to the
    server instead of a file name.

    :param data:  the data as a numpy ndarray
    :param num_samples:  the number of samples
    :param sample_rate: the audio sample rate
    :param num_channels: the number of channels in the audio
    :return:
    """

    # from scipy.io import wavfile
    # sample_rate, data = wavfile.read('somefilename.wav')

    buffer = audio.audioSamples
    buffer.channels = num_channels
    buffer.samples = num_samples  #data.shape[0]
    buffer.rate = sample_rate
    buffer.bit_depth = get_bit_depth(data)
    buffer.data = data.tostring()

    return audio

olivepy.api.olive_async_client

AsyncOliveClient (Thread)

This class is used to make asynchronous requests to the OLIVE server

__init__(self, client_id, address='localhost', request_port=5588, timeout_second=10) special

Parameters:

Name Type Description Default
client_id str

The unique name of this client. Due to a ZMQ bug this ID can not end in '1' on some systems

required
address

the address of the olive server, such as localhost

'localhost'
request_port

default olive port is 5588

5588
timeout_second

time in seconds, to wait for a response from the server

10
Source code in olivepy/api/olive_async_client.py
def __init__(self, client_id : str, address='localhost', request_port=5588, timeout_second=10):
    """
    :param client_id: The unique name of this client.  Due to a ZMQ bug this ID can not end in '1' on some systems
    :param address: the address of the olive server, such as localhost
    :param request_port: default olive port is 5588
    :param timeout_second:  time in seconds, to wait for a response from the server
    """
    threading.Thread.__init__(self)
    self.client_id = client_id

    # due to a ZMQ bug the last character of the client ID can not be 1, so remove it
    if client_id[-1] == "1":
        self.client_id = client_id[:-1]
        logging.warning("Last character of the client ID can not be '1', removing to avoid a ZMQ bug")

    self.server_address = address
    self.server_request_port = request_port
    self.server_status_port = request_port+1

    self.timeout_seconds = timeout_second

    self.request_queue = queue.Queue()
    # special queue used to emulate blocking requests
    # self.completed_sync_request_queue = queue.Queue()
    self.sync_message = {}
    self.response_queue = {}
    self.working = False
    self.request_socket = None
    self.status_socket = None
    # thread to monitor OLIVE server heartbeats
    self.worker = None

    # self.status_socket = context.socket(zmq.SUB)

    self.olive_connected = False
    self.monitor_status = False

    oc.OliveClient.setup_multithreading()

add_heartbeat_listener(self, heartbeat_callback)

Register a callback function to be notified when a heartbeat is received from the OLIVE server

Parameters:

Name Type Description Default
heartbeat_callback Callable[[olive_pb2.Heartbeat], NoneType]

The callback method that is notified each time a heartbeat message is received from the OLIVE server

required
Source code in olivepy/api/olive_async_client.py
def add_heartbeat_listener(self, heartbeat_callback: Callable[[Heartbeat], None]):
    """
    Register a callback function to be notified when a heartbeat is received from the OLIVE server

    :param heartbeat_callback: The callback method that is notified each time a heartbeat message is received \
    from the OLIVE server
    """
    if self.worker:
        self.worker.add_event_callback(heartbeat_callback)
    else:
        print("Unable to add a heartbeat listener because this client was not started with the status  "
              " heartbeat monitor enabled")

analyze_frames(self, plugin, domain, audio_input, callback, mode=<AudioTransferType.AUDIO_SERIALIZED: 3>, opts=None)

Request a analysis of 'filename', returning frame scores.

Parameters:

Name Type Description Default
plugin

the name of the plugin

required
domain

the name of the plugin domain

required
audio_input

the audio to score

required
callback Callable[[olivepy.messaging.response.OliveServerResponse], NoneType]

optional method called when the OLIVE server returns a response to this request. If a callback is not provided, this call blocks until a response is received from the OLIVE server. The callback method accepts one argument: OliveServerResponse

required
mode

the audio transfer mode

<AudioTransferType.AUDIO_SERIALIZED: 3>
opts

a dictionary of name/value pair options for this plugin request

None

Returns:

Type Description

a OliveServerResponse containing the status of the request (FrameScorerResult)

Source code in olivepy/api/olive_async_client.py
def analyze_frames(self, plugin, domain, audio_input, callback: Callable[[response.OliveServerResponse], None], mode=olivepy.messaging.msgutil.AudioTransferType.AUDIO_SERIALIZED, opts=None):
    """
     Request a analysis of 'filename', returning frame scores.

    :param plugin: the name of the plugin
    :param domain: the name of the plugin domain
    :param audio_input: the audio to score
    :param callback: optional method called when the OLIVE server returns a response to this request. \
    If a callback is not provided, this call blocks until a response is received from the OLIVE server.  \
    The callback method accepts one argument: OliveServerResponse
    :param mode: the audio transfer mode
    :param opts: a dictionary of name/value pair options for this plugin request

    :return: a OliveServerResponse containing the status of the request  (FrameScorerResult)
    """

    request = FrameScorerRequest()
    request.plugin = plugin
    request.domain = domain
    audio = request.audio
    olivepy.messaging.msgutil.package_audio(audio, audio_input, mode=mode)

    if callback:
        self.enqueue_request(request, callback)
    else:
        return self.sync_request(request)

analyze_global(self, plugin, domain, audio_input, callback, mode=<AudioTransferType.AUDIO_SERIALIZED: 3>)

Request a global score analysis of 'filename'

Parameters:

Name Type Description Default
plugin

the name of the plugin

required
domain

the name of the plugin domain

required
audio_input

the name of the audio file to score

required
callback Callable[[olivepy.messaging.response.OliveServerResponse], NoneType]

optional method called when the OLIVE server returns a response to the request. If a callback is not provided, this call blocks until a response is received from the OLIVE server. The callback method accepts one argument: OliveServerResponse

required
mode

the audio transfer mode

<AudioTransferType.AUDIO_SERIALIZED: 3>

Returns:

Type Description

a OliveServerResponse containing the status of the request (GlobalScorerResult)

Source code in olivepy/api/olive_async_client.py
def analyze_global(self, plugin, domain, audio_input, callback: Callable[[response.OliveServerResponse], None], mode=olivepy.messaging.msgutil.AudioTransferType.AUDIO_SERIALIZED):
    """
     Request a global score analysis of 'filename'

    :param plugin: the name of the plugin
    :param domain: the name of the plugin domain
    :param audio_input: the name of the audio file to score
    :param callback: optional method called when the OLIVE server returns a response to the request.  If a \
    callback is not provided, this call blocks until a response is received from the OLIVE server.  The callback \
    method accepts one argument: OliveServerResponse
    :param mode: the audio transfer mode

    :return: a OliveServerResponse containing the status of the request  (GlobalScorerResult)
    """

    self.info = self.fullobj = None
    request = GlobalScorerRequest()
    request.plugin = plugin
    request.domain = domain
    audio = request.audio
    olivepy.messaging.msgutil.package_audio(audio, audio_input, mode=mode)

    self.enqueue_request(request, callback)
    if callback:
        self.enqueue_request(request, callback)
    else:
        return self.sync_request(request)

analyze_regions(self, plugin, domain, filename, callback, mode=<AudioTransferType.AUDIO_SERIALIZED: 3>)

Request a analysis of 'filename', returning regions

Parameters:

Name Type Description Default
plugin

the name of the plugin

required
domain

the name of the plugin domain

required
filename

the name of the audio file to score

required
callback Callable[[olivepy.messaging.response.OliveServerResponse], NoneType]

optional method called when the OLIVE server returns a response to the request. If a callback is not provided, this call blocks until a response is received from the OLIVE server. The callback method accepts one argument: OliveServerResponse

required
mode

the audio transfer mode

<AudioTransferType.AUDIO_SERIALIZED: 3>

Returns:

Type Description

a OliveServerResponse containing the status of the request (RegionScorerResult)

Source code in olivepy/api/olive_async_client.py
def analyze_regions(self, plugin, domain, filename, callback: Callable[[response.OliveServerResponse], None], mode=olivepy.messaging.msgutil.AudioTransferType.AUDIO_SERIALIZED):
    """
     Request a analysis of 'filename', returning regions

    :param plugin: the name of the plugin
    :param domain: the name of the plugin domain
    :param filename: the name of the audio file to score
    :param callback: optional method called when the OLIVE server returns a response to the request.  If a \
    callback is not provided, this call blocks until a response is received from the OLIVE server.  The callback \
    method accepts one argument: OliveServerResponse
    :param mode: the audio transfer mode

    :return: a OliveServerResponse containing the status of the request  (RegionScorerResult)
    """

    request = RegionScorerRequest()
    request.plugin = plugin
    request.domain = domain
    audio = request.audio
    olivepy.messaging.msgutil.package_audio(audio, filename, mode=mode)

    if callback:
        self.enqueue_request(request, callback)
    else:
        return self.sync_request(request)

audio_modification(self, plugin, domain, audio_input, callback, mode=<AudioTransferType.AUDIO_SERIALIZED: 3>)

Used to make a AudioModificationRequest (enhancement).

Parameters:

Name Type Description Default
plugin

the name of the plugin

required
domain

the name of the plugin domain

required
audio_input

the audio path or buffer to submit for modification

required
callback Callable[[olivepy.messaging.response.OliveServerResponse], NoneType]

optional method called when the OLIVE server returns a response to the request. If a callback is not provided, this call blocks until a response is received from the OLIVE server. The callback method accepts one argument: OliveServerResponse

required
mode

the audio transfer mode

<AudioTransferType.AUDIO_SERIALIZED: 3>

Returns:

Type Description

a OliveServerResponse containing the status of the request (AudioModificationResult)

Source code in olivepy/api/olive_async_client.py
def audio_modification(self, plugin, domain, audio_input, callback: Callable[[response.OliveServerResponse], None], mode=olivepy.messaging.msgutil.AudioTransferType.AUDIO_SERIALIZED):
    """
    Used to make a AudioModificationRequest (enhancement).

    :param plugin: the name of the plugin
    :param domain: the name of the plugin domain
    :param audio_input: the audio path or buffer to submit for modification
    :param callback: optional method called when the OLIVE server returns a response to the request.  If a \
    callback is not provided, this call blocks until a response is received from the OLIVE server.  The callback \
    method accepts one argument: OliveServerResponse
    :param mode: the audio transfer mode
    :return: a OliveServerResponse containing the status of the request  (AudioModificationResult)
    """
    if mode != olivepy.messaging.msgutil.AudioTransferType.AUDIO_PATH:
        raise Exception('oliveclient.audio_modification requires an filename path and will not work with binary audio data.')
    request = AudioModificationRequest()
    request.plugin = plugin
    request.domain = domain
    request.requested_channels = 1
    request.requested_rate = 8000
    audio = Audio()
    olivepy.messaging.msgutil.package_audio(audio, audio_input, mode=mode)
    request.modifications.append(audio)

    if callback:
        self.enqueue_request(request, callback)
    else:
        return self.sync_request(request)

clear_heartbeat_listeners(self)

Remove all heartbeat listeners

Source code in olivepy/api/olive_async_client.py
def clear_heartbeat_listeners(self):
    """
    Remove all heartbeat listeners
    """
    if self.worker:
        self.worker.clear_callback()

connect(self, monitor_status=False)

Connect this client to the server

Parameters:

Name Type Description Default
monitor_server

if true, starts a thread to monitor the server status connection for heartbeat messages

required
Source code in olivepy/api/olive_async_client.py
def connect(self, monitor_status=False):
    """
    Connect this client to the server

    :param monitor_server: if true, starts a thread to monitor the server status connection for heartbeat messages
    """

    # logging.debug("Starting Olive async monitor...")
    self.monitor_status = monitor_status
    self.connection_done = threading.Event()
    self.start()
    # block until connected
    self.olive_connected = True
    self.connection_done.wait()


    logging.debug("Olive async client ready")

disconnect(self)

Closes the connection to the OLIVE server

Source code in olivepy/api/olive_async_client.py
def disconnect(self):
    """
    Closes the connection to the  OLIVE server
    """
    if self.worker:
        self.worker.stopWorker()
    self.working = False
    self.olive_connected = False
    self.join()
    self.request_socket.close()

enqueue_request(self, message, callback, wrapper=None)

Add a message request to the outbound queue

Parameters:

Name Type Description Default
message

the request message to send

required
callback

this is called when response message is received from the server

required
wrapper

the message wrapper

None
Source code in olivepy/api/olive_async_client.py
def enqueue_request(self, message, callback, wrapper=None):
    """
    Add a message request to the outbound queue

    :param message:  the request message to send
    :param callback: this is called when response message is received from the server
    :param wrapper: the message wrapper
    """

    if wrapper is None:
        wrapper = response.OliveServerResponse()
    self.request_queue.put((message, callback, wrapper))

enroll(self, plugin, domain, class_id, audio_input, callback, mode=<AudioTransferType.AUDIO_SERIALIZED: 3>)

Request a enrollment of 'audio'

Parameters:

Name Type Description Default
plugin

the name of the plugin

required
domain

the name of the plugin domain

required
class_id

the name of the class (i.e. speaker) to enroll

required
audio_input

the Audio message to add as an enrollment addition

required
callback Callable[[olivepy.messaging.response.OliveServerResponse], NoneType]

optional method called when the OLIVE server returns a response to the request. If a callback is not provided, this call blocks until a response is received from the OLIVE server. The callback method accepts one argument: OliveServerResponse

required
mode

the audio transfer mode

<AudioTransferType.AUDIO_SERIALIZED: 3>

Returns:

Type Description

a OliveServerResponse containing the status of the request (ClassModificationResult)

Source code in olivepy/api/olive_async_client.py
def enroll(self, plugin, domain, class_id, audio_input, callback: Callable[[response.OliveServerResponse], None], mode=olivepy.messaging.msgutil.AudioTransferType.AUDIO_SERIALIZED):
    """
     Request a enrollment of 'audio'

    :param plugin: the name of the plugin
    :param domain: the name of the plugin domain
    :param class_id: the name of the class (i.e. speaker) to enroll
    :param audio_input: the Audio message to add as an enrollment addition
    :param callback: optional method called when the OLIVE server returns a response to the request.  If a \
    callback is not provided, this call blocks until a response is received from the OLIVE server.  The callback \
    method accepts one argument: OliveServerResponse
    :param mode: the audio transfer mode

    :return: a OliveServerResponse containing the status of the request  (ClassModificationResult)
    """

    enrollment = ClassModificationRequest()
    enrollment.plugin = plugin
    enrollment.domain = domain
    enrollment.class_id = class_id
    enrollment.finalize = True
    audio = Audio()
    olivepy.messaging.msgutil.package_audio(audio, audio_input, mode=mode)
    enrollment.addition.append(audio)

    if callback:
        self.enqueue_request(enrollment, callback)
    else:
        return self.sync_request(enrollment)

get_active(self, callback)

Used to make a GetActiveRequest

Parameters:

Name Type Description Default
callback Callable[[olivepy.messaging.response.OliveServerResponse], NoneType]

optional method called when the OLIVE server returns a response to this request. If a callback is not provided, this call blocks until a response is received from the OLIVE server. The callback method accepts one argument: OliveServerResponse

required

Returns:

Type Description

a OliveServerResponse containing the status of the request (GetActiveResult)

Source code in olivepy/api/olive_async_client.py
def get_active(self, callback: Callable[[response.OliveServerResponse], None]):
    """
    Used to make a GetActiveRequest

    :param callback: optional method called when the OLIVE server returns a response to this request. \
    If a callback is not provided, this call blocks until a response is received from the OLIVE server.  \
    The callback method accepts one argument: OliveServerResponse

    :return: a OliveServerResponse containing the status of the request  (GetActiveResult)
    """
    request = GetActiveRequest()

    if callback:
        self.enqueue_request(request, callback)
    else:
        return self.sync_request(request)

get_status(self, callback=None)

Used to make a GetStatusRequest and receive a GetStatusResult

Parameters:

Name Type Description Default
callback Callable[[olivepy.messaging.response.OliveServerResponse], NoneType]

optional method called when the OLIVE server returns a response to the request. If a callback is not provided, this call blocks until a response is received from the OLIVE server. The callback method accepts one argument: OliveServerResponse

None

Returns:

Type Description

a OliveServerResponse that contains the most recent server status (GetStatusResult)

Source code in olivepy/api/olive_async_client.py
def get_status(self, callback: Callable[[response.OliveServerResponse], None] = None):
    """
    Used to make a GetStatusRequest and receive a GetStatusResult

    :param callback: optional method called when the OLIVE server returns a response to the request.  If a callback is not provided, this call blocks until a response is received from the OLIVE server.  The callback method accepts one argument: OliveServerResponse

    :return: a OliveServerResponse that contains the most recent server status (GetStatusResult)
    """
    request = GetStatusRequest()
    if callback:
        self.enqueue_request(request, callback)
    else:
        return self.sync_request(request)

get_update_status(self, plugin, domain, callback=None)

Used to make a GetUpdateStatusRequest

Parameters:

Name Type Description Default
plugin

the name of the plugin to query

required
domain

the name of the domain to query

required
callback Callable[[olivepy.messaging.response.OliveServerResponse], NoneType]

optional method called when the OLIVE server returns a response to this request. If a callback is not provided, this call blocks until a response is received from the OLIVE server. The callback method accepts one argument: OliveServerResponse

None

Returns:

Type Description

a OliveServerResponse containing the update status of the requested plugin/domain (GetUpdateStatusResult

Source code in olivepy/api/olive_async_client.py
def get_update_status(self, plugin, domain, callback: Callable[[response.OliveServerResponse], None] = None):
    """
    Used to make a GetUpdateStatusRequest

    :param plugin: the name of the plugin to query
    :param domain: the name of the domain to query
    :param callback: optional method called when the OLIVE server returns a response to this request. \
    If a callback is not provided, this call blocks until a response is received from the OLIVE server.  \
    The callback method accepts one argument: OliveServerResponse

    :return: a OliveServerResponse containing the update status of the requested plugin/domain  (GetUpdateStatusResult
    """
    request = GetUpdateStatusRequest()
    request.plugin = plugin
    request.domain = domain

    if callback:
        self.enqueue_request(request, callback)
    else:
        return self.sync_request(request)

is_connected(self)

Status of the connection to the OLIVE server

Returns:

Type Description

True if connected

Source code in olivepy/api/olive_async_client.py
def is_connected(self):
    """
    Status of the connection to the OLIVE server

    :return: True if connected
    """
    return self.olive_connected

load_plugin_domain(self, plugin, domain, callback)

Used to make a request to pre-load a plugin/domain (via a LoadPluginDomainRequest message)

Parameters:

Name Type Description Default
plugin

the name of the plugin to pre-load

required
domain

the name of hte domain to pre-load

required
callback Callable[[olivepy.messaging.response.OliveServerResponse], NoneType]

optional method called when the OLIVE server returns a response to this request. If a callback is not provided, this call blocks until a response is received from the OLIVE server. The callback method accepts one argument: OliveServerResponse

required

Returns:

Type Description

a OliveServerResponse containing the update status of the request (LoadPluginDomainResult)

Source code in olivepy/api/olive_async_client.py
def load_plugin_domain(self, plugin, domain, callback: Callable[[response.OliveServerResponse], None]):
    """
    Used to make a request to pre-load a plugin/domain (via a LoadPluginDomainRequest message)

    :param plugin: the name of the plugin to pre-load
    :param domain: the name of hte domain to pre-load
    :param callback: optional method called when the OLIVE server returns a response to this request. \
    If a callback is not provided, this call blocks until a response is received from the OLIVE server.  \
    The callback method accepts one argument: OliveServerResponse

    :return: a OliveServerResponse containing the update status of the request  (LoadPluginDomainResult)

    """
    request = LoadPluginDomainRequest()
    request.plugin = plugin
    request.domain = domain

    if callback:
        self.enqueue_request(request, callback)
    else:
        return self.sync_request(request)

request_plugins(self, callback=None)

Used to make a PluginDirectoryRequest

Parameters:

Name Type Description Default
callback Callable[[olivepy.messaging.response.OliveServerResponse], NoneType]

optional method called when the OLIVE server returns a response to this request. If a callback is not provided, this call blocks until a response is received from the OLIVE server. The callback method accepts one argument: OliveServerResponse

None

Returns:

Type Description

a OliveServerResponse containing information about available plugin/domains (PluginDirectoryResult)

Source code in olivepy/api/olive_async_client.py
def request_plugins(self, callback: Callable[[response.OliveServerResponse], None] = None):
    """
    Used to make a PluginDirectoryRequest

    :param callback: optional method called when the OLIVE server returns a response to this request. \
    If a callback is not provided, this call blocks until a response is received from the OLIVE server.  \
    The callback method accepts one argument: OliveServerResponse

    :return: a OliveServerResponse containing information about available plugin/domains (PluginDirectoryResult)
    """
    request = PluginDirectoryRequest()
    if callback:
        self.enqueue_request(request, callback)
    else:
        return self.sync_request(request)

run(self)

Starts the thread to handle async messages

Source code in olivepy/api/olive_async_client.py
def run(self):
    """
    Starts the thread to handle async messages
    """
    try:
        logging.debug("Starting OLIVE Async Message Worker for id: {}".format(self.client_id))

        context = zmq.Context()
        self.request_socket = context.socket(zmq.DEALER)

        # init the request and status socket
        request_addr = "tcp://" + self.server_address + ":" + str(self.server_request_port)
        status_addr = "tcp://" + self.server_address + ":" + str(self.server_status_port)
        self.request_socket.connect(request_addr)

        if self.monitor_status:
            logging.debug("connecting to status socket...")
            self.status_socket = context.socket(zmq.SUB)
            self.status_socket.connect(status_addr)
            self.worker = ClientMonitorThread(self.status_socket, self.client_id)
            self.worker.start()
        else:
            self.worker = None

        self.working = True

        poller = zmq.Poller()
        poller.register(self.request_socket, zmq.POLLIN)
    except Exception as e:
        logging.error("Error connecting to the OLIVE server: {}".format(e))
        self.olive_connected = False
    finally:
        self.connection_done.set()

    while self.working:
        # First, send any client requests
        while not self.request_queue.empty():
            request_msg, cb, wrapper = self.request_queue.get()
            msg_id, env = msgutil._wrap_message(self.client_id, request_msg)
            # Add to our callback Q
            self.response_queue[msg_id] = (request_msg, cb, wrapper)
            # Now send the message
            logging.debug("Sending client request msg type: {}".format(env.message[0].message_type))
            self.request_socket.send(env.SerializeToString())

        # Now check for any results from the server
        # logging.info("checking for response")
        socks = dict(poller.poll(BLOCK_TIMEOUT_MS))
        if self.request_socket in socks:
            # logging.info("Received message from OLIVE...")
            protobuf_data = self.request_socket.recv()
            envelope = Envelope()
            envelope.ParseFromString(protobuf_data)

            for i in range(len(envelope.message)):
                self._process_response(envelope.message[i])


    poller.unregister(self.request_socket)
    self.request_socket.close()
    # todo what do we do with the status socket?
    # self.status_socket.close()
    # print("Async client worker stopped")

setup_multithreading() classmethod

This function is only needed for multithreaded programs. For those programs, you must call this function from the main thread, so it can properly set up your signals so that control-C will work properly to exit your program.

Source code in olivepy/api/olive_async_client.py
@classmethod
def setup_multithreading(cls):
    '''This function is only needed for multithreaded programs.  For those programs,
       you must call this function from the main thread, so it can properly set up
       your signals so that control-C will work properly to exit your program.
    '''
    # https://stackoverflow.com/questions/17174001/stop-pyzmq-receiver-by-keyboardinterrupt
    # https://stackoverflow.com/questions/23206787/check-if-current-thread-is-main-thread-in-python
    if threading.current_thread() is threading.main_thread():
        signal.signal(signal.SIGINT, signal.SIG_DFL)

sync_request(self, message, wrapper=None)

Send a request to the OLIVE server, but wait for a response from the server

Parameters:

Name Type Description Default
message

the request message to send to the OLIVE server

required

Returns:

Type Description

the response from the server

Source code in olivepy/api/olive_async_client.py
def sync_request(self, message, wrapper=None):
    """
    Send a request to the OLIVE server, but wait for a response from the server

    :param message: the request message to send to the OLIVE server

    :return: the response from the server
    """

    if wrapper is None:
        wrapper = response.OliveServerResponse()

    # create an ID for this sync_request
    sync_id = msgutil.get_uuid()
    result_available = threading.Event()
    # result_event = None

    cb = lambda response: self._sync_callback(response, sync_id, result_available)

    self.enqueue_request(message, cb, wrapper)

    result_available.wait()
    # get the result
    if sync_id in self.sync_message:
        return self.sync_message.pop(sync_id)
    else:
        # unexpected.... callback event completed with no result
        raise Exception("Error waiting for a response from the server")


    # self.completed_sync_request_queue.put()

unenroll(self, plugin, domain, class_id, callback)

Unenroll class_id

Parameters:

Name Type Description Default
plugin

the name of the plugin

required
domain

the name of the plugin domain

required
class_id

the name of the class (i.e. speaker) to remove

required
callback Callable[[olivepy.messaging.response.OliveServerResponse], NoneType]

optional method called when the OLIVE server returns a response to the request. If a callback is not provided, this call blocks until a response is received from the OLIVE server. The callback method accepts one argument: OliveServerResponse

required

Returns:

Type Description

a OliveServerResponse containing the status of the request (ClassRemovalResult)

Source code in olivepy/api/olive_async_client.py
def unenroll(self, plugin, domain, class_id, callback: Callable[[response.OliveServerResponse], None]):
    """
     Unenroll class_id

    :param plugin: the name of the plugin
    :param domain: the name of the plugin domain
    :param class_id: the name of the class (i.e. speaker) to remove
    :param callback: optional method called when the OLIVE server returns a response to the request.  If a \
    callback is not provided, this call blocks until a response is received from the OLIVE server.  The callback \
    method accepts one argument: OliveServerResponse

    :return: a OliveServerResponse containing the status of the request  (ClassRemovalResult)

    """

    removal = ClassRemovalRequest()
    removal.plugin = plugin
    removal.domain = domain
    removal.class_id = class_id

    if callback:
        self.enqueue_request(removal, callback)
    else:
        return self.sync_request(removal)

unload_plugin_domain(self, plugin, domain, callback)

Used to make a unload plugin/domain request (RemovePluginDomainRequest). This request will un-load a loaded plugin from server memory)

Parameters:

Name Type Description Default
plugin

the name of the plugin to unload

required
domain

the name of hte domain to unload

required
callback Callable[[olivepy.messaging.response.OliveServerResponse], NoneType]

optional method called when the OLIVE server returns a response to this request. If a callback is not provided, this call blocks until a response is received from the OLIVE server. The callback method accepts one argument: OliveServerResponse

required

Returns:

Type Description

a OliveServerResponse containing the status of the request (RemovePluginDomainResult)

Source code in olivepy/api/olive_async_client.py
def unload_plugin_domain(self, plugin, domain, callback: Callable[[response.OliveServerResponse], None]):
    """
    Used to make a unload plugin/domain request (RemovePluginDomainRequest).  This request will un-load a loaded \
    plugin from server memory)

    :param plugin: the name of the plugin to unload
    :param domain: the name of hte domain to unload
    :param callback: optional method called when the OLIVE server returns a response to this request. \
    If a callback is not provided, this call blocks until a response is received from the OLIVE server.  \
    The callback method accepts one argument: OliveServerResponse

    :return: a OliveServerResponse containing the status of the request  (RemovePluginDomainResult)
    """
    request = RemovePluginDomainRequest()
    request.plugin = plugin.strip()
    request.domain = domain.strip()

    if callback:
        self.enqueue_request(request, callback)
    else:
        return self.sync_request(request)

update_plugin_domain(self, plugin, domain, metadata, callback)

Used to make a ApplyUpdateRequest

Parameters:

Name Type Description Default
plugin

the name of the plugin to update

required
domain

the name of hte domain to update

required
callback Callable[[olivepy.messaging.response.OliveServerResponse], NoneType]

optional method called when the OLIVE server returns a response to this request. If a callback is not provided, this call blocks until a response is received from the OLIVE server. The callback method accepts one argument: OliveServerResponse

required

Returns:

Type Description

a OliveServerResponse containing the status of the request (ApplyUpdateResult)

Source code in olivepy/api/olive_async_client.py
def update_plugin_domain(self, plugin, domain, metadata, callback: Callable[[response.OliveServerResponse], None]):
    """
    Used to make a ApplyUpdateRequest

    :param plugin: the name of the plugin to update
    :param domain: the name of hte domain to update
    :param callback: optional method called when the OLIVE server returns a response to this request. \
    If a callback is not provided, this call blocks until a response is received from the OLIVE server.  \
    The callback method accepts one argument: OliveServerResponse

    :return: a OliveServerResponse containing the status of the request  (ApplyUpdateResult)
    """
    request = ApplyUpdateRequest()
    request.plugin = plugin
    request.domain = domain

    mds = request.params
    for key, item in metadata:
        md = Metadata()
        md.name = key
        if isinstance(item, str):
            md.type = 1
        elif isinstance(item, int):
            md.type = 2
        elif isinstance(item, float):
            md.type = 3
        elif isinstance(item, bool):
            md.type = 4
        elif isinstance(item, list):
            md.type = 5
        else:
            raise Exception('Metadata {} had a {} type that was not str, int, float, bool, or list.'
                            .format(key, str(type(item))))
        md.value = item
        mds.append(md)

    if callback:
        self.enqueue_request(request, callback)
    else:
        return self.sync_request(request)

ClientMonitorThread (Thread)

Helper used to monitor the status of the Oliveserver

add_event_callback(self, callback)

Callback function that is notified of a heartbeat

Parameters:

Name Type Description Default
callback Callable[[olive_pb2.Heartbeat], NoneType]

the function that is called with a Heartbeat object

required
Source code in olivepy/api/olive_async_client.py
def add_event_callback(self, callback: Callable[[Heartbeat], None]):
    """
    Callback function that is notified of a heartbeat

    :param callback: the function that is called with a Heartbeat object
    """
    self.event_callback.append(callback)

run(self)

Method representing the thread's activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object's constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

Source code in olivepy/api/olive_async_client.py
def run(self):
    # print("Starting Olive Status Monitor  for id: {}".format(self.client_id))

    self.working = True
    self.status_socket.subscribe("")

    poller = zmq.Poller()
    poller.register(self.status_socket, zmq.POLLIN)
    last_heartbeat = time.time()

    while self.working:

        # Now check for any results from the server
        # logging.info("checking for response")
        socks = dict(poller.poll(BLOCK_TIMEOUT_MS))
        if self.status_socket in socks:
            last_heartbeat = time.time()
            # print("Received status message from OLIVE...")
            heatbeat_data = self.status_socket.recv()
            heatbeat = Heartbeat()
            heatbeat.ParseFromString(heatbeat_data)

            for cb in self.event_callback:
                cb(heatbeat)
        else:
            # Consider using the same timeout for messages?
            if time.time() - last_heartbeat > HEARTBEAT_TIMEOUT_SECONDS:
                print("heartbeat timeout")
                # it has been too long since a heatbeat message was received from the server... assume there server is down
                for cb in self.event_callback:
                    cb(None)

    self.status_socket.close()

olivepy.api.workflow

OliveWorkflow

An OliveWorkflow instance represents a Workflow Definition actualized by an OLIVE server. Once actualized, an OliveWorkflow instance is used to make analysis, or enrollment/unenrollment requests. An OliveWorkflow should be created using an OliveWorkflowDefinition's create_workflow() method. All calls to the server include an optional callback. When the callback is provided, the call does not block and the callback method is invoked when a response is received from the server. A callback method has 3 arguments: the original request, the response, and an error message if the request failed.

Exceptions:

Type Description
WorkflowException

If the workflow was not actualized

__init__(self, olive_async_client, actualized_workflow) special

Parameters:

Name Type Description Default
olive_async_client AsyncOliveClient

the client connection to the OLIVE server

required
actualized_workflow OliveWorkflowActualizedResponse

An OliveWorkflowDefinition actualized by the server

required
Source code in olivepy/api/workflow.py
def __init__(self, olive_async_client: AsyncOliveClient,
             actualized_workflow: response.OliveWorkflowActualizedResponse):
    """
    :param olive_async_client:  the client connection to the OLIVE server
    :param actualized_workflow: An OliveWorkflowDefinition actualized by the server
    """

    self.client = olive_async_client
    self.workflow_response = actualized_workflow
    actualized_workflow_definition = actualized_workflow.get_workflow()
    # make sure an OLIvE server has actualized this workflow
    if not actualized_workflow_definition.actualized:
        raise WorkflowException("Error: Can not create an OliveWorkflow using  a Workflow Definition that has not "
                                "been actualized by an OLIVE server")

    self.workflow_def = actualized_workflow_definition

    # note: enrollment and adapt should only have one task/job
    # but there could be multiple plugins/task that could support enrollment or adaptation.. so we focus on
    # analysis

adapt(self, data_input, callback, options=None, finalize=True)

NOT YET SUPPORTED -- and not sure it will ever be supported via workflow

Parameters:

Name Type Description Default
data_input required
callback required
options None
finalize True

Returns:

Type Description

not supported

Source code in olivepy/api/workflow.py
def adapt(self, data_input, callback, options=None, finalize=True):
    """
    NOT YET SUPPORTED -- and not sure it will ever be supported via workflow

    :param data_input:
    :param callback:
    :param options:
    :param finalize:

    :return: not supported
    """
    raise Exception("Workflow adaption not supported")

analyze(self, data_inputs, callback=None, options=None)

Perform a workflow analysis

Parameters:

Name Type Description Default
data_inputs List[olive_pb2.WorkflowDataRequest]

a list of data inputs created using the package_audio(), package_text(), package_image(), or package_video() method.

required
callback

an optional callback that is invoked with the workflow completes. If not specified this method blocks, returning OliveWorkflowAnalysisResponse when done. Otherwise this method immediately returns and the callback method is invoked when the response is received. The callback method signature requires 3 arguments: requst, result, error_mssage.

None
options str

a JSON string of name/value options to include with the analysis request such as '{"filter_length":99, "interpolate":1.0, "test_name":"midge"}'

None

Returns:

Type Description
OliveWorkflowAnalysisResponse

an OliveWorkflowAnalysisResponse (if no callback provided)

Source code in olivepy/api/workflow.py
def analyze(self, data_inputs: List[WorkflowDataRequest],
            callback=None,
            options: str = None) -> response.OliveWorkflowAnalysisResponse:
    """
    Perform a workflow analysis

    :param data_inputs:  a list of data inputs created using the package_audio(), package_text(), package_image(), or package_video() method.
    :param callback: an optional callback that is invoked with the workflow completes.  If not specified this method blocks, returning OliveWorkflowAnalysisResponse when done. Otherwise this method immediately returns and the callback method is invoked when the response is received.  The callback method signature requires 3 arguments: requst, result, error_mssage.
    :param options: a JSON string of name/value options to include with the analysis request such as '{"filter_length":99, "interpolate":1.0, "test_name":"midge"}'

    :return: an OliveWorkflowAnalysisResponse (if no callback provided)
    """

    # make call blocking if no callback or always assume it is async?
    analysis_request = WorkflowAnalysisRequest()
    for di in data_inputs:
        analysis_request.workflow_data_input.append(di)
    analysis_request.workflow_definition.CopyFrom(self.workflow_def)

    # Parse options (if any)
    if options:
        jopts = utils.parse_json_options(options)
        analysis_request.option.extend(jopts)

    if callback:
        self.client.enqueue_request(analysis_request, callback, response.OliveWorkflowAnalysisResponse())
    else:
        return self.client.sync_request(analysis_request, response.OliveWorkflowAnalysisResponse())

enroll(self, data_inputs, class_id, job_names, callback=None, options=None)

Submit data for enrollment.

Parameters:

Name Type Description Default
data_inputs List[olive_pb2.WorkflowDataRequest]

a list of data inputs created using the package_audio(), package_text(), package_image(), or package_video() method.

required
class_id str

the name of the enrollment

required
job_names List[str]

a list of job names, where the audio is enrolled with these jobs support enrollment. This value can be None, in which case the data input(s) is enrolled for each job.

required
callback

an optional callback that is invoked when the workflow completes. If not specified this method blocks, returning an OliveWorkflowAnalysisResponse when the enrollment completes on the server. Otherwise this method immediately returns and the callback method is invoked when the response is received.

None
options

a dictionary of name/value option pairs to include with the enrollment request

None

Returns:

Type Description

server enrollment response if no callback provided

Source code in olivepy/api/workflow.py
def enroll(self, data_inputs: List[WorkflowDataRequest], class_id: str, job_names: List[str], callback=None,
           options=None):
    """
    Submit data for enrollment.

    :param data_inputs:  a list of data inputs created using the package_audio(), package_text(), package_image(), or package_video() method.
    :param class_id:  the name of the enrollment
    :param job_names: a list of job names, where the audio is enrolled with these jobs support enrollment.  This value can be None, in which case the data input(s) is enrolled for each job.
    :param callback: an optional callback that is invoked when the workflow completes.  If not specified this method blocks, returning an OliveWorkflowAnalysisResponse when the enrollment completes on the server.  Otherwise this method immediately returns and the callback method is invoked when the response is received.
    :param options: a dictionary of name/value option pairs to include with the enrollment request

    :return: server enrollment response if no callback provided
    """
    # # first, get the enrollment order
    # for order in self.workflow_def.order:
    #     if order.workflow_type == WORKFLOW_ENROLLMENT_TYPE:
    #         workflow_enrollment_order_msg = order
    #         break
    #
    # if workflow_enrollment_order_msg is None:
    #     raise Exception("This workflow does not contain any ")
    #
    #
    # for name in task_names:

    # make call blocking if no callback or always assume it is async?
    enroll_request = WorkflowEnrollRequest()
    for di in data_inputs:
        enroll_request.workflow_data_input.append(di)
    enroll_request.workflow_definition.CopyFrom(self.workflow_def)
    enroll_request.class_id = class_id

    for job_task in job_names:
        enroll_request.job_names.append(job_task)

    if options:
        jopts = utils.parse_json_options(options)
        enroll_request.option.extend(jopts)

    if callback:
        # self.client.enqueue_request(enroll_request, callback, response.OliveWorkflowEnrollmentResponse())
        self.client.enqueue_request(enroll_request, callback, response.OliveWorkflowAnalysisResponse())
    else:
        return self.client.sync_request(enroll_request, response.OliveWorkflowAnalysisResponse())

get_analysis_class_ids(self, type=1, callback=None)

Query OLIVE for the current class IDs (i.e. speaker names for SID, keywords for QbE, etc). For tasks that support enrollment, their class IDs can change over time.

Parameters:

Name Type Description Default
callback

an optional callback method that accepts a OliveClassStatusResponse object. Such as: my_callback(result : response.OliveClassStatusResponse)

None

Returns:

Type Description
OliveClassStatusResponse

an OliveClassStatusResponse object if no callback specified, otherwise the callback receives the OliveClassStatusResponse object when a response is received from the OLIVE server

Source code in olivepy/api/workflow.py
def get_analysis_class_ids(self, type=WORKFLOW_ANALYSIS_TYPE, callback=None) -> response.OliveClassStatusResponse:
    """
    Query OLIVE for the current class IDs (i.e. speaker names for SID, keywords for QbE, etc).  For tasks that support enrollment, their class IDs can change over time.

    :param type the WorkflowOrder type (WORKFLOW_ANALYSIS_TYPE, WORKFLOW_ENROLLMENT_TYPE, or WORKFLOW_UNENROLLMENT_TYPE)
    :param callback: an optional callback method that accepts a OliveClassStatusResponse object.  Such as: my_callback(result : response.OliveClassStatusResponse)

    :return: an OliveClassStatusResponse object if no callback specified, otherwise the callback receives the OliveClassStatusResponse object when a response is received from the OLIVE server
    """

    class_request = WorkflowClassStatusRequest()
    class_request.workflow_definition.CopyFrom(self.workflow_def)
    if type:
        class_request.type = type

    if callback:
        self.client.enqueue_request(class_request, callback, response.OliveClassStatusResponse())
    else:
        return self.client.sync_request(class_request, response.OliveClassStatusResponse())

get_analysis_job_names(self)

The names of analysis jobs in this workflow (usually only one analysis job)

Returns:

Type Description
List[str]

A list of analysis job names in this workflow

Source code in olivepy/api/workflow.py
def get_analysis_job_names(self) -> List[str]:
    """
    The names of analysis jobs in this workflow (usually only one analysis job)

    :return: A list of analysis job names in this workflow
    """
    return response.get_workflow_job_names(self.workflow_def, WORKFLOW_ANALYSIS_TYPE)

get_analysis_task_info(self)

A JSON like report of the tasks used for analysis from the actualized workflow. When possible, this report includes the plugins used in the workflow (although there can be cases when the final plugin/domain used is not known until runtime)

Returns:

Type Description
List[Dict[str, Dict]]

JSON structured detailed information of analysis tasks used in this workflow

Source code in olivepy/api/workflow.py
def get_analysis_task_info(self) -> List[Dict[str, Dict]]:
    """
    A JSON like report of the tasks used for analysis from the actualized workflow.  When possible, this report \
    includes the plugins used in the workflow (although there can be cases when the final plugin/domain used is \
    not known until runtime)

    :return: JSON structured detailed information of analysis tasks used in this workflow
    """
    # return [task.consumer_result_label for task in analysis_jobs[job_name]]
    return self.workflow_response.to_json(indent=1)

get_analysis_tasks(self, job_name=None)

Return a list of tasks supported by this workflow. These names are unique and can generally be assumed they are named after the task type (SAD, LID, SID, etc) they support but they could use alternate names if there are multiple tasks with the same task type in a workflow (for example a workflow could have a SAD task that does frame scoring and a SAD task that does regions scoring)

Parameters:

Name Type Description Default
job_name str

filter the returned task names to those belonging to this job name. Optional since most workflows only support one analysis job.

None

Returns:

Type Description
List[str]

a list of task names

Source code in olivepy/api/workflow.py
def get_analysis_tasks(self, job_name: str = None) -> List[str]:
    """
    Return a list of tasks supported by this workflow. These names are unique and can generally be assumed they are named after the task type (SAD, LID, SID, etc) they support but they could use alternate names if there are multiple tasks with the same task type in a workflow (for example a workflow could have a SAD task that does frame scoring and a SAD task that does regions scoring)

    :param job_name: filter the returned task names to those belonging to this job name.  Optional since most workflows only support one analysis job.

    :return: a list of task names
    """
    analysis_jobs = response.get_workflow_jobs(self.workflow_def, WORKFLOW_ANALYSIS_TYPE)

    # better to exception or empty dict????
    if len(analysis_jobs) == 0:
        return None

    if job_name is not None:
        if job_name not in analysis_jobs:
            return None
    else:
        # get the default job name
        job_name = list(analysis_jobs.keys())[0]

    return [task.consumer_result_label for task in analysis_jobs[job_name]]

get_enrollment_job_names(self)

The names of enrollment jobs in this workflow. There should be one enrollment job for each analysis tasks that supports class enrollment

Returns:

Type Description
List[str]

A list of enrollment job names in this workflow

Source code in olivepy/api/workflow.py
def get_enrollment_job_names(self) -> List[str]:
    """
    The names of enrollment jobs in this workflow.  There should be one enrollment job for each analysis tasks that supports class enrollment

    :return: A list of enrollment job names in this workflow
    """
    return response.get_workflow_job_names(self.workflow_def, WORKFLOW_ENROLLMENT_TYPE)

get_enrollment_tasks(self, job_name=None, type=2)

Return a list of tasks that support enrollment in this workflow.

Parameters:

Name Type Description Default
job_name str

optionally the name of the enrollment job. Optional since most workflows only support one job

None

Returns:

Type Description
List[str]

a list of task names

Source code in olivepy/api/workflow.py
def get_enrollment_tasks(self, job_name: str = None, type=WORKFLOW_ENROLLMENT_TYPE) -> List[str]:
    """
    Return a list of tasks that support enrollment in this workflow.

    :param job_name: optionally the name of the enrollment job.  Optional since most workflows only support one job

    :return: a list of task names
    """
    enrollment_jobs = response.get_workflow_jobs(self.workflow_def, type)
    if len(enrollment_jobs) == 0:
        return None

    if job_name is not None:
        if job_name not in enrollment_jobs:
            return None

    # normally (and currently the only supported option) should be just one enrollment_job...
    return list(response.get_workflow_job_tasks(enrollment_jobs, job_name).keys())

get_unenrollment_job_names(self)

The names of un-enrollment jobs in this workflow. There should be one un-enrollment job for each analysis task that supports class un-enrollment

Returns:

Type Description
List[str]

A list of un-enrollment job names in this workflow

Source code in olivepy/api/workflow.py
def get_unenrollment_job_names(self) -> List[str]:
    """
    The names of un-enrollment jobs in this workflow.  There should be one un-enrollment job for each analysis task that supports class un-enrollment

    :return: A list of un-enrollment job names in this workflow
    """
    return response.get_workflow_job_names(self.workflow_def, WORKFLOW_UNENROLLMENT_TYPE)

get_unenrollment_tasks(self, job_name=None)

Return a list of tasks that support UNenrollment in this workflow.

Parameters:

Name Type Description Default
job_name str

optionally the name of the enrollment job. Optional since most workflows only support one job

None

Returns:

Type Description
List[str]

a list of task names

Source code in olivepy/api/workflow.py
def get_unenrollment_tasks(self, job_name: str = None) -> List[str]:
    """
    Return a list of tasks that support UNenrollment in this workflow.

    :param job_name: optionally the name of the enrollment job.  Optional since most workflows only support one job

    :return: a list of task names
    """
    return self.get_enrollment_tasks(job_name, type=WORKFLOW_UNENROLLMENT_TYPE)

package_audio(self, audio_data, mode=<InputTransferType.SERIALIZED: 3>, annotations=None, task_annotations=None, selected_channel=None, num_channels=None, sample_rate=None, num_samples=None, validate_local_path=True, label=None)

Creates an Audio object that can be submitted with a Workflow analysis, enrollment, or adapt request.

Parameters:

Name Type Description Default
audio_data ~AnyStr

the input data is a string (file path) if mode is 'AUDIO_PATH', otherwise the input data is a binary buffer. Use serialize_audio() to serialize a file into a buffer, or pass in a list of PCM_16 encoded samples

required
mode

specifies how the audio is sent to the server: either as (string) file path or as a binary buffer. NOTE: if sending a path, the path must be valid for the server.

<InputTransferType.SERIALIZED: 3>
annotations List[Tuple[float, float]]

optional regions (start/end regions in seconds) as a list of tuples (start_seconds, end_seconds)

None
task_annotations Dict[str, Dict[str, List[Tuple[float, float]]]]

optional and more regions (start/end regions in seconds) targeted for a task and classifed by a lable (such as speech, non-speech, speaker). For example: {'SHL': {'speaker'':[(0.5, 4.5), (6.8, 9.2)]}, are annotations for the 'SHL' task, which are labeled as class 'speaker' having regions 0.5 to 4.5, and 6.8 to 9.2. Use get_analysis_tasks() to get the name of workflow tasks .

None
selected_channel int

optional - the channel to process if using multi-channel audio

None
num_channels int

The number of channels if audio input is a list of decoded (PCM-16) samples, if not using a buffer of PCM-16 samples this is value is ignored

None
sample_rate int

The sample rate if audio input is a list of decoded (PCM-16) samples, if not using a buffer of PCM-16 samples this is value is ignored

None
num_samples int

The number of samples if audio input is a list of decoded (PCM-16) samples, if not using a buffer of PCM-16 samples this is value is ignored

None
validate_local_path bool

If sending audio as as a string path name, then check that the path exists on the local filesystem. In some cases you may want to pass a path which is valid on the server but not this client so validation is not desired

True
label

an optional name to use with the audio

None

Returns:

Type Description
WorkflowDataRequest

A populated WorkflowDataRequest to use in a workflow activity

Source code in olivepy/api/workflow.py
def package_audio(self, audio_data: AnyStr,
                  mode=olivepy.messaging.msgutil.InputTransferType.SERIALIZED,
                  annotations: List[Tuple[float, float]] = None,
                  task_annotations: Dict[str, Dict[str, List[Tuple[float, float]]]] = None,
                  selected_channel: int = None,
                  num_channels: int = None,
                  sample_rate: int = None,
                  num_samples: int = None,
                  validate_local_path: bool = True,
                  label=None) -> WorkflowDataRequest:
    """

    Creates an Audio object that can be submitted with a Workflow analysis, enrollment, or adapt request.

    :param audio_data: the input data is  a string (file path) if mode is 'AUDIO_PATH', otherwise the input data is a binary buffer.  Use serialize_audio() to serialize a file into a buffer, or pass in a list of PCM_16 encoded samples
    :param mode: specifies how the audio is sent to the server: either as (string) file path or as a binary buffer.  NOTE: if sending a path, the path must be valid for the server.
    :param annotations: optional regions (start/end regions in seconds) as a list of tuples (start_seconds, end_seconds)
    :param task_annotations: optional and more regions (start/end regions in seconds) targeted for a task and classifed by a lable (such as speech, non-speech, speaker).  For example: {'SHL': {'speaker'':[(0.5, 4.5), (6.8, 9.2)]}, are annotations for the 'SHL' task, which are labeled as class 'speaker' having regions 0.5 to 4.5, and 6.8 to 9.2. Use get_analysis_tasks() to get the name of workflow tasks .
    :param selected_channel: optional - the channel to process if using multi-channel audio
    :param num_channels: The number of channels if audio input is a list of decoded (PCM-16) samples, if not using a buffer of PCM-16 samples this is value is ignored
    :param sample_rate: The sample rate if audio input is a list of  decoded (PCM-16) samples, if not using a buffer of PCM-16 samples this is value is ignored
    :param num_samples: The number of samples if audio input is a list of decoded (PCM-16) samples, if not using a buffer of PCM-16 samples this is value is ignored
    :param validate_local_path: If sending audio as as a string path name, then check that the path exists on the local filesystem.  In some cases you may want to pass a path which is valid on the server but not this client so validation is not desired
    :param label: an optional name to use with the audio

    :return: A populated WorkflowDataRequest to use in a workflow activity
    """
    audio = Audio()
    msgutil.package_audio(audio, audio_data, annotations, selected_channel, mode, num_channels, sample_rate,
                          num_samples, validate_local_path)

    # Add any task specific regions:
    if task_annotations:
        for task_label in task_annotations.keys():
            ta = audio.task_annotations.add()
            ta.task_label = task_label
            # we only expect to have one set of annotations, so just one region_label
            for region_label in task_annotations[task_label]:
                ta.region_label = region_label
                for annots in task_annotations[task_label][region_label]:
                    region = ta.regions.add()
                    region.start_t = np.float(annots[0])
                    region.end_t = np.float(annots[1])

    wkf_data_request = WorkflowDataRequest()
    #fixme: this should be set based on the audio.label (filename) or given a unique name here...
    wkf_data_request.data_id = label if label else msgutil.get_uuid()
    wkf_data_request.data_type = AUDIO
    wkf_data_request.workflow_data = audio.SerializeToString()
    # consumer_data_label doesn't need to be set... use default
    # set job name?  Currently we assume one job per workflow so punting on this for now

    return wkf_data_request

package_binary(self, binary_input, mode=<InputTransferType.SERIALIZED: 3>, annotations=None, validate_local_path=True, label=None)

Parameters:

Name Type Description Default
video_input

a video input

required

Returns:

Type Description
WorkflowDataRequest

TBD

Source code in olivepy/api/workflow.py
def package_binary(self,
                   binary_input,
                   mode=olivepy.messaging.msgutil.InputTransferType.SERIALIZED,
                   annotations: List[Tuple[float, float]] = None,
                   validate_local_path: bool = True,
                   label=None) -> WorkflowDataRequest:
    """
    :param video_input: a video input

    :return: TBD
    """
    media = BinaryMedia()
    msgutil.package_binary_media(media, binary_input, mode=mode, validate_local_path=validate_local_path)
    if label:
        media.label = label



    wkf_data_request = WorkflowDataRequest()
    wkf_data_request.data_id = label if label else msgutil.get_uuid()
    wkf_data_request.data_type = VIDEO
    wkf_data_request.workflow_data = media.SerializeToString()

    return wkf_data_request

package_image(self, image_input, mode=<InputTransferType.SERIALIZED: 3>, validate_local_path=True, label=None)

Not yet supported

Parameters:

Name Type Description Default
image_input

An image input

required

Returns:

Type Description
WorkflowDataRequest

TBD

Source code in olivepy/api/workflow.py
def package_image(self, image_input,
                  mode=olivepy.messaging.msgutil.InputTransferType.SERIALIZED,
                  validate_local_path: bool = True,
                  label=None)-> WorkflowDataRequest:
    """
    Not yet supported

    :param image_input: An image input

    :return: TBD
    """
    media = BinaryMedia()
    msgutil.package_binary_media(media, image_input, mode=mode, validate_local_path=validate_local_path)
    if label:
        media.label = label
    media.motion = False

    # todo if annotations...

    wkf_data_request = WorkflowDataRequest()
    wkf_data_request.data_id = label if label else msgutil.get_uuid()
    wkf_data_request.data_type = IMAGE
    wkf_data_request.workflow_data = media.SerializeToString()

    return wkf_data_request

package_text(self, text_input, optional_label=None, text_workflow_key=None)

Used to package data for a workflow that accepts string (text) input

Parameters:

Name Type Description Default
text_input str

a text input

required
optional_label str

an optional label, namoe or comment associated with this input

None
text_workflow_key str

the keyword used to identify this data in the workflow. By default a value of 'text' is assumed and recommend

None

Returns:

Type Description
WorkflowDataRequest

a WorkflowDataRequest populated with the text input

Source code in olivepy/api/workflow.py
def package_text(self, text_input: str, optional_label:str =None, text_workflow_key: str = None) -> WorkflowDataRequest:
    """
    Used to package data for a workflow that accepts string (text) input

    :param text_input: a text input
    :param optional_label: an optional label, namoe or comment associated with this input
    :param text_workflow_key: the keyword used to identify this data in the workflow.  By default a value of 'text' is assumed and recommend

    :return: a WorkflowDataRequest populated with the text input
    """

    text_msg = Text()
    # not (yet?) supported multiple text inputs in a request
    text_msg.text.append(text_input)
    if optional_label:
        text_msg.label = optional_label

    wkf_data_request = WorkflowDataRequest()
    wkf_data_request.data_id = text_workflow_key if text_workflow_key else 'text'
    wkf_data_request.data_type = TEXT
    wkf_data_request.workflow_data = text_msg.SerializeToString()

    return wkf_data_request

package_workflow_input(self, input_msg, expected_data_type=<OliveInputDataType.AUDIO_DATA_TYPE: 2>)

Parameters:

Name Type Description Default
input_msg

the OLIVE data message to package

required
expected_data_type

the data type of the message (Binary

<OliveInputDataType.AUDIO_DATA_TYPE: 2>

Returns:

Type Description
WorkflowDataRequest

TBD

Source code in olivepy/api/workflow.py
def package_workflow_input(self, input_msg,
                   expected_data_type = msgutil.OliveInputDataType.AUDIO_DATA_TYPE) -> WorkflowDataRequest:
    """
    :param input_msg: the OLIVE data message to package
    :param expected_data_type: the data type of the message (Binary
    :return: TBD
    """


    wkf_data_request = WorkflowDataRequest()
    wkf_data_request.data_id = input_msg.label if input_msg.label else msgutil.get_uuid()
    wkf_data_request.data_type = msgutil.data_type_class_map[expected_data_type]
    wkf_data_request.workflow_data = input_msg.SerializeToString()

    return wkf_data_request

serialize_audio(self, filename)

Helper function used to read in an audio file and output a serialized buffer. Can be used with package_audio() when using the AUDIO_SERIALIZED mode and the audio input has not already been serialized

Parameters:

Name Type Description Default
filename str

the local path to the file to serialize

required

Returns:

Type Description
~AnyStr

the contents of the file as a byte buffer, otherwise an exception if the file can not be opened. This buffer contains the raw content of the file, it does NOT contain encoded samples

Source code in olivepy/api/workflow.py
def serialize_audio(self, filename: str) -> AnyStr:
    """
    Helper function used to read in an audio file and output a serialized buffer.  Can be used with package_audio() \
    when using the AUDIO_SERIALIZED mode and the audio input has not already been serialized

    :param filename: the local path to the file to serialize

    :return: the contents of the file as a byte buffer, otherwise an exception if the file can not be opened.  This buffer contains the raw content of the file, it does NOT contain encoded samples
    """
    with open(os.path.expanduser(filename), 'rb') as f:
        serialized_buffer = f.read()

    # return the buffer
    return serialized_buffer

to_json(self, indent=None)

Generate the workflow as a JSON string

Parameters:

Name Type Description Default
indent

if a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. None is the most compact representation. A negative value will return the JSON document

None

Returns:

Type Description

the Workflow Definition as as JSON string:

Source code in olivepy/api/workflow.py
def to_json(self, indent=None):
    """
       Generate the workflow as a JSON string
       :param indent:  if a non-negative integer, then JSON array elements and object members will be pretty-printed with \
       that indent level. An indent level of 0 will only insert newlines. ``None`` is the most compact  \
       representation. A negative value will return the JSON document

       :return: the Workflow Definition as as JSON string:
    """
    return self.workflow_response.to_json(indent=indent)

unenroll(self, class_id, job_names, callback=None, options=None)

Submit a class id (speaker name, language name, etc) for un-enrollment.

Parameters:

Name Type Description Default
class_id str

the name of the enrollment class to remove

required
job_names List[str]

a list of job names, where the class is to be unenrolled. Jobs must support class modification . This value can be None, in which case the data input(s) is unenrolled for each job (which is likely dangerous).

required
callback

an optional callback that is invoked when this workflow action completes. If not specified this method blocks, returning an OliveWorkflowAnalysisResponse when the enrollment completes on the server. Otherwise this method immediately returns and the callback method is invoked when the response is received.

None
options

a dictionary of name/value option pairs to include with the enrollment request

None

Returns:

Type Description

server unenrollment response if no callback provided

Source code in olivepy/api/workflow.py
def unenroll(self, class_id: str, job_names: List[str], callback=None, options=None):
    """
    Submit a class id (speaker name, language name, etc) for un-enrollment.

    :param class_id:  the name of the enrollment class to remove
    :param job_names: a list of job names, where the class is to be unenrolled.  Jobs must support class modification .  This value can be None, in which case the data input(s) is unenrolled for each job (which is likely dangerous).
    :param callback: an optional callback that is invoked when this workflow action completes.  If not specified this method blocks, returning an OliveWorkflowAnalysisResponse when the enrollment completes on the server.  Otherwise this method immediately returns and the callback method is invoked when the response is received.
    :param options: a dictionary of name/value option pairs to include with the enrollment request

    :return: server unenrollment response if no callback provided
    """

    # make call blocking if no callback or always assume it is async?
    unenroll_request = WorkflowUnenrollRequest()
    unenroll_request.workflow_definition.CopyFrom(self.workflow_def)
    unenroll_request.class_id = class_id

    for job_task in job_names:
        unenroll_request.job_names.append(job_task)

    if options:
        jopts = utils.parse_json_options(options)
        unenroll_request.option.extend(jopts)

    if callback:
        # self.client.enqueue_request(enroll_request, callback, response.OliveWorkflowEnrollmentResponse())
        self.client.enqueue_request(unenroll_request, callback, response.OliveWorkflowAnalysisResponse())
    else:
        return self.client.sync_request(unenroll_request, response.OliveWorkflowAnalysisResponse())

OliveWorkflowDefinition

Used to load a Workflow Definition from a file.

__init__(self, filename) special

Create an OliveWorkflowDefinition to access a workflow definition file

Parameters:

Name Type Description Default
filename str

the path/filename of a workflow definition file to load

required
Source code in olivepy/api/workflow.py
def __init__(self, filename: str):
    """
    Create an OliveWorkflowDefinition to access a workflow definition file

    :param filename: the path/filename of a workflow definition file to load
    """
    # First, make sure the workflow definition (WD) file exists
    filename = os.path.expanduser(filename)
    if not os.path.exists(filename):
        raise IOError("Workflow definition file '{}' does not exists".format(filename))

    # Load the WD, then submit to the server

    # Read the workflow - either a workflow or a text file
    try:
        with open(filename, 'rb') as f:
            self.wd = WorkflowDefinition()
            self.wd.ParseFromString(f.read())
    except IOError as e:
        raise IOError("Workflow definition file '{}' does not exist".format(filename))
    except DecodeError as de:
        self.wd = WorkflowDefinition()
        # Try parsing as text file (will fail for a protobuf file)
        with open(filename, 'r') as f:
            # First load as json
            json_input = json.loads(f.read())
            # Next, we need to convert message data in task(s) to byte strings
            for element in json_input:
                if element == 'order':
                    for job in json_input[element]:
                        # print("Job: {}".format(job))
                        for job_def in job['job_definition']:
                            for task in job_def['tasks']:
                                task_type = task['message_type']
                                # Covert 'messageData' into a protobuf and save the byte string in the json
                                # so it can be correctly deserialized
                                tmp_json = task['message_data']
                                msg = msgutil.type_class_map[MessageType.Value(task_type)]()
                                Parse(json.dumps(tmp_json), msg)
                                # now serialized msg as messageData
                                data = base64.b64encode(msg.SerializeToString()).decode('utf-8')
                                task['message_data'] = data

        # Now we should be able to create a WorkflowDefinition from the json data
        Parse(json.dumps(json_input), self.wd)

    # Create JSON formatted output from the the Workflow?

create_workflow(self, client)

Create a new, executable (actualized), Workflow, which can be used to make OLIVE analysis, or enrollment requests

Parameters:

Name Type Description Default
client AsyncOliveClient

an open client connection to an OLIVE server

required

Returns:

Type Description

a new OliveWorkflow object, which has been actualized (activated) by the olive server

Source code in olivepy/api/workflow.py
def create_workflow(self, client: olivepy.api.olive_async_client.AsyncOliveClient):
    """
    Create a new, executable (actualized), Workflow, which can be used to make OLIVE analysis, or enrollment requests

    :param client: an open client connection to an OLIVE server

    :return: a new OliveWorkflow object, which  has been actualized (activated) by the olive server

    """

    if not client.is_connected():
        raise IOError("No connection to the Olive server")

    # Create a workflow request
    request = WorkflowActualizeRequest()
    request.workflow_definition.CopyFrom(self.wd)

    workflow_result = client.sync_request(request, response.OliveWorkflowActualizedResponse())
    if workflow_result.is_error():
        raise msgutil.ExceptionFromServer(workflow_result.get_error())
    # if msg:
    #     raise msgutil.ExceptionFromServer(msg)

    return OliveWorkflow(client, workflow_result)

    # todo send WD to server, return an OliveWorklow to the user

get_json(self, indent=1)

Create a JSON structure of the Workflow

Parameters:

Name Type Description Default
indent

if a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. None is the most compact representation. A negative value will return the JSON document

1

Returns:

Type Description

A JSON (dictionary) representation of the Workflow Definition

Source code in olivepy/api/workflow.py
def get_json(self, indent=1):
    """
    Create a JSON structure of the Workflow

    :param indent:  if a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. None is the most compact representation. A negative value will return the JSON document

    :return: A JSON (dictionary) representation of the Workflow Definition
    """
    analysis_task = []
    job_names = set()

    workflow_analysis_order_msg = None
    for order in self.wd.order:
        if order.workflow_type == WORKFLOW_ANALYSIS_TYPE:
            workflow_analysis_order_msg = order
            break

    if workflow_analysis_order_msg is None:
        # no analysis results
        return analysis_task

    # for job in self._response.job_result:
    for job in workflow_analysis_order_msg.job_definition:
        # create a dictionary for each job result
        job_dict = dict()
        job_name = job.job_name
        job_names.add(job_name)

        # get data handling info for this job
        data_prop = job.data_properties
        job_dict['Data Input'] = json.loads(MessageToJson(data_prop, preserving_proto_field_name=True))
        # if data_prop.mode == SPLIT:
        #     # Hack to make split/mulit-channel mode more clear
        #     job_dict['data']['mode'] = 'SPLIT: Process each channel as a job'

        # and a dictionary of tasks:
        # add to our results - in most cases we will have just one job
        analysis_task.append(job_dict)

        for task in job.tasks:
            task_result_dict = json.loads(MessageToJson(task, preserving_proto_field_name=True))

            # Deserialize message_data, and replace it in the task_result_dict
            task_type_msg = self._extract_serialized_message(task.message_type, task.message_data)
            task_result_dict['job_name'] = job_name
            task_result_dict['analysis'] = json.loads(
                MessageToJson(task_type_msg, preserving_proto_field_name=True))
            del task_result_dict['message_data']

            job_dict[task.consumer_result_label] = task_result_dict

    return json.dumps(analysis_task, indent=indent)

to_json(self, indent=None)

Generate the workflow as a JSON string

Parameters:

Name Type Description Default
indent

if a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. None is the most compact representation. A negative value will return the JSON document

None

Returns:

Type Description

the Workflow Definition as as JSON string:

Source code in olivepy/api/workflow.py
def to_json(self, indent=None):
    """
    Generate the workflow as a JSON string

    :param indent:  if a non-negative integer, then JSON array elements and object members will be pretty-printed with \
    that indent level. An indent level of 0 will only insert newlines. ``None`` is the most compact  \
    representation. A negative value will return the JSON document

    :return: the Workflow Definition as as JSON string:
    """
    json_str_output = MessageToJson(self.wd, preserving_proto_field_name=True)

    json_output = json.loads(json_str_output)
    for element in json_output:
        if element == 'order':
            for job in json_output[element]:
                # print("Job: {}".format(job))
                for job_def in job['job_definition']:
                    for task in job_def['tasks']:
                        task_type = task['message_type']
                        data = base64.b64decode(task['message_data'])
                        msg = self._extract_serialized_message(MessageType.Value(task_type), data)
                        task['message_data'] = json.loads(MessageToJson(msg, preserving_proto_field_name=True))
                        # print("Task: {}".format(task))

    if indent and indent < 0:
        return json_output
    return json.dumps(json_output, indent=indent)

WorkflowException (Exception)

This exception means that an error occurred handling a Workflow