Scripted access (TAP)

This tutorial will get you started to use Python and the IVOA TAP protocol to run scripted queries and retrieve results from the CosmoSim archive.

All the scripts can be also found at CosmoSim TAP tutorial on Github.

Installation of pyvo

In order to interact with the TAP interface of www.cosmosim.org you only require python 3+ and pyvo 1+.

pip install pyvo>=1.0

Importing PyVo and checking the version

It is useful to always print the version of pyvo you are using. Most of non-working scripts fail because of an old version of pyvo.

from pkg_resources import parse_version
import pyvo

#
# Verify the version of pyvo
#
if parse_version(pyvo.__version__) < parse_version('1.0'):
    raise ImportError('pyvo version must be at least than 1.0')

print('\npyvo version %s \n' % (pyvo.__version__,))

Authentication

After registration you can access your API Token by clicking on your user name in the right side of the menu bar. Then select API Token.

aip-token

You will see a long alphanumerical word. Just copy it where ever you see <your-token> in the following examples.

aip-token-blured

The API Token identifies you and provides access to the results tables of your queries.

The connection to the TAP service can be done that way:

import requests
import pyvo

#
# Setup tap_service connection
#
service_name = "CosmoSim"
url = "https://www.cosmosim.org/tap"
token = 'Token <your-token>'

print('TAP service %s \n' % (service_name,))

# Setup authorization
tap_session = requests.Session()
tap_session.headers['Authorization'] = token

tap_service = pyvo.dal.TAPService(url, session=tap_session)

Short queries

Many queries last less than a few seconds, we call them short queries. The latter can be executed with synchronized jobs. You will retrieve the results interactively.

lang = "PostgreSQL"

query = '''-- Select surrounding halos
SELECT bdmid, x, y, z, rvir, mvir
  FROM mdr1.bdmv
 WHERE pdist(1000, x,y,z, 998,450,500) < 5
 LIMIT 10
'''

tap_result = tap_service.run_sync(query, language=lang)


Remark: the lang parameter can take two values, either PostgreSQL or ADQL. This allows to access some features present in the one or the other language. For more details about the differences between both, please refer to Documentation or to IOVA docs

The result tap_result is a so called TAPResults that is essentially a wrapper around an Astropy votable.Table. For standard conversion see Convert to various python types.

Asynchronous jobs

For slightly longer queries, typically counting or larger selections (>10000 objects) a synchronized job will fail because of timeouts (from http protocol or server settings). This is why we provide the possibility to submit asynchronous jobs. These type of jobs will run on the server side, store the results such that you can retrieve them at a later time. They come in 3 flavors: * 1 Minute queue * 1 Hour queue * 5 Hours queue

The 1 minute queue

Most of the asynchronous queries will require less than 1 minute, basically all queries without JOIN, or CONE SEARCH. Therefore this queue is the default and should be preferred.

#
# Submit the query as an async job
#
query_name = "select_snapshot_by_redshifts"
lang = 'PostgreSQL' # ADQL or PostgreSQL
query = '''
-- Select simulation snapshots by redishift
SELECT distinct zred, aexp, snapnum
  FROM mdr1.redshifts
 ORDER BY snapnum
  DESC      
'''

job = tap_service.submit_job(query, language=lang, runid=query_name, queue="1m")
job.run()

#
# Wait to be completed (or an error occurs)
#
job.wait(phases=["COMPLETED", "ERROR", "ABORTED"], timeout=60.0)
print('JOB %s: %s' % (job.job.runid, job.phase))

#
# Fetch the results
#
job.raise_if_error()
print('\nfetching the results...')
tap_results = job.fetch_result()
print('...DONE\n')

As for sync jobs, the result is a TAPResults object.

The 1 hour queue

If you want to extract information on specific stars from various tables you have to JOIN tables. Your query may need more than a few seconds. For that, the 1 hour queue provide a good balance. It should be noticed that for such a queue the wait method should not be used to prevent an overload of the server at peak usage. Therefore using the script with the sleep() method is recommended.

#
# Submit the query as an async job
#
lang = 'PostgreSQL'
query_name = "radial_prof_massive_bdmv"

query = '''
-- Radial profile of most massive BDMV (z=0)
SELECT * FROM bolshoi.bdmvprof
 WHERE bdmid =
       (SELECT bdmid FROM bolshoi.bdmv
         WHERE snapnum=416 ORDER BY mvir DESC LIMIT 1)
 ORDER BY rbin
'''

job = tap_service.submit_job(query, language=lang, runid=query_name, queue="1h")
job.run()

print('JOB %s: SUBMITTED' % (job.job.runid,))

#
# Wait for the query to finish
#
while job.phase not in ("COMPLETED", "ERROR", "ABORTED"):
    print('WAITING...')
    time.sleep(3600.0) # do nothing for some time

print('JOB ' + (job.phase))

#
# Fetch the results
#
job.raise_if_error()
print('\nfetching the results...')
results = job.fetch_result()
print('...DONE\n')

The 5 hours queue

Some complex queries like Cross-Matching or geometric search may take more than the short queues allow. For this purpose we provide the 5 hours queue. If you need longer queues please contact us.

When running a long query, you surely don't want to block CPU ressources for a python process that just wait for 5 hours, for the queue to finish. Therefore long queries are typically done in two parts (= two scripts), one that submits the request, another one that retrieve the results.

Submitting a job and store job_urls for later retrieval

We first submit the query as an async job to the 5h queue, and store the job (the url) of the newly created job into a file job_url.txt. With this url we are able to retrieve the results (when it has finished) at any later time.

#
# Submit the query as an async job
#
query_name = ""
lang = 'PostgreSQL'

query = '''
-- Mass accression history of a halo
SELECT p.foftreeid, p.treesnapnum, p.mass, p.np
  FROM mdr1.fofmtree AS p,
       (SELECT foftreeid, mainleafid FROM mdr1.fofmtree
         WHERE fofid=85000000000) AS mycl
 WHERE p.foftreeid BETWEEN mycl.foftreeid AND mycl.mainleafid
 ORDER BY p.treesnapnum
'''

job = tap_service.submit_job(query, language=lang, runid=query_name, queue="5h")
job.run()

print('JOB %s: SUBMITTED' % (job.job.runid,))
print('JOB %s: %s' % (job.job.runid, job.phase))

#
# Save the job's url in a file to later retrieve results.
#
print('URL: %s' % (job.url,))

with open('job_url.txt', 'w') as fd:
    fd.write(job.url)

Retrieve the results at a later time

In order to retrieve the results, we will first recreate the job from the job_url stored in the job_url.txt file and verify that the job is finished, by asking for its current phase. In case the job is finished we will retrieve the results as usual.

#
# Recreate the job from url and session (token)
#

# read the url
with open('job_url.txt', 'r') as fd:
    job_url = fd.readline()

# recreate the job
job = pyvo.dal.AsyncTAPJob(job_url, session=tap_session)

#
# Check the job status
#
print('JOB %s: %s' % (job.job.runid, job.phase))

# if still running --> exit
if job.phase not in ("COMPLETED", "ERROR", "ABORTED"):
    exit(0)

#
# Fetch the results
#
job.raise_if_error()
print('\nfetching the results...')
tap_results = job.fetch_result()
print('\n...DONE\n')

Thanks to this method you can submit a job, go for a coffee, write a paper and retrieve the results when it suits you. The job and its results are stored on the server side under your user account.

Submitting multiple queries

Some time it is needed to submit several queries at one time. Either because the entire query may last longer than 5 hours and you need to cut it in smaller parts, or because you need non JOIN-able information from various tables.

Your query is too long? Chunk it!

Before contacting us and ask for longer queue time: You may try to cut long queries into smaller chunks, and execute them as a list of shorter queries.

There are three typical ways to do this: * chunk via halo total mass: mtot * chunk via halo bound mass: mvir * chunk via Friend-of-Friend tree ID: foftreeid

Here is an example how to submit a list of queries one after another.

def submit_queries(tap_service, queries, lang='PostgreSQL', queue='1m', urls_filename='jobs_url.txt'):
    '''Submit a serie of tap queries

    Parameters:
    --------
    tap_service: pyvo.dal.tap.TAPService
        The TAP service to which the query will be submitted

    queries: list(tuple)
        A list consisting of (query_name, query_string) pairs

    lang: str, default: PostgreSQL
        The language in which the queries a written

    queue: str, default: 1m
        The name of the queue to use

    urls_filename: str, default: jobs_url.txt
        The filename of the file holding the urls of the submitted jobs (for later retrieval)
    '''

    # list of failed jobs
    failed = []


    # open the file to store the jobs: for later retrieval
    fd = open('jobs_url.txt', 'w')


    # submit all queries one after another
    for name, query in queries:

        print('> Submitting : {name}'.format(name=name))

        # Create the async job
        try:
            job = tap_service.submit_job(query, language=lang, runid=name, queue=queue)
        except Exception as e:
            print('ERROR could not create the job.')
            print(e)
            failed.append(runid)
            continue        

        # Run the run
        try:
            job.run()
        except Exception as e:
            print('Error: could not run the job. Are you sure about: \n - validity of the SQL query?\n - valid language?\n - sufficient quotas?\n')
            print(e)
            failed.append(name)
            continue

        # Save the submitted jobs into a file
        fd.write(job.url + '\n')


    # Verify that all jobs have been submitted
    try:
        assert(failed == [])
        print("\nAll jobs were properly submitted.")
    except AssertionError:
        print("\nThe following jobs had failed: {jobs}".format(jobs=failed))

    fd.close()

To fetch the results of the submitted queries, one can fetch the results via the job url.

def fetch_results_of_complete_jobs(tap_service, urls_filename):
    '''Fetch the results of complete jobs

    Parameters:
    -----------
    tap_service: pyvo.dal.tap.TAPService
        The TAP service to which the query will be submitted

    urls_filename: str
        The filename of the file holding the urls of the jobs
    '''

    running_job_names = []

    #
    # Recreate the job from url and session (token)
    #

    # read the url
    with open(urls_filename, 'r') as fd:
        job_urls = fd.readlines()

    # reopen the file to store the non finished jobs
    fd = open(urls_filename, 'w')

    # Query status and if COMPLETE fetch results
    for job_url in job_urls:

        # recreate the job
        job = pyvo.dal.AsyncTAPJob(job_url.rstrip('\n'), session=tap_session)

        #
        # Check the job status
        #
        print('JOB {name}: {status}'.format(name=job.job.runid , status=job.phase))

        # if still running --> exit
        if job.phase not in ("COMPLETED", "ERROR", "ABORTED"):
            running_job_names.append(job.job.runid)
            fd.write(job_url)
            continue

        #
        # Fetch the results
        #
        try:
            job.raise_if_error()
            print('fetching the results...')
            tap_results = job.fetch_result()
            print('writing results to disk...\n')
            tap_results.votable.to_xml('./' + str(job.job.runid) + '.xml')

        except Exception as e:
            running_job_names.append(job.job.runid)
            fd.write(job_url)
            print(e)

    print('...DONE\n')

    # Output still running jobs
    try:
        assert(running_job_names == [])
    except AssertionError:
        print("The following jobs are still executing: {}".format(running_job_names))

    fd.close()

List of file queries

Sometimes it is useful to just send all .sql queries present in a directory. For such purpose you can use comments to provide the proper parameters.

Let us consider the file radial_prof_massive_bdmv.sql

-- Radial profile of most massive BDMV (z=0)

-- LANGUAGE = PostgreSQL
-- QUEUE = 1h

SELECT * FROM bolshoi.bdmvprof
 WHERE bdmid =
       (SELECT bdmid FROM bolshoi.bdmv
         WHERE snapnum=416 ORDER BY mvir DESC LIMIT 1)
 ORDER BY rbin

The language and queue are prescibed as comments. The query can then be submitted in a script like the following:

import glob

#
# Submit the query as an Asynchrone job
#

# find all .sql files in current directory
queries_filename = sorted(glob.glob('./*.sql'))
print('Sending %d examples' % (len(queries_filename),))

# initialize test results
jobs = []
failed =  []

# send all queries
for query_filename in queries_filename:

    # read the .SQL file
    with open(query_filename, 'r') as fd:
        query = ' '.join(fd.readlines())

    # Set language from comments (default: PostgreSQL)
    if 'LANGUAGE = ADQL' in query:
        lang = 'ADQL'
    else:
        lang = 'PostgreSQL'

    # Set queue from comments (default: 1m)
    if 'QUEUE = 5h' in query:
        queue = "5h"
    elif 'QUEUE = 1h' in query:
        queue = "1h"
    elif 'QUEUE = 1m' in query:
        queue = "1m"
    else:
        queue = "1m"


    # Set the runid from sql filename
    base = os.path.basename(query_filename)
    runid = os.path.splitext(base)[0]

    print('\n> Query : %s\n%s\n' % (runid, query))

The rest of the submission process and retrieval can be done in any manner.

Convert result to various python types

The results obtained via the fetch_results() method returns a so called TAPResults object. The latter is essencially a votable. In case you are not familiar with votables here is a few tricks to get back to some more general pythonic types.

Print data to screen

  • Print the data: python tap_results.to_table().pprint(max_lines=10) It is important to notice the max_lines keyword, printing too many lines may crash a low-memory machine.

  • Show as html (in a browser): python tap_results.to_table().show_in_browser(max_lines=10) It is important to notice the max_lines keyword, printing too many lines may crash a low-memory machine.

  • Show in a notebook (ipython, jupyter or jupyterlab): python tap_results.to_table().show_in_notebook(display_length=10) It is important to notice the display_length keyword, printing too many lines may crash a low-memory machine.

Write data to file

  • Write the results to a VOTable python tap_results.votable.to_xml("filename.xml") or python tap_results.to_table().write("filename.xml", format='votable')

  • Write the results to FITs file python tap_results.to_table().write("filename.fits", format='fits')

  • Write the results to CSV file (not recommanded: lose the metadata) python tap_results.to_table().write("filename.csv")

Convert to alternative format

  • Get a numpy array: python np_array = tap_results.to_table().as_array()

  • Get a Panda's DataFrame python df = tap_results.to_table().to_pandas()

    • Get the header of DataFrame: python df.head()

Archiving your jobs

If you submit several large queries you may go over quota: set to 100 GB. In order to avoid to get over quota you may consider archiving your jobs. Archiving removes the data from the server side but keeps the SQL query. This allows to resubmit a query at a later time.

Deleting (Archiving) a job with pyvo can be simply done that way:

job.delete()

Archiving all COMPLETED jobs

A nice feature of the TAP service is to retrieve all jobs that are marked as COMPLETED and archive them at ones. This can be done as follows:

#
# Archiving all COMPLETED jobs
#

# obtain the list of completed job_descriptions
completed_job_descriptions = tap_service.get_job_list(phases='COMPLETED')

# Archiving each of them
for job_description in completed_job_descriptions:

    # get the jobid
    jobid = job_description.jobid

    # recreate the url by hand
    job_url = tap_service.baseurl + '/async/' + jobid

    # recreate the job
    job = pyvo.dal.AsyncTAPJob(job_url, session=tap_session)

    print('Archiving: {url}'.format(url=job_url))
    job.delete() # archive job

Rerunning ARCHIVED jobs

Rerunning and retrieving results from a job that have been archived previously, can be achieved that way:

#
# Rerunning Archived jobs
#

# obtain the list of the two last ARCHIVED job_descriptions
archived_job_descriptions = tap_service.get_job_list(phases='ARCHIVED', last=2)

# rerunning the two last Archived jobs
for job_description in archived_job_descriptions:

    # get jobid
    jobid = job_description.jobid

    # recreate the url by hand
    job_url = tap_service.baseurl + '/async/' + jobid

    # recreate the archived job
    archived_job = pyvo.dal.AsyncTAPJob(job_url, session=tap_session)

    # get the language (with a bit of magic)
    lang = [parameter._content for parameter in archived_job._job.parameters if parameter._id == 'query_language'][0]

    # extract the query
    query = archived_job.query

    # resubmit the query with corresponding parameters
    job = tap_service.submit_job(query, language=lang, runid='rerun', queue='1m')
    print('%(url)s :\n%(query)s\n' % {"url": job_url, "query": query})

    # start the job
    try:
        job.run()
    except pyvo.dal.DALServiceError:
        raise ValueError("Please check that the SQL query is valid, and that the SQL language is correct.")    

Retrieving the results is done alike explained above.

If you prefer you can also filter for a given runid.

#
# Filtering by runid
#

target_runid = 'radial_prof_massive_bdmv'

# obtain the list of completed job_descriptions
archived_job_descriptions = tap_service.get_job_list(phases='ARCHIVED')

for job_description in archived_job_descriptions:

    # select the job with runid fitting target_runid
    if job_description.runid == target_runid:

        # get jobid
        jobid = job_description.jobid

        # recreate the url by hand
        job_url = tap_service.baseurl + '/async/' + jobid

        # recreate the archived job
        archived_job = pyvo.dal.AsyncTAPJob(job_url, session=tap_session)

        # get the language (with a bit of magic)
        lang = [parameter._content for parameter in archived_job._job.parameters if parameter._id == 'query_language'][0]

        # extract the query
        query = archived_job.query

        # resubmit the query with corresponding parameters
        job = tap_service.submit_job(query, language=lang, runid='rerun', queue='1m')
        print('%(url)s :\n%(query)s\n' % {"url": job_url, "query": query})

        # start the job
        try:
            job.run()
        except pyvo.dal.DALServiceError:
            raise ValueError("Please check that the SQL query is valid, and that the SQL language is correct.")