app.utils package

Submodules

app.utils.gcs module

app.utils.gcs.add_autoscoring_task_to_cloud_tasks(tasks_client, scoringrun, raw_id, files, override_task_id=False, db=None)
app.utils.gcs.check_blob_exists(storage_client, bucketId, objectId)
app.utils.gcs.copy_blob(storage_client, bucket_name, src_file_name, dst_file_name)
app.utils.gcs.create_blob(storage_client, bucket_name, file_data, file_name, content_type='text/plain')
app.utils.gcs.delete_blob(storage_client, bucket_name, file_name)
app.utils.gcs.download_blob_as_bytes(storage_client, bucketId, objectId)
app.utils.gcs.download_blob_as_string(storage_client, bucketId, objectId)
app.utils.gcs.generate_autoscoring_task_autoscoringconfig(scoringrun, db=None)
app.utils.gcs.generate_autoscoring_task_channelmapping(scoringrun)
app.utils.gcs.get_presigned_gcs_url(storage_client, bucket_name, full_filename, expiry_in_days=7, method='POST')
app.utils.gcs.get_storage_client()
app.utils.gcs.get_tasks_client()
app.utils.gcs.list_blobs(storage_client, bucketId, prefix)
app.utils.gcs.trigger_edf_validator(tasks_client, scoringrun)
app.utils.gcs.yield_blob_as_chunks(storage_client, bucketId, objectId)

app.utils.utils module

app.utils.utils.compliance_log(msg)
app.utils.utils.lights_from_seconds_to_epoch(lights_off_seconds, lights_on_seconds)
app.utils.utils.lights_on_calculation(lights_on_epoch, edf_duration)
app.utils.utils.verify_token_get_or_create_internaluser(db, token, verify_token=None)
app.utils.utils.verify_token_m2m(token, verify_token=None)
class app.utils.utils.verify_token_oidc(token, grant_type='user')

Bases: object

Does all the token verification using PyJWT

download_public_certs()
verify()

Module contents