Source code for watson_machine_learning_client.experiments

################################################################################
#
# Licensed Materials - Property of IBM
# (C) Copyright IBM Corp. 2017
# US Government Users Restricted Rights - Use, duplication disclosure restricted
# by GSA ADP Schedule Contract with IBM Corp.
#
################################################################################

from __future__ import print_function
import requests
import json
import re
import time
import copy
from watson_machine_learning_client.wml_client_error import MissingValue, WMLClientError, MissingMetaProp
from watson_machine_learning_client.href_definitions import is_uid
from watson_machine_learning_client.wml_resource import WMLResource
from multiprocessing import Pool
from watson_machine_learning_client.utils import print_text_header_h1, print_text_header_h2, EXPERIMENT_DETAILS_TYPE, EXPERIMENT_RUN_DETAILS_TYPE, format_metrics, STR_TYPE, STR_TYPE_NAME, docstring_parameter, group_metrics, str_type_conv, meta_props_str_conv
from watson_machine_learning_client.hpo import HPOParameter, HPOMethodParam
from watson_machine_learning_client.metanames import ExperimentMetaNames


def _get_details_helper(url, headers, setting=None):
    from watson_machine_learning_client.log_util import get_logger
    logger = get_logger(u'experiments._get_details_helper')

    response_get = requests.get(
        url + u'/runs',
        headers=headers)
    if response_get.status_code == 200:
        logger.debug(u'Successfully got runs details ({}): {}'.format(response_get.status_code, response_get.text))
        details = response_get.json()

        if u'resources' in details:
            resources = details[u'resources']
        else:
            resources = [details]

        if setting is not None:
            for r in resources:
                r[u'entity'].update({u'_parent_settings': setting})
        return resources
    else:
        logger.warning(u'Failure during getting runs details ({}): {}'.format(response_get.status_code, response_get.text))
        return []


[docs]class Experiments(WMLResource): """ Run new experiment. """ ConfigurationMetaNames = ExperimentMetaNames() """MetaNames for experiments creation.""" @staticmethod def HPOParameter(name, values=None, max=None, min=None, step=None): return HPOParameter(name, values, max, min, step) @staticmethod def HPOMethodParam(name=None, value=None): return HPOMethodParam(name, value) def __init__(self, client): WMLResource.__init__(self, __name__, client) self._experiments_uids_cache = {} def _store(self, meta_props): """ Store experiment. :param meta_props: meta data of the experiment configuration. To see available meta names use: >>> client.experiments.ConfigurationMetaNames.get() :type meta_props: dict :returns: stored experiment details :rtype: dict **Example** >>> metadata = { >>> client.experiments.ConfigurationMetaNames.NAME: 'my_experiment', >>> client.experiments.ConfigurationMetaNames.EVALUATION_METRICS: ['accuracy'], >>> client.experiments.ConfigurationMetaNames.TRAINING_DATA_REFERENCE: {'connection': {'endpoint_url': 'https://s3-api.us-geo.objectstorage.softlayer.net', 'access_key_id': '***', 'secret_access_key': '***'}, 'source': {'bucket': 'train-data'}, 'type': 's3'}, >>> client.experiments.ConfigurationMetaNames.TRAINING_RESULTS_REFERENCE: {'connection': {'endpoint_url': 'https://s3-api.us-geo.objectstorage.softlayer.net', 'access_key_id': '***', 'secret_access_key': '***'}, 'target': {'bucket': 'result-data'}, 'type': 's3'}, >>> client.experiments.ConfigurationMetaNames.TRAINING_REFERENCES: [ >>> { >>> 'training_definition_url': definition_url_1 >>> }, >>> { >>> 'training_definition_url': definition_url_2 >>> }, >>> ], >>> } >>> experiment_details = client.experiments._store(meta_props=metadata) >>> experiment_url = client.experiments.get_url(experiment_details) """ Experiments._validate_type(meta_props, u'meta_props', dict, True) meta_props_str_conv(meta_props) self.ConfigurationMetaNames._validate(meta_props) meta_props = copy.deepcopy(meta_props) if any(u'training_definition_url' not in x for x in meta_props[self.ConfigurationMetaNames.TRAINING_REFERENCES]): raise MissingMetaProp(u'training_references.training_definition_url') for ref in meta_props[self.ConfigurationMetaNames.TRAINING_REFERENCES]: if u'name' not in ref or u'command' not in ref: training_definition_response = requests.get(ref[u'training_definition_url'].replace(u'/content', u''), headers=self._client._get_headers()) result = self._handle_response(200, u'getting training definition', training_definition_response) if not u'name' in ref: ref.update({u'name': result[u'entity'][u'name']}) if not u'command' in ref: ref.update({u'command': result[u'entity'][u'command']}) response_experiment_post = requests.post( self._href_definitions.get_experiments_href(), json=self.ConfigurationMetaNames._generate_resource_metadata(meta_props), headers=self._client._get_headers() ) return self._handle_response(201, u'saving experiment', response_experiment_post) @docstring_parameter({'str_type': STR_TYPE_NAME}) def _update_experiment(self, experiment_uid, changes): """ Updates existing experiment metadata. :param experiment_uid: UID of experiment which definition should be updated :type experiment_uid: str :param changes: elements which should be changed, where keys are ConfigurationMetaNames :type changes: dict :return: metadata of updated experiment :rtype: dict **Example** >>> metadata = { >>> client.experiments.ConfigurationMetaNames.NAME:"updated_exp" >>> } >>> exp_details = client.experiments.update(experiment_uid, changes=metadata) """ experiment_uid = str_type_conv(experiment_uid) self._validate_type(experiment_uid, u'experiment_uid', STR_TYPE, True) self._validate_type(changes, u'changes', dict, True) meta_props_str_conv(changes) details = self._client.repository.get_experiment_details(experiment_uid) patch_payload = self.ConfigurationMetaNames._generate_patch_payload(details['entity'], changes, with_validation=True) url = self._href_definitions.get_experiment_href(experiment_uid) response = requests.patch(url, json=patch_payload, headers=self._client._get_headers()) updated_details = self._handle_response(200, u'experiment patch', response) return updated_details def _get_experiment_uid(self, experiment_run_uid=None, experiment_run_url=None): if experiment_run_uid is None and experiment_run_url is None: raise MissingValue(u'experiment_run_id/experiment_run_url') if experiment_run_uid is not None and experiment_run_uid in self._experiments_uids_cache: return self._experiments_uids_cache[experiment_run_uid] if experiment_run_url is not None: m = re.search(u'.+/v3/experiments/{[^\/]+}/runs/{[^\/]+}', experiment_run_url) _experiment_id = m.group(1) _experiment_run_id = m.group(2) self._experiments_uids_cache.update({_experiment_run_id: _experiment_id}) return _experiment_id details = self.get_details() resources = details[u'resources'] try: el = [x for x in resources if x[u'metadata'][u'guid'] == experiment_run_uid][0] except: raise WMLClientError(u'Cannot find experiment_uid for experiment_run_uid: \'{}\''.format(experiment_run_uid)) experiment_uid = el[u'experiment'][u'guid'] self._experiments_uids_cache.update({experiment_run_uid: experiment_uid}) return experiment_uid
[docs] @docstring_parameter({'str_type': STR_TYPE_NAME}) def run(self, experiment_uid, asynchronous=True): """ Run experiment. :param experiment_uid: ID of stored experiment :type experiment_uid: str :param asynchronous: Default `True` means that experiment is started and progress can be checked later. `False` - method will wait till experiment end and print experiment stats. :type asynchronous: bool :return: experiment run details :rtype: dict **Example**: >>> experiment_run_status = client.experiments.run(experiment_uid) >>> experiment_run_status = client.experiments.run(experiment_uid, asynchronous=False) """ WMLResource._chk_and_block_create_update_for_python36(self) experiment_uid = str_type_conv(experiment_uid) Experiments._validate_type(experiment_uid, u'experiment_uid', STR_TYPE, True) Experiments._validate_type(asynchronous, u'asynchronous', bool, True) run_url = self._href_definitions.get_experiment_runs_href(experiment_uid) response = requests.post(run_url, headers=self._client._get_headers()) # TODO should be 201 result_details = self._handle_response(200, u'experiment run', response) experiment_run_uid = self.get_run_uid(result_details) self._experiments_uids_cache.update({experiment_run_uid: experiment_uid}) if asynchronous: return result_details else: print_text_header_h1(u'Running \'{}\' experiment'.format(experiment_uid)) print('Experiment run uid: {}\n'.format(experiment_run_uid)) status = self.get_status(experiment_run_uid) tries_no = 10 error = None training_uids = None import logging root_logger = logging.getLogger("watson_machine_learning_client.wml_client_error") root_logger.disabled = True while tries_no > 0 and training_uids is None: tries_no -= 1 try: run_details = self.get_run_details(experiment_run_uid) training_uids = self.get_training_uids(run_details) except Exception as e: error = e time.sleep(1) root_logger.disabled = False if training_uids is None: raise WMLClientError("Getting training uids failed.", error) total = 100 current_progress = 0 state = "initializing" train_state = "initializing" last_curr_uid = None last_state = state last_train_state = train_state curr_uid = None import math training_runs = [] completed_uids = [] while state not in ["error", "cancelled", "completed"]: run_details = self.get_run_details(experiment_run_uid) training_runs = self.get_training_runs(run_details) running_uids = [x['training_guid'] for x in training_runs if x['state'] == 'running'] uncompleted_uids = [x['training_guid'] for x in training_runs if x['state'] not in ["error", "cancelled", "completed"]] completed_uids = [x['training_guid'] for x in training_runs if x['state'] in ["error", "cancelled", "completed"]] progress = math.floor(total * len(completed_uids)/len(training_runs)) state = self.get_status(experiment_run_uid)[u'state'] if (curr_uid is None or curr_uid in completed_uids) and len(uncompleted_uids) > 0: if len(running_uids) > 0: curr_uid = running_uids[0] else: curr_uid = uncompleted_uids[0] if curr_uid is not None: train_state = self._client.training.get_status(curr_uid)[u'state'] if curr_uid is not None and (last_state != state or last_train_state != train_state or last_curr_uid != curr_uid): last_state = state last_train_state = train_state last_curr_uid = curr_uid indent = ' ' * (3 - len(str(progress))) print('{}%{} - Processing {} ({}/{}): experiment_state={}, training_state={}'.format(progress, indent, curr_uid, min(len(completed_uids) + 1, len(training_runs)), len(training_runs), state, train_state)) print('100% - Finished processing training runs: experiment_state={}'.format(state)) if u'completed' in state: print_text_header_h2(u'Run of \'{}\' finished successfully.'.format(str(experiment_uid))) else: print_text_header_h2( u'Run of \'{}\' failed with status: \'{}\'.'.format(experiment_uid, str(status))) result_details = self.get_run_details(experiment_run_uid) self._logger.debug(u'Response({}): {}'.format(state, result_details)) return result_details
[docs] def get_status(self, experiment_run_uid): """ Get experiment status. :param experiment_run_uid: ID of experiment run :type experiment_run_uid: bool :returns: experiment status :rtype: dict **Example**: >>> experiment_status = client.experiments.get_status(experiment_run_uid) """ experiment_run_uid = str_type_conv(experiment_run_uid) Experiments._validate_type(experiment_run_uid, u'experiment_run_uid', STR_TYPE, True) details = self.get_run_details(experiment_run_uid) try: status = WMLResource._get_required_element_from_dict(details, u'details', [u'entity', u'experiment_run_status']) except Exception as e: self._logger.debug('Missing experiment run status in run details:', e) raise WMLClientError(u'Missing experiment run status in run details. Probably status wasn\'t updated in time. Try again.') return status
[docs] def get_run_details(self, experiment_run_uid): """ Get metadata of particular experiment run. :param experiment_run_uid: experiment run UID :type experiment_run_uid: bool :returns: experiment run metadata :rtype: dict **Example**: >>> experiment_run_details = client.experiments.get_run_details(experiment_run_uid) """ experiment_run_uid = str_type_conv(experiment_run_uid) Experiments._validate_type(experiment_run_uid, u'experiment_run_uid', STR_TYPE, True) experiment_uid = self._get_experiment_uid(experiment_run_uid) url = self._href_definitions.get_experiment_run_href(experiment_uid, experiment_run_uid) response_get = requests.get( url, headers=self._client._get_headers()) response = self._handle_response(200, u'getting experiment run details', response_get) return response
[docs] @docstring_parameter({'str_type': STR_TYPE_NAME}) def get_details(self, experiment_uid=None, limit=None): """ Get metadata of experiment(s) run(s). If no experiment_uid is provided, runs will be listed for all existing experiments. :param experiment_uid: experiment UID (optional) :type experiment_uid: str :param limit: limit number of fetched records (optional) :type limit: int :returns: experiment(s) run(s) metadata :rtype: dict (if uid is not None) or {"resources": [dict]} (if uid is None) **Example**: >>> experiment_details = client.experiments.get_details(experiment_uid) >>> experiment_details = client.experiments.get_details() """ experiment_uid = str_type_conv(experiment_uid) return self._get_extended_details(experiment_uid, False)
@staticmethod @docstring_parameter({'str_type': STR_TYPE_NAME}) def _get_uid(experiment_details): """ Get uid of stored experiment. :param experiment_details: stored experiment details :type experiment_details: dict :returns: uid of stored experiment :rtype: str **Example** >>> experiment_uid = client.experiments.get_uid(experiment_details) """ Experiments._validate_type(experiment_details, u'experiment_details', object, True) Experiments._validate_type_of_details(experiment_details, EXPERIMENT_DETAILS_TYPE) return WMLResource._get_required_element_from_dict(experiment_details, u'experiment_details', [u'metadata', u'guid']) @staticmethod @docstring_parameter({'str_type': STR_TYPE_NAME}) def _get_url(experiment_details): """ Get url of stored experiment. :param experiment_details: stored experiment details :type experiment_details: dict :returns: url of stored experiment :rtype: str **Example** >>> experiment_url = client.experiments.get_url(experiment_details) """ Experiments._validate_type(experiment_details, u'experiment_details', object, True) Experiments._validate_type_of_details(experiment_details, EXPERIMENT_DETAILS_TYPE) return WMLResource._get_required_element_from_dict(experiment_details, u'experiment_details', [u'metadata', u'url'])
[docs] @staticmethod @docstring_parameter({'str_type': STR_TYPE_NAME}) def get_run_url(experiment_run_details): """ Get experiment run url. :param experiment_run_details: details of experiment run :type experiment_run_details: object :returns: experiment run url :rtype: str **Example**: >>> experiment_run_url = client.experiments.get_run_url(experiment_run_details) """ Experiments._validate_type(experiment_run_details, u'experiment_run_details', dict, True) Experiments._validate_type_of_details(experiment_run_details, EXPERIMENT_RUN_DETAILS_TYPE) try: url = WMLResource._get_required_element_from_dict(experiment_run_details, u'experiment_run_details', [u'metadata', u'url']) except Exception as e: raise WMLClientError(u'Failure during getting experiment run url from details.', e) # TODO still, it is not working properly #if not is_url(url): # raise WMLClientError('Experiment url: \'{}\' is invalid.'.format(url)) return url
[docs] @staticmethod @docstring_parameter({'str_type': STR_TYPE_NAME}) def get_run_uid(experiment_run_details): """ Get experiment run uid. :param experiment_run_details: details of experiment run :type experiment_run_details: object :returns: experiment run uid :rtype: str **Example**: >>> experiment_run_uid = client.experiments.get_run_uid(experiment_run_details) """ Experiments._validate_type(experiment_run_details, u'experiment_run_details', dict, True) Experiments._validate_type_of_details(experiment_run_details, EXPERIMENT_RUN_DETAILS_TYPE) try: uid = WMLResource._get_required_element_from_dict(experiment_run_details, u'experiment_run_details', [u'metadata', u'guid']) except Exception as e: raise WMLClientError(u'Failure during getting experiment run uid from details.', e) if not is_uid(uid): raise WMLClientError(u'Experiment run uid: \'{}\' is invalid.'.format(uid)) return uid
[docs] @staticmethod def get_training_runs(experiment_run_details): """ Get experiment training runs details. :param experiment_run_details: details of experiment run :type: object :returns: training runs :rtype: array **Example**: >>> training_runs = client.experiments.get_training_runs(experiment_run_details) """ Experiments._validate_type(experiment_run_details, u'experiment_run_details', dict, True) Experiments._validate_type_of_details(experiment_run_details, EXPERIMENT_RUN_DETAILS_TYPE) try: training_runs = WMLResource._get_required_element_from_dict(experiment_run_details, u'experiment_run_details', [u'entity', u'training_statuses']) except Exception as e: raise WMLClientError(u'Failure during getting experiment training runs from details.', e) if training_runs is None or len(training_runs) <= 0: raise MissingValue(u'training_runs') return training_runs
[docs] @staticmethod def get_training_uids(experiment_run_details): """ Get experiment training uids. :param experiment_run_details: details of experiment run :type experiment_run_details: object :returns: training uids :rtype: array **Example**: >>> training_uids = client.experiments.get_training_uids(experiment_run_details) """ Experiments._validate_type(experiment_run_details, u'experiment_run_details', dict, True) Experiments._validate_type_of_details(experiment_run_details, EXPERIMENT_RUN_DETAILS_TYPE) try: training_uids = [x[u'training_guid'] for x in WMLResource._get_required_element_from_dict(experiment_run_details, u'experiment_run_details', [u'entity', u'training_statuses'])] except Exception as e: raise WMLClientError(u'Failure during getting experiment training runs from details.', e) if training_uids is None or len(training_uids) <= 0: raise MissingValue(u'training_uids') return training_uids
[docs] @docstring_parameter({'str_type': STR_TYPE_NAME}) def delete(self, experiment_run_uid): """ Delete experiment run. :param experiment_run_uid: experiment run UID :type experiment_run_uid: str :returns: returns the status message ("SUCCESS" or FAILED") :rtype: str **Example**: >>> client.experiments.delete(experiment_run_uid) .. hint:: This function is only for deleting experiment runs. To delete whole experiment use: >>> client.repository.delete(experiment_uid) """ experiment_run_uid = str_type_conv(experiment_run_uid) Experiments._validate_type(experiment_run_uid, u'experiment_run_uid', STR_TYPE, True) experiment_uid = self._get_experiment_uid(experiment_run_uid) run_url = self._href_definitions.get_experiment_run_href(experiment_uid, experiment_run_uid) response = requests.delete(run_url, headers=self._client._get_headers()) return self._handle_response(204, u'experiment deletion', response, False)
def _get_extended_details(self, experiment_uid=None, extended=True): experiment_uid = str_type_conv(experiment_uid) Experiments._validate_type(experiment_uid, u'experiment_uid', STR_TYPE, False) if experiment_uid is None: experiments = self._client.repository.get_experiment_details() try: urls_and_settings = [(experiment[u'metadata'][u'url'], experiment[u'entity'][u'settings'] if extended else None) for experiment in experiments[u'resources']] self._logger.debug(u'Preparing details for urls and settings: {}'.format(urls_and_settings)) res = [] pool = Pool(processes=4) tasks = [] for url_and_setting in urls_and_settings: url = url_and_setting[0] setting = url_and_setting[1] tasks.append(pool.apply_async(_get_details_helper, (url, self._client._get_headers(), setting))) for task in tasks: res.extend(task.get()) pool.close() except Exception as e: raise WMLClientError(u'Error during getting all experiments details.', e) return {u'resources': res} else: url = self._href_definitions.get_experiment_runs_href(experiment_uid) response_get = requests.get( url, headers=self._client._get_headers()) result = self._handle_response(200, u'getting experiment details', response_get) if extended: setting = self._client.repository.get_experiment_details(experiment_uid)[u'entity'][u'settings'] for r in result[u'resources']: r[u'entity'].update({u'_parent_settings': setting}) return result
[docs] @docstring_parameter({'str_type': STR_TYPE_NAME}) def list_runs(self, experiment_uid=None, limit=None): """ List experiment runs. If experiment_uid set to None and limit set to None only 50 first records will be displayed. :param experiment_uid: experiment UID (optional) :type experiment_uid: str :param limit: limit number of fetched records (optional) :type limit: int :returns: None :rtype: None .. note:: This function only prints the list of experiments-runs **Example**: >>> client.experiments.list_runs() >>> client.experiments.list_runs(experiment_uid) """ experiment_uid = str_type_conv(experiment_uid) Experiments._validate_type(experiment_uid, u'experiment_uid', STR_TYPE, False) from tabulate import tabulate details = self._get_extended_details(experiment_uid) resources = details['resources'] values = [(m[u'experiment'][u'guid'], m[u'metadata'][u'guid'], m[u'entity'][u'_parent_settings'][u'name'], m[u'entity'][u'experiment_run_status'][u'state'], m[u'metadata'][u'created_at']) for m in resources] if experiment_uid is not None: table = tabulate([[u'GUID (experiment)', u'GUID (run)', u'NAME (experiment)', u'STATE', u'CREATED']] + values) print(table) else: self._list(values, [u'GUID (experiment)', u'GUID (run)', u'NAME (experiment)', u'STATE', u'CREATED'], limit, 50)
[docs] @docstring_parameter({'str_type': STR_TYPE_NAME}) def list_training_runs(self, experiment_run_uid): """ List training runs triggered by experiment run. :param experiment_run_uid: experiment run UID :type experiment_run_uid: str :returns: None :rtype: None .. note:: This function only prints the list of training-runs associated with an experiment-run. **Example**: >>> client.experiments.list_training_runs(experiment_run_uid) """ experiment_run_uid = str_type_conv(experiment_run_uid) Experiments._validate_type(experiment_run_uid, 'experiment_run_uid', STR_TYPE, True) from tabulate import tabulate details = self._client.experiments.get_run_details(experiment_run_uid) training_statuses = details[u'entity'][u'training_statuses'] values = [(m[u'training_guid'], m[u'training_reference_name'], m[u'state'], m[u'submitted_at'], m[u'finished_at'] if u'finished_at' in m else u'-', format_metrics(self._client.training.get_latest_metrics(m[u'training_guid'])) if len(self._client.training.get_latest_metrics(m[u'training_guid'])) > 0 else u'-') for m in training_statuses] table = tabulate([[u'GUID (training)', u'NAME', u'STATE', u'SUBMITTED', u'FINISHED', u'PERFORMANCE']] + values) print(table)
[docs] @docstring_parameter({'str_type': STR_TYPE_NAME}) def monitor_logs(self, experiment_run_uid): """ Monitor experiment run log files (prints log content to console). :param experiment_run_uid: ID of experiment run :type experiment_run_uid: str **Example**: >>> client.experiments.monitor_logs(experiment_run_uid) """ experiment_run_uid = str_type_conv(experiment_run_uid) Experiments._validate_type(experiment_run_uid, u'experiment_run_uid', STR_TYPE, True) print_text_header_h1(u'Monitor started for experiment run: ' + str(experiment_run_uid)) # TODO More correct but not as nice working version # try: # experiment_uid = self.get_run_details(experiment_run_uid)[u'experiment'][u'guid'] # except Exception as e: # raise WMLClientError(u'Failure during getting experiment uid from experiment run details.', e) # # from lomond import WebSocket # experiment_monitor_endpoint = self._wml_credentials[u'url'].replace(u'https', # u'wss') + u'/v3/experiments/' + experiment_uid + u'/runs/' + experiment_run_uid + '/monitor' # websocket = WebSocket(experiment_monitor_endpoint) # try: # websocket.add_header(bytes('Authorization', 'utf-8'), bytes('bearer ' + self._client.service_instance._get_token(), 'utf-8')) # except: # websocket.add_header(bytes('Authorization'), bytes('bearer ' + self._client.service_instance._get_token())) # # previous_guid = '' # # for event in websocket: # if event.name == u'text': # text = json.loads(event.text) # if (u'entity' in str(text)) and (u'training_statuses' in str(text)): # training_statuses = text[u'entity'][u'training_statuses'] # for i in training_statuses: # if (u'training_guid' in i) and (u'message' in i): # msg = i[u'message'].strip() # guid = i[u'training_guid'].strip() # if msg != '': # if guid == previous_guid: # print(msg) # else: # if 'training_reference_name' in str(i): # name = i['training_reference_name'] # if name != '': # h2_text = guid + " (" + name + ")" # else: # h2_text = guid # else: # h2_text = guid # print_text_header_h2(h2_text) # print(msg) # previous_guid = guid state = "initialized" import logging root_logger = logging.getLogger("watson_machine_learning_client.wml_client_error") root_logger.disabled = True uids_get_tries = 10 training_run_uids = None error = None while uids_get_tries > 0 and training_run_uids is None: uids_get_tries -= 1 try: run_details = self.get_run_details(experiment_run_uid) training_run_uids = self.get_training_uids(run_details) except Exception as e: error = e time.sleep(1) root_logger.disabled = False if training_run_uids is None: raise WMLClientError("Training run uids couldn't be retrieved.", error) training_run_uids = [x for x in training_run_uids if "_" not in x] finished = [] while state not in ("error", "cancelled", "completed"): run_details = self.get_run_details(experiment_run_uid) training_runs = self.get_training_runs(run_details) running_uids = [x['training_guid'] for x in training_runs if x['state'] == 'running' and x['training_guid'] not in finished] for running_uid in running_uids: self._client.training._simple_monitor_logs(running_uid, lambda: print_text_header_h2(u'Log monitor started for training run: ' + str(running_uid))) finished.append(running_uid) status = self.get_status(experiment_run_uid) state = status['state'] for training_uid in training_run_uids: if training_uid not in finished: self._client.training._simple_monitor_logs(training_uid, lambda: print_text_header_h2(u'Log monitor started for training run: ' + str(training_uid))) print_text_header_h2(u'Log monitor done.')
[docs] @docstring_parameter({'str_type': STR_TYPE_NAME}) def monitor_metrics(self, experiment_run_uid): """ Monitor metrics log file (prints metrics to console). :param experiment_run_uid: ID of experiment run :type run_uid: str **Example** >>> client.experiments.monitor_metrics(experiment_run_uid) """ experiment_run_uid = str_type_conv(experiment_run_uid) try: experiment_uid = self.get_run_details(experiment_run_uid)[u'experiment'][u'guid'] except Exception as e: raise WMLClientError(u'Failure during getting experiment uid from experiment run details.', e) print_text_header_h1(u'Metric monitor started for experiment run: ' + str(experiment_run_uid)) run_details = self.get_run_details(experiment_run_uid) status = run_details["entity"]["experiment_run_status"]["state"] if (status == "completed" or status == "error" or status=="failed" or status=="canceled"): training_statuses = run_details["entity"]["training_statuses"] for each_training_run in range(len(training_statuses)): self._client.training._COS_metrics(training_statuses[each_training_run]["training_guid"], lambda: print_text_header_h1('Metric monitor started for training run: ' + str(training_statuses[each_training_run]["training_guid"]))) else: from lomond import WebSocket experiment_monitor_endpoint = self._wml_credentials[u'url'].replace(u'https', u'wss') + u'/v3/experiments/' + experiment_uid + u'/runs/' + experiment_run_uid + '/monitor' websocket = WebSocket(experiment_monitor_endpoint) try: websocket.add_header(bytes('Authorization', 'utf-8'), bytes('bearer ' + self._client.service_instance._get_token(), 'utf-8')) except: websocket.add_header(bytes('Authorization'), bytes('bearer ' + self._client.service_instance._get_token())) previous_guid = u'' for event in websocket: if event.name == u'text': text = json.loads(event.text) if (u'entity' in str(text)) and (u'training_statuses' in str(text)): training_statuses = text[u'entity'][u'training_statuses'] for i in training_statuses: if u'training_guid' in i: guid = i[u'training_guid'].strip() if u'metrics' in i: metrics = i[u'metrics'] if len(metrics) > 0: metric = metrics[0] values = u' '.join([x[u'name'] + ':' + str(x[u'value']) for x in metric[u'values']]) metric_msg = u'{} iteration:{} phase:{} {}'.format(metric[u'timestamp'], metric[u'iteration'], metric[u'phase'], values) if metric_msg != u'': if guid == previous_guid: print(metric_msg) else: if 'training_reference_name' in str(i): name = i['training_reference_name'] if name != '': h2_text = guid + " (" + name + ")" else: h2_text = guid else: h2_text = guid print_text_header_h2(h2_text) print(metric_msg) previous_guid = guid websocket.close() print_text_header_h2(u'Metric monitor done.')
[docs] def get_latest_metrics(self, experiment_run_uid): """ Get latest metrics values for experiment run. :param experiment_run_uid: ID of training run :type experiment_run_uid: str :returns: metric values :rtype: list of dicts **Example** >>> client.experiments.get_latest_metrics(experiment_run_uid) """ experiment_run_uid = str_type_conv(experiment_run_uid) Experiments._validate_type(experiment_run_uid, 'experiment_run_uid', str, True) training_statuses = self.get_run_details(experiment_run_uid)['entity']['training_statuses'] metrics = [] for status in training_statuses: training_guid_metrics = status['metrics'] if len(training_guid_metrics) > 0: grouped_metrics = group_metrics(training_guid_metrics) for key, value in grouped_metrics.items(): sorted_value = sorted(value, key=lambda k: k['iteration']) metrics.append({"training_guid": status['training_guid'], "training_reference_name": status['training_reference_name'], "metrics": sorted_value[-1]}) return metrics
[docs] def get_metrics(self, experiment_run_uid): """ Get all metrics values for experiment run. :param experiment_run_uid: ID of training run :type experiment_run_uid: str :returns: metric values :rtype: list of dicts **Example** >>> client.experiments.get_metrics(experiment_run_uid) """ experiment_run_uid = str_type_conv(experiment_run_uid) Experiments._validate_type(experiment_run_uid, 'experiment_run_uid', STR_TYPE, True) training_statuses = self.get_run_details(experiment_run_uid)['entity']['training_statuses'] metrics = [] for status in training_statuses: training_guid_metrics = status['metrics'] if len(training_guid_metrics) > 0: metrics.append({"training_guid": status['training_guid'], "training_reference_name": status['training_reference_name'], "metrics": training_guid_metrics}) return metrics
[docs] def list_definitions(self, limit=None): """ List stored experiments. If limit is set to None there will be only first 50 records shown. :returns: None :rtype: None .. note:: This function only prints the list of experiments **Example**: >>> client.experiments.list_definitions() """ self._client.repository.list_experiments(limit=limit)
[docs] @docstring_parameter({'str_type': STR_TYPE_NAME}) def get_definition_details(self, experiment_uid=None, limit=None): """ Get metadata of stored experiments. If neither experiment uid nor url is specified all experiments metadata is returned. :param experiment_uid: stored experiment UID (optional) :type experiment_uid: str :param limit: limit number of fetched records (optional) :type limit: int :returns: stored experiment(s) metadata :rtype: dict **Example** >>> experiment_details = client.experiments.get_definition_details(experiment_uid) >>> experiment_details = client.experiments.get_definition_details() """ return self._client.repository.get_experiment_details(experiment_uid, limit=limit)
[docs] @docstring_parameter({'str_type': STR_TYPE_NAME}) def get_definition_uid(self, experiment_details): """ Get uid of stored experiment. :param experiment_details: stored experiment details :type experiment_details: dict :returns: uid of stored experiment :rtype: str **Example** >>> experiment_uid = client.experiments.get_definition_uid(experiment_details) """ return self._client.repository.get_experiment_uid(experiment_details)
[docs] @docstring_parameter({'str_type': STR_TYPE_NAME}) def get_definition_url(self, experiment_details): """ Get url of stored experiment. :param experiment_details: stored experiment details :type experiment_details: dict :returns: url of stored experiment :rtype: str **Example** >>> experiment_url = client.experiments.get_definition_url(experiment_details) """ return self._client.repository.get_experiment_url(experiment_details)