From 5d2b32c183be684de9710992690f883a49fdf48c Mon Sep 17 00:00:00 2001 From: Muzzammil Nakhuda Date: Wed, 18 Aug 2021 19:53:22 -0400 Subject: [PATCH] submit_jobdef.py and submit_jobreq.py are two commmand-line tools that calls functions from jobmodule.py jobmodule class. (#88) submit_jobdef submits a job using job definition id and submit_jobreq submits a job using job request id. Added information about them in the README.md and EXAMPLES.md --- EXAMPLES.md | 17 ++- README.md | 2 + jobmodule.py | 312 +++++++++++++++++++++++++++++++++++++++++++++++ submit_jobdef.py | 105 ++++++++++++++++ submit_jobreq.py | 103 ++++++++++++++++ 5 files changed, 538 insertions(+), 1 deletion(-) create mode 100644 jobmodule.py create mode 100644 submit_jobdef.py create mode 100644 submit_jobreq.py diff --git a/EXAMPLES.md b/EXAMPLES.md index 41fc0e0..2cf9c36 100644 --- a/EXAMPLES.md +++ b/EXAMPLES.md @@ -326,4 +326,19 @@ For example: /gelcontent/GELCorp/Marketing/Reports,user,Douglas,grant,"read,update,add,remove,delete,secure","read,update,add,remove,delete,secure" /gelcontent/GELCorp/Marketing/Analyses,group,Marketing,grant,"read,add,remove","read,update,delete,add,remove" /gelcontent/GELCorp/Marketing/Work in Progress,group,Marketing,grant,"read,update,add,remove,delete,secure","read,update,add,remove,delete,secure" -``` \ No newline at end of file +``` + +**submit_jobdef.py** +```bash +./submit_jobdef.py -id {jobDefinitionId} +./submit_jobdef.py -id {jobDefinitionId} -v +./submit_jobdef.py -id {jobDefinitionId} -context "SAS Studio compute context" +./submit_jobdef.py -id {jobDefinitionId} -context "SAS Studio compute context" -v + +``` + +**submit_jobreq.py** +```bash +./submit_jobreq.py -id {jobRequestId} +./submit_jobreq.py -id {jobRequestId} -v +``` diff --git a/README.md b/README.md index 62b4b01..416a287 100644 --- a/README.md +++ b/README.md @@ -128,6 +128,8 @@ Additional tools provide more complex functionality by combining multiple calls * **getauditrecords.py** lists audit records from SAS Infrastructure Data Server in CSV or JSON format using REST calls. * **listmodelobjects.py** lists basic information about model content (models, projects and repositories). * **applyfolderauthorization.py** apply authorization rules to folders in bulk from a source CSV file. +* **submit_jd_job.py** This file calls various functions from jobmodule.py. It will submit a job based on job definition id. Based on other arguments, it will either provide the sasout and saslog information if available in the location provided or it will display more information based on if verbose was selected. It will also display more information if needed based on log level selected. +* **submit_jr_job.py** This file calls various functions from jobmodule.py. It will submit a job based on job request id. Based on other arguments, it will either provide the sasout and saslog information if available in the location provided or it will display more information based on if verbose was selected. It will also display more information if needed based on log level selected. Check back for additional tools and if you build a tool feel free to contribute it to the collection. diff --git a/jobmodule.py b/jobmodule.py new file mode 100644 index 0000000..c2a714e --- /dev/null +++ b/jobmodule.py @@ -0,0 +1,312 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# +# jobmodule.py +# August 2021 +# +# This module has the following functions in the folder. +# submit_job_definition is used by submit_jobdef.py to submit a job based on the job definition id. Depending if a +# corresponding job request was found for the job definition it would either call execute_job or would go to +# submit_job_request to create a new job request based on a default template in variable jobReq +# +# submit_job_request is used by submit_jobreq.py to submit a job based on job request id. If the call is coming from +# submit_job_definition then it will create a job request after which it will call execute_job. +# +# execute_job is called by both submit_job_definition and submit_job_request and is responsible for submitting the job. +# +# check_context verifies if the context provided by the user is the correct context, if it's not the program will error out +# +# +# +# NOTE: Above functions don't use callrestapi from the shared module, instead it makes requests calls. +# getauthtoken +# getbaseurl +# file_accessible +# +# There is another method called cancel_job, which allows users to manually cancel the job by pressing Ctrl + C on the +# keyboard. +# +# +# Copyright © 2018, SAS Institute Inc., Cary, NC, USA. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the License); you may not use this file except in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing permissions and limitations under the License. +# + +import requests, sys, os, time, json +from sharedfunctions import callrestapi + +class jobmodule: + def __init__(self): + self.head = {"Content-type": "application/json", "Accept": "application/json", "Authorization": jobmodule.getauthtoken(jobmodule.getbaseurl())} + self.verbose = None + self.sasjob_status = None + self.sasjob_status_details = None + self.saslog_location = None + self.sasout_location = None + self.sasres_location = None + self.job_definition_id = None + self.job_requests_id = None + self.job_execution_id = None + self.sasjob_error_details = None + self.cancel_job_uri = None + self.cancel_job_method = None + + + + def submit_job_definition(self, contextName="SAS Job Execution compute context", id=None, verbose=False): + if verbose: + self.verbose = True + # jobID = "dbbc02b9-e191-4a0b-b549-388df207c933" + # contextName = "SAS Job Execution compute context" + self.job_definition_id = id + jobID = id + contextName = contextName + jobDefinitionUri = "/jobDefinitions/definitions/" + jobID + url = self.getbaseurl() + jobDefinitionUri + result = requests.get(url=url, headers=self.head) + if result.status_code == 404: + print("ERROR! Job Definition ID is invalid. No Job Definition was found with id: {}".format(id)) + return + print ("Job Definition id: {}".format(id)) + if self.verbose: + print ("Checking Response IDs associated with the job definition id.") + name = result.json()['name'] + desc = result.json()['name'].strip() + " created by: " + os.getlogin() + " using pyviyatools" + url = self.getbaseurl() + "/jobExecution/jobRequests?filter=in('jobDefinitionUri','{}')&sortBy=modifiedTimeStamp:descending".format(jobDefinitionUri) + result = requests.get(url=url,headers=self.head) + count = result.json()['count'] + if count == 0: + if self.verbose: + print ("No Job Responses were found associated to the job request. Creating new job request") + jobReq = { + "name": name, + "description": desc, + "jobDefinitionUri": jobDefinitionUri, + "arguments": { + "_contextName": contextName, + "_omitJsonLog": "true" + } + } + + url = self.getbaseurl() + "jobExecution/jobRequests" + self.submit_job_request(url=url, data=jobReq) + elif count >= 1: + if self.verbose: + print ("Job Request Found using job request id {}".format(result.json()['items'][0]['id'])) + self.job_requests_id = result.json()['items'][0]['id'] + for links in result.json()['items'][0]['links']: + if links['rel'] == 'submitJob': + jobSubmitURI = links['uri'] + url = self.getbaseurl() + jobSubmitURI + self.execute_job(url=url) + + def submit_job_request(self, id=None, job_req_json=None, verbose=False): + if verbose: + self.verbose = verbose + if id is not None: + self.job_requests_id = id + if self.verbose: + print ("Checking if the job request id {} is valid".format(id)) + url = self.getbaseurl() + "/jobExecution/jobRequests/{}".format(id) + result = requests.get(url=url,headers=self.head) + if result.status_code == 404: + print("ERROR! Job Request ID is invalid. No Job Request was found with id: {}".format(kwargs['jobID'])) + sys.exit(1) + print ("Job Request id {} is found.") + for result in result.json()['links']: + if result['rel'] == 'submitJob': + submit_job_uri = self.getbaseurl() + result['uri'] + self.execute_job(url=submit_job_uri) + elif job_req_json is not None: + if self.verbose: + print ("Submitting a new job request.") + url = self.getbaseurl() + "/jobExecution/jobRequests" + result = requests.post(url=url,data=job_req_json,headers=self.head) + print ("New Job Request has been created. {}".format(result.json()['id'])) + self.job_requests_id = result.json()['id'] + for links in result.json()['links']: + submit_job_url = self.getbaseurl() + links['uri'].strip() + self.execute_job(url=submit_job_url) + + def execute_job(self, url): + sasout_loc = None + saslog_loc = None + sasresinfo = None + job_error_details = None + job_status_details = None + jobStatusURI = None + result = requests.post(url=url, headers=self.head) + print ("Job Submitted.") + print ("Job id: {} \nState: {}".format(result.json()['id'],result.json()['state'])) + for links in result.json()['links']: + if links['rel'] == 'self': + jobStatusURI = links['uri'] + url = self.getbaseurl() + jobStatusURI + if links['rel'] == 'updateState': + self.cancel_job_uri = links['uri'] + "?value=canceled" + self.cancel_job_method = links['method'] + + print ("Get Job Results > {}".format(url)) + result = requests.get(url=url, headers=self.head) + while result.json()['state'] == 'running': + time.sleep(0.01) + url = self.getbaseurl() + jobStatusURI + result = requests.get(url=url,headers=self.head) + + url = self.getbaseurl() + jobStatusURI + result = requests.get(url=url,headers=self.head) + job_status = result.json()['state'] + if 'stateDetails' in result.json(): + print ("Job {} with {}".format(job_status,job_status_details)) + job_status_details = result.json()['stateDetails'] + else: + print ("Job {}".format(job_status)) + + if 'error' in result.json(): + job_error_details = json.dumps(result.json()['error']) + if 'results' in result.json(): + computeJob = result.json()['results']['COMPUTE_JOB'] + if computeJob + ".list" in result.json()['results']: + sasout_loc = result.json()['results'][computeJob + ".list.txt"] + saslog_loc = result.json()['logLocation'] + sasresinfo = json.dumps(result.json()['results']) + + self.sasjob_status = job_status + self.sasjob_status_details = job_status_details + self.saslog_location = saslog_loc + self.sasout_location = sasout_loc + self.sasres_location = sasresinfo + self.sasjob_error_details = job_error_details + + def cancel_job(self): + if self.sasjob_status == "running": + result = callrestapi(self.cancel_job_uri, self.cancel_job_method, acceptType="text/plain", contentType="text/plain") + return result.text + + + def check_context(self,contextName): + context = ["SAS Job Execution compute context", "SAS Studio compute context"] + if contextName not in context: + print("Context provided, {} ,is not the default context".format(contextName)) + check_context_uri = self.getbaseurl() + "/compute/contexts?filter=eq('name','{}')".format(contextName) + check_context_resp = requests.get(check_context_uri,headers=self.head) + count = check_context_resp.json()['count'] + if count == 0: + session_context_nf_uri = self.getbaseurl() + "/compute/contexts" + session_context_nf_result = requests.get(session_context_nf_uri,headers=self.head) + listOfNames = [] + for names in session_context_nf_result.json()['items']: + listOfNames.append(names['name']) + print( + "Invalid Context Name. {} is not available in SAS Viya. Here is the list of valid context names {}".format( + contextName, listOfNames)) + sys.exit(1) + + return True + + + @staticmethod + def getauthtoken(baseurl): + + #get authentication information for the header + credential_file=os.path.join(os.path.expanduser('~'),'.sas','credentials.json') + + # check that credential file is available and can be read + access_file=jobmodule.file_accessible(credential_file, 'r') + + if access_file==False: + oaval=None + print("ERROR: Cannot read authentication credentials at: ", credential_file) + print("ERROR: Try refreshing your token with sas-admin auth login") + sys.exit() + + with open(credential_file) as json_file: + data = json.load(json_file) + type(data) + + # the sas-admin profile init creates an empty credential file + # check that credential is in file, if it is add it to the header, if not exit + + # get the profile environment variable to use it + # if it is not set default to the default profile + + cur_profile=os.environ.get("SAS_CLI_PROFILE","Default") + + #print("LOGON: ", cur_profile ) + + if cur_profile in data: + oauthToken=data[cur_profile]['access-token'] + oauthTokenType="bearer" + oaval=oauthTokenType + ' ' + oauthToken + + if oauthToken == "": + oaval = None + print("ERROR: access token not in file: ", credential_file) + print("ERROR: Try refreshing your token with sas-admin auth login") + sys.exit() + + else: + + oaval=None + print("ERROR: access token not in file: ", credential_file) + print("ERROR: Try refreshing your token with sas-admin auth login") + sys.exit() + + return oaval + + + @staticmethod + def getbaseurl(): + # check that profile file is available and can be read + + # note the path to the profile is hard-coded right now + endpointfile=os.path.join(os.path.expanduser('~'),'.sas','config.json') + access_file= jobmodule.file_accessible(endpointfile, 'r') + + #profile does not exist + if access_file==False: + print("ERROR: Cannot read CLI profile at:",endpointfile,". Recreate profile with sas-admin profile init.") + sys.exit() + + #profile is empty file + if os.stat(endpointfile).st_size==0: + print("ERROR: Cannot read CLI profile empty file at:",endpointfile,". Recreate profile with sas-admin profile init.") + sys.exit() + + # get json from profile + with open(endpointfile) as json_file: + data = json.load(json_file) + + # get the profile environment variable to use it + # if it is not set default to the default profile + + cur_profile=os.environ.get("SAS_CLI_PROFILE","Default") + #print("URL: ",cur_profile ) + + # check that information is in profile + if cur_profile in data: + baseurl=data[cur_profile]['sas-endpoint'] + else: + + baseurl=None + print("ERROR: profile "+cur_profile+" does not exist. Recreate profile with sas-admin profile init.") + sys.exit() + + + return baseurl + + @staticmethod + def file_accessible(filepath, mode): + + try: + f = open(filepath, mode) + f.close() + except IOError as e: + return False + + return True diff --git a/submit_jobdef.py b/submit_jobdef.py new file mode 100644 index 0000000..e0da090 --- /dev/null +++ b/submit_jobdef.py @@ -0,0 +1,105 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# +# submit_jobdef.py +# August 2021 +# +# This function takes in the job id, the context and verbose variables. id is the only required argument. +# based on the id, it will check to see if it is the context is correct, it will start the process of submitting the job +# in the jobmodule submit_job_definition function. +# +# As the jobmodule waits for the job to finish, this script allows users to Ctrl + C out of the program by giving them +# a choice to either cancel the job or keep it running in the background. +# +# +# Copyright © 2018, SAS Institute Inc., Cary, NC, USA. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the License); you may not use this file except in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing permissions and limitations under the License. +# + + +import argparse +import logging +import os +import sys +import time +from datetime import datetime + +from jobmodule import jobmodule + + +arguments = None +start_time = time.time() +parser = argparse.ArgumentParser(description="Submit Job using job definition") +parser.add_argument("-id",help="Provide Job Definition ID", action='store',required=True) +parser.add_argument("-c","--context", help="Provide Context Name. Default ['SAS Job Execution compute context']", default="SAS Job Execution compute context") +parser.add_argument("-v", "--verbose", help="Print Verbosity", action='store_true', default=False) +args = parser.parse_args() +try: + + profile = os.environ.get("SAS_CLI_PROFILE", "Default") + x = jobmodule() + + if x.check_context(args.context): + x.submit_job_definition(contextName=args.context, id=args.id, verbose=args.verbose) + + + print("=================================") + print("Job Definition ID: {}".format(x.job_definition_id)) + print("Job Request ID: {}".format(x.job_requests_id)) + print("Job ID: {}".format(x.job_execution_id)) + print("Job Status: {}".format(x.sasjob_status)) + print("Job Status Details: {}".format(x.sasjob_status_details)) + print("Job Error Code: {}".format(x.sasjob_error_details)) + print("Job Log: {}".format(x.saslog_location)) + print("Job List: {}".format(x.sasout_location)) + print("Job Res: {}".format(x.sasres_location)) + print("=================================") + +except KeyboardInterrupt as ke: + job_continue = None + if x.sasjob_status == "running": + job_continue = input("Do you want to cancel job? [y/N]: ") + while job_continue not in ['Y','y','N','n','']: + job_continue = input("Do you want to cancel job? [y/N]: ") + + if job_continue in ['Y','y']: + if x.cancel_job_uri is not None and x.cancel_job_method is not None: + cancel_job_result = x.cancel_job() + print("Result of Cancellation: {}".format(cancel_job_result)) + print("Job with id: {} has been cancelled!".format(x.job_execution_id)) + x.sasjob_status = "canceled" + else: + print ("Job is still running in the background with ID: {}".format(x.job_execution_id)) + +except Exception as e: + print ("There was an unexpected error.") + if x.sasjob_status == "running": + print ("The job was still running when the error occured, please check REST API log and/or check the job.") + x.sasjob_status = "failed" + + logging.error("Exception: ") + logging.error(e) + +finally: + if x.sasjob_status_details is not None: + print("Job {} with {}".format(x.sasjob_status,x.sasjob_status_details)) + if x.sasjob_error_details is not None: + print("More details {}".format(x.sasjob_error_details)) + else: + print("Job {}".format(x.sasjob_status)) + + if x.sasjob_status == "failed" or x.sasjob_status == "canceled": + sys.exit(1) + if x.sasjob_status == "completed": + if x.sasjob_status_details is None: + if x.sasjob_status_details == "info": + print ("Job has some information.") + sys.exit(0) + elif x.sasjob_status_details is not None: + sys.exit(2) diff --git a/submit_jobreq.py b/submit_jobreq.py new file mode 100644 index 0000000..07344f6 --- /dev/null +++ b/submit_jobreq.py @@ -0,0 +1,103 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# +# submit_jobreq.py +# August 2021 +# +# This function takes in the job id and verbose variables. id is the only required argument. +# based on the id, it will check to see, it will start the process of submitting the job +# in the jobmodule submit_job_request function. +# +# As the jobmodule waits for the job to finish, this script allows users to Ctrl + C out of the program by giving them +# a choice to either cancel the job or keep it running in the background. +# +# +# Copyright © 2018, SAS Institute Inc., Cary, NC, USA. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the License); you may not use this file except in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing permissions and limitations under the License. +# + +import argparse +import logging +import os +import sys +import time +from datetime import datetime + +from jobmodule import jobmodule + + +arguments = None +start_time = time.time() +parser = argparse.ArgumentParser(description="Submit Job using job definition") +parser.add_argument("-id",help="Provide Job Definition ID", action='store',required=True) +parser.add_argument("-v", "--verbose", help="Print Verbosity", action='store_true', default=False) +args = parser.parse_args() +try: + + profile = os.environ.get("SAS_CLI_PROFILE", "Default") + x = jobmodule() + + if x.check_context(args.context): + x.submit_job_request(id=args.id, verbose=args.verbose) + + + print("=================================") + print("Job Definition ID: {}".format(x.job_definition_id)) + print("Job Request ID: {}".format(x.job_requests_id)) + print("Job ID: {}".format(x.job_execution_id)) + print("Job Status: {}".format(x.sasjob_status)) + print("Job Status Details: {}".format(x.sasjob_status_details)) + print("Job Error Code: {}".format(x.sasjob_error_details)) + print("Job Log: {}".format(x.saslog_location)) + print("Job List: {}".format(x.sasout_location)) + print("Job Res: {}".format(x.sasres_location)) + print("=================================") + +except KeyboardInterrupt as ke: + job_continue = None + if x.sasjob_status == "running": + job_continue = input("Do you want to cancel job? [y/N]: ") + while job_continue not in ['Y','y','N','n','']: + job_continue = input("Do you want to cancel job? [y/N]: ") + + if job_continue in ['Y','y']: + if x.cancel_job_uri is not None and x.cancel_job_method is not None: + cancel_job_result = x.cancel_job() + print("Result of Cancellation: {}".format(cancel_job_result)) + print("Job with id: {} has been cancelled!".format(x.job_execution_id)) + x.sasjob_status = "canceled" + else: + print ("Job is still running in the background with ID: {}".format(x.job_execution_id)) + +except Exception as e: + print ("There was an unexpected error.") + if x.sasjob_status == "running": + print ("The job was still running when the error occured, please check REST API log and/or check the job.") + x.sasjob_status = "failed" + + logging.error("Exception: ") + logging.error(e) + +finally: + if x.sasjob_status_details is not None: + print("Job {} with {}".format(x.sasjob_status,x.sasjob_status_details)) + if x.sasjob_error_details is not None: + print("More details {}".format(x.sasjob_error_details)) + else: + print("Job {}".format(x.sasjob_status)) + + if x.sasjob_status == "failed" or x.sasjob_status == "canceled": + sys.exit(1) + if x.sasjob_status == "completed": + if x.sasjob_status_details is None: + if x.sasjob_status_details == "info": + print ("Job has some information.") + sys.exit(0) + elif x.sasjob_status_details is not None: + sys.exit(2)