from collections import defaultdict
from copy import deepcopy
from enum import Enum
import hashlib
import importlib
import logging
import os
from pathlib import Path
from typing import Any, List, Mapping, Tuple, Union
from gymnasium import Env, spaces
import csv
import datetime
import numpy as np
import pandas as pd
import random
from citylearn.base import Environment, EpisodeTracker
from citylearn.building import Building, DynamicsBuilding
from citylearn.cost_function import CostFunction
from citylearn.data import CarbonIntensity, DataSet, ChargerSimulation, EnergySimulation, LogisticRegressionOccupantParameters, Pricing, WashingMachineSimulation, Weather
from citylearn.electric_vehicle import ElectricVehicle
from citylearn.energy_model import Battery, PV, WashingMachine
from citylearn.reward_function import MultiBuildingRewardFunction, RewardFunction
from citylearn.utilities import FileHandler
LOGGER = logging.getLogger()
logging.getLogger('matplotlib.font_manager').disabled = True
logging.getLogger('matplotlib.pyplot').disabled = True
[docs]
class EvaluationCondition(Enum):
"""Evaluation conditions.
Used in `citylearn.CityLearnEnv.calculate` method.
"""
# general (soft private)
_DEFAULT = ''
_STORAGE_SUFFIX = '_without_storage'
_PARTIAL_LOAD_SUFFIX = '_and_partial_load'
_PV_SUFFIX = '_and_pv'
# Building type
WITH_STORAGE_AND_PV = _DEFAULT
WITHOUT_STORAGE_BUT_WITH_PV = _STORAGE_SUFFIX
WITHOUT_STORAGE_AND_PV = WITHOUT_STORAGE_BUT_WITH_PV +_PV_SUFFIX
# DynamicsBuilding type
WITH_STORAGE_AND_PARTIAL_LOAD_AND_PV = WITH_STORAGE_AND_PV
WITHOUT_STORAGE_BUT_WITH_PARTIAL_LOAD_AND_PV = WITHOUT_STORAGE_BUT_WITH_PV
WITHOUT_STORAGE_AND_PARTIAL_LOAD_BUT_WITH_PV = WITHOUT_STORAGE_BUT_WITH_PARTIAL_LOAD_AND_PV + _PARTIAL_LOAD_SUFFIX
WITHOUT_STORAGE_AND_PARTIAL_LOAD_AND_PV = WITHOUT_STORAGE_AND_PARTIAL_LOAD_BUT_WITH_PV + _PV_SUFFIX
[docs]
class CityLearnEnv(Environment, Env):
r"""CityLearn nvironment class.
Parameters
----------
schema: Union[str, Path, Mapping[str, Any]]
Name of CityLearn data set, filepath to JSON representation or :code:`dict` object of a CityLearn schema.
Call :py:meth:`citylearn.data.DataSet.get_names` for list of available CityLearn data sets.
root_directory: Union[str, Path]
Absolute path to directory that contains the data files including the schema.
buildings: Union[List[Building], List[str], List[int]], optional
Buildings to include in environment. If list of :code:`citylearn.building.Building` is provided, will override :code:`buildings` definition in schema.
If list of :str: is provided will include only schema :code:`buildings` keys that are contained in provided list of :code:`str`.
If list of :int: is provided will include only schema :code:`buildings` whose index is contained in provided list of :code:`int`.
simulation_start_time_step: int, optional
Time step to start reading data files contents.
simulation_end_time_step: int, optional
Time step to end reading from data files contents.
episode_time_steps: Union[int, List[Tuple[int, int]]], optional
If type is `int`, it is the number of time steps in an episode. If type is `List[Tuple[int, int]]]` is provided,
it is a list of episode start and end time steps between `simulation_start_time_step` and `simulation_end_time_step`.
Defaults to (`simulation_end_time_step` - `simulation_start_time_step`) + 1. Will ignore `rolling_episode_split` if `episode_splits` is of type `List[Tuple[int, int]]]`.
rolling_episode_split: bool, default: False
True if episode sequences are split such that each time step is a candidate for `episode_start_time_step` otherwise, False to split episodes in steps of `episode_time_steps`.
random_episode_split: bool, default: False
True if episode splits are to be selected at random during training otherwise, False to select sequentially.
seconds_per_time_step: float
Number of seconds in 1 `time_step` and must be set to >= 1.
reward_function: Union[RewardFunction, str], optional
Reward function class instance or path to function class e.g. 'citylearn.reward_function.IndependentSACReward'.
If provided, will override :code:`reward_function` definition in schema.
reward_function_kwargs: Mapping[str, Any], optional
Parameters to be parsed to :py:attr:`reward_function` at intialization.
central_agent: bool, optional
Expect 1 central agent to control all buildings.
shared_observations: List[str], optional
Names of common observations across all buildings i.e. observations that have the same value irrespective of the building.
active_observations: Union[List[str], List[List[str]]], optional
List of observations to be made available in the buildings. Can be specified for all buildings in a :code:`List[str]` or for
each building independently in a :code:`List[List[str]]`. Will override the observations defined in the :code:`schema`.
inactive_observations: Union[List[str], List[List[str]]], optional
List of observations to be made unavailable in the buildings. Can be specified for all buildings in a :code:`List[str]` or for
each building independently in a :code:`List[List[str]]`. Will override the observations defined in the :code:`schema`.
active_actions: Union[List[str], List[List[str]]], optional
List of actions to be made available in the buildings. Can be specified for all buildings in a :code:`List[str]` or for
each building independently in a :code:`List[List[str]]`. Will override the actions defined in the :code:`schema`.
inactive_actions: Union[List[str], List[List[str]]], optional
List of actions to be made unavailable in the buildings. Can be specified for all buildings in a :code:`List[str]` or for
each building independently in a :code:`List[List[str]]`. Will override the actions defined in the :code:`schema`.
simulate_power_outage: Union[bool, List[bool]]
Whether to simulate power outages. Can be specified for all buildings as single :code:`bool` or for
each building independently in a :code:`List[bool]`. Will override power outage defined in the :code:`schema`.
solar_generation: Union[bool, List[bool]]
Wehther to allow solar generation. Can be specified for all buildings as single :code:`bool` or for
each building independently in a :code:`List[bool]`. Will override :code:`pv` defined in the :code:`schema`.
random_seed: int, optional
Pseudorandom number generator seed for repeatable results.
Other Parameters
----------------
render_directory: Union[str, Path], optional
Base directory where rendering and export artifacts are stored. Relative paths are resolved from the project root.
render_directory_name: str, optional
Folder name created inside the project root for rendering and export artifacts when ``render_directory`` is not provided.
Defaults to ``render_logs``.
render_session_name: str, optional
Name of the subfolder created under ``render_directory``/``render_directory_name`` for export artifacts. When omitted,
a timestamp is used.
render_mode: str, optional
Rendering strategy. Accepted values are ``'none'`` (default), ``'during'`` for streaming exports each step, and
``'end'`` for exports performed at episode completion while still allowing manual snapshots via :meth:`render`.
**kwargs : dict
Other keyword arguments used to initialize super classes.
Notes
-----
Parameters passed to `citylearn.citylearn.CityLearnEnv.__init__` that are also defined in `schema` will override their `schema` definition.
"""
DEFAULT_RENDER_START_DATE = datetime.date(2024, 1, 1)
def __init__(self,
schema: Union[str, Path, Mapping[str, Any]], root_directory: Union[str, Path] = None, buildings: Union[List[Building], List[str], List[int]] = None,
electric_vehicles: Union[List[ElectricVehicle], List[str], List[int]] = None,
simulation_start_time_step: int = None, simulation_end_time_step: int = None, episode_time_steps: Union[int, List[Tuple[int, int]]] = None, rolling_episode_split: bool = None,
random_episode_split: bool = None, seconds_per_time_step: float = None, reward_function: Union[RewardFunction, str] = None, reward_function_kwargs: Mapping[str, Any] = None,
central_agent: bool = None, shared_observations: List[str] = None, active_observations: Union[List[str], List[List[str]]] = None,
inactive_observations: Union[List[str], List[List[str]]] = None, active_actions: Union[List[str], List[List[str]]] = None,
inactive_actions: Union[List[str], List[List[str]]] = None, simulate_power_outage: bool = None, solar_generation: bool = None, random_seed: int = None, time_step_ratio: int = None,
start_date: Union[str, datetime.date] = None, render_session_name: str = None, render_mode: str = 'none', **kwargs: Any
):
render_directory = kwargs.pop('render_directory', None)
render_directory_name = kwargs.pop('render_directory_name', 'render_logs')
render_flag = kwargs.pop('render', None)
kw_render_mode = kwargs.pop('render_mode', None)
requested_render_mode = render_mode if kw_render_mode is None else kw_render_mode
requested_render_mode = 'none' if requested_render_mode is None else str(requested_render_mode).lower()
kw_render_session_name = kwargs.pop('render_session_name', None)
if kw_render_session_name is not None:
render_session_name = kw_render_session_name if render_session_name is None else render_session_name
self.schema = schema
schema_start_date = self.schema.get('start_date') if isinstance(self.schema, dict) else None
schema_render_mode = self.schema.get('render_mode') if isinstance(self.schema, dict) else None
if schema_render_mode is not None:
requested_render_mode = str(schema_render_mode).lower()
if requested_render_mode not in {'none', 'during', 'end'}:
raise ValueError("render_mode must be one of {'none', 'during', 'end'}.")
self.render_mode = requested_render_mode
self._buffer_render = self.render_mode == 'end'
self._defer_render_flush = False
self._render_buffer = defaultdict(list)
self._render_start_date = self._parse_render_start_date(start_date if start_date is not None else schema_start_date)
self.previous_month = None
self.current_day = self._render_start_date.day
self.year = self._render_start_date.year
self._final_kpis_exported = False
self.__rewards = None
self.buildings = []
self.random_seed = self.schema.get('random_seed', None) if random_seed is None else random_seed
schema_render_session = self.schema.get('render_session_name') if isinstance(self.schema, dict) else None
self.render_session_name = render_session_name if render_session_name is not None else schema_render_session
if self.render_session_name is not None:
self.render_session_name = str(self.render_session_name).strip()
if self.render_session_name == '':
self.render_session_name = None
elif Path(self.render_session_name).is_absolute():
raise ValueError('render_session_name must be a relative path. Use render_directory to choose an absolute location.')
elif '..' in Path(self.render_session_name).parts:
raise ValueError('render_session_name cannot contain parent directory references (“..”).')
root_directory, buildings, electric_vehicles, episode_time_steps, rolling_episode_split, random_episode_split, \
seconds_per_time_step, reward_function, central_agent, shared_observations, episode_tracker = self._load(
deepcopy(self.schema),
root_directory=root_directory,
buildings=buildings,
electric_vehicles=electric_vehicles,
simulation_start_time_step=simulation_start_time_step,
simulation_end_time_step=simulation_end_time_step,
episode_time_steps=episode_time_steps,
rolling_episode_split=rolling_episode_split,
random_episode=random_episode_split,
seconds_per_time_step=seconds_per_time_step,
time_step_ratio=time_step_ratio,
reward_function=reward_function,
reward_function_kwargs=reward_function_kwargs,
central_agent=central_agent,
shared_observations=shared_observations,
active_observations=active_observations,
inactive_observations=inactive_observations,
active_actions=active_actions,
inactive_actions=inactive_actions,
simulate_power_outage=simulate_power_outage,
solar_generation=solar_generation,
random_seed=self.random_seed,
)
self.root_directory = root_directory
self.buildings = buildings
self.electric_vehicles = electric_vehicles
get_time_step_ratio = buildings[0].time_step_ratio if len(buildings) > 0 else 1.0
self.time_step_ratio = get_time_step_ratio
# now call super class initialization and set episode tracker now that buildings are set
super().__init__(seconds_per_time_step=seconds_per_time_step, random_seed=self.random_seed, episode_tracker=episode_tracker, time_step_ratio=self.time_step_ratio)
# set other class variables
self.episode_time_steps = episode_time_steps
self.rolling_episode_split = rolling_episode_split
self.random_episode_split = random_episode_split
self.central_agent = central_agent
self.shared_observations = shared_observations
# set reward function
self.reward_function = reward_function
# rendering switch: schema['render'] overrides explicit flag, otherwise rely on render_mode defaults
schema_render = self.schema.get('render', None) if isinstance(self.schema, dict) else None
if schema_render is not None:
render_enabled_flag = bool(schema_render)
elif render_flag is not None:
render_enabled_flag = bool(render_flag)
else:
render_enabled_flag = self.render_mode in {'during', 'end'}
self.render_enabled = render_enabled_flag
# reset environment and initializes episode time steps
self.reset()
# reset episode tracker to start after initializing episode time steps during reset
self.episode_tracker.reset_episode_index()
# set reward metadata
self.reward_function.env_metadata = self.get_metadata()
# reward history tracker
self.__episode_rewards = []
# reward history tracker
if self.root_directory is None:
self.root_directory = os.path.dirname(os.path.abspath(__file__))
project_root = Path(__file__).resolve().parents[1]
render_directory_name = render_directory_name or 'render_logs'
if render_directory is not None:
render_root = Path(render_directory).expanduser()
if not render_root.is_absolute():
render_root = project_root / render_root
else:
render_root = project_root / render_directory_name
self.render_output_root = render_root.expanduser().resolve()
self._render_timestamp = None
self._render_directory_path = None
self._render_dir_initialized = False
self.new_folder_path = None
self._render_start_datetime = None
if self.render_enabled:
self._ensure_render_output_dir(ensure_exists=False)
@property
def render_start_date(self) -> datetime.date:
"""Date used as the origin for rendered timestamps."""
return self._render_start_date
@property
def schema(self) -> Mapping[str, Any]:
"""`dict` object of CityLearn schema."""
return self.__schema
@property
def render_enabled(self) -> bool:
"""Whether environment rendering/logging is enabled."""
return getattr(self, '_CityLearnEnv__render_enabled', False)
@property
def root_directory(self) -> Union[str, Path]:
"""Absolute path to directory that contains the data files including the schema."""
return self.__root_directory
@property
def buildings(self) -> List[Building]:
"""Buildings in CityLearn environment."""
return self.__buildings
@property
def electric_vehicles(self) -> List[ElectricVehicle]:
"""Electric Vehicles in CityLearn environment."""
return self.__electric_vehicles
@property
def time_steps(self) -> int:
"""Number of time steps in current episode split."""
return self.episode_tracker.episode_time_steps
@property
def episode_time_steps(self) -> Union[int, List[Tuple[int, int]]]:
"""If type is `int`, it is the number of time steps in an episode. If type is `List[Tuple[int, int]]]` is provided, it is a list of
episode start and end time steps between `simulation_start_time_step` and `simulation_end_time_step`. Defaults to (`simulation_end_time_step`
- `simulation_start_time_step`) + 1. Will ignore `rolling_episode_split` if `episode_splits` is of type `List[Tuple[int, int]]]`."""
return self.__episode_time_steps
@property
def rolling_episode_split(self) -> bool:
"""True if episode sequences are split such that each time step is a candidate for `episode_start_time_step` otherwise,
False to split episodes in steps of `episode_time_steps`."""
return self.__rolling_episode_split
@property
def random_episode_split(self) -> bool:
"""True if episode splits are to be selected at random during training otherwise, False to select sequentially."""
return self.__random_episode_split
@property
def episode(self) -> int:
"""Current episode index."""
return self.episode_tracker.episode
@property
def reward_function(self) -> RewardFunction:
"""Reward function class instance."""
return self.__reward_function
@property
def rewards(self) -> List[List[float]]:
"""Reward time series"""
return self.__rewards
@property
def episode_rewards(self) -> List[Mapping[str, Union[float, List[float]]]]:
"""Reward summary statistics for elapsed episodes."""
return self.__episode_rewards
@property
def central_agent(self) -> bool:
"""Expect 1 central agent to control all buildings."""
return self.__central_agent
@property
def shared_observations(self) -> List[str]:
"""Names of common observations across all buildings i.e. observations that have the same value irrespective of the building."""
return self.__shared_observations
@property
def terminated(self) -> bool:
"""Check if simulation has reached completion."""
return self.time_step == self.time_steps - 1
@property
def truncated(self) -> bool:
"""Check if episode truncates due to a time limit or a reason that is not defined as part of the task MDP."""
return False
@property
def observation_space(self) -> List[spaces.Box]:
"""Controller(s) observation spaces.
Returns
-------
observation_space : List[spaces.Box]
List of agent(s) observation spaces.
Notes
-----
If `central_agent` is True, a list of 1 `spaces.Box` object is returned that contains all buildings' limits with the limits in the same order as `buildings`.
The `shared_observations` limits are only included in the first building's limits. If `central_agent` is False, a list of `space.Box` objects as
many as `buildings` is returned in the same order as `buildings`.
"""
if self.central_agent:
low_limit = []
high_limit = []
shared_observations = []
for i, b in enumerate(self.buildings):
for l, h, s in zip(b.observation_space.low, b.observation_space.high, b.active_observations):
if i == 0 or s not in self.shared_observations or s not in shared_observations:
low_limit.append(l)
high_limit.append(h)
else:
pass
if s in self.shared_observations and s not in shared_observations:
shared_observations.append(s)
else:
pass
observation_space = [spaces.Box(low=np.array(low_limit), high=np.array(high_limit), dtype=np.float32)]
else:
observation_space = [b.observation_space for b in self.buildings]
return observation_space
@property
def action_space(self) -> List[spaces.Box]:
"""Controller(s) action spaces.
Returns
-------
action_space : List[spaces.Box]
List of agent(s) action spaces.
Notes
-----
If `central_agent` is True, a list of 1 `spaces.Box` object is returned that contains all buildings' limits with the limits in the same order as `buildings`.
If `central_agent` is False, a list of `space.Box` objects as many as `buildings` is returned in the same order as `buildings`.
"""
if self.central_agent:
low_limit = [v for b in self.buildings for v in b.action_space.low]
high_limit = [v for b in self.buildings for v in b.action_space.high]
action_space = [spaces.Box(low=np.array(low_limit), high=np.array(high_limit), dtype=np.float32)]
else:
action_space = [b.action_space for b in self.buildings]
return action_space
@property
def observations(self) -> List[List[float]]:
"""Observations at current time step.
Notes
-----
If `central_agent` is True, a list of 1 sublist containing all building observation values is returned in the same order as `buildings`.
The `shared_observations` values are only included in the first building's observation values. If `central_agent` is False, a list of sublists
is returned where each sublist is a list of 1 building's observation values and the sublist in the same order as `buildings`.
"""
if self.central_agent:
observations = []
shared_observations = []
for i, b in enumerate(self.buildings):
for k, v in b.observations(normalize=False, periodic_normalization=False, check_limits=True).items():
if i == 0 or k not in self.shared_observations or k not in shared_observations:
observations.append(v)
else:
pass
if k in self.shared_observations and k not in shared_observations:
shared_observations.append(k)
else:
pass
observations = [observations]
else:
observations = [list(b.observations(normalize=False, periodic_normalization=False, check_limits=True).values()) for b in self.buildings]
return observations
@property
def observation_names(self) -> List[List[str]]:
"""Names of returned observations.
Notes
-----
If `central_agent` is True, a list of 1 sublist containing all building observation names is returned in the same order as `buildings`.
The `shared_observations` names are only included in the first building's observation names. If `central_agent` is False, a list of sublists
is returned where each sublist is a list of 1 building's observation names and the sublist in the same order as `buildings`.
"""
if self.central_agent:
observation_names = []
for i, b in enumerate(self.buildings):
for k, _ in b.observations(normalize=False, periodic_normalization=False).items():
if i == 0 or k not in self.shared_observations or k not in observation_names:
observation_names.append(k)
else:
pass
observation_names = [observation_names]
else:
observation_names = [list(b.observations().keys()) for b in self.buildings]
return observation_names
@property
def action_names(self) -> List[List[str]]:
"""Names of received actions.
Notes
-----
If `central_agent` is True, a list of 1 sublist containing all building action names is returned in the same order as `buildings`.
If `central_agent` is False, a list of sublists is returned where each sublist is a list of 1 building's action names and the sublist
in the same order as `buildings`.
"""
if self.central_agent:
action_names = []
for b in self.buildings:
action_names += b.active_actions
action_names = [action_names]
else:
action_names = [b.active_actions for b in self.buildings]
return action_names
@property
def net_electricity_consumption_emission_without_storage_and_partial_load_and_pv(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_emission_without_storage_and_partial_load_and_pv` time series, in [kg_co2]."""
return pd.DataFrame([
b.net_electricity_consumption_emission_without_storage_and_partial_load_and_pv
if isinstance(b, DynamicsBuilding) else b.net_electricity_consumption_emission_without_storage_and_pv
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_cost_without_storage_and_partial_load_and_pv(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_cost_without_storage_and_partial_load_and_pv` time series, in [$]."""
return pd.DataFrame([
b.net_electricity_consumption_cost_without_storage_and_partial_load_and_pv
if isinstance(b, DynamicsBuilding) else b.net_electricity_consumption_cost_without_storage_and_pv
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_without_storage_and_partial_load_and_pv(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_without_storage_and_partial_load_and_pv` time series, in [kWh]."""
return pd.DataFrame([
b.net_electricity_consumption_without_storage_and_partial_load_and_pv
if isinstance(b, DynamicsBuilding) else b.net_electricity_consumption_without_storage_and_pv
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_emission_without_storage_and_partial_load(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_emission_without_storage_and_partial_load` time series, in [kg_co2]."""
return pd.DataFrame([
b.net_electricity_consumption_emission_without_storage_and_partial_load
if isinstance(b, DynamicsBuilding) else b.net_electricity_consumption_emission_without_storage
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_cost_without_storage_and_partial_load(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_cost_without_storage_and_partial_load` time series, in [$]."""
return pd.DataFrame([
b.net_electricity_consumption_cost_without_storage_and_partial_load
if isinstance(b, DynamicsBuilding) else b.net_electricity_consumption_cost_without_storage
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_without_storage_and_partial_load(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_without_storage_and_partial_load` time series, in [kWh]."""
return pd.DataFrame([
b.net_electricity_consumption_without_storage_and_partial_load
if isinstance(b, DynamicsBuilding) else b.net_electricity_consumption_without_storage
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_emission_without_storage_and_pv(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_emission_without_storage_and_pv` time series, in [kg_co2]."""
return pd.DataFrame([
b.net_electricity_consumption_emission_without_storage_and_pv
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_cost_without_storage_and_pv(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_cost_without_storage_and_pv` time series, in [$]."""
return pd.DataFrame([
b.net_electricity_consumption_cost_without_storage_and_pv
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_without_storage_and_pv(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_without_storage_and_pv` time series, in [kWh]."""
return pd.DataFrame([
b.net_electricity_consumption_without_storage_and_pv
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_emission_without_storage(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_emission_without_storage` time series, in [kg_co2]."""
return pd.DataFrame([
b.net_electricity_consumption_emission_without_storage
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_cost_without_storage(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_cost_without_storage` time series, in [$]."""
return pd.DataFrame([
b.net_electricity_consumption_cost_without_storage
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_without_storage(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_without_storage` time series, in [kWh]."""
return pd.DataFrame([
b.net_electricity_consumption_without_storage
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_emission_without_storage(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_emission_without_storage` time series, in [kg_co2]."""
return pd.DataFrame([
b.net_electricity_consumption_emission_without_storage
for b in self.buildings
]).sum(axis = 0, min_count = 1).tolist()
@property
def net_electricity_consumption_cost_without_storage(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_cost_without_storage` time series, in [$]."""
return pd.DataFrame([
b.net_electricity_consumption_cost_without_storage
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_without_storage(self) -> np.ndarray:
"""Summed `Building.net_electricity_consumption_without_storage` time series, in [kWh]."""
return pd.DataFrame([
b.net_electricity_consumption_without_storage
for b in self.buildings
]).sum(axis = 0, min_count = 1).to_numpy()
@property
def net_electricity_consumption_emission(self) -> List[float]:
"""Summed `Building.net_electricity_consumption_emission` time series, in [kg_co2]."""
return self.__net_electricity_consumption_emission
@property
def net_electricity_consumption_cost(self) -> List[float]:
"""Summed `Building.net_electricity_consumption_cost` time series, in [$]."""
return self.__net_electricity_consumption_cost
@property
def net_electricity_consumption(self) -> List[float]:
"""Summed `Building.net_electricity_consumption` time series, in [kWh]."""
return self.__net_electricity_consumption
@property
def cooling_electricity_consumption(self) -> np.ndarray:
"""Summed `Building.cooling_electricity_consumption` time series, in [kWh]."""
return pd.DataFrame([b.cooling_electricity_consumption for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def heating_electricity_consumption(self) -> np.ndarray:
"""Summed `Building.heating_electricity_consumption` time series, in [kWh]."""
return pd.DataFrame([b.heating_electricity_consumption for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def dhw_electricity_consumption(self) -> np.ndarray:
"""Summed `Building.dhw_electricity_consumption` time series, in [kWh]."""
return pd.DataFrame([b.dhw_electricity_consumption for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def cooling_storage_electricity_consumption(self) -> np.ndarray:
"""Summed `Building.cooling_storage_electricity_consumption` time series, in [kWh]."""
return pd.DataFrame([b.cooling_storage_electricity_consumption for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def heating_storage_electricity_consumption(self) -> np.ndarray:
"""Summed `Building.heating_storage_electricity_consumption` time series, in [kWh]."""
return pd.DataFrame([b.heating_storage_electricity_consumption for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def dhw_storage_electricity_consumption(self) -> np.ndarray:
"""Summed `Building.dhw_storage_electricity_consumption` time series, in [kWh]."""
return pd.DataFrame([b.dhw_storage_electricity_consumption for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def electrical_storage_electricity_consumption(self) -> np.ndarray:
"""Summed `Building.electrical_storage_electricity_consumption` time series, in [kWh]."""
return pd.DataFrame([b.electrical_storage_electricity_consumption for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def energy_from_cooling_device_to_cooling_storage(self) -> np.ndarray:
"""Summed `Building.energy_from_cooling_device_to_cooling_storage` time series, in [kWh]."""
return pd.DataFrame([b.energy_from_cooling_device_to_cooling_storage for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def energy_from_heating_device_to_heating_storage(self) -> np.ndarray:
"""Summed `Building.energy_from_heating_device_to_heating_storage` time series, in [kWh]."""
return pd.DataFrame([b.energy_from_heating_device_to_heating_storage for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def energy_from_dhw_device_to_dhw_storage(self) -> np.ndarray:
"""Summed `Building.energy_from_dhw_device_to_dhw_storage` time series, in [kWh]."""
return pd.DataFrame([b.energy_from_dhw_device_to_dhw_storage for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def energy_to_electrical_storage(self) -> np.ndarray:
"""Summed `Building.energy_to_electrical_storage` time series, in [kWh]."""
return pd.DataFrame([b.energy_to_electrical_storage for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def energy_from_cooling_device(self) -> np.ndarray:
"""Summed `Building.energy_from_cooling_device` time series, in [kWh]."""
return pd.DataFrame([b.energy_from_cooling_device for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def energy_from_heating_device(self) -> np.ndarray:
"""Summed `Building.energy_from_heating_device` time series, in [kWh]."""
return pd.DataFrame([b.energy_from_heating_device for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def energy_from_dhw_device(self) -> np.ndarray:
"""Summed `Building.energy_from_dhw_device` time series, in [kWh]."""
return pd.DataFrame([b.energy_from_dhw_device for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def energy_to_non_shiftable_load(self) -> np.ndarray:
"""Summed `Building.energy_to_non_shiftable_load` time series, in [kWh]."""
return pd.DataFrame([b.energy_to_non_shiftable_load for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def energy_from_cooling_storage(self) -> np.ndarray:
"""Summed `Building.energy_from_cooling_storage` time series, in [kWh]."""
return pd.DataFrame([b.energy_from_cooling_storage for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def total_self_consumption(self) -> np.ndarray:
"""Total self-consumption from electrical and thermal storage, in [kWh]."""
return (
self.energy_from_electrical_storage +
self.energy_from_cooling_storage +
self.energy_from_heating_storage +
self.energy_from_dhw_storage
)
@property
def energy_from_heating_storage(self) -> np.ndarray:
"""Summed `Building.energy_from_heating_storage` time series, in [kWh]."""
return pd.DataFrame([b.energy_from_heating_storage for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def energy_from_dhw_storage(self) -> np.ndarray:
"""Summed `Building.energy_from_dhw_storage` time series, in [kWh]."""
return pd.DataFrame([b.energy_from_dhw_storage for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def energy_from_electrical_storage(self) -> np.ndarray:
"""Summed `Building.energy_from_electrical_storage` time series, in [kWh]."""
return pd.DataFrame([b.energy_from_electrical_storage for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def cooling_demand(self) -> np.ndarray:
"""Summed `Building.cooling_demand`, in [kWh]."""
return pd.DataFrame([b.cooling_demand for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def heating_demand(self) -> np.ndarray:
"""Summed `Building.heating_demand`, in [kWh]."""
return pd.DataFrame([b.heating_demand for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def dhw_demand(self) -> np.ndarray:
"""Summed `Building.dhw_demand`, in [kWh]."""
return pd.DataFrame([b.dhw_demand for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def non_shiftable_load(self) -> np.ndarray:
"""Summed `Building.non_shiftable_load`, in [kWh]."""
return pd.DataFrame([b.non_shiftable_load for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def solar_generation(self) -> np.ndarray:
"""Summed `Building.solar_generation, in [kWh]`."""
return pd.DataFrame([b.solar_generation for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()
@property
def power_outage(self) -> np.ndarray:
"""Time series of number of buildings experiencing power outage."""
return pd.DataFrame([b.power_outage_signal for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()[:self.time_step + 1]
@schema.setter
def schema(self, schema: Union[str, Path, Mapping[str, Any]]):
dataset = DataSet()
if isinstance(schema, (str, Path)) and os.path.isfile(schema):
schema_filepath = Path(schema) if isinstance(schema, str) else schema
schema = FileHandler.read_json(schema)
schema['root_directory'] = os.path.split(schema_filepath.absolute())[0] if schema['root_directory'] is None \
else schema['root_directory']
elif isinstance(schema, str) and schema in dataset.get_dataset_names():
schema = dataset.get_schema(schema)
schema['root_directory'] = '' if schema['root_directory'] is None else schema['root_directory']
elif isinstance(schema, dict):
schema = deepcopy(schema)
schema['root_directory'] = '' if schema['root_directory'] is None else schema['root_directory']
else:
raise UnknownSchemaError()
self.__schema = schema
@render_enabled.setter
def render_enabled(self, enabled: bool):
self.__render_enabled = bool(enabled)
@root_directory.setter
def root_directory(self, root_directory: Union[str, Path]):
self.__root_directory = root_directory
@buildings.setter
def buildings(self, buildings: List[Building]):
self.__buildings = buildings
@electric_vehicles.setter
def electric_vehicles(self, electric_vehicles: List[ElectricVehicle]):
self.__electric_vehicles = electric_vehicles
@Environment.episode_tracker.setter
def episode_tracker(self, episode_tracker: EpisodeTracker):
Environment.episode_tracker.fset(self, episode_tracker)
for b in self.buildings:
b.episode_tracker = self.episode_tracker
@episode_time_steps.setter
def episode_time_steps(self, episode_time_steps: Union[int, List[Tuple[int, int]]]):
self.__episode_time_steps = self.episode_tracker.simulation_time_steps if episode_time_steps is None else episode_time_steps
@rolling_episode_split.setter
def rolling_episode_split(self, rolling_episode_split: bool):
self.__rolling_episode_split = False if rolling_episode_split is None else rolling_episode_split
@random_episode_split.setter
def random_episode_split(self, random_episode_split: bool):
self.__random_episode_split = False if random_episode_split is None else random_episode_split
@reward_function.setter
def reward_function(self, reward_function: RewardFunction):
self.__reward_function = reward_function
@central_agent.setter
def central_agent(self, central_agent: bool):
self.__central_agent = central_agent
@shared_observations.setter
def shared_observations(self, shared_observations: List[str]):
self.__shared_observations = self.get_default_shared_observations() if shared_observations is None else shared_observations
@Environment.random_seed.setter
def random_seed(self, seed: int):
Environment.random_seed.fset(self, seed)
for b in self.buildings:
b.random_seed = self.random_seed
@Environment.time_step_ratio.setter
def time_step_ratio(self, time_step_ratio: int):
Environment.time_step_ratio.fset(self, time_step_ratio)
for b in self.buildings:
b.time_step_ratio = self.time_step_ratio
[docs]
@staticmethod
def get_default_shared_observations() -> List[str]:
"""Names of default common observations across all buildings i.e. observations that have the same value irrespective of the building.
Notes
-----
May be used to assigned :attr:`shared_observations` value during `CityLearnEnv` object initialization.
"""
return [
'month', 'day_type', 'hour', 'minutes', 'daylight_savings_status',
'outdoor_dry_bulb_temperature', 'outdoor_dry_bulb_temperature_predicted_1',
'outdoor_dry_bulb_temperature_predicted_2', 'outdoor_dry_bulb_temperature_predicted_3',
'outdoor_relative_humidity', 'outdoor_relative_humidity_predicted_1',
'outdoor_relative_humidity_predicted_2', 'outdoor_relative_humidity_predicted_3',
'diffuse_solar_irradiance', 'diffuse_solar_irradiance_predicted_1',
'diffuse_solar_irradiance_predicted_2', 'diffuse_solar_irradiance_predicted_3',
'direct_solar_irradiance', 'direct_solar_irradiance_predicted_1',
'direct_solar_irradiance_predicted_2', 'direct_solar_irradiance_predicted_3',
'carbon_intensity', 'electricity_pricing', 'electricity_pricing_predicted_1',
'electricity_pricing_predicted_2', 'electricity_pricing_predicted_3',
]
[docs]
def step(self, actions: List[List[float]]) -> Tuple[List[List[float]], List[float], bool, bool, dict]:
"""Apply actions at current timestep, update variables/reward, then advance time.
Parameters
----------
actions: List[List[float]]
Fractions of `buildings` storage devices' capacities to charge/discharge by.
If `central_agent` is True, `actions` parameter should be a list of 1 list containing all buildings' actions and follows
the ordering of buildings in `buildings`. If `central_agent` is False, `actions` parameter should be a list of sublists
where each sublists contains the actions for each building in `buildings` and follows the ordering of buildings in `buildings`.
Returns
-------
observations: List[List[float]]
:attr:`observations` current value.
reward: List[float]
:meth:`get_reward` current value.
terminated: bool
A boolean value for if the episode has ended, in which case further :meth:`step` calls will return undefined results.
A done signal may be emitted for different reasons: Maybe the task underlying the environment was solved successfully,
a certain timelimit was exceeded, or the physics simulation has entered an invalid observation.
truncated: bool
A boolean value for if episode truncates due to a time limit or a reason that is not defined as part of the task MDP.
Will always return False in this base class.
info: dict
A dictionary that may contain additional information regarding the reason for a `terminated` signal.
`info` contains auxiliary diagnostic information (helpful for debugging, learning, and logging).
Override :meth"`get_info` to get custom key-value pairs in `info`.
"""
actions = self._parse_actions(actions)
# Apply actions at current timestep t
for building, building_actions in zip(self.buildings, actions):
building.apply_actions(**building_actions)
# Update environment/building variables for timestep t (reflect effects of actions)
self.update_variables()
# NOTE:
# This call to retrieve each building's observation dictionary is an expensive call especially since the observations
# are retrieved again to send to agent but the observations in dict form is needed for the reward function to easily
# extract building-level values. Can't think of a better way to handle this without giving the reward direct access to
# env, which is not the best design for competition integrity sake. Will revisit the building.observations() function
# to see how it can be optimized.
reward_observations = [b.observations(include_all=True, normalize=False, periodic_normalization=False) for b in self.buildings]
reward = self.reward_function.calculate(observations=reward_observations)
self.__rewards.append(reward)
# Advance to next timestep t+1
self.next_time_step()
# store episode reward summary at the end of episode (upon reaching final timestep)
if self.terminated:
if self.render_mode == 'during' and self.render_enabled:
# Final step was already streamed during the most recent `next_time_step` call.
pass
rewards = np.array(self.__rewards[1:], dtype='float32')
self.__episode_rewards.append({
'min': rewards.min(axis=0).tolist(),
'max': rewards.max(axis=0).tolist(),
'sum': rewards.sum(axis=0).tolist(),
'mean': rewards.mean(axis=0).tolist()
})
if self.render_mode == 'end' and self.render_enabled:
if self.time_step > 0:
final_index = min(self.time_steps - 1, self.time_step - 1)
else:
final_index = 0
has_buffered_rows = any(self._render_buffer.values())
if not has_buffered_rows:
state_snapshot = self._override_render_time_step(final_index)
self._defer_render_flush = True
try:
self.render()
finally:
self._restore_render_time_step(state_snapshot)
self._defer_render_flush = False
self._flush_render_buffer()
if self.render_enabled and not self._final_kpis_exported:
self.export_final_kpis()
return self.observations, reward, self.terminated, self.truncated, self.get_info()
[docs]
def get_info(self) -> Mapping[Any, Any]:
"""Other information to return from the `citylearn.CityLearnEnv.step` function."""
return {}
def _parse_actions(self, actions: List[List[float]]) -> List[Mapping[str, float]]:
"""Return mapping of action name to action value for each building."""
actions = list(actions)
building_actions = []
if self.central_agent:
actions = actions[0]
number_of_actions = len(actions)
expected_number_of_actions = self.action_space[0].shape[0]
assert number_of_actions == expected_number_of_actions, \
f'Expected {expected_number_of_actions} actions but {number_of_actions} were parsed to env.step.'
for building in self.buildings:
size = building.action_space.shape[0]
building_actions.append(actions[0:size])
actions = actions[size:]
else:
building_actions = [list(a) for a in actions]
# check that appropriate number of building actions have been provided
for b, a in zip(self.buildings, building_actions):
number_of_actions = len(a)
expected_number_of_actions = b.action_space.shape[0]
assert number_of_actions == expected_number_of_actions,\
f'Expected {expected_number_of_actions} for {b.name} but {number_of_actions} actions were provided.'
active_actions = [[k for k, v in b.action_metadata.items() if v] for b in self.buildings]
# Create a list of dictionaries for actions including EV-specific actions
parsed_actions = []
for i, building in enumerate(self.buildings):
action_dict = {}
electric_vehicle_actions = {}
washing_machine_actions = {}
# Populate the action_dict with regular actions
for k, action in zip(active_actions[i], building_actions[i]):
if 'electric_vehicle_storage' in k:
# Collect EV actions separately
charger_id = k.replace("electric_vehicle_storage_", "")
electric_vehicle_actions[charger_id] = action
elif 'washing_machine' in k:
# Collect Washing Machine actions separately
washing_machine_actions[k] = action
else:
action_dict[f'{k}_action'] = action
# Add EV actions to the action_dict if they exist
if electric_vehicle_actions:
action_dict['electric_vehicle_storage_actions'] = electric_vehicle_actions # aqui podes criar dicionario
if washing_machine_actions:
action_dict['washing_machine_actions'] = washing_machine_actions
# Fill missing actions with default NaN
for k in building.action_metadata:
if (
f'{k}_action' not in action_dict and
'electric_vehicle_storage' not in k and
'washing_machine' not in k
):
action_dict[f'{k}_action'] = np.nan
parsed_actions.append(action_dict)
return parsed_actions
[docs]
def evaluate(self, control_condition: EvaluationCondition = None, baseline_condition: EvaluationCondition = None, comfort_band: float = None) -> pd.DataFrame:
r"""Evaluate cost functions at current time step.
Calculates and returns building-level and district-level cost functions normalized w.r.t. the no control scenario.
Parameters
----------
control_condition: EvaluationCondition, default: :code:`EvaluationCondition.WITH_STORAGE_AND_PARTIAL_LOAD_AND_PV`
Condition for net electricity consumption, cost and emission to use in calculating cost functions for the control/flexible scenario.
baseline_condition: EvaluationCondition, default: :code:`EvaluationCondition.WITHOUT_STORAGE_AND_PARTIAL_LOAD_BUT_WITH_PV`
Condition for net electricity consumption, cost and emission to use in calculating cost functions for the baseline scenario
that is used to normalize the control_condition scenario.
comfort_band: float, optional
Comfort band above dry_bulb_temperature_cooling_set_point and below dry_bulb_temperature_heating_set_point beyond
which occupant is assumed to be uncomfortable. Defaults to :py:attr:`citylearn.data.EnergySimulation.DEFUALT_COMFORT_BAND`.
Returns
-------
cost_functions: pd.DataFrame
Cost function summary including the following: electricity consumption, zero net energy, carbon emissions, cost,
discomfort (total, too cold, too hot, minimum delta, maximum delta, average delta), ramping, 1 - load factor,
average daily peak and average annual peak.
Notes
-----
The equation for the returned cost function values is :math:`\frac{C_{\textrm{control}}}{C_{\textrm{no control}}}`
where :math:`C_{\textrm{control}}` is the value when the agent(s) control the environment and :math:`C_{\textrm{no control}}`
is the value when none of the storages and partial load cooling and heating devices in the environment are actively controlled.
"""
# lambda functions to get building or district level properties w.r.t. evaluation condition
get_net_electricity_consumption = lambda x, c: getattr(x, f'net_electricity_consumption{c.value}')
get_net_electricity_consumption_cost = lambda x, c: getattr(x, f'net_electricity_consumption_cost{c.value}')
get_net_electricity_consumption_emission = lambda x, c: getattr(x, f'net_electricity_consumption_emission{c.value}')
# Safe division helper for KPI ratios
def _safe_div(control_value: float, baseline_value: float):
try:
c = control_value
b = baseline_value
# Treat None/NaN/inf as 0.0 for robust normalization on short horizons
def _coerce(x):
try:
v = float(x)
return v if np.isfinite(v) else 0.0
except Exception:
return 0.0
c = _coerce(c)
b = _coerce(b)
if b == 0.0:
return 1.0 if c == 0.0 else None
return c / b
except Exception:
return None
comfort_band = EnergySimulation.DEFUALT_COMFORT_BAND if comfort_band is None else comfort_band
building_level = []
for b in self.buildings:
if isinstance(b, DynamicsBuilding):
control_condition = EvaluationCondition.WITH_STORAGE_AND_PARTIAL_LOAD_AND_PV if control_condition is None else control_condition
baseline_condition = EvaluationCondition.WITHOUT_STORAGE_AND_PARTIAL_LOAD_BUT_WITH_PV if baseline_condition is None else baseline_condition
else:
control_condition = EvaluationCondition.WITH_STORAGE_AND_PV if control_condition is None else control_condition
baseline_condition = EvaluationCondition.WITHOUT_STORAGE_BUT_WITH_PV if baseline_condition is None else baseline_condition
discomfort_kwargs = {
'indoor_dry_bulb_temperature': b.indoor_dry_bulb_temperature,
'dry_bulb_temperature_cooling_set_point': b.indoor_dry_bulb_temperature_cooling_set_point,
'dry_bulb_temperature_heating_set_point': b.indoor_dry_bulb_temperature_heating_set_point,
'band': b.comfort_band if comfort_band is None else comfort_band,
'occupant_count': b.occupant_count,
}
unmet, cold, hot,\
cold_minimum_delta, cold_maximum_delta, cold_average_delta,\
hot_minimum_delta, hot_maximum_delta, hot_average_delta =\
CostFunction.discomfort(**discomfort_kwargs)
expected_energy = b.cooling_demand + b.heating_demand + b.dhw_demand + b.non_shiftable_load
served_energy = b.energy_from_cooling_device + b.energy_from_cooling_storage\
+ b.energy_from_heating_device + b.energy_from_heating_storage\
+ b.energy_from_dhw_device + b.energy_from_dhw_storage\
+ b.energy_to_non_shiftable_load
ec_c = CostFunction.electricity_consumption(get_net_electricity_consumption(b, control_condition))[-1]
ec_b = CostFunction.electricity_consumption(get_net_electricity_consumption(b, baseline_condition))[-1]
zne_c = CostFunction.zero_net_energy(get_net_electricity_consumption(b, control_condition))[-1]
zne_b = CostFunction.zero_net_energy(get_net_electricity_consumption(b, baseline_condition))[-1]
ce_c = CostFunction.carbon_emissions(get_net_electricity_consumption_emission(b, control_condition))[-1]
ce_b = CostFunction.carbon_emissions(get_net_electricity_consumption_emission(b, baseline_condition))[-1] if sum(b.carbon_intensity.carbon_intensity) != 0 else 0
cost_c = CostFunction.cost(get_net_electricity_consumption_cost(b, control_condition))[-1]
cost_b = CostFunction.cost(get_net_electricity_consumption_cost(b, baseline_condition))[-1] if sum(b.pricing.electricity_pricing) != 0 else 0
building_level_ = pd.DataFrame([{
'cost_function': 'electricity_consumption_total',
'value': _safe_div(ec_c, ec_b),
}, {
'cost_function': 'zero_net_energy',
'value': _safe_div(zne_c, zne_b),
}, {
'cost_function': 'carbon_emissions_total',
'value': _safe_div(ce_c, ce_b),
}, {
'cost_function': 'cost_total',
'value': _safe_div(cost_c, cost_b),
}, {
'cost_function': 'discomfort_proportion',
'value': unmet[-1],
}, {
'cost_function': 'discomfort_cold_proportion',
'value': cold[-1],
}, {
'cost_function': 'discomfort_hot_proportion',
'value': hot[-1],
}, {
'cost_function': 'discomfort_cold_delta_minimum',
'value': cold_minimum_delta[-1],
}, {
'cost_function': 'discomfort_cold_delta_maximum',
'value': cold_maximum_delta[-1],
}, {
'cost_function': 'discomfort_cold_delta_average',
'value': cold_average_delta[-1],
}, {
'cost_function': 'discomfort_hot_delta_minimum',
'value': hot_minimum_delta[-1],
}, {
'cost_function': 'discomfort_hot_delta_maximum',
'value': hot_maximum_delta[-1],
}, {
'cost_function': 'discomfort_hot_delta_average',
'value': hot_average_delta[-1],
}, {
'cost_function': 'one_minus_thermal_resilience_proportion',
'value': CostFunction.one_minus_thermal_resilience(power_outage=b.power_outage_signal, **discomfort_kwargs)[-1],
}, {
'cost_function': 'power_outage_normalized_unserved_energy_total',
'value': CostFunction.normalized_unserved_energy(expected_energy, served_energy, power_outage=b.power_outage_signal)[-1]
}, {
'cost_function': 'annual_normalized_unserved_energy_total',
'value': CostFunction.normalized_unserved_energy(expected_energy, served_energy)[-1]
}])
building_level_['name'] = b.name
building_level.append(building_level_)
building_level = pd.concat(building_level, ignore_index=True)
building_level['level'] = 'building'
## district level
# set default evaluation conditions
control_condition = EvaluationCondition.WITH_STORAGE_AND_PARTIAL_LOAD_AND_PV if control_condition is None else control_condition
baseline_condition = EvaluationCondition.WITHOUT_STORAGE_AND_PARTIAL_LOAD_BUT_WITH_PV if baseline_condition is None else baseline_condition
# District-level normalized KPIs with safe division to avoid 0/0 or div-by-zero
ramp_c = CostFunction.ramping(get_net_electricity_consumption(self, control_condition))[-1]
ramp_b = CostFunction.ramping(get_net_electricity_consumption(self, baseline_condition))[-1]
dlf24_c = CostFunction.one_minus_load_factor(get_net_electricity_consumption(self, control_condition), window=24)[-1]
dlf24_b = CostFunction.one_minus_load_factor(get_net_electricity_consumption(self, baseline_condition), window=24)[-1]
dlf730_c = CostFunction.one_minus_load_factor(get_net_electricity_consumption(self, control_condition), window=730)[-1]
dlf730_b = CostFunction.one_minus_load_factor(get_net_electricity_consumption(self, baseline_condition), window=730)[-1]
peak24_c = CostFunction.peak(get_net_electricity_consumption(self, control_condition), window=24)[-1]
peak24_b = CostFunction.peak(get_net_electricity_consumption(self, baseline_condition), window=24)[-1]
peak_all_c = CostFunction.peak(get_net_electricity_consumption(self, control_condition), window=self.time_steps)[-1]
peak_all_b = CostFunction.peak(get_net_electricity_consumption(self, baseline_condition), window=self.time_steps)[-1]
district_level = pd.DataFrame([{
'cost_function': 'ramping_average',
'value': _safe_div(ramp_c, ramp_b),
}, {
'cost_function': 'daily_one_minus_load_factor_average',
'value': _safe_div(dlf24_c, dlf24_b),
},{
'cost_function': 'monthly_one_minus_load_factor_average',
'value': _safe_div(dlf730_c, dlf730_b),
}, {
'cost_function': 'daily_peak_average',
'value': _safe_div(peak24_c, peak24_b),
}, {
'cost_function': 'all_time_peak_average',
'value': _safe_div(peak_all_c, peak_all_b),
}])
district_level = pd.concat([district_level, building_level], ignore_index=True, sort=False)
district_level = district_level.groupby(['cost_function'])[['value']].mean().reset_index()
district_level['name'] = 'District'
district_level['level'] = 'district'
cost_functions = pd.concat([district_level, building_level], ignore_index=True, sort=False)
return cost_functions
[docs]
def next_time_step(self):
r"""Advance all buildings to next `time_step`."""
if getattr(self, 'render_enabled', False):
if self.render_mode == 'during':
self.render()
elif self.render_mode == 'end':
self._defer_render_flush = True
try:
self.render()
finally:
self._defer_render_flush = False
for building in self.buildings:
building.next_time_step()
# Advance electric vehicles to the next time step. This function is used as EVs exist even without being connected to any building (e.g. when they are being used to commute)
# As such, this function simulates the EV to the next time step.
for electric_vehicle in self.electric_vehicles:
electric_vehicle.next_time_step()
super().next_time_step()
# Apply battery SOC simulation for EVs that are NOT connected
self.simulate_unconnected_ev_soc()
#This function is here so that, when the new time step is reached, the first thing to do is plug in/out the EVs according to their individual dataset
#It basicly associates an EV to a Building.Charger
self.associate_chargers_to_electric_vehicles()
[docs]
def associate_chargers_to_electric_vehicles(self):
r"""Associate charger to its corresponding electric_vehicle based on charger simulation state."""
def _resolve_arrival_soc(simulation: ChargerSimulation, step: int, prev_state: float, prev_id: Union[str, None], ev_identifier: str) -> Union[float, None]:
"""Return expected SOC (as fraction) for an EV connecting at `step`, or ``None`` when unavailable."""
candidate_index = None
if prev_state in (2, 3) and step > 0:
if isinstance(prev_id, str) and prev_id.strip() not in {"", "nan"} and prev_id != ev_identifier:
raise ValueError(
f"Charger dataset EV mismatch: expected '{ev_identifier}' but found '{prev_id}' at time step {step - 1}."
)
candidate_index = step - 1
elif 0 <= step < len(simulation.electric_vehicle_estimated_soc_arrival):
candidate_index = step
soc_value = None
if candidate_index is not None and 0 <= candidate_index < len(simulation.electric_vehicle_estimated_soc_arrival):
candidate = simulation.electric_vehicle_estimated_soc_arrival[candidate_index]
if isinstance(candidate, (float, np.floating)) and not np.isnan(candidate) and candidate >= 0:
soc_value = float(candidate)
if soc_value is None and 0 <= step < len(simulation.electric_vehicle_required_soc_departure):
fallback = simulation.electric_vehicle_required_soc_departure[step]
if isinstance(fallback, (float, np.floating)) and not np.isnan(fallback) and fallback >= 0:
soc_value = float(fallback)
return soc_value
for building in self.buildings:
if building.electric_vehicle_chargers is None:
continue
for charger in building.electric_vehicle_chargers:
sim = charger.charger_simulation
state = sim.electric_vehicle_charger_state[self.time_step]
if np.isnan(state) or state not in [1, 2]:
continue # Skip if no EV is connected or incoming
ev_id = sim.electric_vehicle_id[self.time_step]
prev_state = np.nan
prev_ev_id = None
if self.time_step > 0:
idx = self.time_step - 1
if idx < len(sim.electric_vehicle_charger_state):
prev_state = sim.electric_vehicle_charger_state[idx]
if idx < len(sim.electric_vehicle_id):
prev_ev_id = sim.electric_vehicle_id[idx]
if isinstance(ev_id, str) and ev_id.strip() not in ["", "nan"]:
for ev in self.electric_vehicles:
if ev.name == ev_id:
if state == 1:
charger.plug_car(ev)
is_new_connection = (
prev_state != 1
or not isinstance(prev_ev_id, str)
or prev_ev_id != ev_id
)
if is_new_connection:
soc_value = _resolve_arrival_soc(sim, self.time_step, prev_state, prev_ev_id, ev_id)
if soc_value is not None:
ev.battery.force_set_soc(soc_value)
elif state == 2:
charger.associate_incoming_car(ev)
[docs]
def simulate_unconnected_ev_soc(self):
"""Simulate SOC changes for EVs that are not under charger control at t+1."""
t = self.time_step
if t + 1 >= self.episode_tracker.episode_time_steps:
return
for ev in self.electric_vehicles:
ev_id = ev.name
found_in_charger = False
for building in self.buildings:
for charger in building.electric_vehicle_chargers or []:
sim : ChargerSimulation = charger.charger_simulation
curr_id = sim.electric_vehicle_id[t] if t < len(sim.electric_vehicle_id) else ""
next_id = sim.electric_vehicle_id[t + 1] if t + 1 < len(sim.electric_vehicle_id) else ""
curr_state = sim.electric_vehicle_charger_state[t] if t < len(sim.electric_vehicle_charger_state) else np.nan
next_state = sim.electric_vehicle_charger_state[t + 1] if t + 1 < len(sim.electric_vehicle_charger_state) else np.nan
currently_connected = isinstance(curr_id, str) and curr_id == ev_id and curr_state == 1
if currently_connected:
found_in_charger = True
break
is_connecting = (
isinstance(next_id, str)
and next_id == ev_id
and next_state == 1
and curr_state != 1
)
is_incoming = isinstance(curr_id, str) and curr_id == ev_id and curr_state == 2
if is_connecting:
found_in_charger = True
# Priority 1: current soc_arrival if incoming at t
if is_incoming:
if t < len(sim.electric_vehicle_estimated_soc_arrival):
soc = sim.electric_vehicle_estimated_soc_arrival[t]
else:
soc = np.nan
else:
if t + 1 < len(sim.electric_vehicle_estimated_soc_arrival):
soc = sim.electric_vehicle_estimated_soc_arrival[t + 1]
else:
soc = np.nan
if 0 <= soc <= 1:
ev.battery.force_set_soc(soc)
break
if found_in_charger:
break
if not found_in_charger:
# Not being connected or incoming in a valid charger — apply SOC drift
if t > 0:
last_soc = ev.battery.soc[t - 1]
variability = np.clip(np.random.normal(1.0, 0.2), 0.6, 1.4)
new_soc = np.clip(last_soc * variability, 0.0, 1.0)
ev.battery.force_set_soc(new_soc)
[docs]
def export_final_kpis(self, model: 'citylearn.agents.base.Agent' = None, filepath: str = "exported_kpis.csv"):
"""Export episode KPIs to csv.
Parameters
----------
model: citylearn.agents.base.Agent, optional
Agent whose environment should be evaluated. Defaults to the current environment.
filepath: str, default: ``"exported_kpis.csv"``
Output filename placed inside :pyattr:`new_folder_path`.
"""
# Ensure output directory exists even if rendering was disabled
self._ensure_render_output_dir()
file_path = os.path.join(self.new_folder_path, filepath)
if model is not None and getattr(model, 'env', None) is not None:
kpis = model.env.evaluate()
else:
kpis = self.evaluate()
kpis = kpis.pivot(index='cost_function', columns='name', values='value').round(3)
kpis = kpis.dropna(how='all')
kpis = kpis.fillna('')
kpis = kpis.reset_index()
kpis = kpis.rename(columns={'cost_function': 'KPI'})
kpis.to_csv(file_path, index=False, encoding='utf-8')
self._final_kpis_exported = True
[docs]
def render(self):
"""
Renders the current state of the CityLearn environment, logging data into separate CSV files.
Organizes files by episode number when simulation spans multiple episodes.
"""
if not getattr(self, 'render_enabled', False):
return
if self.render_mode == 'end' and not getattr(self, '_defer_render_flush', False):
self._flush_render_buffer()
return
# Ensure the output directory is prepared
self._ensure_render_output_dir()
iso_timestamp = self._get_iso_timestamp()
os.makedirs(self.new_folder_path, exist_ok=True)
episode_num = self.episode_tracker.episode
# Save community data - add episode number to filename
self._save_to_csv(f"exported_data_community_ep{episode_num}.csv",
{"timestamp": iso_timestamp, **self.as_dict()})
# Save building data
for idx, building in enumerate(self.buildings):
building_filename = f"exported_data_{building.name.lower()}_ep{episode_num}.csv"
self._save_to_csv(building_filename,
{"timestamp": iso_timestamp, **building.as_dict()})
# Battery data
battery = building.electrical_storage # save battery to render
battery_filename = f"exported_data_{building.name.lower()}_battery_ep{episode_num}.csv"
self._save_to_csv(battery_filename,
{"timestamp": iso_timestamp, **battery.as_dict()})
# Chargers
for charger_idx, charger in enumerate(building.electric_vehicle_chargers):
charger_filename = f"exported_data_{building.name.lower()}_{charger.charger_id}_ep{episode_num}.csv"
self._save_to_csv(charger_filename,
{"timestamp": iso_timestamp, **charger.as_dict()})
# Pricing data
pricing_filename = f"exported_data_pricing_ep{episode_num}.csv"
self._save_to_csv(pricing_filename,
{"timestamp": iso_timestamp, **self.buildings[0].pricing.as_dict(self.time_step)})
# EV data
for idx, ev in enumerate(self.__electric_vehicles):
ev_filename = f"exported_data_{ev.name.lower()}_ep{episode_num}.csv"
self._save_to_csv(ev_filename,
{"timestamp": iso_timestamp, **ev.as_dict()})
def _save_to_csv(self, filename, data):
"""
Saves data to a CSV file, appending it if the file exists. When `render_mode='end'`,
rows may be buffered in memory until a flush is requested.
"""
if self._buffer_render and getattr(self, '_defer_render_flush', False):
self._render_buffer[filename].append(dict(data))
return
self._write_render_rows(filename, [dict(data)])
def _flush_render_buffer(self):
"""Write any buffered render rows to disk."""
if not getattr(self, '_render_buffer', None):
return
has_pending_rows = any(self._render_buffer.values())
if not has_pending_rows:
self._render_buffer.clear()
return
try:
target_dir = Path(self.new_folder_path)
except Exception:
target_dir = None
if target_dir is not None:
print(f"Writing buffered render exports to {target_dir} ...")
original_defer = self._defer_render_flush
original_buffer_state = self._buffer_render
self._defer_render_flush = False
self._buffer_render = False
try:
for filename, rows in list(self._render_buffer.items()):
if rows:
self._write_render_rows(filename, rows)
finally:
self._render_buffer.clear()
self._buffer_render = original_buffer_state
self._defer_render_flush = original_defer
def _write_render_rows(self, filename: str, rows: List[Mapping[str, Any]]):
"""Write one or more render rows to disk with minimal rewrites."""
file_path = Path(self.new_folder_path) / filename
file_path.parent.mkdir(parents=True, exist_ok=True)
if not rows:
return
buffered_fieldnames = list(
dict.fromkeys(field for row in rows for field in row.keys())
)
if not file_path.exists():
fieldnames = buffered_fieldnames
with file_path.open('w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for row in rows:
writer.writerow({field: row.get(field, '') for field in fieldnames})
return
# File exists – inspect current header.
needs_header_extension = False
with file_path.open('r', newline='') as existing:
reader = csv.DictReader(existing)
existing_fieldnames = reader.fieldnames or []
for field in buffered_fieldnames:
if field not in existing_fieldnames:
needs_header_extension = True
break
if needs_header_extension:
existing_rows = list(reader)
else:
existing_rows = None
if not needs_header_extension:
with file_path.open('a', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=existing_fieldnames)
for row in rows:
writer.writerow({field: row.get(field, '') for field in existing_fieldnames})
return
# Need to rewrite with the expanded header.
extended_fieldnames = list(
dict.fromkeys(existing_fieldnames + [f for f in buffered_fieldnames if f not in existing_fieldnames])
)
existing_rows = existing_rows or []
for row in existing_rows:
for field in extended_fieldnames:
row.setdefault(field, '')
with file_path.open('w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=extended_fieldnames)
writer.writeheader()
writer.writerows(existing_rows)
for row in rows:
writer.writerow({field: row.get(field, '') for field in extended_fieldnames})
def _parse_render_start_date(self, start_date: Union[str, datetime.date]) -> datetime.date:
"""Return a valid start date for rendering timestamps."""
if start_date is None:
return self.DEFAULT_RENDER_START_DATE
if isinstance(start_date, datetime.datetime):
return start_date.date()
if isinstance(start_date, datetime.date):
return start_date
if isinstance(start_date, str):
try:
return datetime.date.fromisoformat(start_date)
except ValueError as exc:
raise ValueError(
"CityLearnEnv start_date must be in ISO format 'YYYY-MM-DD'."
) from exc
raise TypeError(
"CityLearnEnv start_date must be a date, datetime, or ISO format string."
)
def _ensure_render_output_dir(self, *, ensure_exists: bool = True):
"""Prepare the render output directory and optionally create it on disk.
Parameters
----------
ensure_exists: bool, default: True
When ``True`` the directory tree is created (and legacy exports removed when
reusing :pyattr:`render_session_name`). When ``False`` only internal state is
updated so that paths can be materialized later on demand.
"""
base_render_path = Path(getattr(self, 'render_output_root', Path(__file__).resolve().parents[1] / 'render_logs')).expanduser()
if ensure_exists:
try:
base_render_path.mkdir(parents=True, exist_ok=True)
except PermissionError:
fallback = (Path.cwd() / 'render_logs').resolve()
fallback.mkdir(parents=True, exist_ok=True)
self.render_output_root = fallback
base_render_path = fallback
render_dir = getattr(self, '_render_directory_path', None)
needs_new_dir = render_dir is None
if not needs_new_dir and ensure_exists:
render_dir = Path(render_dir)
try:
needs_new_dir = not render_dir.is_relative_to(base_render_path)
except AttributeError:
needs_new_dir = base_render_path not in render_dir.parents and render_dir != base_render_path
if needs_new_dir:
if self.render_session_name:
render_dir = (base_render_path / Path(self.render_session_name)).expanduser().resolve()
else:
if getattr(self, '_render_timestamp', None) is None:
self._render_timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
render_dir = (base_render_path / self._render_timestamp).resolve()
self._render_directory_path = render_dir
else:
render_dir = Path(self._render_directory_path)
if ensure_exists:
render_dir.mkdir(parents=True, exist_ok=True)
if not self._render_dir_initialized:
if self.render_session_name:
for csv_file in render_dir.glob('exported_*.csv'):
try:
csv_file.unlink()
except OSError:
pass
self._render_dir_initialized = True
self.new_folder_path = str(render_dir)
def _get_iso_timestamp(self):
# Reset time tracking if this is the first step of a new episode
if self.time_step == 0:
self._reset_time_tracking()
energy_sim = self.buildings[0].energy_simulation
month_series = getattr(energy_sim, 'month', None)
hour_series = getattr(energy_sim, 'hour', None)
minutes_series = getattr(energy_sim, 'minutes', None)
def _get_series_value(series, index, default):
if series is None:
return default
if index >= len(series):
return default
try:
return int(series[index])
except (TypeError, ValueError):
return default
month = _get_series_value(month_series, self.time_step, self.render_start_date.month)
hour = _get_series_value(hour_series, self.time_step, 1)
minutes = _get_series_value(minutes_series, self.time_step, 0)
next_index = self.time_step + 1
next_month = _get_series_value(month_series, next_index, month)
next_hour = _get_series_value(hour_series, next_index, hour)
next_minutes = _get_series_value(minutes_series, next_index, minutes)
raw_hour = hour
timestamp_year = self.year
timestamp_month = month
timestamp_day = self.current_day
hour_for_timestamp = raw_hour % 24
next_hour_mod = next_hour % 24
next_minutes_clamped = max(0, min(59, next_minutes))
minute_for_timestamp = max(0, min(59, minutes))
if raw_hour >= 24:
if next_month != month:
timestamp_month = next_month
if next_month < month:
timestamp_year = self.year + 1
timestamp_day = 1
else:
# Keep the current day; the day roll-over is handled via next_day logic.
timestamp_day = self.current_day
timestamp = f"{timestamp_year:04d}-{int(timestamp_month):02d}-{timestamp_day:02d}T{hour_for_timestamp:02d}:{minute_for_timestamp:02d}:00"
next_year = timestamp_year
next_day = timestamp_day
if next_month != month:
if next_month < month:
next_year = timestamp_year + 1
next_day = 1
elif next_hour_mod <= hour_for_timestamp and next_minutes_clamped <= minute_for_timestamp:
next_day = timestamp_day + 1
self.year = next_year
self.current_day = next_day
return timestamp
def _override_render_time_step(self, index: int):
"""Temporarily set time_step to `index` for the environment and descendants."""
snapshot = []
def _record(obj):
if hasattr(obj, 'time_step'):
snapshot.append((obj, obj.time_step))
obj.time_step = index
_record(self)
for building in getattr(self, 'buildings', []):
_record(building)
electrical_storage = getattr(building, 'electrical_storage', None)
if electrical_storage is not None:
_record(electrical_storage)
for charger in getattr(building, 'electric_vehicle_chargers', []) or []:
_record(charger)
for washing_machine in getattr(building, 'washing_machines', []) or []:
_record(washing_machine)
for ev in getattr(self, 'electric_vehicles', []):
_record(ev)
battery = getattr(ev, 'battery', None)
if battery is not None:
_record(battery)
return snapshot
@staticmethod
def _restore_render_time_step(snapshot):
for obj, value in snapshot:
try:
obj.time_step = value
except AttributeError:
pass
def _reset_time_tracking(self):
"""Reset all time tracking variables."""
start_offset = getattr(self.episode_tracker, 'episode_start_time_step', 0)
base_datetime = datetime.datetime.combine(self.render_start_date, datetime.time())
base_datetime += datetime.timedelta(seconds=start_offset * self.seconds_per_time_step)
self._render_start_datetime = base_datetime
self.year = base_datetime.year
self.current_day = base_datetime.day
# Add any other time-related variables that need resetting
[docs]
def reset(self, seed: int = None, options: Mapping[str, Any] = None) -> Tuple[List[List[float]], dict]:
r"""Reset `CityLearnEnv` to initial state.
Parameters
----------
seed: int, optional
Use to updated :code:`citylearn.CityLearnEnv.random_seed` if value is provided.
options: Mapping[str, Any], optional
Use to pass additional data to environment on reset. Not used in this base class
but included to conform to gymnasium interface.
Returns
-------
observations: List[List[float]]
:attr:`observations`.
info: dict
A dictionary that may contain additional information regarding the reason for a `terminated` signal.
`info` contains auxiliary diagnostic information (helpful for debugging, learning, and logging).
Override :meth"`get_info` to get custom key-value pairs in `info`.
"""
# object reset
super().reset()
self._final_kpis_exported = False
# update seed
if seed is not None:
self.random_seed = seed
else:
pass
# update time steps for time series
self.episode_tracker.next_episode(
self.episode_time_steps,
self.rolling_episode_split,
self.random_episode_split,
self.random_seed,
)
for building in self.buildings:
building.reset()
for ev in self.electric_vehicles:
ev.reset()
self.associate_chargers_to_electric_vehicles()
# reset reward function (does nothing by default)
self.reward_function.reset()
# variable reset
self.__rewards = [[]]
self.__net_electricity_consumption = []
self.__net_electricity_consumption_cost = []
self.__net_electricity_consumption_emission = []
self.update_variables()
return self.observations, self.get_info()
[docs]
def update_variables(self):
for b in self.buildings:
b.update_variables()
# Helper to set or append district-level aggregates for current timestep
def _set_or_append(lst, value):
# If list length matches current index => append
if len(lst) == self.time_step:
lst.append(value)
# If already has an entry for current timestep => overwrite
elif len(lst) == self.time_step + 1:
lst[self.time_step] = value
else:
# Out-of-sync: resize to current index and append
del lst[self.time_step + 1:]
if len(lst) < self.time_step:
# pad if needed
lst.extend([0.0] * (self.time_step - len(lst)))
lst.append(value)
# net electricity consumption
total = sum(b.net_electricity_consumption[self.time_step] for b in self.buildings)
_set_or_append(self.__net_electricity_consumption, total)
# net electricity consumption cost
total_cost = sum(b.net_electricity_consumption_cost[self.time_step] for b in self.buildings)
_set_or_append(self.__net_electricity_consumption_cost, total_cost)
# net electricity consumption emission
total_emission = sum(b.net_electricity_consumption_emission[self.time_step] for b in self.buildings)
_set_or_append(self.__net_electricity_consumption_emission, total_emission)
[docs]
def load_agent(self, agent: Union[str, 'citylearn.agents.base.Agent'] = None, **kwargs) -> Union[Any, 'citylearn.agents.base.Agent']:
"""Return :class:`Agent` or sub class object as defined by the `schema`.
Parameters
----------
agent: Union[str, 'citylearn.agents.base.Agent], optional
Agent class or string describing path to agent class, e.g. 'citylearn.agents.base.BaselineAgent'.
If a value is not provided, defaults to the agent defined in the schema:agent:type.
**kwargs : dict
Agent initialization attributes. For most agents e.g. CityLearn and Stable-Baselines3 agents,
an intialized :py:attr:`env` must be parsed to the agent :py:meth:`init` function.
Returns
-------
agent: Agent
Initialized agent.
"""
# set agent class
if agent is not None:
agent_type = agent
if not isinstance(agent_type, str):
agent_type = [agent_type.__module__] + [agent_type.__name__]
agent_type = '.'.join(agent_type)
else:
pass
# set agent init attributes
else:
agent_type = self.schema['agent']['type']
if kwargs is not None and len(kwargs) > 0:
agent_attributes = dict(kwargs)
elif agent is None:
agent_attributes = dict(self.schema['agent'].get('attributes', {}))
else:
agent_attributes = {}
if 'env' not in agent_attributes:
agent_attributes['env'] = self
agent_module = '.'.join(agent_type.split('.')[0:-1])
agent_name = agent_type.split('.')[-1]
agent_constructor = getattr(importlib.import_module(agent_module), agent_name)
agent = agent_constructor(**agent_attributes)
return agent
def _load(self, schema: Mapping[str, Any], **kwargs) -> Tuple[Union[Path, str], List[Building], List[ElectricVehicle], Union[int, List[Tuple[int, int]]], bool, bool, float, RewardFunction, bool, List[str], EpisodeTracker]:
"""Return `CityLearnEnv` and `Controller` objects as defined by the `schema`.
Parameters
----------
schema: Mapping[str, Any]
N:code:`dict` object of a CityLearn schema.
Returns
-------
root_directory: Union[Path, str]
Absolute path to directory that contains the data files including the schema.
buildings : List[Building]
Buildings in CityLearn environment.
electric_vehicles : List[ElectricVehicle]
Electric Vehicles in CityLearn environment.
episode_time_steps: Union[int, List[Tuple[int, int]]]
Number of time steps in an episode. Defaults to (`simulation_end_time_step` - `simulation_start_time_step`) + 1.
rolling_episode_split: bool
True if episode sequences are split such that each time step is a candidate for `episode_start_time_step` otherwise, False to split episodes
in steps of `episode_time_steps`.
random_episode_split: bool
True if episode splits are to be selected at random during training otherwise, False to select sequentially.
seconds_per_time_step: float
Number of seconds in 1 `time_step` and must be set to >= 1.
reward_function : RewardFunction
Reward function class instance.
central_agent : bool
Expect 1 central agent to control all building storage device.
shared_observations : List[str]
Names of common observations across all buildings i.e. observations that have the same value irrespective of the building.
"""
schema['root_directory'] = kwargs['root_directory'] if kwargs.get('root_directory') is not None else schema['root_directory']
schema['random_seed'] = schema.get('random_seed', None) if kwargs.get('random_seed', None) is None else schema.get('random_seed', None)
schema['central_agent'] = kwargs['central_agent'] if kwargs.get('central_agent') is not None else schema['central_agent']
#Separated chargers observations to create one for each charger at each building based on active ones at the schema
schema['chargers_observations_helper'] = {key: value for key, value in schema["observations"].items() if "electric_vehicle_" in key}
schema['chargers_actions_helper'] = {key: value for key, value in schema["actions"].items() if "electric_vehicle_" in key}
schema['chargers_shared_observations_helper'] = {key: value for key, value in schema["observations"].items()
if "electric_vehicle_" in key and value.get("shared_in_central_agent", True)}
schema['washing_machine_observations_helper'] = {key: value for key, value in schema["observations"].items() if "washing_machine_" in key}
schema['washing_machine_actions_helper'] = {key: value for key, value in schema["actions"].items() if "washing_machine" in key}
schema['observations'] = {
key: value
for key, value in schema["observations"].items()
if key not in set(schema['chargers_observations_helper']) | set(schema['washing_machine_observations_helper'])
}
schema['actions'] = {
key: value
for key, value in schema['actions'].items()
if key not in set(schema['chargers_actions_helper']) | set(schema['washing_machine_actions_helper'])
}
# Update shared observations, excluding any keys that start with 'electric_vehicle_'
schema['shared_observations'] = (
kwargs['shared_observations'] if kwargs.get('shared_observations') is not None else [
k for k, v in schema['observations'].items()
if not k.startswith("electric_vehicle_")
and "washing_machine" not in k
and v.get('shared_in_central_agent', False)
]
)
schema['episode_time_steps'] = kwargs['episode_time_steps'] if kwargs.get('episode_time_steps') is not None else schema.get('episode_time_steps', None)
schema['rolling_episode_split'] = kwargs['rolling_episode_split'] if kwargs.get('rolling_episode_split') is not None else schema.get('rolling_episode_split', None)
schema['random_episode_split'] = kwargs['random_episode_split'] if kwargs.get('random_episode_split') is not None else schema.get('random_episode_split', None)
schema['seconds_per_time_step'] = kwargs['seconds_per_time_step'] if kwargs.get('seconds_per_time_step') is not None else schema['seconds_per_time_step']
schema['simulation_start_time_step'] = kwargs['simulation_start_time_step'] \
if kwargs.get('simulation_start_time_step') is not None else schema['simulation_start_time_step']
schema['simulation_end_time_step'] = kwargs['simulation_end_time_step'] \
if kwargs.get('simulation_end_time_step') is not None else schema['simulation_end_time_step']
episode_tracker = EpisodeTracker(schema['simulation_start_time_step'], schema['simulation_end_time_step'])
# get sizing data to reduce read time
dataset = DataSet()
pv_sizing_data = dataset.get_pv_sizing_data()
battery_sizing_data = dataset.get_battery_sizing_data()
# get buildings to include
buildings_to_include = list(schema['buildings'].keys())
buildings = []
if kwargs.get('buildings') is not None and len(kwargs['buildings']) > 0:
if isinstance(kwargs['buildings'][0], Building):
buildings: List[Building] = kwargs['buildings']
for b in buildings:
b.episode_tracker = episode_tracker
buildings_to_include = []
elif isinstance(kwargs['buildings'][0], str):
buildings_to_include = [b for b in buildings_to_include if b in kwargs['buildings']]
elif isinstance(kwargs['buildings'][0], int):
buildings_to_include = [buildings_to_include[i] for i in kwargs['buildings']]
else:
raise Exception('Unknown buildings type. Allowed types are citylearn.building.Building, int and str.')
else:
buildings_to_include = [b for b in buildings_to_include if schema['buildings'][b]['include']]
# load buildings
for i, building_name in enumerate(buildings_to_include):
buildings.append(self._load_building(i, building_name, schema, episode_tracker, pv_sizing_data, battery_sizing_data,**kwargs))
# Load electric vehicles (if present in the schema)
electric_vehicles = []
if kwargs.get('electric_vehicles_def') is not None and len(kwargs['electric_vehicles_def']) > 0:
electric_vehicle_schemas = kwargs['electric_vehicles_def']
else:
electric_vehicle_schemas = schema.get('electric_vehicles_def', {})
for electric_vehicle_name, electric_vehicle_schema in electric_vehicle_schemas.items():
if electric_vehicle_schema['include']:
time_step_ratio = buildings[0].time_step_ratio if len(buildings) > 0 else 1.0
electric_vehicles.append(self._load_electric_vehicle(electric_vehicle_name,schema,electric_vehicle_schema,episode_tracker, time_step_ratio))
# set reward function
# Extract reward configuration from schema
reward_schema = schema['reward_function']
reward_type = reward_schema['type']
reward_attrs = reward_schema.get('attributes', {})
# Determine if it's a multi-building configuration (i.e., a mapping from building names to reward types)
is_multi = isinstance(reward_type, dict)
if is_multi:
# Fallback to 'default' reward type if one isn't specified per building
default_type = reward_type.get('default')
if default_type is None and reward_type:
default_type = next(iter(reward_type.values())) # Use the first available type if 'default' not set
# Same fallback logic for attributes
default_attrs = reward_attrs.get('default')
if default_attrs is None and reward_attrs:
default_attrs = next(iter(reward_attrs.values()))
reward_functions = {}
for building in buildings:
name = building.name
# Use building-specific reward type or fallback to default
r_type = reward_type.get(name, default_type)
r_attr = reward_attrs.get(name, default_attrs) or {} # Ensure it's a dict, not None
if r_type is None:
raise ValueError(f"No reward function defined for building '{name}' and no default provided")
# Dynamically load class from dotted path string
module_name = '.'.join(r_type.split('.')[:-1])
class_name = r_type.split('.')[-1]
module = importlib.import_module(module_name)
constructor = getattr(module, class_name)
# Instantiate reward function for this building
reward_functions[name] = constructor(None, **r_attr)
# Combine individual building reward functions into a multi-building one
reward_function = MultiBuildingRewardFunction(None, reward_functions)
else:
# Handle the single reward function case
if 'reward_function' in kwargs and kwargs['reward_function'] is not None:
reward_function_type = kwargs['reward_function']
# If a class is passed instead of a string, convert to dotted path
if not isinstance(reward_function_type, str):
reward_function_type = f"{reward_function_type.__module__}.{reward_function_type.__name__}"
else:
reward_function_type = reward_type # Use type from schema
# Get attributes from kwargs or schema, default to empty dict
reward_function_attributes = kwargs.get('reward_function_kwargs') or reward_attrs or {}
# Dynamically load class from dotted path string
module_name = '.'.join(reward_function_type.split('.')[:-1])
class_name = reward_function_type.split('.')[-1]
module = importlib.import_module(module_name)
constructor = getattr(module, class_name)
# Instantiate the single reward function
reward_function = constructor(None, **reward_function_attributes)
return (
schema['root_directory'], buildings, electric_vehicles, schema['episode_time_steps'], schema['rolling_episode_split'],
schema['random_episode_split'],
schema['seconds_per_time_step'], reward_function, schema['central_agent'], schema['shared_observations'],
episode_tracker
)
def _load_building(self, index: int, building_name: str, schema: dict, episode_tracker: EpisodeTracker, pv_sizing_data: pd.DataFrame, battery_sizing_data: pd.DataFrame, **kwargs) -> Building:
"""Initializes and returns a building model."""
building_schema = schema['buildings'][building_name]
building_kwargs = {}
if building_schema.get('charging_constraints') is not None:
building_kwargs['charging_constraints'] = building_schema['charging_constraints']
seconds_per_time_step = schema['seconds_per_time_step']
noise_std = building_schema.get('noise_std', 0.0)
# data
energy_simulation = pd.read_csv(os.path.join(schema['root_directory'], building_schema['energy_simulation']))
energy_simulation = EnergySimulation(**energy_simulation.to_dict('list'), seconds_per_time_step=seconds_per_time_step, noise_std=noise_std)
building_kwargs['time_step_ratio'] = energy_simulation.time_step_ratios[index]
weather = pd.read_csv(os.path.join(schema['root_directory'], building_schema['weather']))
weather = Weather(**weather.to_dict('list'), noise_std=noise_std)
if building_schema.get('carbon_intensity', None) is not None:
carbon_intensity = pd.read_csv(os.path.join(schema['root_directory'], building_schema['carbon_intensity']))
carbon_intensity = CarbonIntensity(**carbon_intensity.to_dict('list'), noise_std=noise_std)
else:
carbon_intensity = CarbonIntensity(np.zeros(energy_simulation.hour.shape[0], dtype='float32'), noise_std=noise_std)
if building_schema.get('pricing', None) is not None:
pricing = pd.read_csv(os.path.join(schema['root_directory'], building_schema['pricing']))
pricing = Pricing(**pricing.to_dict('list'), noise_std=noise_std)
else:
pricing = Pricing(
np.zeros(energy_simulation.hour.shape[0], dtype='float32'),
np.zeros(energy_simulation.hour.shape[0], dtype='float32'),
np.zeros(energy_simulation.hour.shape[0], dtype='float32'),
np.zeros(energy_simulation.hour.shape[0], dtype='float32'),
noise_std=noise_std
)
# construct building
building_type = 'citylearn.citylearn.Building' if building_schema.get('type', None) is None else building_schema['type']
building_type_module = '.'.join(building_type.split('.')[0:-1])
building_type_name = building_type.split('.')[-1]
building_constructor = getattr(importlib.import_module(building_type_module), building_type_name)
# set dynamics
if building_schema.get('dynamics', None) is not None:
dynamics_type = building_schema['dynamics']['type']
dynamics_module = '.'.join(dynamics_type.split('.')[0:-1])
dynamics_name = dynamics_type.split('.')[-1]
dynamics_constructor = getattr(importlib.import_module(dynamics_module), dynamics_name)
attributes = building_schema['dynamics'].get('attributes', {})
attributes['filepath'] = os.path.join(schema['root_directory'], attributes['filename'])
_ = attributes.pop('filename')
building_kwargs[f'dynamics'] = dynamics_constructor(**attributes)
else:
building_kwargs['dynamics'] = None
# set occupant
if building_schema.get('occupant', None) is not None:
building_occupant = building_schema['occupant']
occupant_type = building_occupant['type']
occupant_module = '.'.join(occupant_type.split('.')[0:-1])
occupant_name = occupant_type.split('.')[-1]
occupant_constructor = getattr(importlib.import_module(occupant_module), occupant_name)
attributes: dict = building_occupant.get('attributes', {})
parameters_filepath = os.path.join(schema['root_directory'], building_occupant['parameters_filename'])
parameters = pd.read_csv(parameters_filepath)
attributes['parameters'] = LogisticRegressionOccupantParameters(**parameters.to_dict('list'))
attributes['episode_tracker'] = episode_tracker
attributes['random_seed'] = schema['random_seed']
for k in ['increase', 'decrease']:
attributes[f'setpoint_{k}_model_filepath'] = os.path.join(schema['root_directory'], attributes[f'setpoint_{k}_model_filename'])
_ = attributes.pop(f'setpoint_{k}_model_filename')
building_kwargs['occupant'] = occupant_constructor(**attributes)
else:
building_kwargs['occupant'] = None
# set power outage model
building_schema_power_outage = building_schema.get('power_outage', {})
simulate_power_outage = kwargs.get('simulate_power_outage')
simulate_power_outage = building_schema_power_outage.get('simulate_power_outage') if simulate_power_outage is None else simulate_power_outage
simulate_power_outage = simulate_power_outage[index] if isinstance(simulate_power_outage,list) else simulate_power_outage
stochastic_power_outage = building_schema_power_outage.get('stochastic_power_outage')
if building_schema_power_outage.get('stochastic_power_outage_model', None) is not None:
stochastic_power_outage_model_type = building_schema_power_outage['stochastic_power_outage_model']['type']
stochastic_power_outage_model_module = '.'.join(stochastic_power_outage_model_type.split('.')[0:-1])
stochastic_power_outage_model_name = stochastic_power_outage_model_type.split('.')[-1]
stochastic_power_outage_model_constructor = getattr(
importlib.import_module(stochastic_power_outage_model_module),
stochastic_power_outage_model_name
)
attributes = building_schema_power_outage.get('stochastic_power_outage_model', {}).get('attributes', {})
stochastic_power_outage_model = stochastic_power_outage_model_constructor(**attributes)
else:
stochastic_power_outage_model = None
# ------------------ Chargers ------------------
# Initialize chargers list
chargers_list = []
#Adding chargers to buildings if they exist
if building_schema.get("chargers", None) is not None:
for charger_name, charger_config in building_schema["chargers"].items():
noise_std = charger_config.get('noise_std', 0.0)
charger_simulation_file = pd.read_csv(
os.path.join(schema['root_directory'], charger_config['charger_simulation'])
).iloc[schema['simulation_start_time_step']:schema['simulation_end_time_step'] + 1].copy()
charger_simulation = ChargerSimulation(*charger_simulation_file.values.T, noise_std=noise_std)
charger_type = charger_config['type']
charger_module = '.'.join(charger_type.split('.')[0:-1])
charger_class_name = charger_type.split('.')[-1]
charger_class = getattr(importlib.import_module(charger_module), charger_class_name)
charger_attributes = charger_config.get('attributes', {})
charger_attributes['episode_tracker'] = episode_tracker
charger_object = charger_class(charger_simulation=charger_simulation, charger_id=charger_name, **charger_attributes, seconds_per_time_step=schema['seconds_per_time_step'], time_step_ratio = building_kwargs['time_step_ratio'])
chargers_list.append(charger_object)
washing_machines_list = []
# Adding washing machines to buildings if they exist
if kwargs.get('washing_machines') is not None and len(kwargs['washing_machines']) > 0:
washing_machine_schemas = kwargs['washing_machines']
else:
washing_machine_schemas = building_schema.get('washing_machines', {})
for washing_machine_name, washing_machine_schema in washing_machine_schemas.items():
washing_machines_list.append(self._load_washing_machine(washing_machine_name,schema,washing_machine_schema,episode_tracker))
observation_metadata, action_metadata = self.process_metadata(schema, building_schema, chargers_list, washing_machines_list, index, energy_simulation,**kwargs)
building: Building = building_constructor(
energy_simulation=energy_simulation,
washing_machines = washing_machines_list,
electric_vehicle_chargers=chargers_list,
weather=weather,
observation_metadata=observation_metadata,
action_metadata=action_metadata,
carbon_intensity=carbon_intensity,
pricing=pricing,
name=building_name,
seconds_per_time_step=schema['seconds_per_time_step'],
random_seed=schema['random_seed'],
episode_tracker=episode_tracker,
simulate_power_outage=simulate_power_outage,
stochastic_power_outage=stochastic_power_outage,
stochastic_power_outage_model=stochastic_power_outage_model,
**building_kwargs,
)
# update devices
device_metadata = {
'cooling_device': {'autosizer': building.autosize_cooling_device},
'heating_device': {'autosizer': building.autosize_heating_device},
'dhw_device': {'autosizer': building.autosize_dhw_device},
'dhw_storage': {'autosizer': building.autosize_dhw_storage},
'cooling_storage': {'autosizer': building.autosize_cooling_storage},
'heating_storage': {'autosizer': building.autosize_heating_storage},
'electrical_storage': {'autosizer': building.autosize_electrical_storage},
'washing_machine': {'autosizer': building.autosize_electrical_storage},
'pv': {'autosizer': building.autosize_pv},
}
solar_generation = kwargs.get('solar_generation')
solar_generation = True if solar_generation is None else solar_generation
solar_generation = solar_generation[index] if isinstance(solar_generation, list) else solar_generation
for device_name in device_metadata:
if building_schema.get(device_name, None) is None:
device = None
elif device_name == 'pv' and not solar_generation:
device = None
else:
device_type: str = building_schema[device_name]['type']
device_module = '.'.join(device_type.split('.')[0:-1])
device_type_name = device_type.split('.')[-1]
constructor = getattr(importlib.import_module(device_module), device_type_name)
attributes = building_schema[device_name].get('attributes', {})
attributes['seconds_per_time_step'] = schema['seconds_per_time_step']
# in case device technical specifications are to be randomly sampled, make sure each device per building has a unique seed
md5 = hashlib.md5()
device_random_seed = 0
for string in [building_name, building_type, device_name, device_type]:
md5.update(string.encode())
hash_to_integer_base = 16
device_random_seed += int(md5.hexdigest(), hash_to_integer_base)
device_random_seed = int(str(device_random_seed * (schema['random_seed'] + 1))[:9])
attributes = {
**attributes,
'random_seed': attributes['random_seed'] if attributes.get('random_seed', None) is not None else device_random_seed
}
device = constructor(**attributes)
autosize = False if building_schema[device_name].get('autosize', None) is None else building_schema[device_name]['autosize']
building.__setattr__(device_name, device)
if autosize:
autosizer = device_metadata[device_name]['autosizer']
autosize_kwargs = {} if building_schema[device_name].get('autosize_attributes', None) is None else \
building_schema[device_name]['autosize_attributes']
if isinstance(device, PV):
autosize_kwargs['epw_filepath'] = os.path.join(schema['root_directory'], autosize_kwargs['epw_filepath'])
autosize_kwargs['sizing_data'] = pv_sizing_data
elif isinstance(device, Battery):
autosize_kwargs['sizing_data'] = battery_sizing_data
else:
pass
autosizer(**autosize_kwargs)
else:
pass
# set back the random seed to to building's random seed
device.random_seed = schema['random_seed']
building.observation_space = building.estimate_observation_space()
building.action_space = building.estimate_action_space()
return building
def _load_electric_vehicle(self, electric_vehicle_name: str, schema: dict, electric_vehicle_schema: dict, episode_tracker: EpisodeTracker, time_step_ratio) -> ElectricVehicle:
"""Initializes and returns an electric vehicle model."""
# Construct the battery object
capacity = electric_vehicle_schema["battery"]["attributes"]["capacity"]
nominal_power = electric_vehicle_schema["battery"]["attributes"]["nominal_power"]
initial_soc = electric_vehicle_schema["battery"]["attributes"].get("initial_soc", random.uniform(0, 1))
depth_of_discharge = electric_vehicle_schema["battery"]["attributes"].get("depth_of_discharge", 0.10)
battery = Battery(
capacity=capacity,
nominal_power=nominal_power,
initial_soc=initial_soc,
seconds_per_time_step=schema['seconds_per_time_step'],
time_step_ratio=time_step_ratio,
random_seed=schema['random_seed'],
episode_tracker=episode_tracker,
depth_of_discharge=depth_of_discharge
)
# Get the EV constructor
electric_vehicle_type = 'citylearn.citylearn.ElectricVehicle' \
if electric_vehicle_schema.get('type', None) is None else electric_vehicle_schema['type']
electric_vehicle_type_module = '.'.join(electric_vehicle_type.split('.')[0:-1])
electric_vehicle_type_name = electric_vehicle_type.split('.')[-1]
electric_vehicle_constructor = getattr(importlib.import_module(electric_vehicle_type_module), electric_vehicle_type_name)
# Initialize EV
ev: ElectricVehicle = electric_vehicle_constructor(
battery=battery,
name=electric_vehicle_name,
seconds_per_time_step=schema['seconds_per_time_step'],
random_seed=schema['random_seed'],
episode_tracker=episode_tracker
)
return ev
def _load_washing_machine(
self,
washing_machine_name: str,
schema: dict,
washing_machine_schema: dict,
episode_tracker: EpisodeTracker
) -> WashingMachine:
"""
Load simulation data and initialize a WashingMachine instance.
Parameters
----------
washing_machine_name : str
Unique identifier for the washing machine.
schema : dict
Global schema containing configuration for simulation, such as time step size and paths.
washing_machine_schema : dict
Sub-schema specific to washing machine setup (e.g., file paths for energy profiles).
episode_tracker : EpisodeTracker
Object that tracks simulation episode and time step data.
Returns
-------
WashingMachine
An initialized WashingMachine object using the provided simulation data.
"""
file_path = os.path.join(schema['root_directory'], washing_machine_schema['washing_machine_energy_simulation'])
# Load CSV file and slice it to the relevant simulation range
washing_machine_simulation = pd.read_csv(file_path).iloc[
schema['simulation_start_time_step']:schema['simulation_end_time_step'] + 1
].copy()
# Convert DataFrame into a WashingMachineSimulation object
washing_machine_simulation = WashingMachineSimulation(*washing_machine_simulation.values.T)
# Create and return the WashingMachine object
wm = WashingMachine(
washing_machine_simulation=washing_machine_simulation,
episode_tracker=episode_tracker,
name=washing_machine_name,
seconds_per_time_step=schema['seconds_per_time_step'],
random_seed=schema['random_seed'],
)
return wm
def __str__(self) -> str:
"""
Return a string representation of the current simulation state.
Useful for logging or quick inspection of internal values.
"""
return str(self.as_dict())
[docs]
def as_dict(self) -> dict:
"""
Convert the current simulation state to a dictionary.
This includes key performance indicators such as energy usage, emissions,
and electricity pricing at the current time step.
Returns
-------
dict
Dictionary with energy and environmental metrics for the current step.
"""
if len(self.net_electricity_consumption) == 0:
idx = 0
else:
idx = max(0, min(self.time_step, len(self.net_electricity_consumption) - 1))
return {
"Net Electricity Consumption-kWh": self.net_electricity_consumption[idx],
"Self Consumption-kWh": self.total_self_consumption[idx],
"Stored energy by community- kWh": self.energy_to_electrical_storage[idx],
"Total Solar Generation-kWh": self.solar_generation[idx],
"CO2-kg_co2": self.net_electricity_consumption_emission[idx],
"Price-$": self.net_electricity_consumption_cost[idx],
}
[docs]
class Error(Exception):
"""Base class for other exceptions."""
[docs]
class UnknownSchemaError(Error):
"""Raised when a schema is not a data set name, dict nor filepath."""
__MESSAGE = 'Unknown schema parsed into constructor. Schema must be name of CityLearn data set,'\
' a filepath to JSON representation or `dict` object of a CityLearn schema.'\
' Call citylearn.data.DataSet.get_names() for list of available CityLearn data sets.'
def __init__(self,message=None):
super().__init__(self.__MESSAGE if message is None else message)