adjusting the ingest function to update json body for ingest and to update a 'current' object for latest status retrieval

schema
lance allen 6 years ago
parent 0e2f600338
commit cf2c6e56db

@ -1,70 +1,85 @@
#!/usr/bin/env python #!/usr/bin/env python
from datetime import datetime from datetime import datetime
from json import dumps from json import dumps, loads
from pytz import utc
import config as app_config import config as app_config
import boto3 import boto3
from pprint import pprint
bucket_name = app_config.backend['s3']['name'] bucket_name = app_config.backend['s3']['name']
bucket_region = app_config.backend['s3']['region'] bucket_region = app_config.backend['s3']['region']
athena = boto3.client('athena')
def response(code, method, device):
query_tmpl = ''' """Returns a JSON response to HTTP clients"""
SELECT return {
* "response": code,
FROM "method": method,
history "device": device
WHERE }
token = '%(token)s'
AND
year = %(year)d def status(request):
AND """View status of the device"""
month = %(month)d device_name = request.matched_route.name
AND
day = %(day)d if request.method == "GET":
ORDER BY s3 = boto3.client("s3")
timestamp asc object = s3.get_object(
''' Bucket=bucket_name,
Key="current/{}.json".format(device_name)
response = athena.start_query_execution( )
QueryString=query, json_body = loads(object["Body"].read())
QueryExecutionContext={ return json_body
'Database': 'locations' else:
}, return response(204, request.method, device_name)
ResultConfiguration={
'OutputLocation': output_location
}
)
print('Execution scheduled with ID -> ', response['QueryExecutionId'])
def ingest(request): def ingest(request):
"""Ingest incoming JSON data from Owntracks devices into S3""" """Ingest incoming JSON data from Owntracks devices into S3"""
now = datetime.now() now = datetime.now(tz=utc)
device_name = request.matched_route.name device_name = request.matched_route.name
# https://www.w3.org/Protocols/rfc2616/rfc2616-sec6.html#sec6.1.1 # https://www.w3.org/Protocols/rfc2616/rfc2616-sec6.html#sec6.1.1
if request.method == "POST": if request.method == "POST":
# Discard request if empty body string
if not request.body:
return response(400, request.method, device_name)
# Otherwise continue
s3 = boto3.client("s3") s3 = boto3.client("s3")
json_data = request.json_body json_data = request.json_body
json_data["device_name"] = device_name json_data["device_name"] = device_name
json_data["timestamp"] = int(now.timestamp())
json_data["datestamp"] = now.strftime("%Y-%m-%d %H:%M:%S %Z")
print("[DEBUG] Received object!\n{}".format(json_data))
# Put the object in the date stamped path
s3.put_object( s3.put_object(
ACL="private", ACL="private",
Body=dumps(json_data), Body=dumps(json_data),
Bucket=bucket_name, Bucket=bucket_name,
Key="year={}/month={}/day={}/{}.json".format( Key="year={}/month={}/day={}/{}_{}.json".format(
now.year, now.month, now.day, json_data["tst"] now.year, now.month, now.day,
json_data["timestamp"], json_data["_type"]
), ),
ServerSideEncryption='AES256' ServerSideEncryption='AES256'
) )
# Also update the "current" object
if json_data["_type"] == "location":
s3.put_object(
ACL="private",
Body=dumps(json_data),
Bucket=bucket_name,
Key="current/{}-status.json".format(device_name),
ServerSideEncryption='AES256'
)
response_code = 202 response_code = 202
else: else:
response_code = 204 response_code = 204
return { return response(response_code, request.method, device_name)
"response": response_code,
"method": request.method,
"device": device_name
}

Loading…
Cancel
Save