Here are some code examples to use the Ansible Tower API from python scripts.
First, the code acquires an authentication token by sending a POST request to the API endpoint using specific credentials. The token is then extracted from the JSON response.
Next, the code utilizes this token to initiate a job template. It sets up the request headers with the authorization token and content type, and prepares the data payload with extra variables. A POST request is then sent to the job template launch endpoint, and the job ID is extracted from the response.
After starting the job, the code enters a loop to monitor the job’s status. It sends GET requests to the job status endpoint at one-second intervals, checking if the job’s status is “successful”. This loop continues until the job is marked as successful, printing the current loop count and status during each iteration. Once the job status is confirmed to be “successful” the loop exits, and the final status is printed.
import time
import requests
import urllib3
urllib3.disable_warnings()
########################################################################################################################
# Aquire token
########################################################################################################################
response = requests.post('https://10.1.1.1/api/v2/tokens/', verify=False, auth=('execute_user', 'Pa55w0rd'))
x = response.json()
token = x['token']
########################################################################################################################
# Start template
########################################################################################################################
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
data = {"extra_vars":{"name":"max","foobar":"tho"}}
start_job = requests.post("https://10.1.1.1/api/v2/job_templates/10/launch/", json=data, verify=False, headers=headers)
print(start_job.status_code)
job=start_job.json()
print(job)
jobid=job['job']
########################################################################################################################
# Loop to wait for state finished
########################################################################################################################
status= None
loop = 0
while status is None:
print(loop)
job_state = requests.get(f"https://10.1.1.1/api/v2/jobs/{jobid}/", headers=headers, verify=False)
job_state_data = job_state.json()
if job_state_data['status'] == "successful":
status = job_state_data['status']
print(status)
print("xxxxxx")
loop +=1
time.sleep(1)
print("Final:")
print(job_state_data['status'])
This example do the same without parameters.
import time
import requests
import urllib3
urllib3.disable_warnings()
########################################################################################################################
# Aquire token
########################################################################################################################
response = requests.post('https://10.1.1.1/api/v2/tokens/', verify=False, auth=('admin', 'Pa55w0rd'))
x = response.json()
token = x['token']
########################################################################################################################
# Start template
########################################################################################################################
headers = {
"Authorization": f"Bearer {token}"
}
start_job = requests.post("https://10.1.1.1/api/v2/job_templates/7/launch/", verify=False, headers=headers)
job=start_job.json()
jobid=job['job']
########################################################################################################################
# Loop to wait for state finished
########################################################################################################################
status= None
loop = 0
while status is None:
print(loop)
job_state = requests.get(f"https://10.1.1.1/api/v2/jobs/{jobid}/", headers=headers, verify=False)
job_state_data = job_state.json()
if job_state_data['status'] == "successful":
status = job_state_data['status']
print(status)
print("xxxxxx")
loop +=1
time.sleep(1)
print("Final:")
print(job_state_data['status'])