Skip to content

Commit

Permalink
add methods to execute and return queries from trustyai service
Browse files Browse the repository at this point in the history
  • Loading branch information
Christina Xu committed Feb 14, 2024
1 parent e40dc58 commit fdf54b5
Showing 1 changed file with 167 additions and 0 deletions.
167 changes: 167 additions & 0 deletions src/trustyai/utils/extras/metrics_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
"""Python client for TrustyAI metrics"""
from typing import List, Optional, Any, Union

import json
import pandas as pd
import requests
import subprocess
import datetime as dt

def json_to_df(
data_path: str,
batch_list: List[int]
) -> pd.DataFrame:
"""
Converts batched data in json files to a single pandas DataFrame
"""
df = pd.DataFrame()
for batch in batch_list:
file = data_path + f'{batch}.json'
with open(file) as train_file:
batch_data = json.load(train_file)['inputs'][0]['data']
batch_df = pd.DataFrame.from_dict(batch_data)
df = pd.concat([df, batch_df])
return df


def df_to_json(
df: pd.DataFrame,
name: str,
json_file: str
) -> None:
"""
Converts pandas DataFrame to json file
"""
if str(df.dtypes[0]) == 'float64':
data_type = 'FP64'
inputs = [{'name': name, 'shape': list(df.shape), "datatype": data_type, 'data': df.values.tolist()}]
data_dict = {"inputs": inputs}
with open(json_file, 'w') as outfile:
json.dump(data_dict, outfile)


class trustyaiMetricsService:
"""
Executes and returns queries from TrustyAI service on ODH
"""
def __init__(
self,
token: str
):
self.token = token
self.trusty_url = 'https://' + (subprocess.check_output('oc get route/trustyai-service --template={{.spec.host}}', shell=True)).decode()
self.thanos_url = 'https://' + (subprocess.check_output('oc get route thanos-querier -n openshift-monitoring --template={{.spec.host}}', shell=True)).decode()
self.headers = {'Authorization': 'Bearer ' + token, 'Content-Type': 'application/json'}

print(f'TRUSTY ROUTE: {self.trusty_url}')
print(f'THANOS QUERIER HOST: {self.thanos_url}')


def upload_payload_data(
self,
json_file: str
) -> None:
"""
Uploads data to TrustyAI service
"""
payload = open(json_file, 'r')
r = requests.post(f'{self.trusty_url}/data/upload', data=payload, headers=self.headers, verify=False)
payload.close()
if r.status_code == 200:
print('Data sucessfully uploaded to TrustyAI service')
else:
print(f"Error {r.status_code}: {r.reason}")

def get_model_metadata(
self
):
"""
Retrieves model data from TrustyAI
"""
r = requests.get(f'{self.trusty_url}/info', headers=self.headers, verify=False)
if r.status_code == 200:
model_metadata = json.loads(r.text)
return model_metadata
else:
print(f"Error {r.status_code}: {r.reason}")


def label_data_fields(
self,
model: str,
payload: str
):
"""
Assigns feature names to model input data
"""
def print_name_mapping(self):
r = requests.get(f'{self.trusty_url}/info', headers= self.headers, verify=False)
name_mapping = json.loads(r.text)[0]
for k, v in name_mapping['data']['inputSchema']['nameMapping'].items():
print(f'{k} -> {v}')

r = requests.post(f'{self.trusty_url}/info/names',json=payload, headers=self.headers, verify=False)
if r.status_code == 200:
print_name_mapping()
else:
print(f"Error {r.status_code}: {r.reason}")


def get_metric_request(
self,
payload: str,
metric: str,
reoccuring: bool
):
"""
Retrieve or schedule a metric request
"""
if reoccuring:
r = requests.post(f'{self.trusty_url}/metrics/{metric}/request', json=payload, headers=self.headers, verify=False)
else:
r = requests.post(f'{self.trusty_url}/metrics/{metric}', json=payload, headers=self.headers, verify=False)

if r.status_code == 200:
return r.text
else:
print(f"Error {r.status_code}: {r.reason}")


def upload_data_to_model(
self,
model_name: str,
json_file: str
):
"""
Sends an inference request to the model
"""
model_route = subprocess.check_output(f'oc get route {model_name}' + ' --template={{.spec.host}}{{.spec.path}}', shell=True).decode()
with open(json_file) as batch_file:
r = requests.post(url=f'https://{model_route}/infer', data=batch_file, headers=self.headers, verify=False)
if r.status_code == 200:
return r.text
else:
print(f"Error {r.status_code}: {r.reason}")

def get_metric_data(
self,
namespace: str,
metric: str,
range: List[str]
):
"""
Retrives metric data for a specific range in time
"""
params = {
"query": "{metric}{{namespace='{namespace}'}}{range}".format(metric=metric,namespace=namespace, range=range)
}
r = requests.get(f'{self.thanos_url}/api/v1/query?', params=params, headers=self.headers, verify=False)
if r.status_code == 200:
data_dict = json.loads(r.text)['data']['result'][0]['values']
df = pd.DataFrame(data_dict, columns=['timestamp', metric])
df['timestamp'] = df['timestamp'].apply(lambda epoch: dt.datetime.fromtimestamp(epoch).strftime('%Y-%m-%d %H:%M:%S'))
return df
else:
print(f"Error {r.status_code}: {r.reason}")


0 comments on commit fdf54b5

Please sign in to comment.