Skip to content

olivepy client module

olivepy.client.analyze_client

olivepy.client.client_common

extract_input_data(args, expected_data_type=<OliveInputDataType.AUDIO_DATA_TYPE: 2>, fail_if_no_data=True, has_class_ids=False)

Helper util to use the standard CLI arguments to package input (audio, video, image, text) for processing by the OLIVE server

Parameters:

Name Type Description Default
args required
fail_if_no_data True
has_class_ids False

Returns:

Type Description
Source code in olivepy/client/client_common.py
def extract_input_data(args, expected_data_type=OliveInputDataType.AUDIO_DATA_TYPE, fail_if_no_data=True, has_class_ids=False):
    """
    Helper util to use the standard CLI arguments to package input (audio, video, image, text) for processing by
    the OLIVE server

    :param args:
    :param fail_if_no_data:
    :param has_class_ids:
    :return:
    """

    send_pathname = True if args.path else False
    send_serialized = not send_pathname

    data_input = []
    if send_pathname:
        # fixme more generic for image and video?
        transfer_mode = InputTransferType.PATH
    else:
        transfer_mode = InputTransferType.SERIALIZED

    audio = text = image = video = False

    if args.input_list:
        # parse input list to make sure we actually have one or more files...
        data_input = parse_data_list2(args.input_list, transfer_mode, expected_data_type, has_class_ids)
        # data_intpu --> [{filename: DATA_MSG, {channel: [start, end, class]}] , filename: Audio,  {-1: (None, None, None)}
        if len(data_input) == 0:
            args_bad = True
            print("Data input list '{}' contains no valid files".format(args.list))

    elif args.input:
        # Do we want to support sending a path that is not local?  In case they want to specify a path that is a
        # available for the server

        if has_class_ids:
            # data_input.append( (convert_filename_to_data(args.input, transfer_mode, expected_data_type), args.enroll))
            data_input = {args.enroll: [convert_filename_to_data(args.input, transfer_mode, expected_data_type)]}
        else:
            data_input.append( convert_filename_to_data(args.input, transfer_mode, expected_data_type) )


    # do some basic validation
    # If mo data input supplied, make sure that is okay with the other options provided
    if not data_input and fail_if_no_data:
        print('The command requires data input(s).')
        exit(1)

    return data_input, transfer_mode, send_pathname

olivepy.client.enroll_client

olivepy.client.learn_client

olivepy.client.status_client

heartbeat_notification(heatbeat)

Callback method, notified by the async client that a heartbeat message has been received from the OLIVE server

Source code in olivepy/client/status_client.py
def heartbeat_notification(heatbeat):
    """Callback method, notified by the async client that a heartbeat message has been received from the OLIVE server"""

    if heatbeat:
        if heatbeat.HasField("stats"):
            stats = heatbeat.stats
            print("System CPU Used:    %02.01f%%" % stats.cpu_percent)
            print("System CPU Average: %02.01f%%" % stats.cpu_average)
            print("System MEM Used:    %02.01f%%" % stats.mem_percent)
            print("System MEM Max:     %02.01f%%" % stats.max_mem_percent)
            print("System SWAP Used:   %02.01f%%" % stats.swap_percent)
            print("System SWAP Max:    %02.01f%%" % stats.max_swap_percent)
            print("Number active jobs: " + str(stats.pool_busy))
            print("Number pending jobs: " + str(stats.pool_pending))
            print("Number finished jobs: " + str(stats.pool_finished))
            print("Max number jobs: " + str(stats.max_num_jobs))
            print("Server version: " + str(stats.server_version))
            print("\n")
    else:
        print("No OLIVE heatbeat received.  Olive server or connection down")

olivepy.client.utils_client

main()

Returns:

Type Description
Source code in olivepy/client/utils_client.py
def main():
    """

    :return:
    """
    parser = argparse.ArgumentParser(prog='olivepyutils')

    parser.add_argument('workflow', action='store',
                        help='The workflow definition to use.')

    parser.add_argument('--save_as_text', action='store',
                        help='Save the workflow to a JSON formatted text file having this name.')

    parser.add_argument('--save_as_binary', action='store',
                        help='Save the workflow to a binary formatted workflow file.')

    parser.add_argument('--print_workflow', action='store_true',
                        help='Print the workflow definition file info (before it is actualized/sent to server)')

    args_bad = False
    args = parser.parse_args()

    if args.workflow is None :
        print('No workflow definition is specified.')
        args_bad = True

    if not (args.save_as_text or args.print_workflow or args.save_as_binary):
            args_bad = True
            print('The command requires one or more tasks.')

    if args_bad:
        print('Run the command with --help or -h to see all the command line options.')
        quit(1)

    try:
        # first, create the workflow definition from the workflow file:
        owd = ow.OliveWorkflowDefinition(args.workflow)

        if args.save_as_text:
            print("Saving Workflow Definition '{}' as '{}'".format(args.workflow, args.save_as_text))
            owd._save_as_json(args.save_as_text)

        if args.save_as_binary:
            print("Saving Workflow Definition '{}' as '{}'".format(args.workflow, args.save_as_binary))
            owd._save_as_binary(args.save_as_binary)

        if args.print_workflow:
            wdef_json = owd.to_json(indent=1)
            print("Workflow Definition Task Info: {}".format(wdef_json))
            print("")

    except Exception as e:
        print("Workflow failed with error: {}".format(e))

olivepy.client.workflow_client

heartbeat_notification(heatbeat)

Callback method, notified by the async client that a heartbeat message has been received from the OLIVE server

Source code in olivepy/client/workflow_client.py
def heartbeat_notification(heatbeat):
    """
    Callback method, notified by the async client that a heartbeat message has been received from the OLIVE server
    """

    if heatbeat:
        if heatbeat.HasField("stats"):
            stats = heatbeat.stats
            print("System CPU Used:    %02.01f%%" % stats.cpu_percent)
            print("System CPU Average: %02.01f%%" % stats.cpu_average)
            print("System MEM Used:    %02.01f%%" % stats.mem_percent)
            print("System MEM Max:     %02.01f%%" % stats.max_mem_percent)
            print("System SWAP Used:   %02.01f%%" % stats.swap_percent)
            print("System SWAP Max:    %02.01f%%" % stats.max_swap_percent)
            print("Number active jobs: " + str(stats.pool_busy))
            print("Number pending jobs: " + str(stats.pool_pending))
            print("Number finished jobs: " + str(stats.pool_finished))
            print("Max number jobs: " + str(stats.max_num_jobs))
            print("Server version: " + str(stats.server_version))
            print("\n")
    else:
        print("Too long since a heatbeat message was received.  Olive server or connection down")

main()

Client for interacting with the workflow API

Source code in olivepy/client/workflow_client.py
def main():
    """
    Client for interacting with the workflow API
    """
    parser = argparse.ArgumentParser(prog='olivepyworkflow', description="Perform OLIVE analysis using a Workflow "
                                                                         "Definition file")

    # Required positional option
    parser.add_argument('workflow', action='store',
                        help='The workflow definition to use.')

    parser.add_argument('--tasks', action='store_true',
                        help='Print the workflow analysis tasks.')

    parser.add_argument('--class_ids', action='store_true',
                        help='Print the class IDs available for analysis in the specified workflow.')

    parser.add_argument('--print_actualized', action='store_true',
                        help='Print the actualized workflow info.')

    parser.add_argument('--print_workflow', action='store_true',
                        help='Print the workflow definition file info (before it is actualized, if requested)')
    #
    # parser.add_argument('--print_options', action='store_true',
    #                     help='Print the options recognized for each task')

    parser.add_argument('-s', '--server', action='store', default='localhost',
                        help='The machine the server is running on. Defaults to %(default)s.')
    parser.add_argument('-P', '--port', type=int, action='store', default=5588,
                        help='The port to use.')
    parser.add_argument('-t', '--timeout', type=int, action='store', default=10,
                        help='The timeout (in seconds) to wait for a response from the server ')

    parser.add_argument('-i', '--input', action='store',
                        help='The data input to analyze.  Either a pathname to an audio/image/video file or a string for text input.  For text input, also specify the --text flag')
    parser.add_argument('--input_list', action='store',
                        help='A list of files to analyze. One file per line.')
    parser.add_argument('--text', action='store_true',
                        help='Indicates that input (or input list) is a literal text string to send in the analysis request.')


    parser.add_argument('--options', action='store',
                        help='A JSON formatted string of workflow options such as '
                             '[{"task":"SAD", "options":{"filter_length":99, "interpolate":1.0}] or '
                             '{"filter_length":99, "interpolate":1.0, "name":"midge"}, where the former '
                             'options are only applied to the SAD task, and the later are applied to all tasks ')
    parser.add_argument('--path', action='store_true',
                        help='Send the path of the audio instead of a buffer.  '
                             'Server and client must share a filesystem to use this option')
    #
    # parser.add_argument('--heartbeat', action='store_true',
    #                     help='Listen for server heartbeats ')
    # parser.add_argument('--status', action='store_true',
    #                     help='get server status')
    parser.add_argument('--debug', action='store_true',
                        help='Debug mode ')

    # Not supported since it needs an additional 3rd party lib:
    # parser.add_argument('--decoded', action='store_true',
    #                     help='Send audio file as decoded PCM16 samples instead of sending as serialized buffer. '
    #                          'Input file must be a wav file')


    args_bad = False
    args = parser.parse_args()

    # Simple logging config
    if args.debug:
        log_level = logging.DEBUG
    else:
        log_level = logging.INFO
        # log_level = logging.WARN
    logging.basicConfig(level=log_level)

    if args.workflow is None :
        print('No workflow definition is specified.')
        args_bad = True


    if (args.tasks or args.class_ids or args.print_actualized or args.print_workflow):
        data_required = False
    else:
        data_required = True

    # Our workflow should consume one of the 4 data types (but not a combination of types) ....
    # data_input, audio_mode, send_pathname, audio, text, image, video = client_com.extract_input_data_type(args, fail_if_no_data=data_required)

    if args.text:
        # special case of handling text data
        expected_data_type = OliveInputDataType.TEXT_DATA_TYPE
    else:
        expected_data_type = OliveInputDataType.BINARY_DATA_TYPE
        # expected_data_type = OliveInputDataType.AUDIO_DATA_TYPE  # if you only want to send audio
    data_input, transfer_mode, send_pathname = client_com.extract_input_data(args, expected_data_type=expected_data_type, fail_if_no_data=data_required)


    json_opts = None
    if args.options:
        json_opts = args.options
        print("Options: {}".format(json_opts))

    if args_bad:
        print('Run the command with --help or -h to see all the command line options.')
        quit(1)

    enable_status_socket =  False
    # Create the connection to the OLIVE server
    client = oc.AsyncOliveClient("olivepy_workflow", args.server, args.port, args.timeout)
    client.connect(monitor_status=enable_status_socket)
    try:
        # if args.heartbeat:
        #     # Register to be notified of heartbeats from the OLIVE server
        #     client.add_heartbeat_listener(heartbeat_notification)

        # if args.status:
        #     # Request the current server status
        #     server_status_response = client.get_status()
        #     if server_status_response.is_successful():
        #         print("OLIVE JSON Server status: {}".format(server_status_response.to_json(indent=10)))
        #         #
        #         # Or you can access the GetStatusResult protobuf:
        #         print("OLIVE Server status: pending: {}, busy: {}, finished: {}, version: {}"
        #               .format(server_status_response.get_response().num_pending,
        #                       server_status_response.get_response().num_busy,
        #                       server_status_response.get_response().num_finished,
        #                       server_status_response.get_response().version))


        # first, create the workflow definition from the workflow file:
        workflow_def = ow.OliveWorkflowDefinition(args.workflow)

        if args.print_workflow:
            wdef_json = workflow_def.to_json(indent=1)
            print("Workflow Definition: \n{}".format(wdef_json))
            print("")

        # Submit that workflow definition to the client for actualization (instantiation):
        workflow = workflow_def.create_workflow(client)

        if args.print_actualized:
            # tasks_json = workflow.get_analysis_task_info()
            print("Actualized Workflow: {}".format(workflow.to_json(indent=1)))
            print("")


        if args.tasks:
            #  Print the analysis tasks:
            print("Analysis Tasks: {}".format(workflow.get_analysis_tasks()))

            for enroll_job_name in workflow.get_enrollment_job_names():
                print("Enrollment job '{}' has Tasks: {}".format(enroll_job_name, workflow.get_enrollment_tasks(enroll_job_name)))
            for unenroll_job_name in workflow.get_unenrollment_job_names():
                print("Unenrollment job '{}' has Tasks: {}".format(unenroll_job_name, workflow.get_unenrollment_tasks(unenroll_job_name)))

        if args.class_ids:
            #  Print the class IDs available for the workflow tasks:
            # support other types: type=olive_pb2.WORKFLOW_ENROLLMENT_TYPE?
            class_status_response = workflow.get_analysis_class_ids()

            print("Class Info: {}".format(class_status_response.to_json(indent=1)))

        buffers = []
        for input in data_input:
            buffers.append(workflow.package_workflow_input(input, expected_data_type))

        if  (data_required):
            print("Sending analysis request...")
            response = workflow.analyze(buffers, options=json_opts)

            print("Workflow analysis results:")
            print("{}".format(response.to_json(indent=1)))


    except Exception as e:
        print("Workflow failed with error: {}".format(e))
    finally:
        client.disconnect()

olivepy.client.workflow_enroll_client

main()

Returns:

Type Description
Source code in olivepy/client/workflow_enroll_client.py
def main():
    """

    :return:
    """
    parser = argparse.ArgumentParser(prog='olivepyworkflowenroll', description="Perform OLIVE enrollment using a Workflow "
                                                                         "Definition file")

    # parser.add_argument('-C', '--client-id', action='store', default='olivepy_',
    #                     help='Experimental: the client_id to use')

    # Required positional option
    parser.add_argument('workflow', action='store',
                        help='The workflow definition to use.')

    parser.add_argument('--print_jobs', action='store_true',
                        help='Print the supported workflow enrollment jobs.')

    parser.add_argument('--job', action='store',
                        help='Enroll/Unenroll an Class ID for a job(s) in the specified workflow. If not specified enroll or unenroll for ALL enrollment/unenrollment jobst')

    parser.add_argument('--enroll', action='store',
                        help='Enroll using this (class) name.  Should be used with the job argument to specify a target job to enroll with (if there are more than one enrollment jobs) ')

    parser.add_argument('--unenroll', action='store',
                        help='Enroll using this (class) name.  Should be used with the job argument to specify a job to unenroll (if there are more than one unenrollment jobs)')

    parser.add_argument('-i', '--input', action='store',
                        help='The data input to enroll.  Either a pathname to an audio/image/video file or a string for text input')
    parser.add_argument('--input_list', action='store',
                        help='A list of files to enroll. One file per line plus the class id to enroll.')

    parser.add_argument('--path', action='store_true',
                        help='Send the path of the audio instead of a buffer.  '
                             'Server and client must share a filesystem to use this option')

    # Connection arguments
    parser.add_argument('-s', '--server', action='store', default='localhost',
                        help='The machine the server is running on. Defaults to %(default)s.')
    parser.add_argument('-P', '--port', type=int, action='store', default=5588,
                        help='The port to use.')
    parser.add_argument('-t', '--timeout', type=int, action='store', default=10,
                        help='The timeout (in seconds) to wait for a response from the server ')


    # not supporting batch enrollments:
    # parser.add_argument('--audio_list', action='store',
    #                     help='A list of audio files to analyze. One file per line')

    args_bad = False
    args = parser.parse_args()

    if args.workflow is None :
        print('No workflow definition is specified.')
        args_bad = True

    require_data = True
    if args.unenroll or args.print_jobs:
        require_data = False

    expected_data_type = OliveInputDataType.BINARY_DATA_TYPE
    data_input, transfer_mode, send_pathname = client_com.extract_input_data(args, expected_data_type=expected_data_type, fail_if_no_data=require_data, has_class_ids=True)

    # if args.enroll:
    #     # there must be only one input
    #     if len(data_input) > 1:
    #         args_bad = True
    #         print("The enroll and audio_list argument are mutually exclusive. Pick one and run again")
    #     else:
    #         data_input = [(data_input[0], args.enroll)]

    print("enrolling {} files".format(len(data_input)))
    # if len(data_input) > 1 and not audio:
    #     args_bad = True
    #     print("Non-audio files can not be enrolled from an input list")

    # support other data types....
    # audios = []
    using_pem = False


# TODO GET CLASS IDS FROM ENROLLMENT FILE

    enroll = False
    unenroll = False
    if args.enroll:
        action_str = "Enrollment"
        enroll = True
        if args.unenroll:
            print("Enrollment and un-enrollment are mutually exclusive.  Pick one and run again")
            args_bad = True
    elif args.unenroll:
        action_str = "Unenrollment"
        unenroll = True
    elif len(data_input) > 1:
        enroll = True
    elif not args.print_jobs:
        args_bad = True
        print("Must use one of the options: --enroll, --unenroll, or --print_jobs ")
    action_str = ""

    # support enrollments from a file (list and/or PEM format)?
    # if not (audio or image or video):
    #     # no input provided, make sure this is a status request and not an analysis task
    #     if (enroll):
    #         args_bad = True
    #         print('The command requires data (audio, image, or video) input.')

    if args.job:
        jobs = []
        jobs.extend(str.split(args.job, ','))

    if args_bad:
        print('Run the command with --help or -h to see all the command line options.')
        quit(1)

    # Create the connection to the OLIVE server
    client = oc.AsyncOliveClient("olivepy_workflow", args.server, args.port, args.timeout)
    client.connect()
    try:
        # right now, we only support analysis, so that is what we do...

        # first, create the workflow definition from the workflow file:
        owd = ow.OliveWorkflowDefinition(args.workflow)

        # Submit that workflow definition to the client for actualization (instantiation):
        workflow = owd.create_workflow(client)

        if args.print_jobs:
            #  Print available jobs:
            print("Enrollment jobs '{}'".format(workflow.get_enrollment_job_names()))
            print("Un-Enrollment jobs '{}'".format(workflow.get_unenrollment_job_names()))
            # for enroll_job_name in workflow.get_enrollment_job_names():
                # print("Enrollment job '{}' has Tasks: {}".format(enroll_job_name, workflow.get_enrollment_tasks(enroll_job_name)))
            # for unenroll_job_name in workflow.get_unenrollment_job_names():
                # print("Unenrollment job '{}' has Tasks: {}".format(unenroll_job_name, workflow.get_unenrollment_tasks(unenroll_job_name)))

        if not args.job:
            if enroll:
                print("Enrolling for all jobs: {}".format(workflow.get_enrollment_job_names()))
            if unenroll:
                print("Unenrolling for all job: {}".format(workflow.get_unenrollment_job_names()))
            jobs = []

        if len(data_input) > 0:

            enroll_jobs = workflow.get_enrollment_job_names()
            if enroll_jobs is None:
                print("ERROR: This workflow has no jobs that support enrollment")
                quit(1)

            for t in jobs:
                if t not in enroll_jobs:
                    print(
                        "Error: Job '{}'  can not be enrolled via this workflow.  Only jobs(s) '{}' support enrollment.".format(
                            t, enroll_jobs))
                    quit(1)

            enroll_buffers = {}
            for classid in data_input.keys():
                for input_msg in data_input[classid]:
                    if classid not in enroll_buffers:
                        enroll_buffers[classid] = []
                    # buffers.append(workflow.package_workflow_input(input, expected_data_type))
                    enroll_buffers[classid].append(
                        workflow.package_workflow_input(input_msg, expected_data_type))

                # if audio:
                #     # NOT SUPPORTING PEM
                #
                #     # if using_pem:
                #     #     for filename, channel_dict in list(data_input.items()):
                #     #         for channel, regions in list(channel_dict.items()):
                #     #             try:
                #     #                 if channel is None:
                #     #                     ch_label = 0
                #     #                 else:
                #     #                     ch_label = int(channel)
                #     #
                #     #                 buffers.append(workflow.package_audio(filename, mode=audio_mode,  label=os.path.basename(filename),
                #     #                                                       annotations=regions, selected_channel=ch_label))
                #     #
                #     #             except Exception as e:
                #     #                 print("Failed to parse regions from (PEM) input file: {}".format(e))
                #     #                 quit(1)
                # elif text:
                #     print("Text enrollment not supported")
                # elif video:
                #     print("clg adding video file: {}".format(filename))
                #     enroll_buffers[classid].append(
                #         workflow.package_binary(filename, mode=audio_mode, label=os.path.basename(filename)))
                # elif image:
                #     enroll_buffers[classid].append(
                #         workflow.package_image(filename, mode=audio_mode, label=os.path.basename(filename)))

            print("Workflow {} results:".format(action_str.lower()))
            for classid in enroll_buffers.keys():
                buffers = enroll_buffers[classid]
                print("enrolling {} files for class: {}".format(len(buffers), classid))
                response = workflow.enroll(buffers, classid, jobs)
                print("{}".format(response.to_json(indent=1)))
        elif unenroll:
            # TODO use options
            unenroll_jobs = workflow.get_unenrollment_job_names()
            if unenroll_jobs is None:
                print("ERROR: This workflow has no job that support unenrollment")
                quit(1)

            for t in jobs:
                if t not in unenroll_jobs:
                    print(
                        "Error: Job '{}' can not be un-enrolled via this workflow.  Only job(s) '{}' support "
                        "un-enrollment.".format(t, unenroll_jobs))
                    quit(1)

            response = workflow.unenroll(args.unenroll, jobs)
            print("Workflow {} results:".format(action_str.lower()))
            print("{}".format(response.to_json(indent=1)))

    except Exception as e:
        print("Workflow failed with error: {}".format(e))
    finally:
        client.disconnect()

parse_pem_file(data_lines)

Parse a PEM file, grouping the results by audio file and channel

Parameters:

Name Type Description Default
data_lines required

Returns:

Type Description

a dictionary of audio files to score and the channel region: {'filename': {channel: [(start_region, end_region)]} }

Source code in olivepy/client/workflow_enroll_client.py
def parse_pem_file(data_lines):
    '''
    Parse a PEM file, grouping the results by audio file and channel
    :param data_lines:
    :return:  a dictionary of audio files to score and the channel region: {'filename': {channel: [(start_region, end_region)]} }
    '''
    #  We process by file and channel - the class/label is ignored
    regions = {}

    input_pem = Pem()
    input_pem.add_records_from_data_lines(data_lines)

    for id in input_pem.get_ids():
        audio_id = os.path.expandvars(id)
        # Create a dictionary of the regions specified for the the current file
        regions[audio_id] = {}
        for rec in input_pem.get_records(id):
            # channel could be a list...
            channels = []
            if type(rec.channel) is str:
                # convert to a list
                channels = map(int, str.split(rec.channel, ','))

            elif type(rec.channel) is int:
                channels.append(rec.channel)
            else:
                print("Unsupported channel value: {}".format(rec.channel))

            for ch in channels:
                if ch not in regions[audio_id]:
                    regions[audio_id][ch] = []

                regions[audio_id][ch].append((rec.start_t, rec.end_t))

    return regions