olivepy
client
module
olivepy.client.analyze_client
olivepy.client.client_common
extract_input_data(args, expected_data_type=<OliveInputDataType.AUDIO_DATA_TYPE: 2>, fail_if_no_data=True, has_class_ids=False)
Helper util to use the standard CLI arguments to package input (audio, video, image, text) for processing by the OLIVE server
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
required | ||
fail_if_no_data |
True |
||
has_class_ids |
False |
Returns:
Type | Description |
---|---|
Source code in olivepy/client/client_common.py
def extract_input_data(args, expected_data_type=OliveInputDataType.AUDIO_DATA_TYPE, fail_if_no_data=True, has_class_ids=False):
"""
Helper util to use the standard CLI arguments to package input (audio, video, image, text) for processing by
the OLIVE server
:param args:
:param fail_if_no_data:
:param has_class_ids:
:return:
"""
send_pathname = True if args.path else False
send_serialized = not send_pathname
data_input = []
if send_pathname:
# fixme more generic for image and video?
transfer_mode = InputTransferType.PATH
else:
transfer_mode = InputTransferType.SERIALIZED
audio = text = image = video = False
if args.input_list:
# parse input list to make sure we actually have one or more files...
data_input = parse_data_list2(args.input_list, transfer_mode, expected_data_type, has_class_ids)
# data_intpu --> [{filename: DATA_MSG, {channel: [start, end, class]}] , filename: Audio, {-1: (None, None, None)}
if len(data_input) == 0:
args_bad = True
print("Data input list '{}' contains no valid files".format(args.list))
elif args.input:
# Do we want to support sending a path that is not local? In case they want to specify a path that is a
# available for the server
if has_class_ids:
# data_input.append( (convert_filename_to_data(args.input, transfer_mode, expected_data_type), args.enroll))
data_input = {args.enroll: [convert_filename_to_data(args.input, transfer_mode, expected_data_type)]}
else:
data_input.append( convert_filename_to_data(args.input, transfer_mode, expected_data_type) )
# do some basic validation
# If mo data input supplied, make sure that is okay with the other options provided
if not data_input and fail_if_no_data:
print('The command requires data input(s).')
exit(1)
return data_input, transfer_mode, send_pathname
olivepy.client.enroll_client
olivepy.client.learn_client
olivepy.client.status_client
heartbeat_notification(heatbeat)
Callback method, notified by the async client that a heartbeat message has been received from the OLIVE server
Source code in olivepy/client/status_client.py
def heartbeat_notification(heatbeat):
"""Callback method, notified by the async client that a heartbeat message has been received from the OLIVE server"""
if heatbeat:
if heatbeat.HasField("stats"):
stats = heatbeat.stats
print("System CPU Used: %02.01f%%" % stats.cpu_percent)
print("System CPU Average: %02.01f%%" % stats.cpu_average)
print("System MEM Used: %02.01f%%" % stats.mem_percent)
print("System MEM Max: %02.01f%%" % stats.max_mem_percent)
print("System SWAP Used: %02.01f%%" % stats.swap_percent)
print("System SWAP Max: %02.01f%%" % stats.max_swap_percent)
print("Number active jobs: " + str(stats.pool_busy))
print("Number pending jobs: " + str(stats.pool_pending))
print("Number finished jobs: " + str(stats.pool_finished))
print("Max number jobs: " + str(stats.max_num_jobs))
print("Server version: " + str(stats.server_version))
print("\n")
else:
print("No OLIVE heatbeat received. Olive server or connection down")
olivepy.client.utils_client
main()
Returns:
Type | Description |
---|---|
Source code in olivepy/client/utils_client.py
def main():
"""
:return:
"""
parser = argparse.ArgumentParser(prog='olivepyutils')
parser.add_argument('workflow', action='store',
help='The workflow definition to use.')
parser.add_argument('--save_as_text', action='store',
help='Save the workflow to a JSON formatted text file having this name.')
parser.add_argument('--save_as_binary', action='store',
help='Save the workflow to a binary formatted workflow file.')
parser.add_argument('--print_workflow', action='store_true',
help='Print the workflow definition file info (before it is actualized/sent to server)')
args_bad = False
args = parser.parse_args()
if args.workflow is None :
print('No workflow definition is specified.')
args_bad = True
if not (args.save_as_text or args.print_workflow or args.save_as_binary):
args_bad = True
print('The command requires one or more tasks.')
if args_bad:
print('Run the command with --help or -h to see all the command line options.')
quit(1)
try:
# first, create the workflow definition from the workflow file:
owd = ow.OliveWorkflowDefinition(args.workflow)
if args.save_as_text:
print("Saving Workflow Definition '{}' as '{}'".format(args.workflow, args.save_as_text))
owd._save_as_json(args.save_as_text)
if args.save_as_binary:
print("Saving Workflow Definition '{}' as '{}'".format(args.workflow, args.save_as_binary))
owd._save_as_binary(args.save_as_binary)
if args.print_workflow:
wdef_json = owd.to_json(indent=1)
print("Workflow Definition Task Info: {}".format(wdef_json))
print("")
except Exception as e:
print("Workflow failed with error: {}".format(e))
olivepy.client.workflow_client
heartbeat_notification(heatbeat)
Callback method, notified by the async client that a heartbeat message has been received from the OLIVE server
Source code in olivepy/client/workflow_client.py
def heartbeat_notification(heatbeat):
"""
Callback method, notified by the async client that a heartbeat message has been received from the OLIVE server
"""
if heatbeat:
if heatbeat.HasField("stats"):
stats = heatbeat.stats
print("System CPU Used: %02.01f%%" % stats.cpu_percent)
print("System CPU Average: %02.01f%%" % stats.cpu_average)
print("System MEM Used: %02.01f%%" % stats.mem_percent)
print("System MEM Max: %02.01f%%" % stats.max_mem_percent)
print("System SWAP Used: %02.01f%%" % stats.swap_percent)
print("System SWAP Max: %02.01f%%" % stats.max_swap_percent)
print("Number active jobs: " + str(stats.pool_busy))
print("Number pending jobs: " + str(stats.pool_pending))
print("Number finished jobs: " + str(stats.pool_finished))
print("Max number jobs: " + str(stats.max_num_jobs))
print("Server version: " + str(stats.server_version))
print("\n")
else:
print("Too long since a heatbeat message was received. Olive server or connection down")
main()
Client for interacting with the workflow API
Source code in olivepy/client/workflow_client.py
def main():
"""
Client for interacting with the workflow API
"""
parser = argparse.ArgumentParser(prog='olivepyworkflow', description="Perform OLIVE analysis using a Workflow "
"Definition file")
# Required positional option
parser.add_argument('workflow', action='store',
help='The workflow definition to use.')
parser.add_argument('--tasks', action='store_true',
help='Print the workflow analysis tasks.')
parser.add_argument('--class_ids', action='store_true',
help='Print the class IDs available for analysis in the specified workflow.')
parser.add_argument('--print_actualized', action='store_true',
help='Print the actualized workflow info.')
parser.add_argument('--print_workflow', action='store_true',
help='Print the workflow definition file info (before it is actualized, if requested)')
#
# parser.add_argument('--print_options', action='store_true',
# help='Print the options recognized for each task')
parser.add_argument('-s', '--server', action='store', default='localhost',
help='The machine the server is running on. Defaults to %(default)s.')
parser.add_argument('-P', '--port', type=int, action='store', default=5588,
help='The port to use.')
parser.add_argument('-t', '--timeout', type=int, action='store', default=10,
help='The timeout (in seconds) to wait for a response from the server ')
parser.add_argument('-i', '--input', action='store',
help='The data input to analyze. Either a pathname to an audio/image/video file or a string for text input. For text input, also specify the --text flag')
parser.add_argument('--input_list', action='store',
help='A list of files to analyze. One file per line.')
parser.add_argument('--text', action='store_true',
help='Indicates that input (or input list) is a literal text string to send in the analysis request.')
parser.add_argument('--options', action='store',
help='A JSON formatted string of workflow options such as '
'[{"task":"SAD", "options":{"filter_length":99, "interpolate":1.0}] or '
'{"filter_length":99, "interpolate":1.0, "name":"midge"}, where the former '
'options are only applied to the SAD task, and the later are applied to all tasks ')
parser.add_argument('--path', action='store_true',
help='Send the path of the audio instead of a buffer. '
'Server and client must share a filesystem to use this option')
#
# parser.add_argument('--heartbeat', action='store_true',
# help='Listen for server heartbeats ')
# parser.add_argument('--status', action='store_true',
# help='get server status')
parser.add_argument('--debug', action='store_true',
help='Debug mode ')
# Not supported since it needs an additional 3rd party lib:
# parser.add_argument('--decoded', action='store_true',
# help='Send audio file as decoded PCM16 samples instead of sending as serialized buffer. '
# 'Input file must be a wav file')
args_bad = False
args = parser.parse_args()
# Simple logging config
if args.debug:
log_level = logging.DEBUG
else:
log_level = logging.INFO
# log_level = logging.WARN
logging.basicConfig(level=log_level)
if args.workflow is None :
print('No workflow definition is specified.')
args_bad = True
if (args.tasks or args.class_ids or args.print_actualized or args.print_workflow):
data_required = False
else:
data_required = True
# Our workflow should consume one of the 4 data types (but not a combination of types) ....
# data_input, audio_mode, send_pathname, audio, text, image, video = client_com.extract_input_data_type(args, fail_if_no_data=data_required)
if args.text:
# special case of handling text data
expected_data_type = OliveInputDataType.TEXT_DATA_TYPE
else:
expected_data_type = OliveInputDataType.BINARY_DATA_TYPE
# expected_data_type = OliveInputDataType.AUDIO_DATA_TYPE # if you only want to send audio
data_input, transfer_mode, send_pathname = client_com.extract_input_data(args, expected_data_type=expected_data_type, fail_if_no_data=data_required)
json_opts = None
if args.options:
json_opts = args.options
print("Options: {}".format(json_opts))
if args_bad:
print('Run the command with --help or -h to see all the command line options.')
quit(1)
enable_status_socket = False
# Create the connection to the OLIVE server
client = oc.AsyncOliveClient("olivepy_workflow", args.server, args.port, args.timeout)
client.connect(monitor_status=enable_status_socket)
try:
# if args.heartbeat:
# # Register to be notified of heartbeats from the OLIVE server
# client.add_heartbeat_listener(heartbeat_notification)
# if args.status:
# # Request the current server status
# server_status_response = client.get_status()
# if server_status_response.is_successful():
# print("OLIVE JSON Server status: {}".format(server_status_response.to_json(indent=10)))
# #
# # Or you can access the GetStatusResult protobuf:
# print("OLIVE Server status: pending: {}, busy: {}, finished: {}, version: {}"
# .format(server_status_response.get_response().num_pending,
# server_status_response.get_response().num_busy,
# server_status_response.get_response().num_finished,
# server_status_response.get_response().version))
# first, create the workflow definition from the workflow file:
workflow_def = ow.OliveWorkflowDefinition(args.workflow)
if args.print_workflow:
wdef_json = workflow_def.to_json(indent=1)
print("Workflow Definition: \n{}".format(wdef_json))
print("")
# Submit that workflow definition to the client for actualization (instantiation):
workflow = workflow_def.create_workflow(client)
if args.print_actualized:
# tasks_json = workflow.get_analysis_task_info()
print("Actualized Workflow: {}".format(workflow.to_json(indent=1)))
print("")
if args.tasks:
# Print the analysis tasks:
print("Analysis Tasks: {}".format(workflow.get_analysis_tasks()))
for enroll_job_name in workflow.get_enrollment_job_names():
print("Enrollment job '{}' has Tasks: {}".format(enroll_job_name, workflow.get_enrollment_tasks(enroll_job_name)))
for unenroll_job_name in workflow.get_unenrollment_job_names():
print("Unenrollment job '{}' has Tasks: {}".format(unenroll_job_name, workflow.get_unenrollment_tasks(unenroll_job_name)))
if args.class_ids:
# Print the class IDs available for the workflow tasks:
# support other types: type=olive_pb2.WORKFLOW_ENROLLMENT_TYPE?
class_status_response = workflow.get_analysis_class_ids()
print("Class Info: {}".format(class_status_response.to_json(indent=1)))
buffers = []
for input in data_input:
buffers.append(workflow.package_workflow_input(input, expected_data_type))
if (data_required):
print("Sending analysis request...")
response = workflow.analyze(buffers, options=json_opts)
print("Workflow analysis results:")
print("{}".format(response.to_json(indent=1)))
except Exception as e:
print("Workflow failed with error: {}".format(e))
finally:
client.disconnect()
olivepy.client.workflow_enroll_client
main()
Returns:
Type | Description |
---|---|
Source code in olivepy/client/workflow_enroll_client.py
def main():
"""
:return:
"""
parser = argparse.ArgumentParser(prog='olivepyworkflowenroll', description="Perform OLIVE enrollment using a Workflow "
"Definition file")
# parser.add_argument('-C', '--client-id', action='store', default='olivepy_',
# help='Experimental: the client_id to use')
# Required positional option
parser.add_argument('workflow', action='store',
help='The workflow definition to use.')
parser.add_argument('--print_jobs', action='store_true',
help='Print the supported workflow enrollment jobs.')
parser.add_argument('--job', action='store',
help='Enroll/Unenroll an Class ID for a job(s) in the specified workflow. If not specified enroll or unenroll for ALL enrollment/unenrollment jobst')
parser.add_argument('--enroll', action='store',
help='Enroll using this (class) name. Should be used with the job argument to specify a target job to enroll with (if there are more than one enrollment jobs) ')
parser.add_argument('--unenroll', action='store',
help='Enroll using this (class) name. Should be used with the job argument to specify a job to unenroll (if there are more than one unenrollment jobs)')
parser.add_argument('-i', '--input', action='store',
help='The data input to enroll. Either a pathname to an audio/image/video file or a string for text input')
parser.add_argument('--input_list', action='store',
help='A list of files to enroll. One file per line plus the class id to enroll.')
parser.add_argument('--path', action='store_true',
help='Send the path of the audio instead of a buffer. '
'Server and client must share a filesystem to use this option')
# Connection arguments
parser.add_argument('-s', '--server', action='store', default='localhost',
help='The machine the server is running on. Defaults to %(default)s.')
parser.add_argument('-P', '--port', type=int, action='store', default=5588,
help='The port to use.')
parser.add_argument('-t', '--timeout', type=int, action='store', default=10,
help='The timeout (in seconds) to wait for a response from the server ')
# not supporting batch enrollments:
# parser.add_argument('--audio_list', action='store',
# help='A list of audio files to analyze. One file per line')
args_bad = False
args = parser.parse_args()
if args.workflow is None :
print('No workflow definition is specified.')
args_bad = True
require_data = True
if args.unenroll or args.print_jobs:
require_data = False
expected_data_type = OliveInputDataType.BINARY_DATA_TYPE
data_input, transfer_mode, send_pathname = client_com.extract_input_data(args, expected_data_type=expected_data_type, fail_if_no_data=require_data, has_class_ids=True)
# if args.enroll:
# # there must be only one input
# if len(data_input) > 1:
# args_bad = True
# print("The enroll and audio_list argument are mutually exclusive. Pick one and run again")
# else:
# data_input = [(data_input[0], args.enroll)]
print("enrolling {} files".format(len(data_input)))
# if len(data_input) > 1 and not audio:
# args_bad = True
# print("Non-audio files can not be enrolled from an input list")
# support other data types....
# audios = []
using_pem = False
# TODO GET CLASS IDS FROM ENROLLMENT FILE
enroll = False
unenroll = False
if args.enroll:
action_str = "Enrollment"
enroll = True
if args.unenroll:
print("Enrollment and un-enrollment are mutually exclusive. Pick one and run again")
args_bad = True
elif args.unenroll:
action_str = "Unenrollment"
unenroll = True
elif len(data_input) > 1:
enroll = True
elif not args.print_jobs:
args_bad = True
print("Must use one of the options: --enroll, --unenroll, or --print_jobs ")
action_str = ""
# support enrollments from a file (list and/or PEM format)?
# if not (audio or image or video):
# # no input provided, make sure this is a status request and not an analysis task
# if (enroll):
# args_bad = True
# print('The command requires data (audio, image, or video) input.')
if args.job:
jobs = []
jobs.extend(str.split(args.job, ','))
if args_bad:
print('Run the command with --help or -h to see all the command line options.')
quit(1)
# Create the connection to the OLIVE server
client = oc.AsyncOliveClient("olivepy_workflow", args.server, args.port, args.timeout)
client.connect()
try:
# right now, we only support analysis, so that is what we do...
# first, create the workflow definition from the workflow file:
owd = ow.OliveWorkflowDefinition(args.workflow)
# Submit that workflow definition to the client for actualization (instantiation):
workflow = owd.create_workflow(client)
if args.print_jobs:
# Print available jobs:
print("Enrollment jobs '{}'".format(workflow.get_enrollment_job_names()))
print("Un-Enrollment jobs '{}'".format(workflow.get_unenrollment_job_names()))
# for enroll_job_name in workflow.get_enrollment_job_names():
# print("Enrollment job '{}' has Tasks: {}".format(enroll_job_name, workflow.get_enrollment_tasks(enroll_job_name)))
# for unenroll_job_name in workflow.get_unenrollment_job_names():
# print("Unenrollment job '{}' has Tasks: {}".format(unenroll_job_name, workflow.get_unenrollment_tasks(unenroll_job_name)))
if not args.job:
if enroll:
print("Enrolling for all jobs: {}".format(workflow.get_enrollment_job_names()))
if unenroll:
print("Unenrolling for all job: {}".format(workflow.get_unenrollment_job_names()))
jobs = []
if len(data_input) > 0:
enroll_jobs = workflow.get_enrollment_job_names()
if enroll_jobs is None:
print("ERROR: This workflow has no jobs that support enrollment")
quit(1)
for t in jobs:
if t not in enroll_jobs:
print(
"Error: Job '{}' can not be enrolled via this workflow. Only jobs(s) '{}' support enrollment.".format(
t, enroll_jobs))
quit(1)
enroll_buffers = {}
for classid in data_input.keys():
for input_msg in data_input[classid]:
if classid not in enroll_buffers:
enroll_buffers[classid] = []
# buffers.append(workflow.package_workflow_input(input, expected_data_type))
enroll_buffers[classid].append(
workflow.package_workflow_input(input_msg, expected_data_type))
# if audio:
# # NOT SUPPORTING PEM
#
# # if using_pem:
# # for filename, channel_dict in list(data_input.items()):
# # for channel, regions in list(channel_dict.items()):
# # try:
# # if channel is None:
# # ch_label = 0
# # else:
# # ch_label = int(channel)
# #
# # buffers.append(workflow.package_audio(filename, mode=audio_mode, label=os.path.basename(filename),
# # annotations=regions, selected_channel=ch_label))
# #
# # except Exception as e:
# # print("Failed to parse regions from (PEM) input file: {}".format(e))
# # quit(1)
# elif text:
# print("Text enrollment not supported")
# elif video:
# print("clg adding video file: {}".format(filename))
# enroll_buffers[classid].append(
# workflow.package_binary(filename, mode=audio_mode, label=os.path.basename(filename)))
# elif image:
# enroll_buffers[classid].append(
# workflow.package_image(filename, mode=audio_mode, label=os.path.basename(filename)))
print("Workflow {} results:".format(action_str.lower()))
for classid in enroll_buffers.keys():
buffers = enroll_buffers[classid]
print("enrolling {} files for class: {}".format(len(buffers), classid))
response = workflow.enroll(buffers, classid, jobs)
print("{}".format(response.to_json(indent=1)))
elif unenroll:
# TODO use options
unenroll_jobs = workflow.get_unenrollment_job_names()
if unenroll_jobs is None:
print("ERROR: This workflow has no job that support unenrollment")
quit(1)
for t in jobs:
if t not in unenroll_jobs:
print(
"Error: Job '{}' can not be un-enrolled via this workflow. Only job(s) '{}' support "
"un-enrollment.".format(t, unenroll_jobs))
quit(1)
response = workflow.unenroll(args.unenroll, jobs)
print("Workflow {} results:".format(action_str.lower()))
print("{}".format(response.to_json(indent=1)))
except Exception as e:
print("Workflow failed with error: {}".format(e))
finally:
client.disconnect()
parse_pem_file(data_lines)
Parse a PEM file, grouping the results by audio file and channel
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_lines |
required |
Returns:
Type | Description |
---|---|
a dictionary of audio files to score and the channel region: {'filename': {channel: [(start_region, end_region)]} } |
Source code in olivepy/client/workflow_enroll_client.py
def parse_pem_file(data_lines):
'''
Parse a PEM file, grouping the results by audio file and channel
:param data_lines:
:return: a dictionary of audio files to score and the channel region: {'filename': {channel: [(start_region, end_region)]} }
'''
# We process by file and channel - the class/label is ignored
regions = {}
input_pem = Pem()
input_pem.add_records_from_data_lines(data_lines)
for id in input_pem.get_ids():
audio_id = os.path.expandvars(id)
# Create a dictionary of the regions specified for the the current file
regions[audio_id] = {}
for rec in input_pem.get_records(id):
# channel could be a list...
channels = []
if type(rec.channel) is str:
# convert to a list
channels = map(int, str.split(rec.channel, ','))
elif type(rec.channel) is int:
channels.append(rec.channel)
else:
print("Unsupported channel value: {}".format(rec.channel))
for ch in channels:
if ch not in regions[audio_id]:
regions[audio_id][ch] = []
regions[audio_id][ch].append((rec.start_t, rec.end_t))
return regions