Browse Source
submit_jobdef submits a job using job definition id and submit_jobreq submits a job using job request id. Added information about them in the README.md and EXAMPLES.mdmaster
committed by
GitHub
5 changed files with 538 additions and 1 deletions
@ -0,0 +1,312 @@ |
|||
#!/usr/bin/python |
|||
# -*- coding: utf-8 -*- |
|||
# |
|||
# jobmodule.py |
|||
# August 2021 |
|||
# |
|||
# This module has the following functions in the folder. |
|||
# submit_job_definition is used by submit_jobdef.py to submit a job based on the job definition id. Depending if a |
|||
# corresponding job request was found for the job definition it would either call execute_job or would go to |
|||
# submit_job_request to create a new job request based on a default template in variable jobReq |
|||
# |
|||
# submit_job_request is used by submit_jobreq.py to submit a job based on job request id. If the call is coming from |
|||
# submit_job_definition then it will create a job request after which it will call execute_job. |
|||
# |
|||
# execute_job is called by both submit_job_definition and submit_job_request and is responsible for submitting the job. |
|||
# |
|||
# check_context verifies if the context provided by the user is the correct context, if it's not the program will error out |
|||
# |
|||
# |
|||
# |
|||
# NOTE: Above functions don't use callrestapi from the shared module, instead it makes requests calls. |
|||
# getauthtoken |
|||
# getbaseurl |
|||
# file_accessible |
|||
# |
|||
# There is another method called cancel_job, which allows users to manually cancel the job by pressing Ctrl + C on the |
|||
# keyboard. |
|||
# |
|||
# |
|||
# Copyright © 2018, SAS Institute Inc., Cary, NC, USA. All Rights Reserved. |
|||
# |
|||
# Licensed under the Apache License, Version 2.0 (the License); you may not use this file except in compliance with the License. You may obtain a copy of the License at |
|||
# |
|||
# http://www.apache.org/licenses/LICENSE-2.0 |
|||
# |
|||
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either |
|||
# express or implied. See the License for the specific language governing permissions and limitations under the License. |
|||
# |
|||
|
|||
import requests, sys, os, time, json |
|||
from sharedfunctions import callrestapi |
|||
|
|||
class jobmodule: |
|||
def __init__(self): |
|||
self.head = {"Content-type": "application/json", "Accept": "application/json", "Authorization": jobmodule.getauthtoken(jobmodule.getbaseurl())} |
|||
self.verbose = None |
|||
self.sasjob_status = None |
|||
self.sasjob_status_details = None |
|||
self.saslog_location = None |
|||
self.sasout_location = None |
|||
self.sasres_location = None |
|||
self.job_definition_id = None |
|||
self.job_requests_id = None |
|||
self.job_execution_id = None |
|||
self.sasjob_error_details = None |
|||
self.cancel_job_uri = None |
|||
self.cancel_job_method = None |
|||
|
|||
|
|||
|
|||
def submit_job_definition(self, contextName="SAS Job Execution compute context", id=None, verbose=False): |
|||
if verbose: |
|||
self.verbose = True |
|||
# jobID = "dbbc02b9-e191-4a0b-b549-388df207c933" |
|||
# contextName = "SAS Job Execution compute context" |
|||
self.job_definition_id = id |
|||
jobID = id |
|||
contextName = contextName |
|||
jobDefinitionUri = "/jobDefinitions/definitions/" + jobID |
|||
url = self.getbaseurl() + jobDefinitionUri |
|||
result = requests.get(url=url, headers=self.head) |
|||
if result.status_code == 404: |
|||
print("ERROR! Job Definition ID is invalid. No Job Definition was found with id: {}".format(id)) |
|||
return |
|||
print ("Job Definition id: {}".format(id)) |
|||
if self.verbose: |
|||
print ("Checking Response IDs associated with the job definition id.") |
|||
name = result.json()['name'] |
|||
desc = result.json()['name'].strip() + " created by: " + os.getlogin() + " using pyviyatools" |
|||
url = self.getbaseurl() + "/jobExecution/jobRequests?filter=in('jobDefinitionUri','{}')&sortBy=modifiedTimeStamp:descending".format(jobDefinitionUri) |
|||
result = requests.get(url=url,headers=self.head) |
|||
count = result.json()['count'] |
|||
if count == 0: |
|||
if self.verbose: |
|||
print ("No Job Responses were found associated to the job request. Creating new job request") |
|||
jobReq = { |
|||
"name": name, |
|||
"description": desc, |
|||
"jobDefinitionUri": jobDefinitionUri, |
|||
"arguments": { |
|||
"_contextName": contextName, |
|||
"_omitJsonLog": "true" |
|||
} |
|||
} |
|||
|
|||
url = self.getbaseurl() + "jobExecution/jobRequests" |
|||
self.submit_job_request(url=url, data=jobReq) |
|||
elif count >= 1: |
|||
if self.verbose: |
|||
print ("Job Request Found using job request id {}".format(result.json()['items'][0]['id'])) |
|||
self.job_requests_id = result.json()['items'][0]['id'] |
|||
for links in result.json()['items'][0]['links']: |
|||
if links['rel'] == 'submitJob': |
|||
jobSubmitURI = links['uri'] |
|||
url = self.getbaseurl() + jobSubmitURI |
|||
self.execute_job(url=url) |
|||
|
|||
def submit_job_request(self, id=None, job_req_json=None, verbose=False): |
|||
if verbose: |
|||
self.verbose = verbose |
|||
if id is not None: |
|||
self.job_requests_id = id |
|||
if self.verbose: |
|||
print ("Checking if the job request id {} is valid".format(id)) |
|||
url = self.getbaseurl() + "/jobExecution/jobRequests/{}".format(id) |
|||
result = requests.get(url=url,headers=self.head) |
|||
if result.status_code == 404: |
|||
print("ERROR! Job Request ID is invalid. No Job Request was found with id: {}".format(kwargs['jobID'])) |
|||
sys.exit(1) |
|||
print ("Job Request id {} is found.") |
|||
for result in result.json()['links']: |
|||
if result['rel'] == 'submitJob': |
|||
submit_job_uri = self.getbaseurl() + result['uri'] |
|||
self.execute_job(url=submit_job_uri) |
|||
elif job_req_json is not None: |
|||
if self.verbose: |
|||
print ("Submitting a new job request.") |
|||
url = self.getbaseurl() + "/jobExecution/jobRequests" |
|||
result = requests.post(url=url,data=job_req_json,headers=self.head) |
|||
print ("New Job Request has been created. {}".format(result.json()['id'])) |
|||
self.job_requests_id = result.json()['id'] |
|||
for links in result.json()['links']: |
|||
submit_job_url = self.getbaseurl() + links['uri'].strip() |
|||
self.execute_job(url=submit_job_url) |
|||
|
|||
def execute_job(self, url): |
|||
sasout_loc = None |
|||
saslog_loc = None |
|||
sasresinfo = None |
|||
job_error_details = None |
|||
job_status_details = None |
|||
jobStatusURI = None |
|||
result = requests.post(url=url, headers=self.head) |
|||
print ("Job Submitted.") |
|||
print ("Job id: {} \nState: {}".format(result.json()['id'],result.json()['state'])) |
|||
for links in result.json()['links']: |
|||
if links['rel'] == 'self': |
|||
jobStatusURI = links['uri'] |
|||
url = self.getbaseurl() + jobStatusURI |
|||
if links['rel'] == 'updateState': |
|||
self.cancel_job_uri = links['uri'] + "?value=canceled" |
|||
self.cancel_job_method = links['method'] |
|||
|
|||
print ("Get Job Results > {}".format(url)) |
|||
result = requests.get(url=url, headers=self.head) |
|||
while result.json()['state'] == 'running': |
|||
time.sleep(0.01) |
|||
url = self.getbaseurl() + jobStatusURI |
|||
result = requests.get(url=url,headers=self.head) |
|||
|
|||
url = self.getbaseurl() + jobStatusURI |
|||
result = requests.get(url=url,headers=self.head) |
|||
job_status = result.json()['state'] |
|||
if 'stateDetails' in result.json(): |
|||
print ("Job {} with {}".format(job_status,job_status_details)) |
|||
job_status_details = result.json()['stateDetails'] |
|||
else: |
|||
print ("Job {}".format(job_status)) |
|||
|
|||
if 'error' in result.json(): |
|||
job_error_details = json.dumps(result.json()['error']) |
|||
if 'results' in result.json(): |
|||
computeJob = result.json()['results']['COMPUTE_JOB'] |
|||
if computeJob + ".list" in result.json()['results']: |
|||
sasout_loc = result.json()['results'][computeJob + ".list.txt"] |
|||
saslog_loc = result.json()['logLocation'] |
|||
sasresinfo = json.dumps(result.json()['results']) |
|||
|
|||
self.sasjob_status = job_status |
|||
self.sasjob_status_details = job_status_details |
|||
self.saslog_location = saslog_loc |
|||
self.sasout_location = sasout_loc |
|||
self.sasres_location = sasresinfo |
|||
self.sasjob_error_details = job_error_details |
|||
|
|||
def cancel_job(self): |
|||
if self.sasjob_status == "running": |
|||
result = callrestapi(self.cancel_job_uri, self.cancel_job_method, acceptType="text/plain", contentType="text/plain") |
|||
return result.text |
|||
|
|||
|
|||
def check_context(self,contextName): |
|||
context = ["SAS Job Execution compute context", "SAS Studio compute context"] |
|||
if contextName not in context: |
|||
print("Context provided, {} ,is not the default context".format(contextName)) |
|||
check_context_uri = self.getbaseurl() + "/compute/contexts?filter=eq('name','{}')".format(contextName) |
|||
check_context_resp = requests.get(check_context_uri,headers=self.head) |
|||
count = check_context_resp.json()['count'] |
|||
if count == 0: |
|||
session_context_nf_uri = self.getbaseurl() + "/compute/contexts" |
|||
session_context_nf_result = requests.get(session_context_nf_uri,headers=self.head) |
|||
listOfNames = [] |
|||
for names in session_context_nf_result.json()['items']: |
|||
listOfNames.append(names['name']) |
|||
print( |
|||
"Invalid Context Name. {} is not available in SAS Viya. Here is the list of valid context names {}".format( |
|||
contextName, listOfNames)) |
|||
sys.exit(1) |
|||
|
|||
return True |
|||
|
|||
|
|||
@staticmethod |
|||
def getauthtoken(baseurl): |
|||
|
|||
#get authentication information for the header |
|||
credential_file=os.path.join(os.path.expanduser('~'),'.sas','credentials.json') |
|||
|
|||
# check that credential file is available and can be read |
|||
access_file=jobmodule.file_accessible(credential_file, 'r') |
|||
|
|||
if access_file==False: |
|||
oaval=None |
|||
print("ERROR: Cannot read authentication credentials at: ", credential_file) |
|||
print("ERROR: Try refreshing your token with sas-admin auth login") |
|||
sys.exit() |
|||
|
|||
with open(credential_file) as json_file: |
|||
data = json.load(json_file) |
|||
type(data) |
|||
|
|||
# the sas-admin profile init creates an empty credential file |
|||
# check that credential is in file, if it is add it to the header, if not exit |
|||
|
|||
# get the profile environment variable to use it |
|||
# if it is not set default to the default profile |
|||
|
|||
cur_profile=os.environ.get("SAS_CLI_PROFILE","Default") |
|||
|
|||
#print("LOGON: ", cur_profile ) |
|||
|
|||
if cur_profile in data: |
|||
oauthToken=data[cur_profile]['access-token'] |
|||
oauthTokenType="bearer" |
|||
oaval=oauthTokenType + ' ' + oauthToken |
|||
|
|||
if oauthToken == "": |
|||
oaval = None |
|||
print("ERROR: access token not in file: ", credential_file) |
|||
print("ERROR: Try refreshing your token with sas-admin auth login") |
|||
sys.exit() |
|||
|
|||
else: |
|||
|
|||
oaval=None |
|||
print("ERROR: access token not in file: ", credential_file) |
|||
print("ERROR: Try refreshing your token with sas-admin auth login") |
|||
sys.exit() |
|||
|
|||
return oaval |
|||
|
|||
|
|||
@staticmethod |
|||
def getbaseurl(): |
|||
# check that profile file is available and can be read |
|||
|
|||
# note the path to the profile is hard-coded right now |
|||
endpointfile=os.path.join(os.path.expanduser('~'),'.sas','config.json') |
|||
access_file= jobmodule.file_accessible(endpointfile, 'r') |
|||
|
|||
#profile does not exist |
|||
if access_file==False: |
|||
print("ERROR: Cannot read CLI profile at:",endpointfile,". Recreate profile with sas-admin profile init.") |
|||
sys.exit() |
|||
|
|||
#profile is empty file |
|||
if os.stat(endpointfile).st_size==0: |
|||
print("ERROR: Cannot read CLI profile empty file at:",endpointfile,". Recreate profile with sas-admin profile init.") |
|||
sys.exit() |
|||
|
|||
# get json from profile |
|||
with open(endpointfile) as json_file: |
|||
data = json.load(json_file) |
|||
|
|||
# get the profile environment variable to use it |
|||
# if it is not set default to the default profile |
|||
|
|||
cur_profile=os.environ.get("SAS_CLI_PROFILE","Default") |
|||
#print("URL: ",cur_profile ) |
|||
|
|||
# check that information is in profile |
|||
if cur_profile in data: |
|||
baseurl=data[cur_profile]['sas-endpoint'] |
|||
else: |
|||
|
|||
baseurl=None |
|||
print("ERROR: profile "+cur_profile+" does not exist. Recreate profile with sas-admin profile init.") |
|||
sys.exit() |
|||
|
|||
|
|||
return baseurl |
|||
|
|||
@staticmethod |
|||
def file_accessible(filepath, mode): |
|||
|
|||
try: |
|||
f = open(filepath, mode) |
|||
f.close() |
|||
except IOError as e: |
|||
return False |
|||
|
|||
return True |
|||
@ -0,0 +1,105 @@ |
|||
#!/usr/bin/python |
|||
# -*- coding: utf-8 -*- |
|||
# |
|||
# submit_jobdef.py |
|||
# August 2021 |
|||
# |
|||
# This function takes in the job id, the context and verbose variables. id is the only required argument. |
|||
# based on the id, it will check to see if it is the context is correct, it will start the process of submitting the job |
|||
# in the jobmodule submit_job_definition function. |
|||
# |
|||
# As the jobmodule waits for the job to finish, this script allows users to Ctrl + C out of the program by giving them |
|||
# a choice to either cancel the job or keep it running in the background. |
|||
# |
|||
# |
|||
# Copyright © 2018, SAS Institute Inc., Cary, NC, USA. All Rights Reserved. |
|||
# |
|||
# Licensed under the Apache License, Version 2.0 (the License); you may not use this file except in compliance with the License. You may obtain a copy of the License at |
|||
# |
|||
# http://www.apache.org/licenses/LICENSE-2.0 |
|||
# |
|||
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either |
|||
# express or implied. See the License for the specific language governing permissions and limitations under the License. |
|||
# |
|||
|
|||
|
|||
import argparse |
|||
import logging |
|||
import os |
|||
import sys |
|||
import time |
|||
from datetime import datetime |
|||
|
|||
from jobmodule import jobmodule |
|||
|
|||
|
|||
arguments = None |
|||
start_time = time.time() |
|||
parser = argparse.ArgumentParser(description="Submit Job using job definition") |
|||
parser.add_argument("-id",help="Provide Job Definition ID", action='store',required=True) |
|||
parser.add_argument("-c","--context", help="Provide Context Name. Default ['SAS Job Execution compute context']", default="SAS Job Execution compute context") |
|||
parser.add_argument("-v", "--verbose", help="Print Verbosity", action='store_true', default=False) |
|||
args = parser.parse_args() |
|||
try: |
|||
|
|||
profile = os.environ.get("SAS_CLI_PROFILE", "Default") |
|||
x = jobmodule() |
|||
|
|||
if x.check_context(args.context): |
|||
x.submit_job_definition(contextName=args.context, id=args.id, verbose=args.verbose) |
|||
|
|||
|
|||
print("=================================") |
|||
print("Job Definition ID: {}".format(x.job_definition_id)) |
|||
print("Job Request ID: {}".format(x.job_requests_id)) |
|||
print("Job ID: {}".format(x.job_execution_id)) |
|||
print("Job Status: {}".format(x.sasjob_status)) |
|||
print("Job Status Details: {}".format(x.sasjob_status_details)) |
|||
print("Job Error Code: {}".format(x.sasjob_error_details)) |
|||
print("Job Log: {}".format(x.saslog_location)) |
|||
print("Job List: {}".format(x.sasout_location)) |
|||
print("Job Res: {}".format(x.sasres_location)) |
|||
print("=================================") |
|||
|
|||
except KeyboardInterrupt as ke: |
|||
job_continue = None |
|||
if x.sasjob_status == "running": |
|||
job_continue = input("Do you want to cancel job? [y/N]: ") |
|||
while job_continue not in ['Y','y','N','n','']: |
|||
job_continue = input("Do you want to cancel job? [y/N]: ") |
|||
|
|||
if job_continue in ['Y','y']: |
|||
if x.cancel_job_uri is not None and x.cancel_job_method is not None: |
|||
cancel_job_result = x.cancel_job() |
|||
print("Result of Cancellation: {}".format(cancel_job_result)) |
|||
print("Job with id: {} has been cancelled!".format(x.job_execution_id)) |
|||
x.sasjob_status = "canceled" |
|||
else: |
|||
print ("Job is still running in the background with ID: {}".format(x.job_execution_id)) |
|||
|
|||
except Exception as e: |
|||
print ("There was an unexpected error.") |
|||
if x.sasjob_status == "running": |
|||
print ("The job was still running when the error occured, please check REST API log and/or check the job.") |
|||
x.sasjob_status = "failed" |
|||
|
|||
logging.error("Exception: ") |
|||
logging.error(e) |
|||
|
|||
finally: |
|||
if x.sasjob_status_details is not None: |
|||
print("Job {} with {}".format(x.sasjob_status,x.sasjob_status_details)) |
|||
if x.sasjob_error_details is not None: |
|||
print("More details {}".format(x.sasjob_error_details)) |
|||
else: |
|||
print("Job {}".format(x.sasjob_status)) |
|||
|
|||
if x.sasjob_status == "failed" or x.sasjob_status == "canceled": |
|||
sys.exit(1) |
|||
if x.sasjob_status == "completed": |
|||
if x.sasjob_status_details is None: |
|||
if x.sasjob_status_details == "info": |
|||
print ("Job has some information.") |
|||
sys.exit(0) |
|||
elif x.sasjob_status_details is not None: |
|||
sys.exit(2) |
|||
@ -0,0 +1,103 @@ |
|||
#!/usr/bin/python |
|||
# -*- coding: utf-8 -*- |
|||
# |
|||
# submit_jobreq.py |
|||
# August 2021 |
|||
# |
|||
# This function takes in the job id and verbose variables. id is the only required argument. |
|||
# based on the id, it will check to see, it will start the process of submitting the job |
|||
# in the jobmodule submit_job_request function. |
|||
# |
|||
# As the jobmodule waits for the job to finish, this script allows users to Ctrl + C out of the program by giving them |
|||
# a choice to either cancel the job or keep it running in the background. |
|||
# |
|||
# |
|||
# Copyright © 2018, SAS Institute Inc., Cary, NC, USA. All Rights Reserved. |
|||
# |
|||
# Licensed under the Apache License, Version 2.0 (the License); you may not use this file except in compliance with the License. You may obtain a copy of the License at |
|||
# |
|||
# http://www.apache.org/licenses/LICENSE-2.0 |
|||
# |
|||
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either |
|||
# express or implied. See the License for the specific language governing permissions and limitations under the License. |
|||
# |
|||
|
|||
import argparse |
|||
import logging |
|||
import os |
|||
import sys |
|||
import time |
|||
from datetime import datetime |
|||
|
|||
from jobmodule import jobmodule |
|||
|
|||
|
|||
arguments = None |
|||
start_time = time.time() |
|||
parser = argparse.ArgumentParser(description="Submit Job using job definition") |
|||
parser.add_argument("-id",help="Provide Job Definition ID", action='store',required=True) |
|||
parser.add_argument("-v", "--verbose", help="Print Verbosity", action='store_true', default=False) |
|||
args = parser.parse_args() |
|||
try: |
|||
|
|||
profile = os.environ.get("SAS_CLI_PROFILE", "Default") |
|||
x = jobmodule() |
|||
|
|||
if x.check_context(args.context): |
|||
x.submit_job_request(id=args.id, verbose=args.verbose) |
|||
|
|||
|
|||
print("=================================") |
|||
print("Job Definition ID: {}".format(x.job_definition_id)) |
|||
print("Job Request ID: {}".format(x.job_requests_id)) |
|||
print("Job ID: {}".format(x.job_execution_id)) |
|||
print("Job Status: {}".format(x.sasjob_status)) |
|||
print("Job Status Details: {}".format(x.sasjob_status_details)) |
|||
print("Job Error Code: {}".format(x.sasjob_error_details)) |
|||
print("Job Log: {}".format(x.saslog_location)) |
|||
print("Job List: {}".format(x.sasout_location)) |
|||
print("Job Res: {}".format(x.sasres_location)) |
|||
print("=================================") |
|||
|
|||
except KeyboardInterrupt as ke: |
|||
job_continue = None |
|||
if x.sasjob_status == "running": |
|||
job_continue = input("Do you want to cancel job? [y/N]: ") |
|||
while job_continue not in ['Y','y','N','n','']: |
|||
job_continue = input("Do you want to cancel job? [y/N]: ") |
|||
|
|||
if job_continue in ['Y','y']: |
|||
if x.cancel_job_uri is not None and x.cancel_job_method is not None: |
|||
cancel_job_result = x.cancel_job() |
|||
print("Result of Cancellation: {}".format(cancel_job_result)) |
|||
print("Job with id: {} has been cancelled!".format(x.job_execution_id)) |
|||
x.sasjob_status = "canceled" |
|||
else: |
|||
print ("Job is still running in the background with ID: {}".format(x.job_execution_id)) |
|||
|
|||
except Exception as e: |
|||
print ("There was an unexpected error.") |
|||
if x.sasjob_status == "running": |
|||
print ("The job was still running when the error occured, please check REST API log and/or check the job.") |
|||
x.sasjob_status = "failed" |
|||
|
|||
logging.error("Exception: ") |
|||
logging.error(e) |
|||
|
|||
finally: |
|||
if x.sasjob_status_details is not None: |
|||
print("Job {} with {}".format(x.sasjob_status,x.sasjob_status_details)) |
|||
if x.sasjob_error_details is not None: |
|||
print("More details {}".format(x.sasjob_error_details)) |
|||
else: |
|||
print("Job {}".format(x.sasjob_status)) |
|||
|
|||
if x.sasjob_status == "failed" or x.sasjob_status == "canceled": |
|||
sys.exit(1) |
|||
if x.sasjob_status == "completed": |
|||
if x.sasjob_status_details is None: |
|||
if x.sasjob_status_details == "info": |
|||
print ("Job has some information.") |
|||
sys.exit(0) |
|||
elif x.sasjob_status_details is not None: |
|||
sys.exit(2) |
|||
Loading…
Reference in new issue