@ -4,6 +4,7 @@
from boto3 import client as boto3_client
from json import loads as json_loads
from json import dumps as json_dumps
from ast import literal_eval
from arrow import get as arrow_get
from arrow import utcnow as arrow_utcnow
from flask import current_app as app
@ -20,6 +21,7 @@ class Secret(object):
def __init__ ( self , secret_name = ' ' ) :
self . secret_name = secret_name
# Perform a check if secret name provided
if self . secret_name :
self . check ( )
@ -40,12 +42,14 @@ class Secret(object):
self . expired = None
def check_tags_expired ( self , json_data ) :
""" Given a DescribeSecret JSON response and assess whether
the ' Expiration ' tag shows the secret is expired or not
""" Given a DescribeSecret JSON response , assess whether
the metadata tags show the secret is expired or not
"""
now = arrow_utcnow ( )
for tag in json_data [ ' Tags ' ] :
# Set expiration equal to tag value - perform date delta vs now
if ' Expiration ' in tag . values ( ) :
self . expiration = tag [ ' Value ' ]
expiration_date = arrow_get ( tag [ ' Value ' ] )
@ -55,10 +59,20 @@ class Secret(object):
else :
self . expired = False
# Set value if it's set to expire after first read
if ' ExpireOnRead ' in tag . values ( ) :
self . expire_on_read = literal_eval ( str ( tag [ ' Value ' ] ) . title ( ) )
else :
self . expire_on_read = False
# If 'LastAccessedDate' in json then the record has been read
if ' LastAccessedDate ' in json_data and self . expire_on_read :
self . expired = True
return
def create ( self , username , password , message , expiration = ' ' ) :
def create ( self , username , password , message , expiration = ' ' , expire_on_read = False ):
""" Create a secret """
now = arrow_utcnow ( )
@ -67,6 +81,7 @@ class Secret(object):
self . username = str ( username )
self . password = str ( password )
self . message = str ( message )
self . expire_on_read = bool ( expire_on_read )
data_object = {
" username " : self . username ,
" password " : self . password ,
@ -75,7 +90,7 @@ class Secret(object):
if not expiration :
# Set default time expiration
self . expiration = str ( now . shift ( hours = app . config [ ' DEFAULT_HOURS ' ] ) )
self . expiration = str ( now . shift ( hours = app . config . get ( ' DEFAULT_HOURS ' ) ) )
else :
# Otherwise validate we have a good expiration date
try :
@ -98,6 +113,10 @@ class Secret(object):
' Key ' : ' Expiration ' ,
' Value ' : self . expiration
} ,
{
' Key ' : ' ExpireOnRead ' ,
' Value ' : str ( self . expire_on_read )
}
]
)
@ -113,38 +132,6 @@ class Secret(object):
)
secret_value = json_loads ( response [ " SecretString " ] )
secret_value [ ' expiration ' ] = self . expiration
secret_value [ ' expire_on_read ' ] = self . expire_on_read
return secret_value
# One-off functions used by cleanup Lambda
def list_secrets ( boto_client ) :
""" List all secrets """
next_token = " "
pagination_finished = False
secrets = [ ]
response = boto_client . list_secrets (
MaxResults = 20
)
while not pagination_finished :
for secret in response [ ' SecretList ' ] :
secrets . append ( secret )
if ' NextToken ' in response :
next_token = response [ ' NextToken ' ]
response = boto_client . list_secrets (
MaxResults = 20 ,
NextToken = next_token
)
else :
pagination_finished = True
return secrets
def delete_secret ( boto_client , secret_name ) :
""" Remove a secret """
response = boto_client . delete_secret (
SecretId = secret_name ,
ForceDeleteWithoutRecovery = True
)
return response