Connecting

AWS IoT Core 연동하기 본문

클라우드

AWS IoT Core 연동하기

팬도라 2023. 9. 11. 15:28
반응형

본 글은 BMS (Battery Management System)를 구성하는 자체적인 시나리오를 기반으로 AWS IoT Core와 연동합니다.  

1. 서론

배터리 관리 시스템(BMS) 개요

배터리 관리 시스템(BMS)은 최신 에너지 저장 시스템의 중요한 구성 요소입니다. 주요 목적은 배터리 성능을 모니터링, 제어 및 최적화하는 것입니다. 안전하고 효율적인 배터리 작동을 보장함으로써 BMS는 전기 자동차, 재생 에너지 시스템 및 휴대용 전자 장치를 포함한 다양한 응용 분야에서 중요한 역할을 합니다.

프로젝트 목표 및 목적

이 프로젝트의 주요 목표는 실제 BMS의 기능을 에뮬레이트하는 가상 배터리 관리 시스템(BMS)을 개발하는 것입니다. 가상 BMS는 AWS IoT 서비스와 통합되어 원활한 데이터 수집, 처리 및 분석이 가능합니다. 프로젝트의 구체적인 목표는 다음과 같습니다.

  • 배터리 매개변수를 모니터링할 수 있는 가상 BMS 장치를 설계하고 시뮬레이션합니다
  • 가상 BMS에서 배터리 밸런싱, 보호 및 충전 상태 추정을 위한 기능을 구현합니다.
  • 데이터 통신 및 관리를 위해 가상 BMS를 AWS IoT Core와 통합합니다.
  • 실시간 인사이트를 위해 대화형 대시보드를 통해 배터리 관련 지표를 시각화합니다.

2. BMS 가상 디바이스 디자인

가상 디바이스의 필요성

가상 디바이스는 실제 장치의 작동을 시뮬레이트하여, 실제 장치의 구현 전에 시스템의 작동을 이해하고, 문제를 식별하고, 솔루션을 테스트하는 데 도움이 됩니다. 이는 개발 과정에서 시간과 자원을 절약하며, 최종 제품의 품질과 신뢰성을 향상 시킵니다.

가상 디바이스에서 수집되어야 하는 데이터

가상 BMS는 배터리 상태를 효과적으로 모니터링하기 위해 중요한 데이터를 수집해야 합니다. 수집할 기본 매개변수는 다음과 같습니다.

  • 전압: 배터리의 충전 상태(SoC) 및 상태(SoH)를 평가합니다.
  • 전류: 전력 흐름을 계산하고 배터리 용량을 예측합니다.
  • 온도: 배터리 온도를 모니터링하고 열 폭주를 방지합니다.
  • 셀 밸런싱 상태: 개별 셀의 균일한 충전 및 방전을 보장합니다.
  • 배터리 보호 상태: 비정상적인 조건의 경우 안전 조치를 트리거합니다.

가상 디바이스에서 수행되어야 하는 기능

BMS 가상 디바이스는 다음과 같은 기능을 수행해야 합니다:

  • 센서 데이터 모니터링: 배터리와 관련된 모든 중요한 매개변수를 실시간으로 모니터링합니다.
  • 알림 및 경고: 임계값을 초과하거나 예상치 못한 상황이 발생할 경우, 적절한 알림 또는 경고를 생성합니다.
  • 데이터 로깅 및 보고: 센서 데이터를 로깅하고, 이를 기반으로 통계 및 보고서를 생성합니다.
  • 원격 제어: 필요에 따라 배터리의 작동을 원격으로 제어할 수 있습니다.

3. AWS IoT Core와의 통합

AWS IoT Core 개요

AWS IoT Core는 완전 관리형 MQTT 메시지 브로커 기능을 제공함으로 별도의 MQTTX와 같은 브로커를 설치하지 않고 AWS 환경과 서비스를 결합하여 사용할 수 있습니다.
AWS에서 제공하는 주요 특징은 다음과 같습니다.

효율적으로 확장되는 솔루션 구축

  • 지속적이고 상시 연결을 지원하고, 고급 메시지 보존 정책을 지원하고, 수백만 개의 디바이스와 주제를 동시에 처리하는 관리형 MQTT 브로커에 대한 기본 지원을 통해 AWS IoT Core는 인프라 비용, 라이선스 요금 및 기타 운영 비용을 절감하면서 연결된 솔루션을 자동으로 확장하여 수조 건의 메시지를 처리할 수 있습니다.MQTT 5 및 MQTT 3 사양 간 호환성 실현
  • AWS IoT Core는 향상된 최신 MQTT 5 표준을 지원하고 MQTT 3 표준과 역호환되므로 MQTT 연결 사양이 혼합된 이기종 배포를 효과적으로 관리하는 동시에 설계에 상당한 기능 개선을 통합할 수 있습니다.시스템 효율성 향상
  • MQTT 메시지에 대한 프로그래밍 가능한 규칙을 정의하여 메시지를 효율적으로 처리하고 대상으로 라우팅하고 공유 구독, 사용자 속성, 세션 만료 등의 다양한 기능을 구현하여 플릿 전반에서 메시지 관리를 개선할 수 있습니다.커뮤니케이션 메시지 보호
  • AWS IoT Core에는 솔루션을 취약성으로부터 보호하기 위한 다중 인증 방법 및 액세스 정책에 대한 기능이 포함되어 있습니다. 또한 AWS IoT Core Device Advisor를 사용하면 사전 구축된 테스트 제품군에 액세스하여 디바이스를 클라우드에 온보딩하기 전에도 개발 단계에서 디바이스의 MQTT 기능을 검증할 수 있습니다.

가상 디바이스를 AWS IoT Core에 연결하기

Raspberry Pi에 구성된 BMS 가상 디바이스가 MQTT 통신을 통해 AWS IoT Core와 연동되고, 이를 활용하여 OpenSearch, Lambda, RDS와 연동되는 모습을 볼 수 있습니다.
이번 챕터에서는 가상디바이스를 IoT Core에 연동하는 과정을 살펴보겠습니다.

BMS 가상 디바이스

위에서 설명한 내용을 기반으로 파이썬을 활용한 BMS 가상 디바이스 코드를 다음과 같이 작성합니다.

import random  
import time  
import json  
import pandas as pd  
import numpy as np  
import pvlib  
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient  
​  
​  
# AWS IoT Core   
ENDPOINT = $(ENDPOINT URL) 
CLIENT_ID = 'B001'  
TOPIC = 'bms/battery_data/B001/metric'  
PORT = 8883  
​  
#Create an MQTT client  
mqttc = AWSIoTMQTTClient(CLIENT_ID)  
mqttc.configureEndpoint(ENDPOINT,PORT)  
#Create credentials by locating your certificates and private key  
mqttc.configureCredentials("./certs/AmazonRootCA1.pem","./certs/private.pem.key","./certs/certificate.pem.crt")  
​  
#Connect to the MQTT broker  
try:  
    mqttc.connect()  
    print("AWS IoT Core Connected")  
except Exception as e:  
    print(f"Failed to connect to AWS IoT Core: {e}")  
    exit()  # Terminate the script if connection fails  
​  
# Define battery parameters  
battery_id = "B001"  
capacity = 3600  # Wh  
nominal_voltage = 48  # V  
battery_soc = 0.5  # Initial State of Charge (SoC)  
battery_soh = 1.0  # Initial State of Health (SoH)  
aging_rate = 0.005  # 0.5% capacity reduction per day  
​  
# Define solar charging parameters  
solar_voltage_range = (11, 150)  # V  
max_solar_current = 15  # A  
max_solar_power = 1600  # W  
​  
# Define AC power consumption profiles for equipment  
tv_power = 100  # W (during the day)  
lights_power = 50  # W (during the evening)  
refrigerator_power = 150  # W (intermittently throughout the day)  
​  
# Define safety thresholds  
low_battery_warning_threshold = 0.2  # 20% SoC  
overcharge_threshold = 0.95  # 95% SoC  
overcurrent_threshold = 30  # A  
overvoltage_threshold = 52  # V  
​  
# Define user profiles with randomized time of use  
user_profiles = {}  
​  
# Modify the battery model to reflect the variation in voltage with SoC  
def get_battery_voltage(soc):  
    return nominal_voltage * (0.5 + 0.5 * soc)  # Assume that the voltage varies linearly from 50% to 100% of the nominal voltage  
​  
# Modify the user profile generation code to generate random user profiles  
def generate_user_profiles():  
    equipment_list = ["TV", "Lights", "Refrigerator"]  
    for i in range(10):  # Assume there are 10 users  
        equipment = random.sample(equipment_list, random.randint(1, len(equipment_list)))  # Each user has 1 to 3 types of equipment  
        time_of_use = [(random.randint(0, 24), random.randint(0, 24)) for _ in range(random.randint(1, 3))]  # Each user uses the equipment at 1 to 3 different times of the day  
        user_profiles[f"user{i+1}"] = {"equipment": equipment, "time_of_use": time_of_use}  
​  
generate_user_profiles()  # Generate user profiles  
​  
# Define temperature and humidity profiles with randomized daily fluctuations  
temp_min, temp_max = 15, 35  # Temperature range in Celsius  
humidity_min, humidity_max = 0.2, 0.8  # Humidity range  
​  
def get_temperature_and_humidity(hour):  
    # Return temperature and humidity based on time of day  
    # Random fluctuations: ±2 degrees for temperature  
    temperature_fluctuation = np.random.uniform(-2, 2)  
    if 6 <= hour < 12:  
        temperature = 20 + temperature_fluctuation  # Morning: 20 degrees  
    elif 12 <= hour < 18:  
        temperature = 25 + temperature_fluctuation  # Afternoon: 25 degrees  
    elif 18 <= hour < 24:  
        temperature = 22 + temperature_fluctuation  # Evening: 22 degrees  
    else:  
        temperature = 18 + temperature_fluctuation  # Night: 18 degrees  
​  
    # Calculate humidity based on temperature  
    humidity = 0.6 + 0.01 * (temperature - 20) + np.random.uniform(-0.02, 0.02)  
​  
    # Ensure humidity stays within the valid range  
    humidity = max(min(humidity, humidity_max), humidity_min)  
​  
    # Round temperature and humidity to two decimal places  
    temperature = round(temperature, 2)  
    humidity = round(humidity * 100)  
​  
    return temperature, humidity  
​  
def simulate_solar_power():  
    # Get current time and solar position  
    current_time = pd.Timestamp(time.time(), unit='s', tz='Asia/Seoul')  
    latitude = 37.5665  # Latitude for Seoul  
    longitude = 126.9780  # Longitude for Seoul  
    solar_position = pvlib.solarposition.get_solarposition(current_time, latitude, longitude)  
​  
    # Calculate solar power generation using the pvlib library  
    solar_power = pvlib.irradiance.get_total_irradiance(  
        surface_tilt=30,  # Assume a tilt angle of 30 degrees for the solar panels  
        surface_azimuth=180,  # Assume the panels are facing south  
        solar_zenith=solar_position['apparent_zenith'].values[0],  # Use the calculated solar zenith angle  
        solar_azimuth=solar_position['azimuth'].values[0],  # Use the calculated solar azimuth angle  
        dni=1000,  # Assume direct normal irradiance of 1000 W/m^2  
        ghi=800,   # Assume global horizontal irradiance of 800 W/m^2  
        dhi=200    # Assume diffuse horizontal irradiance of 200 W/m^2  
    )['poa_global']  # Get the plane-of-array global irradiance  
​  
    # Make sure solar power is within the valid range  
    solar_power = max(0, min(solar_power, max_solar_power))  
    return solar_power  
​  
def get_user_profiles(hour):  
    # Determine the active user profiles based on the time of day  
    return {user_id: profile for user_id, profile in user_profiles.items() if any(start <= hour < end for start, end in profile["time_of_use"])}  
​  
def simulate_ac_power_consumption(profile, time_interval):  
    # Simulate AC power consumption based on user equipment usage patterns  
    equipment_power = sum(globals()[equipment.lower() + "_power"] for equipment in profile["equipment"])  
    return equipment_power * time_interval  # Power consumption in watts  
​  
def simulate_battery_behavior(solar_power, load_power, time_interval, temperature, humidity):  
    global battery_soc, battery_soh, capacity  
​  
    # Calculate net power: positive for charging, negative for discharging  
    net_power = solar_power - load_power  # Power in watts  
​  
    # Update battery state of charge (SoC)  
    delta_soc = net_power * time_interval / (capacity * get_battery_voltage(battery_soc))  # SoC change  
    battery_soc = min(max(battery_soc + delta_soc, 0), 1)  # Keep SoC between 0 and 1  
​  
    # Calculate battery aging based on the aging rate  
    battery_soh -= aging_rate * time_interval / (24 * 60)  # Aging rate per day  
​  
    # Ensure the battery state of health (SoH) stays within the valid range  
    battery_soh = max(battery_soh, 0)  
​  
    # Update the battery capacity based on the SoH  
    capacity = 3600 * battery_soh  # Update capacity based on SoH  
​  
    # If battery life is over, replace with a new one  
    if battery_soh <= 0:  
        battery_soh = 1.0  
​  
def check_safety_measures():  
    # Check safety measures and return error codes  
    return {  
        "low_battery_warning": bool(battery_soc < low_battery_warning_threshold),  
        "overcharge_protection": bool(battery_soc > overcharge_threshold),  
        # Simplified current based on power = voltage * current  
        "overcurrent_protection": bool((simulate_solar_power() - sum(simulate_ac_power_consumption(profile, 1) for profile in get_user_profiles(time.localtime().tm_hour).values())) / get_battery_voltage(battery_soc) > overcurrent_threshold),  
        "overvoltage_protection": bool(get_battery_voltage(battery_soc) > overvoltage_threshold),  
    }  
​  
# Main simulation loop  
start_time = time.time()  # Get the current time  
for i in range(24 * 60):  # Simulate for 24 hours in 1-minute time steps  
    current_time = time.localtime(start_time + i * 60)  # Get the current time  
​  
    # Get current temperature and humidity  
    temperature, humidity = get_temperature_and_humidity(current_time.tm_hour)  
​  
    # Simulate solar charging if it's daytime  
    solar_power = simulate_solar_power()  
​  
    # Get current user profiles and simulate AC power consumption regardless of daytime or nighttime  
    active_user_profiles = get_user_profiles(current_time.tm_hour)  
    load_power = sum(simulate_ac_power_consumption(profile, 1) for profile in active_user_profiles.values())  # 1-minute time interval  
​  
    # Simulate battery behavior  
    simulate_battery_behavior(solar_power, load_power, 1, temperature, humidity)  # 1-minute time interval  
​  
    # Check safety measures  
    safety_measures = check_safety_measures()  
​  
    # Prepare data payload  
    payload = {  
    "battery_id": battery_id,  # Add battery ID to payload  
    "location": {  
        "lat": 35.173008,  # Directly include latitude  
        "lon": 129.127909,  # Directly include longitude  
    },  
    "battery_data": {  
        "battery_voltage": get_battery_voltage(battery_soc),  
        "battery_current": solar_power / get_battery_voltage(battery_soc) if solar_power > 0 else load_power / get_battery_voltage(battery_soc),  
        "battery_soc": battery_soc,  
        "battery_soh": battery_soh,  
        "battery_temperature": temperature,  # Update battery temperature  
        "battery_humidity": humidity,  # Update battery humidity  
    },  
    "solar_charging_data": {  
        "solar_voltage": solar_power / max_solar_current if solar_power > 0 else 0,  
        "solar_current": max_solar_current if solar_power > 0 else 0,  
        "solar_power": solar_power,  
    },  
    "ac_power_consumption_data": {  
        "ac_power_consumption": load_power,  
    },  
    "error_codes": safety_measures  
}  
    # Display the data payload as a JSON string every 1 minute  
    if i % 12 == 0:  # Print every 12 iterations, which corresponds to every 1 minute  
        try:  
            mqttc.publish(TOPIC, json.dumps(payload), 0)  
            print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), " : Message published. Data:" + json.dumps(payload) + "\n")  
        except: print(f"Failed to publish message: {e}")  
​  
    # Wait for the next time step  
    time.sleep(5)  # Wait for 5 seconds  
​  
# Disconnect from AWS IoT Core  
mqttc.disconnect()
  • AWS IoT Core 설정:
    • ENDPOINT, CLIENT_ID, TOPIC, PORT와 같은 AWS IoT Core 관련 설정을 정의합니다. 이 정보는 AWS IoT Core에 연결하기 위해 사용됩니다.
    • AWS IoT Core에 연결하는 MQTT 클라이언트를 생성하고 설정합니다.
  • 배터리 및 태양광 설정:
    • 가상 배터리 및 태양광 관련 매개변수를 정의합니다. 이러한 설정은 시뮬레이션의 기본 매개변수를 설정합니다. 배터리 ID, 용량, 정격 전압, 초기 SoC (상태 of Charge), 초기 SoH (상태 of Health), 배터리의 노화 속도 등이 여기에 포함됩니다.
  • 사용자 프로필 및 환경 시뮬레이션:
    • generate_user_profiles 함수를 사용하여 가상 사용자 프로필을 생성합니다. 각 사용자는 TV, 조명, 냉장고와 같은 가정용 전기 장치를 사용하며, 사용 시간은 무작위로 설정됩니다.
    • get_temperature_and_humidity 함수를 사용하여 시간대별로 온도와 습도를 시뮬레이션합니다.
  • 태양광 발전 시뮬레이션:
    • simulate_solar_power 함수를 사용하여 현재 시간에 따른 태양광 발전량을 시뮬레이션합니다. 이는 태양광 패널의 위치 및 기타 환경 변수를 고려하여 계산됩니다.
  • 사용자 프로필 가져오기:
    • 현재 시간을 기준으로 사용자 프로필을 가져오는 get_user_profiles 함수를 사용합니다. 이는 사용자가 어떤 장치를 사용하고 언제 사용하는지를 정의합니다.
  • 전력 소비 시뮬레이션:
    • simulate_ac_power_consumption 함수를 사용하여 사용자 프로필을 기반으로 AC 전력 소비를 시뮬레이션합니다.
  • 배터리 동작 시뮬레이션:
    • simulate_battery_behavior 함수를 사용하여 배터리 동작을 시뮬레이션합니다. 이 함수에서는 배터리의 충전 및 방전 동작을 모의하며 배터리의 상태를 업데이트합니다.
  • 안전 조치 확인:
    • check_safety_measures 함수를 사용하여 배터리의 안전 조치를 확인하고 오류 코드를 생성합니다. 이러한 조치에는 배터리의 저전압 경고, 과충전 방지, 과전류 방지, 과전압 방지 등이 포함됩니다.
  • 데이터 페이로드 생성:
    • 배터리, 태양광, AC 전력 소비, 위치 정보 및 안전 조치 관련 데이터를 포함하는 JSON 형식의 데이터 페이로드를 생성합니다. 이 데이터는 AWS IoT Core로 전송됩니다.
  • 데이터 게시 및 시뮬레이션 루프:
    • mqttc.publish를 사용하여 데이터 페이로드를 AWS IoT Core의 특정 주제(TOPIC)로 게시합니다. 이 작업은 시뮬레이션 루프 내에서 1분마다 수행됩니다.
    • 전체 시뮬레이션은 24시간(1일) 동안 진행되며, 1분 간격으로 데이터를 게시하고 새로운 데이터를 생성합니다.
  • AWS IoT Core 연결 해제:
    • 시뮬레이션이 완료되면 AWS IoT Core와의 연결을 해제합니다.

IoT Core 연동

AWS 콘솔에 로그인하고 AWS 서비스 검색 창에서 IoT Core를 선택하여 AWS IoT 대시보드에 액세스할 수 있는지 확인합니다.

AWS IoT 관리 메뉴에서 관리 -> 모든 디바이스 -> 사물을 클릭합니다.

사물 생성 항목에서 단일 사물 생성을 선택하고 다음으로 넘어갑니다.

사물 속성 지정 항목에서 사물의 이름을 지정하고 하단은 기본값으로 두고 넘어갑니다.

이제 인증서의 정책을 생성하여 수신받을 TOPIC을 제한하는 항목입니다. 보안을 위해서는 본인이 받아야 하는 TOPIC만 지정해야 하지만 이번 시간에서는 모든 TOPIC을 수신받을 수 있도록 지정하겠습니다.

정책 생성을 클릭하면 새로운 탭에 하단과 같은 화면이 표시됩니다. 정책 속성 부분에서 정책 이름과 정책 내용을 작성하겠습니다.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "iot:Publish",
        "iot:Subscribe",
        "iot:Connect",
        "iot:Receive"
      ],
      "Resource": [
        "*"
      ]
    }
  ]
}

NOTI

  • 만약 위의 방식대로 하지 않고, 디바이스 한 개 연결을 통해 생성하였다면, 인증서 정책에 수신 받는 TOPIC이 지정되어 있으므로, 정책 설정 부분에 JSON을 수정하여 우리가 받을 TOPIC을 지정해 주어야 합니다.

인증서 정책이 성공적으로 생성되었다면 정책 연결 화면에 방금전에 생성된 정책을 추가할 수 있습니다. 이후 사물 생성을 클릭하여 다음으로 넘어갑니다.

이제 생성한 사물에 대한 인증서를 다운로드 받을 수 있습니다. 해당 부분에서만 인증서를 다운받을 수 있기 때문에 반드시 다운로드 받습니다.

  • Device Certificate
  • Public key file
  • Private key file
  • Amazon Root CA1

사물 설정이 완료 되었습니다. 작성한 코드를 열고 cert 폴더를 생성합니다.
사용할 인증서는AmazonRootCA1.pem, private.pem.key, certificate.pem.crt 이므로 적절하게 이름을 변경합니다.

코드를 실행하기 이전 AWSIoTPythonSDK 설치를 진행해야 합니다.

git clone https://github.com/aws/aws-iot-device-sdk-python.git
cd aws-iot-device-sdk-python
python3 setup.py install

이제 코드를 실행하고 AWS IoT -> 테스트 -> MQTT 테스트 클라이언트 항목 -> 주제 구독 -> 주제 필터에 지정한 TOPIC를 입력하고 구독을 진행하면 1분 간격으로 데이터가 수신되는 모습을 확인할 수 있습니다.

Comments