You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

312 lines
13 KiB

#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# jobmodule.py
# August 2021
#
# This module has the following functions in the folder.
# submit_job_definition is used by submit_jobdef.py to submit a job based on the job definition id. Depending if a
# corresponding job request was found for the job definition it would either call execute_job or would go to
# submit_job_request to create a new job request based on a default template in variable jobReq
#
# submit_job_request is used by submit_jobreq.py to submit a job based on job request id. If the call is coming from
# submit_job_definition then it will create a job request after which it will call execute_job.
#
# execute_job is called by both submit_job_definition and submit_job_request and is responsible for submitting the job.
#
# check_context verifies if the context provided by the user is the correct context, if it's not the program will error out
#
#
#
# NOTE: Above functions don't use callrestapi from the shared module, instead it makes requests calls.
# getauthtoken
# getbaseurl
# file_accessible
#
# There is another method called cancel_job, which allows users to manually cancel the job by pressing Ctrl + C on the
# keyboard.
#
#
# Copyright © 2018, SAS Institute Inc., Cary, NC, USA. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the License); you may not use this file except in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing permissions and limitations under the License.
#
import requests, sys, os, time, json
from sharedfunctions import callrestapi
class jobmodule:
def __init__(self):
self.head = {"Content-type": "application/json", "Accept": "application/json", "Authorization": jobmodule.getauthtoken(jobmodule.getbaseurl())}
self.verbose = None
self.sasjob_status = None
self.sasjob_status_details = None
self.saslog_location = None
self.sasout_location = None
self.sasres_location = None
self.job_definition_id = None
self.job_requests_id = None
self.job_execution_id = None
self.sasjob_error_details = None
self.cancel_job_uri = None
self.cancel_job_method = None
def submit_job_definition(self, contextName="SAS Job Execution compute context", id=None, verbose=False):
if verbose:
self.verbose = True
# jobID = "dbbc02b9-e191-4a0b-b549-388df207c933"
# contextName = "SAS Job Execution compute context"
self.job_definition_id = id
jobID = id
contextName = contextName
jobDefinitionUri = "/jobDefinitions/definitions/" + jobID
url = self.getbaseurl() + jobDefinitionUri
result = requests.get(url=url, headers=self.head)
if result.status_code == 404:
print("ERROR! Job Definition ID is invalid. No Job Definition was found with id: {}".format(id))
return
print ("Job Definition id: {}".format(id))
if self.verbose:
print ("Checking Response IDs associated with the job definition id.")
name = result.json()['name']
desc = result.json()['name'].strip() + " created by: " + os.getlogin() + " using pyviyatools"
url = self.getbaseurl() + "/jobExecution/jobRequests?filter=in('jobDefinitionUri','{}')&sortBy=modifiedTimeStamp:descending".format(jobDefinitionUri)
result = requests.get(url=url,headers=self.head)
count = result.json()['count']
if count == 0:
if self.verbose:
print ("No Job Responses were found associated to the job request. Creating new job request")
jobReq = {
"name": name,
"description": desc,
"jobDefinitionUri": jobDefinitionUri,
"arguments": {
"_contextName": contextName,
"_omitJsonLog": "true"
}
}
url = self.getbaseurl() + "jobExecution/jobRequests"
self.submit_job_request(url=url, data=jobReq)
elif count >= 1:
if self.verbose:
print ("Job Request Found using job request id {}".format(result.json()['items'][0]['id']))
self.job_requests_id = result.json()['items'][0]['id']
for links in result.json()['items'][0]['links']:
if links['rel'] == 'submitJob':
jobSubmitURI = links['uri']
url = self.getbaseurl() + jobSubmitURI
self.execute_job(url=url)
def submit_job_request(self, id=None, job_req_json=None, verbose=False):
if verbose:
self.verbose = verbose
if id is not None:
self.job_requests_id = id
if self.verbose:
print ("Checking if the job request id {} is valid".format(id))
url = self.getbaseurl() + "/jobExecution/jobRequests/{}".format(id)
result = requests.get(url=url,headers=self.head)
if result.status_code == 404:
print("ERROR! Job Request ID is invalid. No Job Request was found with id: {}".format(kwargs['jobID']))
sys.exit(1)
print ("Job Request id {} is found.")
for result in result.json()['links']:
if result['rel'] == 'submitJob':
submit_job_uri = self.getbaseurl() + result['uri']
self.execute_job(url=submit_job_uri)
elif job_req_json is not None:
if self.verbose:
print ("Submitting a new job request.")
url = self.getbaseurl() + "/jobExecution/jobRequests"
result = requests.post(url=url,data=job_req_json,headers=self.head)
print ("New Job Request has been created. {}".format(result.json()['id']))
self.job_requests_id = result.json()['id']
for links in result.json()['links']:
submit_job_url = self.getbaseurl() + links['uri'].strip()
self.execute_job(url=submit_job_url)
def execute_job(self, url):
sasout_loc = None
saslog_loc = None
sasresinfo = None
job_error_details = None
job_status_details = None
jobStatusURI = None
result = requests.post(url=url, headers=self.head)
print ("Job Submitted.")
print ("Job id: {} \nState: {}".format(result.json()['id'],result.json()['state']))
for links in result.json()['links']:
if links['rel'] == 'self':
jobStatusURI = links['uri']
url = self.getbaseurl() + jobStatusURI
if links['rel'] == 'updateState':
self.cancel_job_uri = links['uri'] + "?value=canceled"
self.cancel_job_method = links['method']
print ("Get Job Results > {}".format(url))
result = requests.get(url=url, headers=self.head)
while result.json()['state'] == 'running':
time.sleep(0.01)
url = self.getbaseurl() + jobStatusURI
result = requests.get(url=url,headers=self.head)
url = self.getbaseurl() + jobStatusURI
result = requests.get(url=url,headers=self.head)
job_status = result.json()['state']
if 'stateDetails' in result.json():
print ("Job {} with {}".format(job_status,job_status_details))
job_status_details = result.json()['stateDetails']
else:
print ("Job {}".format(job_status))
if 'error' in result.json():
job_error_details = json.dumps(result.json()['error'])
if 'results' in result.json():
computeJob = result.json()['results']['COMPUTE_JOB']
if computeJob + ".list" in result.json()['results']:
sasout_loc = result.json()['results'][computeJob + ".list.txt"]
saslog_loc = result.json()['logLocation']
sasresinfo = json.dumps(result.json()['results'])
self.sasjob_status = job_status
self.sasjob_status_details = job_status_details
self.saslog_location = saslog_loc
self.sasout_location = sasout_loc
self.sasres_location = sasresinfo
self.sasjob_error_details = job_error_details
def cancel_job(self):
if self.sasjob_status == "running":
result = callrestapi(self.cancel_job_uri, self.cancel_job_method, acceptType="text/plain", contentType="text/plain")
return result.text
def check_context(self,contextName):
context = ["SAS Job Execution compute context", "SAS Studio compute context"]
if contextName not in context:
print("Context provided, {} ,is not the default context".format(contextName))
check_context_uri = self.getbaseurl() + "/compute/contexts?filter=eq('name','{}')".format(contextName)
check_context_resp = requests.get(check_context_uri,headers=self.head)
count = check_context_resp.json()['count']
if count == 0:
session_context_nf_uri = self.getbaseurl() + "/compute/contexts"
session_context_nf_result = requests.get(session_context_nf_uri,headers=self.head)
listOfNames = []
for names in session_context_nf_result.json()['items']:
listOfNames.append(names['name'])
print(
"Invalid Context Name. {} is not available in SAS Viya. Here is the list of valid context names {}".format(
contextName, listOfNames))
sys.exit(1)
return True
@staticmethod
def getauthtoken(baseurl):
#get authentication information for the header
credential_file=os.path.join(os.path.expanduser('~'),'.sas','credentials.json')
# check that credential file is available and can be read
access_file=jobmodule.file_accessible(credential_file, 'r')
if access_file==False:
oaval=None
print("ERROR: Cannot read authentication credentials at: ", credential_file)
print("ERROR: Try refreshing your token with sas-admin auth login")
sys.exit()
with open(credential_file) as json_file:
data = json.load(json_file)
type(data)
# the sas-admin profile init creates an empty credential file
# check that credential is in file, if it is add it to the header, if not exit
# get the profile environment variable to use it
# if it is not set default to the default profile
cur_profile=os.environ.get("SAS_CLI_PROFILE","Default")
#print("LOGON: ", cur_profile )
if cur_profile in data:
oauthToken=data[cur_profile]['access-token']
oauthTokenType="bearer"
oaval=oauthTokenType + ' ' + oauthToken
if oauthToken == "":
oaval = None
print("ERROR: access token not in file: ", credential_file)
print("ERROR: Try refreshing your token with sas-admin auth login")
sys.exit()
else:
oaval=None
print("ERROR: access token not in file: ", credential_file)
print("ERROR: Try refreshing your token with sas-admin auth login")
sys.exit()
return oaval
@staticmethod
def getbaseurl():
# check that profile file is available and can be read
# note the path to the profile is hard-coded right now
endpointfile=os.path.join(os.path.expanduser('~'),'.sas','config.json')
access_file= jobmodule.file_accessible(endpointfile, 'r')
#profile does not exist
if access_file==False:
print("ERROR: Cannot read CLI profile at:",endpointfile,". Recreate profile with sas-admin profile init.")
sys.exit()
#profile is empty file
if os.stat(endpointfile).st_size==0:
print("ERROR: Cannot read CLI profile empty file at:",endpointfile,". Recreate profile with sas-admin profile init.")
sys.exit()
# get json from profile
with open(endpointfile) as json_file:
data = json.load(json_file)
# get the profile environment variable to use it
# if it is not set default to the default profile
cur_profile=os.environ.get("SAS_CLI_PROFILE","Default")
#print("URL: ",cur_profile )
# check that information is in profile
if cur_profile in data:
baseurl=data[cur_profile]['sas-endpoint']
else:
baseurl=None
print("ERROR: profile "+cur_profile+" does not exist. Recreate profile with sas-admin profile init.")
sys.exit()
return baseurl
@staticmethod
def file_accessible(filepath, mode):
try:
f = open(filepath, mode)
f.close()
except IOError as e:
return False
return True