import mdf_iter
import canedge_browser
import can_decoder
import pandas as pd
import numpy as np
from datetime import datetime, timezone
from utils import (
setup_fs,
load_dbc_files,
restructure_data,
add_custom_sig,
ProcessData,
)
import logging
import boto3
from botocore.exceptions import ClientError
1.1.2 S3 업로드
Python SDK를 활용하여 S3에 csv 파일을 업로드합니다.
def upload_file(file_name, bucket, object_name=None):
"""Upload a file to an S3 bucket
:param file_name: File to upload
:param bucket: Bucket to upload to
:param object_name: S3 object name. If not specified then file_name is used
:return: True if file was uploaded, else False
"""
# If S3 object_name was not specified, use file_name
if object_name is None:
object_name = file_name
# Upload the file
s3_client = boto3.client("s3")
try:
response = s3_client.upload_file(file_name, bucket, object_name)
except ClientError as e:
logging.error(e)
return False
return True
1.1.3 lambda_handler
Lambda에 파일을 생성하려면 경로를 반드시 /tmp/로 지정하여야합니다.
하나의 MDF4 파일만 변환하여 CSV 파일로 변환한 후 업로드합니다.
(기존의 코드는 폴더 속 모든 MDF4 파일을 불러와 데이터프레임에 추가합니다.)
def ratio(s1, s2):
return s2 / s1 if s1 else np.nan
def lambda_handler(event, context):
bucket = event["Records"][0]["s3"]["bucket"]["name"]
object_key = f'{event["Records"][0]["s3"]["object"]["key"].split(".")[0]}.csv'
print(bucket)
# specify devices to process (from local/S3), DBC files and start time
devices = [f"{bucket}/LOG/958D2219"]
dbc_paths = ["dbc_files/CSS-Electronics-SAE-J1939-DEMO.dbc"]
start = datetime(year=2020, month=1, day=13, hour=0, tzinfo=timezone.utc)
# setup filesystem (local/S3), load DBC files and list log files for processing
fs = setup_fs(s3=True, endpoint="http://s3.ap-southeast-1.amazonaws.com")
db_list = load_dbc_files(dbc_paths)
log_file = f'{bucket}/{event["Records"][0]["s3"]["object"]["key"]}'
# --------------------------------------------
# perform data processing of each log file
proc = ProcessData(fs, db_list, signals=[])
df_phys_all = pd.DataFrame()
df_raw, device_id = proc.get_raw_data(log_file)
df_phys = proc.extract_phys(df_raw)
proc.print_log_summary(device_id, log_file, df_phys)
df_phys_all = df_phys_all.append(df_phys)
# --------------------------------------------
# example: Add a custom signal
df_phys_all = add_custom_sig(
df_phys_all, "WheelBasedVehicleSpeed", "EngineSpeed", ratio, "RatioRpmSpeed"
)
# --------------------------------------------
# example: resample and restructure data (parameters in columns)
df_phys_join = restructure_data(df_phys=df_phys_all, res="1S")
df_phys_join.to_csv(f"/tmp/{object_key.split('/')[-1]}")
upload_file(f"/tmp/{object_key.split('/')[-1]}", bucket, object_key)
print(df_phys_join)
1.2 결과
MF4 파일을 업로드한 다음의 결과입니다.
MF4 파일에 저장된 값을 변환하여 같은 이름의 csv 파일로 저장합니다.
2. DynamoDB
Dashboard에 사용할 값(ex 과거 통계 데이터)을 가공하여 DynamoDB에 저장합니다.
만약 기존의 항목이 없다면 새로 생성하며, 있는 경우에는 기존 값을 불러와 값을 더합니다.
def create_statistical_item(name, items):
import pandas as pd
from datetime import datetime
# Dataframe Size
column_size = items.columns.size
# Convert Dataframe Timestamp -> datetime.datetime
first_timestamp = items.index[0].to_pydatetime()
item = dynamodb_get_item(
name, str(first_timestamp.year), str(first_timestamp.month)
)
if not item:
item = {
"Year": {"N": str(first_timestamp.year)},
"Month": {"N": str(first_timestamp.month)},
"Day": {"M": {}},
}
if str(first_timestamp.day) not in item["Day"]["M"].keys():
item["Day"]["M"][str(first_timestamp.day)] = {"M": {}}
for index in range(column_size):
item["Day"]["M"][str(first_timestamp.day)]["M"][items.columns[index]] = {
"N": str(items.sum()[index])
}
else:
for index in range(column_size):
item["Day"]["M"][str(first_timestamp.day)]["M"][items.columns[index]] = {
"N": str(
float(
item["Day"]["M"][str(first_timestamp.day)]["M"][
items.columns[index]
]["N"]
)
+ items.sum()[index]
)
}
return item
변경사항(2021.05.31)
Day 항목의 타입을 리스트로 바꾸는 과정에서 일부 코드 수정이 있었습니다.
아래 결과가 다르게 보일 수 있으나, 진행 과정은 변함없습니다. 참고바랍니다.
def create_statistical_item(name, items):
import pandas as pd
from datetime import datetime
# Dataframe Size
row_size = items.index.size
column_size = items.columns.size
pre_item = {"Count": {"N": str(row_size)}}
# Convert Dataframe Timestamp -> datetime.datetime
first_timestamp = items.index[0].to_pydatetime()
last_timestamp = items.index[-1].to_pydatetime()
item = dynamodb_get_item(
name, str(first_timestamp.year), str(first_timestamp.month)
)
if not item:
item = {
"Year": {"N": str(first_timestamp.year)},
"Month": {"N": str(first_timestamp.month)},
"Day": {"L": []},
}
"""
"Day" : {
{"L" : [
{"M": {
"Day" : {
{"N" : 13}
},
"WheelBasedVehicleSpeed" : {
{"N" : 150000}
}
...
}
}
]
}
}
"""
day_set = [obj["M"]["Day"]["N"] for obj in item["Day"]["L"]]
if str(first_timestamp.day) not in day_set:
for index in range(column_size):
pre_item[items.columns[index]] = {"N": str(items.sum()[index])}
pre_item["Day"] = {"N": str(first_timestamp.day)}
item["Day"]["L"].append({"M": pre_item})
else:
for content in item["Day"]["L"]:
if content["M"]["Day"]["N"] == str(first_timestamp.day):
content["M"]["Count"]["N"] = str(
float(content["M"]["Count"]["N"]) + row_size
)
for index in range(column_size):
content["M"][items.columns[index]]["N"] = str(
float(content["M"][items.columns[index]]["N"])
+ items.sum()[index]
)
return item
2.1.6 Put Item
테이블에 값을 추가합니다.
테이블이 있는지 확인하여, 없는 경우 생성합니다.
2.1.5에서 생성한 아이템을 테이블에 추가합니다.
def dynamodb_put_item(name, items):
"""Add items from dataframe into AWS DynamoDB
:param name: DynamoDB table name
:param items: Groups of item which will be added to AWS DynamoDB
"""
# Check a DynamoDB Table List
dynamotb_tables = dynamodb_list_table()
if name not in dynamotb_tables:
import time
table = dynamodb_create_table(name)
while True:
table = dynamodb_describe_table(name)
if table["TableStatus"] != "ACTIVE":
print(
f'Table Name: {table["TableName"]} Status: {table["TableStatus"]}'
)
time.sleep(5)
else:
print(
f'Table Name: {table["TableName"]} Status: {table["TableStatus"]}'
)
break
import boto3
import pandas as pd
client = boto3.client("dynamodb")
item = create_statistical_item(name, items)
response = client.put_item(TableName=name, Item=item)
return True
Athena ▶ Data Sources ▶ Connect data source 순으로 선택합니다.
Query a data source ▶ Amazon DynamoDB ▶ Next 순으로 선택합니다.
Configure new AWS Lambda function을 선택합니다.
SAM 리소스로 복사를 선택합니다.
SAM 구성 중 template.yaml 파일을 수정합니다.
Resource 항목 아래에 복사한 내용을 추가합니다. AthenaCatalogName, SpillBucket을 필요에 따라 변경합니다.
AthenaDynamoDBConnector:
Type: AWS::Serverless::Application
Properties:
Location:
ApplicationId: arn:aws:serverlessrepo:us-east-1:292517598671:applications/AthenaDynamoDBConnector
SemanticVersion: 2021.18.1
Parameters:
# The name you will give to this catalog in Athena. It will also be used as the function name. This name must satisfy the pattern ^[a-z0-9-_]{1,64}$
AthenaCatalogName: dynamodbcatalog
# WARNING: If set to 'true' encryption for spilled data is disabled.
# DisableSpillEncryption: 'false' # Uncomment to override default value
# Lambda memory in MB (min 128 - 3008 max).
# LambdaMemory: '3008' # Uncomment to override default value
# Maximum Lambda invocation runtime in seconds. (min 1 - 900 max)
# LambdaTimeout: '900' # Uncomment to override default value
# The name of the bucket where this function can spill data.
SpillBucket: !Ref MyFilesBucket
# The prefix within SpillBucket where this function can spill data.
# SpillPrefix: 'athena-spill' # Uncomment to override default value
변경사항을 build하여 Cloudformation에 반영합니다.
sam build --use-container --build-image amazon/aws-sam-cli-build-image-python3.8
sam package --template-file template.yaml --s3-bucket aws-sam-cli-managed-default-samclisourcebucket-aaa