Source code for citylearn.citylearn

from collections import defaultdict
from copy import deepcopy
from enum import Enum
import hashlib
import importlib
import logging
import os
from pathlib import Path
from typing import Any, List, Mapping, Tuple, Union
from gymnasium import Env, spaces
import csv
import datetime
import numpy as np
import pandas as pd
import random
from citylearn.base import Environment, EpisodeTracker
from citylearn.building import Building, DynamicsBuilding
from citylearn.cost_function import CostFunction
from citylearn.data import CarbonIntensity, DataSet, ChargerSimulation, EnergySimulation, LogisticRegressionOccupantParameters, Pricing, WashingMachineSimulation, Weather
from citylearn.electric_vehicle import ElectricVehicle
from citylearn.energy_model import Battery, PV, WashingMachine
from citylearn.reward_function import MultiBuildingRewardFunction, RewardFunction
from citylearn.utilities import FileHandler

LOGGER = logging.getLogger()
logging.getLogger('matplotlib.font_manager').disabled = True
logging.getLogger('matplotlib.pyplot').disabled = True

[docs] class EvaluationCondition(Enum): """Evaluation conditions. Used in `citylearn.CityLearnEnv.calculate` method. """ # general (soft private) _DEFAULT = '' _STORAGE_SUFFIX = '_without_storage' _PARTIAL_LOAD_SUFFIX = '_and_partial_load' _PV_SUFFIX = '_and_pv' # Building type WITH_STORAGE_AND_PV = _DEFAULT WITHOUT_STORAGE_BUT_WITH_PV = _STORAGE_SUFFIX WITHOUT_STORAGE_AND_PV = WITHOUT_STORAGE_BUT_WITH_PV +_PV_SUFFIX # DynamicsBuilding type WITH_STORAGE_AND_PARTIAL_LOAD_AND_PV = WITH_STORAGE_AND_PV WITHOUT_STORAGE_BUT_WITH_PARTIAL_LOAD_AND_PV = WITHOUT_STORAGE_BUT_WITH_PV WITHOUT_STORAGE_AND_PARTIAL_LOAD_BUT_WITH_PV = WITHOUT_STORAGE_BUT_WITH_PARTIAL_LOAD_AND_PV + _PARTIAL_LOAD_SUFFIX WITHOUT_STORAGE_AND_PARTIAL_LOAD_AND_PV = WITHOUT_STORAGE_AND_PARTIAL_LOAD_BUT_WITH_PV + _PV_SUFFIX
[docs] class CityLearnEnv(Environment, Env): r"""CityLearn nvironment class. Parameters ---------- schema: Union[str, Path, Mapping[str, Any]] Name of CityLearn data set, filepath to JSON representation or :code:`dict` object of a CityLearn schema. Call :py:meth:`citylearn.data.DataSet.get_names` for list of available CityLearn data sets. root_directory: Union[str, Path] Absolute path to directory that contains the data files including the schema. buildings: Union[List[Building], List[str], List[int]], optional Buildings to include in environment. If list of :code:`citylearn.building.Building` is provided, will override :code:`buildings` definition in schema. If list of :str: is provided will include only schema :code:`buildings` keys that are contained in provided list of :code:`str`. If list of :int: is provided will include only schema :code:`buildings` whose index is contained in provided list of :code:`int`. simulation_start_time_step: int, optional Time step to start reading data files contents. simulation_end_time_step: int, optional Time step to end reading from data files contents. episode_time_steps: Union[int, List[Tuple[int, int]]], optional If type is `int`, it is the number of time steps in an episode. If type is `List[Tuple[int, int]]]` is provided, it is a list of episode start and end time steps between `simulation_start_time_step` and `simulation_end_time_step`. Defaults to (`simulation_end_time_step` - `simulation_start_time_step`) + 1. Will ignore `rolling_episode_split` if `episode_splits` is of type `List[Tuple[int, int]]]`. rolling_episode_split: bool, default: False True if episode sequences are split such that each time step is a candidate for `episode_start_time_step` otherwise, False to split episodes in steps of `episode_time_steps`. random_episode_split: bool, default: False True if episode splits are to be selected at random during training otherwise, False to select sequentially. seconds_per_time_step: float Number of seconds in 1 `time_step` and must be set to >= 1. reward_function: Union[RewardFunction, str], optional Reward function class instance or path to function class e.g. 'citylearn.reward_function.IndependentSACReward'. If provided, will override :code:`reward_function` definition in schema. reward_function_kwargs: Mapping[str, Any], optional Parameters to be parsed to :py:attr:`reward_function` at intialization. central_agent: bool, optional Expect 1 central agent to control all buildings. shared_observations: List[str], optional Names of common observations across all buildings i.e. observations that have the same value irrespective of the building. active_observations: Union[List[str], List[List[str]]], optional List of observations to be made available in the buildings. Can be specified for all buildings in a :code:`List[str]` or for each building independently in a :code:`List[List[str]]`. Will override the observations defined in the :code:`schema`. inactive_observations: Union[List[str], List[List[str]]], optional List of observations to be made unavailable in the buildings. Can be specified for all buildings in a :code:`List[str]` or for each building independently in a :code:`List[List[str]]`. Will override the observations defined in the :code:`schema`. active_actions: Union[List[str], List[List[str]]], optional List of actions to be made available in the buildings. Can be specified for all buildings in a :code:`List[str]` or for each building independently in a :code:`List[List[str]]`. Will override the actions defined in the :code:`schema`. inactive_actions: Union[List[str], List[List[str]]], optional List of actions to be made unavailable in the buildings. Can be specified for all buildings in a :code:`List[str]` or for each building independently in a :code:`List[List[str]]`. Will override the actions defined in the :code:`schema`. simulate_power_outage: Union[bool, List[bool]] Whether to simulate power outages. Can be specified for all buildings as single :code:`bool` or for each building independently in a :code:`List[bool]`. Will override power outage defined in the :code:`schema`. solar_generation: Union[bool, List[bool]] Wehther to allow solar generation. Can be specified for all buildings as single :code:`bool` or for each building independently in a :code:`List[bool]`. Will override :code:`pv` defined in the :code:`schema`. random_seed: int, optional Pseudorandom number generator seed for repeatable results. Other Parameters ---------------- render_directory: Union[str, Path], optional Base directory where rendering and export artifacts are stored. Relative paths are resolved from the project root. render_directory_name: str, optional Folder name created inside the project root for rendering and export artifacts when ``render_directory`` is not provided. Defaults to ``render_logs``. render_session_name: str, optional Name of the subfolder created under ``render_directory``/``render_directory_name`` for export artifacts. When omitted, a timestamp is used. render_mode: str, optional Rendering strategy. Accepted values are ``'none'`` (default), ``'during'`` for streaming exports each step, and ``'end'`` for exports performed at episode completion while still allowing manual snapshots via :meth:`render`. **kwargs : dict Other keyword arguments used to initialize super classes. Notes ----- Parameters passed to `citylearn.citylearn.CityLearnEnv.__init__` that are also defined in `schema` will override their `schema` definition. """ DEFAULT_RENDER_START_DATE = datetime.date(2024, 1, 1) def __init__(self, schema: Union[str, Path, Mapping[str, Any]], root_directory: Union[str, Path] = None, buildings: Union[List[Building], List[str], List[int]] = None, electric_vehicles: Union[List[ElectricVehicle], List[str], List[int]] = None, simulation_start_time_step: int = None, simulation_end_time_step: int = None, episode_time_steps: Union[int, List[Tuple[int, int]]] = None, rolling_episode_split: bool = None, random_episode_split: bool = None, seconds_per_time_step: float = None, reward_function: Union[RewardFunction, str] = None, reward_function_kwargs: Mapping[str, Any] = None, central_agent: bool = None, shared_observations: List[str] = None, active_observations: Union[List[str], List[List[str]]] = None, inactive_observations: Union[List[str], List[List[str]]] = None, active_actions: Union[List[str], List[List[str]]] = None, inactive_actions: Union[List[str], List[List[str]]] = None, simulate_power_outage: bool = None, solar_generation: bool = None, random_seed: int = None, time_step_ratio: int = None, start_date: Union[str, datetime.date] = None, render_session_name: str = None, render_mode: str = 'none', **kwargs: Any ): render_directory = kwargs.pop('render_directory', None) render_directory_name = kwargs.pop('render_directory_name', 'render_logs') render_flag = kwargs.pop('render', None) kw_render_mode = kwargs.pop('render_mode', None) requested_render_mode = render_mode if kw_render_mode is None else kw_render_mode requested_render_mode = 'none' if requested_render_mode is None else str(requested_render_mode).lower() kw_render_session_name = kwargs.pop('render_session_name', None) if kw_render_session_name is not None: render_session_name = kw_render_session_name if render_session_name is None else render_session_name self.schema = schema schema_start_date = self.schema.get('start_date') if isinstance(self.schema, dict) else None schema_render_mode = self.schema.get('render_mode') if isinstance(self.schema, dict) else None if schema_render_mode is not None: requested_render_mode = str(schema_render_mode).lower() if requested_render_mode not in {'none', 'during', 'end'}: raise ValueError("render_mode must be one of {'none', 'during', 'end'}.") self.render_mode = requested_render_mode self._buffer_render = self.render_mode == 'end' self._defer_render_flush = False self._render_buffer = defaultdict(list) self._render_start_date = self._parse_render_start_date(start_date if start_date is not None else schema_start_date) self.previous_month = None self.current_day = self._render_start_date.day self.year = self._render_start_date.year self._final_kpis_exported = False self.__rewards = None self.buildings = [] self.random_seed = self.schema.get('random_seed', None) if random_seed is None else random_seed schema_render_session = self.schema.get('render_session_name') if isinstance(self.schema, dict) else None self.render_session_name = render_session_name if render_session_name is not None else schema_render_session if self.render_session_name is not None: self.render_session_name = str(self.render_session_name).strip() if self.render_session_name == '': self.render_session_name = None elif Path(self.render_session_name).is_absolute(): raise ValueError('render_session_name must be a relative path. Use render_directory to choose an absolute location.') elif '..' in Path(self.render_session_name).parts: raise ValueError('render_session_name cannot contain parent directory references (“..”).') root_directory, buildings, electric_vehicles, episode_time_steps, rolling_episode_split, random_episode_split, \ seconds_per_time_step, reward_function, central_agent, shared_observations, episode_tracker = self._load( deepcopy(self.schema), root_directory=root_directory, buildings=buildings, electric_vehicles=electric_vehicles, simulation_start_time_step=simulation_start_time_step, simulation_end_time_step=simulation_end_time_step, episode_time_steps=episode_time_steps, rolling_episode_split=rolling_episode_split, random_episode=random_episode_split, seconds_per_time_step=seconds_per_time_step, time_step_ratio=time_step_ratio, reward_function=reward_function, reward_function_kwargs=reward_function_kwargs, central_agent=central_agent, shared_observations=shared_observations, active_observations=active_observations, inactive_observations=inactive_observations, active_actions=active_actions, inactive_actions=inactive_actions, simulate_power_outage=simulate_power_outage, solar_generation=solar_generation, random_seed=self.random_seed, ) self.root_directory = root_directory self.buildings = buildings self.electric_vehicles = electric_vehicles get_time_step_ratio = buildings[0].time_step_ratio if len(buildings) > 0 else 1.0 self.time_step_ratio = get_time_step_ratio # now call super class initialization and set episode tracker now that buildings are set super().__init__(seconds_per_time_step=seconds_per_time_step, random_seed=self.random_seed, episode_tracker=episode_tracker, time_step_ratio=self.time_step_ratio) # set other class variables self.episode_time_steps = episode_time_steps self.rolling_episode_split = rolling_episode_split self.random_episode_split = random_episode_split self.central_agent = central_agent self.shared_observations = shared_observations # set reward function self.reward_function = reward_function # rendering switch: schema['render'] overrides explicit flag, otherwise rely on render_mode defaults schema_render = self.schema.get('render', None) if isinstance(self.schema, dict) else None if schema_render is not None: render_enabled_flag = bool(schema_render) elif render_flag is not None: render_enabled_flag = bool(render_flag) else: render_enabled_flag = self.render_mode in {'during', 'end'} self.render_enabled = render_enabled_flag # reset environment and initializes episode time steps self.reset() # reset episode tracker to start after initializing episode time steps during reset self.episode_tracker.reset_episode_index() # set reward metadata self.reward_function.env_metadata = self.get_metadata() # reward history tracker self.__episode_rewards = [] # reward history tracker if self.root_directory is None: self.root_directory = os.path.dirname(os.path.abspath(__file__)) project_root = Path(__file__).resolve().parents[1] render_directory_name = render_directory_name or 'render_logs' if render_directory is not None: render_root = Path(render_directory).expanduser() if not render_root.is_absolute(): render_root = project_root / render_root else: render_root = project_root / render_directory_name self.render_output_root = render_root.expanduser().resolve() self._render_timestamp = None self._render_directory_path = None self._render_dir_initialized = False self.new_folder_path = None self._render_start_datetime = None if self.render_enabled: self._ensure_render_output_dir(ensure_exists=False) @property def render_start_date(self) -> datetime.date: """Date used as the origin for rendered timestamps.""" return self._render_start_date @property def schema(self) -> Mapping[str, Any]: """`dict` object of CityLearn schema.""" return self.__schema @property def render_enabled(self) -> bool: """Whether environment rendering/logging is enabled.""" return getattr(self, '_CityLearnEnv__render_enabled', False) @property def root_directory(self) -> Union[str, Path]: """Absolute path to directory that contains the data files including the schema.""" return self.__root_directory @property def buildings(self) -> List[Building]: """Buildings in CityLearn environment.""" return self.__buildings @property def electric_vehicles(self) -> List[ElectricVehicle]: """Electric Vehicles in CityLearn environment.""" return self.__electric_vehicles @property def time_steps(self) -> int: """Number of time steps in current episode split.""" return self.episode_tracker.episode_time_steps @property def episode_time_steps(self) -> Union[int, List[Tuple[int, int]]]: """If type is `int`, it is the number of time steps in an episode. If type is `List[Tuple[int, int]]]` is provided, it is a list of episode start and end time steps between `simulation_start_time_step` and `simulation_end_time_step`. Defaults to (`simulation_end_time_step` - `simulation_start_time_step`) + 1. Will ignore `rolling_episode_split` if `episode_splits` is of type `List[Tuple[int, int]]]`.""" return self.__episode_time_steps @property def rolling_episode_split(self) -> bool: """True if episode sequences are split such that each time step is a candidate for `episode_start_time_step` otherwise, False to split episodes in steps of `episode_time_steps`.""" return self.__rolling_episode_split @property def random_episode_split(self) -> bool: """True if episode splits are to be selected at random during training otherwise, False to select sequentially.""" return self.__random_episode_split @property def episode(self) -> int: """Current episode index.""" return self.episode_tracker.episode @property def reward_function(self) -> RewardFunction: """Reward function class instance.""" return self.__reward_function @property def rewards(self) -> List[List[float]]: """Reward time series""" return self.__rewards @property def episode_rewards(self) -> List[Mapping[str, Union[float, List[float]]]]: """Reward summary statistics for elapsed episodes.""" return self.__episode_rewards @property def central_agent(self) -> bool: """Expect 1 central agent to control all buildings.""" return self.__central_agent @property def shared_observations(self) -> List[str]: """Names of common observations across all buildings i.e. observations that have the same value irrespective of the building.""" return self.__shared_observations @property def terminated(self) -> bool: """Check if simulation has reached completion.""" return self.time_step == self.time_steps - 1 @property def truncated(self) -> bool: """Check if episode truncates due to a time limit or a reason that is not defined as part of the task MDP.""" return False @property def observation_space(self) -> List[spaces.Box]: """Controller(s) observation spaces. Returns ------- observation_space : List[spaces.Box] List of agent(s) observation spaces. Notes ----- If `central_agent` is True, a list of 1 `spaces.Box` object is returned that contains all buildings' limits with the limits in the same order as `buildings`. The `shared_observations` limits are only included in the first building's limits. If `central_agent` is False, a list of `space.Box` objects as many as `buildings` is returned in the same order as `buildings`. """ if self.central_agent: low_limit = [] high_limit = [] shared_observations = [] for i, b in enumerate(self.buildings): for l, h, s in zip(b.observation_space.low, b.observation_space.high, b.active_observations): if i == 0 or s not in self.shared_observations or s not in shared_observations: low_limit.append(l) high_limit.append(h) else: pass if s in self.shared_observations and s not in shared_observations: shared_observations.append(s) else: pass observation_space = [spaces.Box(low=np.array(low_limit), high=np.array(high_limit), dtype=np.float32)] else: observation_space = [b.observation_space for b in self.buildings] return observation_space @property def action_space(self) -> List[spaces.Box]: """Controller(s) action spaces. Returns ------- action_space : List[spaces.Box] List of agent(s) action spaces. Notes ----- If `central_agent` is True, a list of 1 `spaces.Box` object is returned that contains all buildings' limits with the limits in the same order as `buildings`. If `central_agent` is False, a list of `space.Box` objects as many as `buildings` is returned in the same order as `buildings`. """ if self.central_agent: low_limit = [v for b in self.buildings for v in b.action_space.low] high_limit = [v for b in self.buildings for v in b.action_space.high] action_space = [spaces.Box(low=np.array(low_limit), high=np.array(high_limit), dtype=np.float32)] else: action_space = [b.action_space for b in self.buildings] return action_space @property def observations(self) -> List[List[float]]: """Observations at current time step. Notes ----- If `central_agent` is True, a list of 1 sublist containing all building observation values is returned in the same order as `buildings`. The `shared_observations` values are only included in the first building's observation values. If `central_agent` is False, a list of sublists is returned where each sublist is a list of 1 building's observation values and the sublist in the same order as `buildings`. """ if self.central_agent: observations = [] shared_observations = [] for i, b in enumerate(self.buildings): for k, v in b.observations(normalize=False, periodic_normalization=False, check_limits=True).items(): if i == 0 or k not in self.shared_observations or k not in shared_observations: observations.append(v) else: pass if k in self.shared_observations and k not in shared_observations: shared_observations.append(k) else: pass observations = [observations] else: observations = [list(b.observations(normalize=False, periodic_normalization=False, check_limits=True).values()) for b in self.buildings] return observations @property def observation_names(self) -> List[List[str]]: """Names of returned observations. Notes ----- If `central_agent` is True, a list of 1 sublist containing all building observation names is returned in the same order as `buildings`. The `shared_observations` names are only included in the first building's observation names. If `central_agent` is False, a list of sublists is returned where each sublist is a list of 1 building's observation names and the sublist in the same order as `buildings`. """ if self.central_agent: observation_names = [] for i, b in enumerate(self.buildings): for k, _ in b.observations(normalize=False, periodic_normalization=False).items(): if i == 0 or k not in self.shared_observations or k not in observation_names: observation_names.append(k) else: pass observation_names = [observation_names] else: observation_names = [list(b.observations().keys()) for b in self.buildings] return observation_names @property def action_names(self) -> List[List[str]]: """Names of received actions. Notes ----- If `central_agent` is True, a list of 1 sublist containing all building action names is returned in the same order as `buildings`. If `central_agent` is False, a list of sublists is returned where each sublist is a list of 1 building's action names and the sublist in the same order as `buildings`. """ if self.central_agent: action_names = [] for b in self.buildings: action_names += b.active_actions action_names = [action_names] else: action_names = [b.active_actions for b in self.buildings] return action_names @property def net_electricity_consumption_emission_without_storage_and_partial_load_and_pv(self) -> np.ndarray: """Summed `Building.net_electricity_consumption_emission_without_storage_and_partial_load_and_pv` time series, in [kg_co2].""" return pd.DataFrame([ b.net_electricity_consumption_emission_without_storage_and_partial_load_and_pv if isinstance(b, DynamicsBuilding) else b.net_electricity_consumption_emission_without_storage_and_pv for b in self.buildings ]).sum(axis = 0, min_count = 1).to_numpy() @property def net_electricity_consumption_cost_without_storage_and_partial_load_and_pv(self) -> np.ndarray: """Summed `Building.net_electricity_consumption_cost_without_storage_and_partial_load_and_pv` time series, in [$].""" return pd.DataFrame([ b.net_electricity_consumption_cost_without_storage_and_partial_load_and_pv if isinstance(b, DynamicsBuilding) else b.net_electricity_consumption_cost_without_storage_and_pv for b in self.buildings ]).sum(axis = 0, min_count = 1).to_numpy() @property def net_electricity_consumption_without_storage_and_partial_load_and_pv(self) -> np.ndarray: """Summed `Building.net_electricity_consumption_without_storage_and_partial_load_and_pv` time series, in [kWh].""" return pd.DataFrame([ b.net_electricity_consumption_without_storage_and_partial_load_and_pv if isinstance(b, DynamicsBuilding) else b.net_electricity_consumption_without_storage_and_pv for b in self.buildings ]).sum(axis = 0, min_count = 1).to_numpy() @property def net_electricity_consumption_emission_without_storage_and_partial_load(self) -> np.ndarray: """Summed `Building.net_electricity_consumption_emission_without_storage_and_partial_load` time series, in [kg_co2].""" return pd.DataFrame([ b.net_electricity_consumption_emission_without_storage_and_partial_load if isinstance(b, DynamicsBuilding) else b.net_electricity_consumption_emission_without_storage for b in self.buildings ]).sum(axis = 0, min_count = 1).to_numpy() @property def net_electricity_consumption_cost_without_storage_and_partial_load(self) -> np.ndarray: """Summed `Building.net_electricity_consumption_cost_without_storage_and_partial_load` time series, in [$].""" return pd.DataFrame([ b.net_electricity_consumption_cost_without_storage_and_partial_load if isinstance(b, DynamicsBuilding) else b.net_electricity_consumption_cost_without_storage for b in self.buildings ]).sum(axis = 0, min_count = 1).to_numpy() @property def net_electricity_consumption_without_storage_and_partial_load(self) -> np.ndarray: """Summed `Building.net_electricity_consumption_without_storage_and_partial_load` time series, in [kWh].""" return pd.DataFrame([ b.net_electricity_consumption_without_storage_and_partial_load if isinstance(b, DynamicsBuilding) else b.net_electricity_consumption_without_storage for b in self.buildings ]).sum(axis = 0, min_count = 1).to_numpy() @property def net_electricity_consumption_emission_without_storage_and_pv(self) -> np.ndarray: """Summed `Building.net_electricity_consumption_emission_without_storage_and_pv` time series, in [kg_co2].""" return pd.DataFrame([ b.net_electricity_consumption_emission_without_storage_and_pv for b in self.buildings ]).sum(axis = 0, min_count = 1).to_numpy() @property def net_electricity_consumption_cost_without_storage_and_pv(self) -> np.ndarray: """Summed `Building.net_electricity_consumption_cost_without_storage_and_pv` time series, in [$].""" return pd.DataFrame([ b.net_electricity_consumption_cost_without_storage_and_pv for b in self.buildings ]).sum(axis = 0, min_count = 1).to_numpy() @property def net_electricity_consumption_without_storage_and_pv(self) -> np.ndarray: """Summed `Building.net_electricity_consumption_without_storage_and_pv` time series, in [kWh].""" return pd.DataFrame([ b.net_electricity_consumption_without_storage_and_pv for b in self.buildings ]).sum(axis = 0, min_count = 1).to_numpy() @property def net_electricity_consumption_emission_without_storage(self) -> np.ndarray: """Summed `Building.net_electricity_consumption_emission_without_storage` time series, in [kg_co2].""" return pd.DataFrame([ b.net_electricity_consumption_emission_without_storage for b in self.buildings ]).sum(axis = 0, min_count = 1).to_numpy() @property def net_electricity_consumption_cost_without_storage(self) -> np.ndarray: """Summed `Building.net_electricity_consumption_cost_without_storage` time series, in [$].""" return pd.DataFrame([ b.net_electricity_consumption_cost_without_storage for b in self.buildings ]).sum(axis = 0, min_count = 1).to_numpy() @property def net_electricity_consumption_without_storage(self) -> np.ndarray: """Summed `Building.net_electricity_consumption_without_storage` time series, in [kWh].""" return pd.DataFrame([ b.net_electricity_consumption_without_storage for b in self.buildings ]).sum(axis = 0, min_count = 1).to_numpy() @property def net_electricity_consumption_emission_without_storage(self) -> np.ndarray: """Summed `Building.net_electricity_consumption_emission_without_storage` time series, in [kg_co2].""" return pd.DataFrame([ b.net_electricity_consumption_emission_without_storage for b in self.buildings ]).sum(axis = 0, min_count = 1).tolist() @property def net_electricity_consumption_cost_without_storage(self) -> np.ndarray: """Summed `Building.net_electricity_consumption_cost_without_storage` time series, in [$].""" return pd.DataFrame([ b.net_electricity_consumption_cost_without_storage for b in self.buildings ]).sum(axis = 0, min_count = 1).to_numpy() @property def net_electricity_consumption_without_storage(self) -> np.ndarray: """Summed `Building.net_electricity_consumption_without_storage` time series, in [kWh].""" return pd.DataFrame([ b.net_electricity_consumption_without_storage for b in self.buildings ]).sum(axis = 0, min_count = 1).to_numpy() @property def net_electricity_consumption_emission(self) -> List[float]: """Summed `Building.net_electricity_consumption_emission` time series, in [kg_co2].""" return self.__net_electricity_consumption_emission @property def net_electricity_consumption_cost(self) -> List[float]: """Summed `Building.net_electricity_consumption_cost` time series, in [$].""" return self.__net_electricity_consumption_cost @property def net_electricity_consumption(self) -> List[float]: """Summed `Building.net_electricity_consumption` time series, in [kWh].""" return self.__net_electricity_consumption @property def cooling_electricity_consumption(self) -> np.ndarray: """Summed `Building.cooling_electricity_consumption` time series, in [kWh].""" return pd.DataFrame([b.cooling_electricity_consumption for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy() @property def heating_electricity_consumption(self) -> np.ndarray: """Summed `Building.heating_electricity_consumption` time series, in [kWh].""" return pd.DataFrame([b.heating_electricity_consumption for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy() @property def dhw_electricity_consumption(self) -> np.ndarray: """Summed `Building.dhw_electricity_consumption` time series, in [kWh].""" return pd.DataFrame([b.dhw_electricity_consumption for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy() @property def cooling_storage_electricity_consumption(self) -> np.ndarray: """Summed `Building.cooling_storage_electricity_consumption` time series, in [kWh].""" return pd.DataFrame([b.cooling_storage_electricity_consumption for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy() @property def heating_storage_electricity_consumption(self) -> np.ndarray: """Summed `Building.heating_storage_electricity_consumption` time series, in [kWh].""" return pd.DataFrame([b.heating_storage_electricity_consumption for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy() @property def dhw_storage_electricity_consumption(self) -> np.ndarray: """Summed `Building.dhw_storage_electricity_consumption` time series, in [kWh].""" return pd.DataFrame([b.dhw_storage_electricity_consumption for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy() @property def electrical_storage_electricity_consumption(self) -> np.ndarray: """Summed `Building.electrical_storage_electricity_consumption` time series, in [kWh].""" return pd.DataFrame([b.electrical_storage_electricity_consumption for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy() @property def energy_from_cooling_device_to_cooling_storage(self) -> np.ndarray: """Summed `Building.energy_from_cooling_device_to_cooling_storage` time series, in [kWh].""" return pd.DataFrame([b.energy_from_cooling_device_to_cooling_storage for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy() @property def energy_from_heating_device_to_heating_storage(self) -> np.ndarray: """Summed `Building.energy_from_heating_device_to_heating_storage` time series, in [kWh].""" return pd.DataFrame([b.energy_from_heating_device_to_heating_storage for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy() @property def energy_from_dhw_device_to_dhw_storage(self) -> np.ndarray: """Summed `Building.energy_from_dhw_device_to_dhw_storage` time series, in [kWh].""" return pd.DataFrame([b.energy_from_dhw_device_to_dhw_storage for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy() @property def energy_to_electrical_storage(self) -> np.ndarray: """Summed `Building.energy_to_electrical_storage` time series, in [kWh].""" return pd.DataFrame([b.energy_to_electrical_storage for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy() @property def energy_from_cooling_device(self) -> np.ndarray: """Summed `Building.energy_from_cooling_device` time series, in [kWh].""" return pd.DataFrame([b.energy_from_cooling_device for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy() @property def energy_from_heating_device(self) -> np.ndarray: """Summed `Building.energy_from_heating_device` time series, in [kWh].""" return pd.DataFrame([b.energy_from_heating_device for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy() @property def energy_from_dhw_device(self) -> np.ndarray: """Summed `Building.energy_from_dhw_device` time series, in [kWh].""" return pd.DataFrame([b.energy_from_dhw_device for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy() @property def energy_to_non_shiftable_load(self) -> np.ndarray: """Summed `Building.energy_to_non_shiftable_load` time series, in [kWh].""" return pd.DataFrame([b.energy_to_non_shiftable_load for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy() @property def energy_from_cooling_storage(self) -> np.ndarray: """Summed `Building.energy_from_cooling_storage` time series, in [kWh].""" return pd.DataFrame([b.energy_from_cooling_storage for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy() @property def total_self_consumption(self) -> np.ndarray: """Total self-consumption from electrical and thermal storage, in [kWh].""" return ( self.energy_from_electrical_storage + self.energy_from_cooling_storage + self.energy_from_heating_storage + self.energy_from_dhw_storage ) @property def energy_from_heating_storage(self) -> np.ndarray: """Summed `Building.energy_from_heating_storage` time series, in [kWh].""" return pd.DataFrame([b.energy_from_heating_storage for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy() @property def energy_from_dhw_storage(self) -> np.ndarray: """Summed `Building.energy_from_dhw_storage` time series, in [kWh].""" return pd.DataFrame([b.energy_from_dhw_storage for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy() @property def energy_from_electrical_storage(self) -> np.ndarray: """Summed `Building.energy_from_electrical_storage` time series, in [kWh].""" return pd.DataFrame([b.energy_from_electrical_storage for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy() @property def cooling_demand(self) -> np.ndarray: """Summed `Building.cooling_demand`, in [kWh].""" return pd.DataFrame([b.cooling_demand for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy() @property def heating_demand(self) -> np.ndarray: """Summed `Building.heating_demand`, in [kWh].""" return pd.DataFrame([b.heating_demand for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy() @property def dhw_demand(self) -> np.ndarray: """Summed `Building.dhw_demand`, in [kWh].""" return pd.DataFrame([b.dhw_demand for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy() @property def non_shiftable_load(self) -> np.ndarray: """Summed `Building.non_shiftable_load`, in [kWh].""" return pd.DataFrame([b.non_shiftable_load for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy() @property def solar_generation(self) -> np.ndarray: """Summed `Building.solar_generation, in [kWh]`.""" return pd.DataFrame([b.solar_generation for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy() @property def power_outage(self) -> np.ndarray: """Time series of number of buildings experiencing power outage.""" return pd.DataFrame([b.power_outage_signal for b in self.buildings]).sum(axis = 0, min_count = 1).to_numpy()[:self.time_step + 1] @schema.setter def schema(self, schema: Union[str, Path, Mapping[str, Any]]): dataset = DataSet() if isinstance(schema, (str, Path)) and os.path.isfile(schema): schema_filepath = Path(schema) if isinstance(schema, str) else schema schema = FileHandler.read_json(schema) schema['root_directory'] = os.path.split(schema_filepath.absolute())[0] if schema['root_directory'] is None \ else schema['root_directory'] elif isinstance(schema, str) and schema in dataset.get_dataset_names(): schema = dataset.get_schema(schema) schema['root_directory'] = '' if schema['root_directory'] is None else schema['root_directory'] elif isinstance(schema, dict): schema = deepcopy(schema) schema['root_directory'] = '' if schema['root_directory'] is None else schema['root_directory'] else: raise UnknownSchemaError() self.__schema = schema @render_enabled.setter def render_enabled(self, enabled: bool): self.__render_enabled = bool(enabled) @root_directory.setter def root_directory(self, root_directory: Union[str, Path]): self.__root_directory = root_directory @buildings.setter def buildings(self, buildings: List[Building]): self.__buildings = buildings @electric_vehicles.setter def electric_vehicles(self, electric_vehicles: List[ElectricVehicle]): self.__electric_vehicles = electric_vehicles @Environment.episode_tracker.setter def episode_tracker(self, episode_tracker: EpisodeTracker): Environment.episode_tracker.fset(self, episode_tracker) for b in self.buildings: b.episode_tracker = self.episode_tracker @episode_time_steps.setter def episode_time_steps(self, episode_time_steps: Union[int, List[Tuple[int, int]]]): self.__episode_time_steps = self.episode_tracker.simulation_time_steps if episode_time_steps is None else episode_time_steps @rolling_episode_split.setter def rolling_episode_split(self, rolling_episode_split: bool): self.__rolling_episode_split = False if rolling_episode_split is None else rolling_episode_split @random_episode_split.setter def random_episode_split(self, random_episode_split: bool): self.__random_episode_split = False if random_episode_split is None else random_episode_split @reward_function.setter def reward_function(self, reward_function: RewardFunction): self.__reward_function = reward_function @central_agent.setter def central_agent(self, central_agent: bool): self.__central_agent = central_agent @shared_observations.setter def shared_observations(self, shared_observations: List[str]): self.__shared_observations = self.get_default_shared_observations() if shared_observations is None else shared_observations @Environment.random_seed.setter def random_seed(self, seed: int): Environment.random_seed.fset(self, seed) for b in self.buildings: b.random_seed = self.random_seed @Environment.time_step_ratio.setter def time_step_ratio(self, time_step_ratio: int): Environment.time_step_ratio.fset(self, time_step_ratio) for b in self.buildings: b.time_step_ratio = self.time_step_ratio
[docs] def get_metadata(self) -> Mapping[str, Any]: return { **super().get_metadata(), 'reward_function': self.reward_function.__class__.__name__, 'central_agent': self.central_agent, 'shared_observations': self.shared_observations, 'buildings': [b.get_metadata() for b in self.buildings], }
[docs] @staticmethod def get_default_shared_observations() -> List[str]: """Names of default common observations across all buildings i.e. observations that have the same value irrespective of the building. Notes ----- May be used to assigned :attr:`shared_observations` value during `CityLearnEnv` object initialization. """ return [ 'month', 'day_type', 'hour', 'minutes', 'daylight_savings_status', 'outdoor_dry_bulb_temperature', 'outdoor_dry_bulb_temperature_predicted_1', 'outdoor_dry_bulb_temperature_predicted_2', 'outdoor_dry_bulb_temperature_predicted_3', 'outdoor_relative_humidity', 'outdoor_relative_humidity_predicted_1', 'outdoor_relative_humidity_predicted_2', 'outdoor_relative_humidity_predicted_3', 'diffuse_solar_irradiance', 'diffuse_solar_irradiance_predicted_1', 'diffuse_solar_irradiance_predicted_2', 'diffuse_solar_irradiance_predicted_3', 'direct_solar_irradiance', 'direct_solar_irradiance_predicted_1', 'direct_solar_irradiance_predicted_2', 'direct_solar_irradiance_predicted_3', 'carbon_intensity', 'electricity_pricing', 'electricity_pricing_predicted_1', 'electricity_pricing_predicted_2', 'electricity_pricing_predicted_3', ]
[docs] def step(self, actions: List[List[float]]) -> Tuple[List[List[float]], List[float], bool, bool, dict]: """Apply actions at current timestep, update variables/reward, then advance time. Parameters ---------- actions: List[List[float]] Fractions of `buildings` storage devices' capacities to charge/discharge by. If `central_agent` is True, `actions` parameter should be a list of 1 list containing all buildings' actions and follows the ordering of buildings in `buildings`. If `central_agent` is False, `actions` parameter should be a list of sublists where each sublists contains the actions for each building in `buildings` and follows the ordering of buildings in `buildings`. Returns ------- observations: List[List[float]] :attr:`observations` current value. reward: List[float] :meth:`get_reward` current value. terminated: bool A boolean value for if the episode has ended, in which case further :meth:`step` calls will return undefined results. A done signal may be emitted for different reasons: Maybe the task underlying the environment was solved successfully, a certain timelimit was exceeded, or the physics simulation has entered an invalid observation. truncated: bool A boolean value for if episode truncates due to a time limit or a reason that is not defined as part of the task MDP. Will always return False in this base class. info: dict A dictionary that may contain additional information regarding the reason for a `terminated` signal. `info` contains auxiliary diagnostic information (helpful for debugging, learning, and logging). Override :meth"`get_info` to get custom key-value pairs in `info`. """ actions = self._parse_actions(actions) # Apply actions at current timestep t for building, building_actions in zip(self.buildings, actions): building.apply_actions(**building_actions) # Update environment/building variables for timestep t (reflect effects of actions) self.update_variables() # NOTE: # This call to retrieve each building's observation dictionary is an expensive call especially since the observations # are retrieved again to send to agent but the observations in dict form is needed for the reward function to easily # extract building-level values. Can't think of a better way to handle this without giving the reward direct access to # env, which is not the best design for competition integrity sake. Will revisit the building.observations() function # to see how it can be optimized. reward_observations = [b.observations(include_all=True, normalize=False, periodic_normalization=False) for b in self.buildings] reward = self.reward_function.calculate(observations=reward_observations) self.__rewards.append(reward) # Advance to next timestep t+1 self.next_time_step() # store episode reward summary at the end of episode (upon reaching final timestep) if self.terminated: if self.render_mode == 'during' and self.render_enabled: # Final step was already streamed during the most recent `next_time_step` call. pass rewards = np.array(self.__rewards[1:], dtype='float32') self.__episode_rewards.append({ 'min': rewards.min(axis=0).tolist(), 'max': rewards.max(axis=0).tolist(), 'sum': rewards.sum(axis=0).tolist(), 'mean': rewards.mean(axis=0).tolist() }) if self.render_mode == 'end' and self.render_enabled: if self.time_step > 0: final_index = min(self.time_steps - 1, self.time_step - 1) else: final_index = 0 has_buffered_rows = any(self._render_buffer.values()) if not has_buffered_rows: state_snapshot = self._override_render_time_step(final_index) self._defer_render_flush = True try: self.render() finally: self._restore_render_time_step(state_snapshot) self._defer_render_flush = False self._flush_render_buffer() if self.render_enabled and not self._final_kpis_exported: self.export_final_kpis() return self.observations, reward, self.terminated, self.truncated, self.get_info()
[docs] def get_info(self) -> Mapping[Any, Any]: """Other information to return from the `citylearn.CityLearnEnv.step` function.""" return {}
def _parse_actions(self, actions: List[List[float]]) -> List[Mapping[str, float]]: """Return mapping of action name to action value for each building.""" actions = list(actions) building_actions = [] if self.central_agent: actions = actions[0] number_of_actions = len(actions) expected_number_of_actions = self.action_space[0].shape[0] assert number_of_actions == expected_number_of_actions, \ f'Expected {expected_number_of_actions} actions but {number_of_actions} were parsed to env.step.' for building in self.buildings: size = building.action_space.shape[0] building_actions.append(actions[0:size]) actions = actions[size:] else: building_actions = [list(a) for a in actions] # check that appropriate number of building actions have been provided for b, a in zip(self.buildings, building_actions): number_of_actions = len(a) expected_number_of_actions = b.action_space.shape[0] assert number_of_actions == expected_number_of_actions,\ f'Expected {expected_number_of_actions} for {b.name} but {number_of_actions} actions were provided.' active_actions = [[k for k, v in b.action_metadata.items() if v] for b in self.buildings] # Create a list of dictionaries for actions including EV-specific actions parsed_actions = [] for i, building in enumerate(self.buildings): action_dict = {} electric_vehicle_actions = {} washing_machine_actions = {} # Populate the action_dict with regular actions for k, action in zip(active_actions[i], building_actions[i]): if 'electric_vehicle_storage' in k: # Collect EV actions separately charger_id = k.replace("electric_vehicle_storage_", "") electric_vehicle_actions[charger_id] = action elif 'washing_machine' in k: # Collect Washing Machine actions separately washing_machine_actions[k] = action else: action_dict[f'{k}_action'] = action # Add EV actions to the action_dict if they exist if electric_vehicle_actions: action_dict['electric_vehicle_storage_actions'] = electric_vehicle_actions # aqui podes criar dicionario if washing_machine_actions: action_dict['washing_machine_actions'] = washing_machine_actions # Fill missing actions with default NaN for k in building.action_metadata: if ( f'{k}_action' not in action_dict and 'electric_vehicle_storage' not in k and 'washing_machine' not in k ): action_dict[f'{k}_action'] = np.nan parsed_actions.append(action_dict) return parsed_actions
[docs] def evaluate(self, control_condition: EvaluationCondition = None, baseline_condition: EvaluationCondition = None, comfort_band: float = None) -> pd.DataFrame: r"""Evaluate cost functions at current time step. Calculates and returns building-level and district-level cost functions normalized w.r.t. the no control scenario. Parameters ---------- control_condition: EvaluationCondition, default: :code:`EvaluationCondition.WITH_STORAGE_AND_PARTIAL_LOAD_AND_PV` Condition for net electricity consumption, cost and emission to use in calculating cost functions for the control/flexible scenario. baseline_condition: EvaluationCondition, default: :code:`EvaluationCondition.WITHOUT_STORAGE_AND_PARTIAL_LOAD_BUT_WITH_PV` Condition for net electricity consumption, cost and emission to use in calculating cost functions for the baseline scenario that is used to normalize the control_condition scenario. comfort_band: float, optional Comfort band above dry_bulb_temperature_cooling_set_point and below dry_bulb_temperature_heating_set_point beyond which occupant is assumed to be uncomfortable. Defaults to :py:attr:`citylearn.data.EnergySimulation.DEFUALT_COMFORT_BAND`. Returns ------- cost_functions: pd.DataFrame Cost function summary including the following: electricity consumption, zero net energy, carbon emissions, cost, discomfort (total, too cold, too hot, minimum delta, maximum delta, average delta), ramping, 1 - load factor, average daily peak and average annual peak. Notes ----- The equation for the returned cost function values is :math:`\frac{C_{\textrm{control}}}{C_{\textrm{no control}}}` where :math:`C_{\textrm{control}}` is the value when the agent(s) control the environment and :math:`C_{\textrm{no control}}` is the value when none of the storages and partial load cooling and heating devices in the environment are actively controlled. """ # lambda functions to get building or district level properties w.r.t. evaluation condition get_net_electricity_consumption = lambda x, c: getattr(x, f'net_electricity_consumption{c.value}') get_net_electricity_consumption_cost = lambda x, c: getattr(x, f'net_electricity_consumption_cost{c.value}') get_net_electricity_consumption_emission = lambda x, c: getattr(x, f'net_electricity_consumption_emission{c.value}') # Safe division helper for KPI ratios def _safe_div(control_value: float, baseline_value: float): try: c = control_value b = baseline_value # Treat None/NaN/inf as 0.0 for robust normalization on short horizons def _coerce(x): try: v = float(x) return v if np.isfinite(v) else 0.0 except Exception: return 0.0 c = _coerce(c) b = _coerce(b) if b == 0.0: return 1.0 if c == 0.0 else None return c / b except Exception: return None comfort_band = EnergySimulation.DEFUALT_COMFORT_BAND if comfort_band is None else comfort_band building_level = [] for b in self.buildings: if isinstance(b, DynamicsBuilding): control_condition = EvaluationCondition.WITH_STORAGE_AND_PARTIAL_LOAD_AND_PV if control_condition is None else control_condition baseline_condition = EvaluationCondition.WITHOUT_STORAGE_AND_PARTIAL_LOAD_BUT_WITH_PV if baseline_condition is None else baseline_condition else: control_condition = EvaluationCondition.WITH_STORAGE_AND_PV if control_condition is None else control_condition baseline_condition = EvaluationCondition.WITHOUT_STORAGE_BUT_WITH_PV if baseline_condition is None else baseline_condition discomfort_kwargs = { 'indoor_dry_bulb_temperature': b.indoor_dry_bulb_temperature, 'dry_bulb_temperature_cooling_set_point': b.indoor_dry_bulb_temperature_cooling_set_point, 'dry_bulb_temperature_heating_set_point': b.indoor_dry_bulb_temperature_heating_set_point, 'band': b.comfort_band if comfort_band is None else comfort_band, 'occupant_count': b.occupant_count, } unmet, cold, hot,\ cold_minimum_delta, cold_maximum_delta, cold_average_delta,\ hot_minimum_delta, hot_maximum_delta, hot_average_delta =\ CostFunction.discomfort(**discomfort_kwargs) expected_energy = b.cooling_demand + b.heating_demand + b.dhw_demand + b.non_shiftable_load served_energy = b.energy_from_cooling_device + b.energy_from_cooling_storage\ + b.energy_from_heating_device + b.energy_from_heating_storage\ + b.energy_from_dhw_device + b.energy_from_dhw_storage\ + b.energy_to_non_shiftable_load ec_c = CostFunction.electricity_consumption(get_net_electricity_consumption(b, control_condition))[-1] ec_b = CostFunction.electricity_consumption(get_net_electricity_consumption(b, baseline_condition))[-1] zne_c = CostFunction.zero_net_energy(get_net_electricity_consumption(b, control_condition))[-1] zne_b = CostFunction.zero_net_energy(get_net_electricity_consumption(b, baseline_condition))[-1] ce_c = CostFunction.carbon_emissions(get_net_electricity_consumption_emission(b, control_condition))[-1] ce_b = CostFunction.carbon_emissions(get_net_electricity_consumption_emission(b, baseline_condition))[-1] if sum(b.carbon_intensity.carbon_intensity) != 0 else 0 cost_c = CostFunction.cost(get_net_electricity_consumption_cost(b, control_condition))[-1] cost_b = CostFunction.cost(get_net_electricity_consumption_cost(b, baseline_condition))[-1] if sum(b.pricing.electricity_pricing) != 0 else 0 building_level_ = pd.DataFrame([{ 'cost_function': 'electricity_consumption_total', 'value': _safe_div(ec_c, ec_b), }, { 'cost_function': 'zero_net_energy', 'value': _safe_div(zne_c, zne_b), }, { 'cost_function': 'carbon_emissions_total', 'value': _safe_div(ce_c, ce_b), }, { 'cost_function': 'cost_total', 'value': _safe_div(cost_c, cost_b), }, { 'cost_function': 'discomfort_proportion', 'value': unmet[-1], }, { 'cost_function': 'discomfort_cold_proportion', 'value': cold[-1], }, { 'cost_function': 'discomfort_hot_proportion', 'value': hot[-1], }, { 'cost_function': 'discomfort_cold_delta_minimum', 'value': cold_minimum_delta[-1], }, { 'cost_function': 'discomfort_cold_delta_maximum', 'value': cold_maximum_delta[-1], }, { 'cost_function': 'discomfort_cold_delta_average', 'value': cold_average_delta[-1], }, { 'cost_function': 'discomfort_hot_delta_minimum', 'value': hot_minimum_delta[-1], }, { 'cost_function': 'discomfort_hot_delta_maximum', 'value': hot_maximum_delta[-1], }, { 'cost_function': 'discomfort_hot_delta_average', 'value': hot_average_delta[-1], }, { 'cost_function': 'one_minus_thermal_resilience_proportion', 'value': CostFunction.one_minus_thermal_resilience(power_outage=b.power_outage_signal, **discomfort_kwargs)[-1], }, { 'cost_function': 'power_outage_normalized_unserved_energy_total', 'value': CostFunction.normalized_unserved_energy(expected_energy, served_energy, power_outage=b.power_outage_signal)[-1] }, { 'cost_function': 'annual_normalized_unserved_energy_total', 'value': CostFunction.normalized_unserved_energy(expected_energy, served_energy)[-1] }]) building_level_['name'] = b.name building_level.append(building_level_) building_level = pd.concat(building_level, ignore_index=True) building_level['level'] = 'building' ## district level # set default evaluation conditions control_condition = EvaluationCondition.WITH_STORAGE_AND_PARTIAL_LOAD_AND_PV if control_condition is None else control_condition baseline_condition = EvaluationCondition.WITHOUT_STORAGE_AND_PARTIAL_LOAD_BUT_WITH_PV if baseline_condition is None else baseline_condition # District-level normalized KPIs with safe division to avoid 0/0 or div-by-zero ramp_c = CostFunction.ramping(get_net_electricity_consumption(self, control_condition))[-1] ramp_b = CostFunction.ramping(get_net_electricity_consumption(self, baseline_condition))[-1] dlf24_c = CostFunction.one_minus_load_factor(get_net_electricity_consumption(self, control_condition), window=24)[-1] dlf24_b = CostFunction.one_minus_load_factor(get_net_electricity_consumption(self, baseline_condition), window=24)[-1] dlf730_c = CostFunction.one_minus_load_factor(get_net_electricity_consumption(self, control_condition), window=730)[-1] dlf730_b = CostFunction.one_minus_load_factor(get_net_electricity_consumption(self, baseline_condition), window=730)[-1] peak24_c = CostFunction.peak(get_net_electricity_consumption(self, control_condition), window=24)[-1] peak24_b = CostFunction.peak(get_net_electricity_consumption(self, baseline_condition), window=24)[-1] peak_all_c = CostFunction.peak(get_net_electricity_consumption(self, control_condition), window=self.time_steps)[-1] peak_all_b = CostFunction.peak(get_net_electricity_consumption(self, baseline_condition), window=self.time_steps)[-1] district_level = pd.DataFrame([{ 'cost_function': 'ramping_average', 'value': _safe_div(ramp_c, ramp_b), }, { 'cost_function': 'daily_one_minus_load_factor_average', 'value': _safe_div(dlf24_c, dlf24_b), },{ 'cost_function': 'monthly_one_minus_load_factor_average', 'value': _safe_div(dlf730_c, dlf730_b), }, { 'cost_function': 'daily_peak_average', 'value': _safe_div(peak24_c, peak24_b), }, { 'cost_function': 'all_time_peak_average', 'value': _safe_div(peak_all_c, peak_all_b), }]) district_level = pd.concat([district_level, building_level], ignore_index=True, sort=False) district_level = district_level.groupby(['cost_function'])[['value']].mean().reset_index() district_level['name'] = 'District' district_level['level'] = 'district' cost_functions = pd.concat([district_level, building_level], ignore_index=True, sort=False) return cost_functions
[docs] def next_time_step(self): r"""Advance all buildings to next `time_step`.""" if getattr(self, 'render_enabled', False): if self.render_mode == 'during': self.render() elif self.render_mode == 'end': self._defer_render_flush = True try: self.render() finally: self._defer_render_flush = False for building in self.buildings: building.next_time_step() # Advance electric vehicles to the next time step. This function is used as EVs exist even without being connected to any building (e.g. when they are being used to commute) # As such, this function simulates the EV to the next time step. for electric_vehicle in self.electric_vehicles: electric_vehicle.next_time_step() super().next_time_step() # Apply battery SOC simulation for EVs that are NOT connected self.simulate_unconnected_ev_soc() #This function is here so that, when the new time step is reached, the first thing to do is plug in/out the EVs according to their individual dataset #It basicly associates an EV to a Building.Charger self.associate_chargers_to_electric_vehicles()
[docs] def associate_chargers_to_electric_vehicles(self): r"""Associate charger to its corresponding electric_vehicle based on charger simulation state.""" def _resolve_arrival_soc(simulation: ChargerSimulation, step: int, prev_state: float, prev_id: Union[str, None], ev_identifier: str) -> Union[float, None]: """Return expected SOC (as fraction) for an EV connecting at `step`, or ``None`` when unavailable.""" candidate_index = None if prev_state in (2, 3) and step > 0: if isinstance(prev_id, str) and prev_id.strip() not in {"", "nan"} and prev_id != ev_identifier: raise ValueError( f"Charger dataset EV mismatch: expected '{ev_identifier}' but found '{prev_id}' at time step {step - 1}." ) candidate_index = step - 1 elif 0 <= step < len(simulation.electric_vehicle_estimated_soc_arrival): candidate_index = step soc_value = None if candidate_index is not None and 0 <= candidate_index < len(simulation.electric_vehicle_estimated_soc_arrival): candidate = simulation.electric_vehicle_estimated_soc_arrival[candidate_index] if isinstance(candidate, (float, np.floating)) and not np.isnan(candidate) and candidate >= 0: soc_value = float(candidate) if soc_value is None and 0 <= step < len(simulation.electric_vehicle_required_soc_departure): fallback = simulation.electric_vehicle_required_soc_departure[step] if isinstance(fallback, (float, np.floating)) and not np.isnan(fallback) and fallback >= 0: soc_value = float(fallback) return soc_value for building in self.buildings: if building.electric_vehicle_chargers is None: continue for charger in building.electric_vehicle_chargers: sim = charger.charger_simulation state = sim.electric_vehicle_charger_state[self.time_step] if np.isnan(state) or state not in [1, 2]: continue # Skip if no EV is connected or incoming ev_id = sim.electric_vehicle_id[self.time_step] prev_state = np.nan prev_ev_id = None if self.time_step > 0: idx = self.time_step - 1 if idx < len(sim.electric_vehicle_charger_state): prev_state = sim.electric_vehicle_charger_state[idx] if idx < len(sim.electric_vehicle_id): prev_ev_id = sim.electric_vehicle_id[idx] if isinstance(ev_id, str) and ev_id.strip() not in ["", "nan"]: for ev in self.electric_vehicles: if ev.name == ev_id: if state == 1: charger.plug_car(ev) is_new_connection = ( prev_state != 1 or not isinstance(prev_ev_id, str) or prev_ev_id != ev_id ) if is_new_connection: soc_value = _resolve_arrival_soc(sim, self.time_step, prev_state, prev_ev_id, ev_id) if soc_value is not None: ev.battery.force_set_soc(soc_value) elif state == 2: charger.associate_incoming_car(ev)
[docs] def simulate_unconnected_ev_soc(self): """Simulate SOC changes for EVs that are not under charger control at t+1.""" t = self.time_step if t + 1 >= self.episode_tracker.episode_time_steps: return for ev in self.electric_vehicles: ev_id = ev.name found_in_charger = False for building in self.buildings: for charger in building.electric_vehicle_chargers or []: sim : ChargerSimulation = charger.charger_simulation curr_id = sim.electric_vehicle_id[t] if t < len(sim.electric_vehicle_id) else "" next_id = sim.electric_vehicle_id[t + 1] if t + 1 < len(sim.electric_vehicle_id) else "" curr_state = sim.electric_vehicle_charger_state[t] if t < len(sim.electric_vehicle_charger_state) else np.nan next_state = sim.electric_vehicle_charger_state[t + 1] if t + 1 < len(sim.electric_vehicle_charger_state) else np.nan currently_connected = isinstance(curr_id, str) and curr_id == ev_id and curr_state == 1 if currently_connected: found_in_charger = True break is_connecting = ( isinstance(next_id, str) and next_id == ev_id and next_state == 1 and curr_state != 1 ) is_incoming = isinstance(curr_id, str) and curr_id == ev_id and curr_state == 2 if is_connecting: found_in_charger = True # Priority 1: current soc_arrival if incoming at t if is_incoming: if t < len(sim.electric_vehicle_estimated_soc_arrival): soc = sim.electric_vehicle_estimated_soc_arrival[t] else: soc = np.nan else: if t + 1 < len(sim.electric_vehicle_estimated_soc_arrival): soc = sim.electric_vehicle_estimated_soc_arrival[t + 1] else: soc = np.nan if 0 <= soc <= 1: ev.battery.force_set_soc(soc) break if found_in_charger: break if not found_in_charger: # Not being connected or incoming in a valid charger — apply SOC drift if t > 0: last_soc = ev.battery.soc[t - 1] variability = np.clip(np.random.normal(1.0, 0.2), 0.6, 1.4) new_soc = np.clip(last_soc * variability, 0.0, 1.0) ev.battery.force_set_soc(new_soc)
[docs] def export_final_kpis(self, model: 'citylearn.agents.base.Agent' = None, filepath: str = "exported_kpis.csv"): """Export episode KPIs to csv. Parameters ---------- model: citylearn.agents.base.Agent, optional Agent whose environment should be evaluated. Defaults to the current environment. filepath: str, default: ``"exported_kpis.csv"`` Output filename placed inside :pyattr:`new_folder_path`. """ # Ensure output directory exists even if rendering was disabled self._ensure_render_output_dir() file_path = os.path.join(self.new_folder_path, filepath) if model is not None and getattr(model, 'env', None) is not None: kpis = model.env.evaluate() else: kpis = self.evaluate() kpis = kpis.pivot(index='cost_function', columns='name', values='value').round(3) kpis = kpis.dropna(how='all') kpis = kpis.fillna('') kpis = kpis.reset_index() kpis = kpis.rename(columns={'cost_function': 'KPI'}) kpis.to_csv(file_path, index=False, encoding='utf-8') self._final_kpis_exported = True
[docs] def render(self): """ Renders the current state of the CityLearn environment, logging data into separate CSV files. Organizes files by episode number when simulation spans multiple episodes. """ if not getattr(self, 'render_enabled', False): return if self.render_mode == 'end' and not getattr(self, '_defer_render_flush', False): self._flush_render_buffer() return # Ensure the output directory is prepared self._ensure_render_output_dir() iso_timestamp = self._get_iso_timestamp() os.makedirs(self.new_folder_path, exist_ok=True) episode_num = self.episode_tracker.episode # Save community data - add episode number to filename self._save_to_csv(f"exported_data_community_ep{episode_num}.csv", {"timestamp": iso_timestamp, **self.as_dict()}) # Save building data for idx, building in enumerate(self.buildings): building_filename = f"exported_data_{building.name.lower()}_ep{episode_num}.csv" self._save_to_csv(building_filename, {"timestamp": iso_timestamp, **building.as_dict()}) # Battery data battery = building.electrical_storage # save battery to render battery_filename = f"exported_data_{building.name.lower()}_battery_ep{episode_num}.csv" self._save_to_csv(battery_filename, {"timestamp": iso_timestamp, **battery.as_dict()}) # Chargers for charger_idx, charger in enumerate(building.electric_vehicle_chargers): charger_filename = f"exported_data_{building.name.lower()}_{charger.charger_id}_ep{episode_num}.csv" self._save_to_csv(charger_filename, {"timestamp": iso_timestamp, **charger.as_dict()}) # Pricing data pricing_filename = f"exported_data_pricing_ep{episode_num}.csv" self._save_to_csv(pricing_filename, {"timestamp": iso_timestamp, **self.buildings[0].pricing.as_dict(self.time_step)}) # EV data for idx, ev in enumerate(self.__electric_vehicles): ev_filename = f"exported_data_{ev.name.lower()}_ep{episode_num}.csv" self._save_to_csv(ev_filename, {"timestamp": iso_timestamp, **ev.as_dict()})
def _save_to_csv(self, filename, data): """ Saves data to a CSV file, appending it if the file exists. When `render_mode='end'`, rows may be buffered in memory until a flush is requested. """ if self._buffer_render and getattr(self, '_defer_render_flush', False): self._render_buffer[filename].append(dict(data)) return self._write_render_rows(filename, [dict(data)]) def _flush_render_buffer(self): """Write any buffered render rows to disk.""" if not getattr(self, '_render_buffer', None): return has_pending_rows = any(self._render_buffer.values()) if not has_pending_rows: self._render_buffer.clear() return try: target_dir = Path(self.new_folder_path) except Exception: target_dir = None if target_dir is not None: print(f"Writing buffered render exports to {target_dir} ...") original_defer = self._defer_render_flush original_buffer_state = self._buffer_render self._defer_render_flush = False self._buffer_render = False try: for filename, rows in list(self._render_buffer.items()): if rows: self._write_render_rows(filename, rows) finally: self._render_buffer.clear() self._buffer_render = original_buffer_state self._defer_render_flush = original_defer def _write_render_rows(self, filename: str, rows: List[Mapping[str, Any]]): """Write one or more render rows to disk with minimal rewrites.""" file_path = Path(self.new_folder_path) / filename file_path.parent.mkdir(parents=True, exist_ok=True) if not rows: return buffered_fieldnames = list( dict.fromkeys(field for row in rows for field in row.keys()) ) if not file_path.exists(): fieldnames = buffered_fieldnames with file_path.open('w', newline='') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for row in rows: writer.writerow({field: row.get(field, '') for field in fieldnames}) return # File exists – inspect current header. needs_header_extension = False with file_path.open('r', newline='') as existing: reader = csv.DictReader(existing) existing_fieldnames = reader.fieldnames or [] for field in buffered_fieldnames: if field not in existing_fieldnames: needs_header_extension = True break if needs_header_extension: existing_rows = list(reader) else: existing_rows = None if not needs_header_extension: with file_path.open('a', newline='') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=existing_fieldnames) for row in rows: writer.writerow({field: row.get(field, '') for field in existing_fieldnames}) return # Need to rewrite with the expanded header. extended_fieldnames = list( dict.fromkeys(existing_fieldnames + [f for f in buffered_fieldnames if f not in existing_fieldnames]) ) existing_rows = existing_rows or [] for row in existing_rows: for field in extended_fieldnames: row.setdefault(field, '') with file_path.open('w', newline='') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=extended_fieldnames) writer.writeheader() writer.writerows(existing_rows) for row in rows: writer.writerow({field: row.get(field, '') for field in extended_fieldnames}) def _parse_render_start_date(self, start_date: Union[str, datetime.date]) -> datetime.date: """Return a valid start date for rendering timestamps.""" if start_date is None: return self.DEFAULT_RENDER_START_DATE if isinstance(start_date, datetime.datetime): return start_date.date() if isinstance(start_date, datetime.date): return start_date if isinstance(start_date, str): try: return datetime.date.fromisoformat(start_date) except ValueError as exc: raise ValueError( "CityLearnEnv start_date must be in ISO format 'YYYY-MM-DD'." ) from exc raise TypeError( "CityLearnEnv start_date must be a date, datetime, or ISO format string." ) def _ensure_render_output_dir(self, *, ensure_exists: bool = True): """Prepare the render output directory and optionally create it on disk. Parameters ---------- ensure_exists: bool, default: True When ``True`` the directory tree is created (and legacy exports removed when reusing :pyattr:`render_session_name`). When ``False`` only internal state is updated so that paths can be materialized later on demand. """ base_render_path = Path(getattr(self, 'render_output_root', Path(__file__).resolve().parents[1] / 'render_logs')).expanduser() if ensure_exists: try: base_render_path.mkdir(parents=True, exist_ok=True) except PermissionError: fallback = (Path.cwd() / 'render_logs').resolve() fallback.mkdir(parents=True, exist_ok=True) self.render_output_root = fallback base_render_path = fallback render_dir = getattr(self, '_render_directory_path', None) needs_new_dir = render_dir is None if not needs_new_dir and ensure_exists: render_dir = Path(render_dir) try: needs_new_dir = not render_dir.is_relative_to(base_render_path) except AttributeError: needs_new_dir = base_render_path not in render_dir.parents and render_dir != base_render_path if needs_new_dir: if self.render_session_name: render_dir = (base_render_path / Path(self.render_session_name)).expanduser().resolve() else: if getattr(self, '_render_timestamp', None) is None: self._render_timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") render_dir = (base_render_path / self._render_timestamp).resolve() self._render_directory_path = render_dir else: render_dir = Path(self._render_directory_path) if ensure_exists: render_dir.mkdir(parents=True, exist_ok=True) if not self._render_dir_initialized: if self.render_session_name: for csv_file in render_dir.glob('exported_*.csv'): try: csv_file.unlink() except OSError: pass self._render_dir_initialized = True self.new_folder_path = str(render_dir) def _get_iso_timestamp(self): # Reset time tracking if this is the first step of a new episode if self.time_step == 0: self._reset_time_tracking() energy_sim = self.buildings[0].energy_simulation month_series = getattr(energy_sim, 'month', None) hour_series = getattr(energy_sim, 'hour', None) minutes_series = getattr(energy_sim, 'minutes', None) def _get_series_value(series, index, default): if series is None: return default if index >= len(series): return default try: return int(series[index]) except (TypeError, ValueError): return default month = _get_series_value(month_series, self.time_step, self.render_start_date.month) hour = _get_series_value(hour_series, self.time_step, 1) minutes = _get_series_value(minutes_series, self.time_step, 0) next_index = self.time_step + 1 next_month = _get_series_value(month_series, next_index, month) next_hour = _get_series_value(hour_series, next_index, hour) next_minutes = _get_series_value(minutes_series, next_index, minutes) raw_hour = hour timestamp_year = self.year timestamp_month = month timestamp_day = self.current_day hour_for_timestamp = raw_hour % 24 next_hour_mod = next_hour % 24 next_minutes_clamped = max(0, min(59, next_minutes)) minute_for_timestamp = max(0, min(59, minutes)) if raw_hour >= 24: if next_month != month: timestamp_month = next_month if next_month < month: timestamp_year = self.year + 1 timestamp_day = 1 else: # Keep the current day; the day roll-over is handled via next_day logic. timestamp_day = self.current_day timestamp = f"{timestamp_year:04d}-{int(timestamp_month):02d}-{timestamp_day:02d}T{hour_for_timestamp:02d}:{minute_for_timestamp:02d}:00" next_year = timestamp_year next_day = timestamp_day if next_month != month: if next_month < month: next_year = timestamp_year + 1 next_day = 1 elif next_hour_mod <= hour_for_timestamp and next_minutes_clamped <= minute_for_timestamp: next_day = timestamp_day + 1 self.year = next_year self.current_day = next_day return timestamp def _override_render_time_step(self, index: int): """Temporarily set time_step to `index` for the environment and descendants.""" snapshot = [] def _record(obj): if hasattr(obj, 'time_step'): snapshot.append((obj, obj.time_step)) obj.time_step = index _record(self) for building in getattr(self, 'buildings', []): _record(building) electrical_storage = getattr(building, 'electrical_storage', None) if electrical_storage is not None: _record(electrical_storage) for charger in getattr(building, 'electric_vehicle_chargers', []) or []: _record(charger) for washing_machine in getattr(building, 'washing_machines', []) or []: _record(washing_machine) for ev in getattr(self, 'electric_vehicles', []): _record(ev) battery = getattr(ev, 'battery', None) if battery is not None: _record(battery) return snapshot @staticmethod def _restore_render_time_step(snapshot): for obj, value in snapshot: try: obj.time_step = value except AttributeError: pass def _reset_time_tracking(self): """Reset all time tracking variables.""" start_offset = getattr(self.episode_tracker, 'episode_start_time_step', 0) base_datetime = datetime.datetime.combine(self.render_start_date, datetime.time()) base_datetime += datetime.timedelta(seconds=start_offset * self.seconds_per_time_step) self._render_start_datetime = base_datetime self.year = base_datetime.year self.current_day = base_datetime.day # Add any other time-related variables that need resetting
[docs] def reset(self, seed: int = None, options: Mapping[str, Any] = None) -> Tuple[List[List[float]], dict]: r"""Reset `CityLearnEnv` to initial state. Parameters ---------- seed: int, optional Use to updated :code:`citylearn.CityLearnEnv.random_seed` if value is provided. options: Mapping[str, Any], optional Use to pass additional data to environment on reset. Not used in this base class but included to conform to gymnasium interface. Returns ------- observations: List[List[float]] :attr:`observations`. info: dict A dictionary that may contain additional information regarding the reason for a `terminated` signal. `info` contains auxiliary diagnostic information (helpful for debugging, learning, and logging). Override :meth"`get_info` to get custom key-value pairs in `info`. """ # object reset super().reset() self._final_kpis_exported = False # update seed if seed is not None: self.random_seed = seed else: pass # update time steps for time series self.episode_tracker.next_episode( self.episode_time_steps, self.rolling_episode_split, self.random_episode_split, self.random_seed, ) for building in self.buildings: building.reset() for ev in self.electric_vehicles: ev.reset() self.associate_chargers_to_electric_vehicles() # reset reward function (does nothing by default) self.reward_function.reset() # variable reset self.__rewards = [[]] self.__net_electricity_consumption = [] self.__net_electricity_consumption_cost = [] self.__net_electricity_consumption_emission = [] self.update_variables() return self.observations, self.get_info()
[docs] def update_variables(self): for b in self.buildings: b.update_variables() # Helper to set or append district-level aggregates for current timestep def _set_or_append(lst, value): # If list length matches current index => append if len(lst) == self.time_step: lst.append(value) # If already has an entry for current timestep => overwrite elif len(lst) == self.time_step + 1: lst[self.time_step] = value else: # Out-of-sync: resize to current index and append del lst[self.time_step + 1:] if len(lst) < self.time_step: # pad if needed lst.extend([0.0] * (self.time_step - len(lst))) lst.append(value) # net electricity consumption total = sum(b.net_electricity_consumption[self.time_step] for b in self.buildings) _set_or_append(self.__net_electricity_consumption, total) # net electricity consumption cost total_cost = sum(b.net_electricity_consumption_cost[self.time_step] for b in self.buildings) _set_or_append(self.__net_electricity_consumption_cost, total_cost) # net electricity consumption emission total_emission = sum(b.net_electricity_consumption_emission[self.time_step] for b in self.buildings) _set_or_append(self.__net_electricity_consumption_emission, total_emission)
[docs] def load_agent(self, agent: Union[str, 'citylearn.agents.base.Agent'] = None, **kwargs) -> Union[Any, 'citylearn.agents.base.Agent']: """Return :class:`Agent` or sub class object as defined by the `schema`. Parameters ---------- agent: Union[str, 'citylearn.agents.base.Agent], optional Agent class or string describing path to agent class, e.g. 'citylearn.agents.base.BaselineAgent'. If a value is not provided, defaults to the agent defined in the schema:agent:type. **kwargs : dict Agent initialization attributes. For most agents e.g. CityLearn and Stable-Baselines3 agents, an intialized :py:attr:`env` must be parsed to the agent :py:meth:`init` function. Returns ------- agent: Agent Initialized agent. """ # set agent class if agent is not None: agent_type = agent if not isinstance(agent_type, str): agent_type = [agent_type.__module__] + [agent_type.__name__] agent_type = '.'.join(agent_type) else: pass # set agent init attributes else: agent_type = self.schema['agent']['type'] if kwargs is not None and len(kwargs) > 0: agent_attributes = dict(kwargs) elif agent is None: agent_attributes = dict(self.schema['agent'].get('attributes', {})) else: agent_attributes = {} if 'env' not in agent_attributes: agent_attributes['env'] = self agent_module = '.'.join(agent_type.split('.')[0:-1]) agent_name = agent_type.split('.')[-1] agent_constructor = getattr(importlib.import_module(agent_module), agent_name) agent = agent_constructor(**agent_attributes) return agent
def _load(self, schema: Mapping[str, Any], **kwargs) -> Tuple[Union[Path, str], List[Building], List[ElectricVehicle], Union[int, List[Tuple[int, int]]], bool, bool, float, RewardFunction, bool, List[str], EpisodeTracker]: """Return `CityLearnEnv` and `Controller` objects as defined by the `schema`. Parameters ---------- schema: Mapping[str, Any] N:code:`dict` object of a CityLearn schema. Returns ------- root_directory: Union[Path, str] Absolute path to directory that contains the data files including the schema. buildings : List[Building] Buildings in CityLearn environment. electric_vehicles : List[ElectricVehicle] Electric Vehicles in CityLearn environment. episode_time_steps: Union[int, List[Tuple[int, int]]] Number of time steps in an episode. Defaults to (`simulation_end_time_step` - `simulation_start_time_step`) + 1. rolling_episode_split: bool True if episode sequences are split such that each time step is a candidate for `episode_start_time_step` otherwise, False to split episodes in steps of `episode_time_steps`. random_episode_split: bool True if episode splits are to be selected at random during training otherwise, False to select sequentially. seconds_per_time_step: float Number of seconds in 1 `time_step` and must be set to >= 1. reward_function : RewardFunction Reward function class instance. central_agent : bool Expect 1 central agent to control all building storage device. shared_observations : List[str] Names of common observations across all buildings i.e. observations that have the same value irrespective of the building. """ schema['root_directory'] = kwargs['root_directory'] if kwargs.get('root_directory') is not None else schema['root_directory'] schema['random_seed'] = schema.get('random_seed', None) if kwargs.get('random_seed', None) is None else schema.get('random_seed', None) schema['central_agent'] = kwargs['central_agent'] if kwargs.get('central_agent') is not None else schema['central_agent'] #Separated chargers observations to create one for each charger at each building based on active ones at the schema schema['chargers_observations_helper'] = {key: value for key, value in schema["observations"].items() if "electric_vehicle_" in key} schema['chargers_actions_helper'] = {key: value for key, value in schema["actions"].items() if "electric_vehicle_" in key} schema['chargers_shared_observations_helper'] = {key: value for key, value in schema["observations"].items() if "electric_vehicle_" in key and value.get("shared_in_central_agent", True)} schema['washing_machine_observations_helper'] = {key: value for key, value in schema["observations"].items() if "washing_machine_" in key} schema['washing_machine_actions_helper'] = {key: value for key, value in schema["actions"].items() if "washing_machine" in key} schema['observations'] = { key: value for key, value in schema["observations"].items() if key not in set(schema['chargers_observations_helper']) | set(schema['washing_machine_observations_helper']) } schema['actions'] = { key: value for key, value in schema['actions'].items() if key not in set(schema['chargers_actions_helper']) | set(schema['washing_machine_actions_helper']) } # Update shared observations, excluding any keys that start with 'electric_vehicle_' schema['shared_observations'] = ( kwargs['shared_observations'] if kwargs.get('shared_observations') is not None else [ k for k, v in schema['observations'].items() if not k.startswith("electric_vehicle_") and "washing_machine" not in k and v.get('shared_in_central_agent', False) ] ) schema['episode_time_steps'] = kwargs['episode_time_steps'] if kwargs.get('episode_time_steps') is not None else schema.get('episode_time_steps', None) schema['rolling_episode_split'] = kwargs['rolling_episode_split'] if kwargs.get('rolling_episode_split') is not None else schema.get('rolling_episode_split', None) schema['random_episode_split'] = kwargs['random_episode_split'] if kwargs.get('random_episode_split') is not None else schema.get('random_episode_split', None) schema['seconds_per_time_step'] = kwargs['seconds_per_time_step'] if kwargs.get('seconds_per_time_step') is not None else schema['seconds_per_time_step'] schema['simulation_start_time_step'] = kwargs['simulation_start_time_step'] \ if kwargs.get('simulation_start_time_step') is not None else schema['simulation_start_time_step'] schema['simulation_end_time_step'] = kwargs['simulation_end_time_step'] \ if kwargs.get('simulation_end_time_step') is not None else schema['simulation_end_time_step'] episode_tracker = EpisodeTracker(schema['simulation_start_time_step'], schema['simulation_end_time_step']) # get sizing data to reduce read time dataset = DataSet() pv_sizing_data = dataset.get_pv_sizing_data() battery_sizing_data = dataset.get_battery_sizing_data() # get buildings to include buildings_to_include = list(schema['buildings'].keys()) buildings = [] if kwargs.get('buildings') is not None and len(kwargs['buildings']) > 0: if isinstance(kwargs['buildings'][0], Building): buildings: List[Building] = kwargs['buildings'] for b in buildings: b.episode_tracker = episode_tracker buildings_to_include = [] elif isinstance(kwargs['buildings'][0], str): buildings_to_include = [b for b in buildings_to_include if b in kwargs['buildings']] elif isinstance(kwargs['buildings'][0], int): buildings_to_include = [buildings_to_include[i] for i in kwargs['buildings']] else: raise Exception('Unknown buildings type. Allowed types are citylearn.building.Building, int and str.') else: buildings_to_include = [b for b in buildings_to_include if schema['buildings'][b]['include']] # load buildings for i, building_name in enumerate(buildings_to_include): buildings.append(self._load_building(i, building_name, schema, episode_tracker, pv_sizing_data, battery_sizing_data,**kwargs)) # Load electric vehicles (if present in the schema) electric_vehicles = [] if kwargs.get('electric_vehicles_def') is not None and len(kwargs['electric_vehicles_def']) > 0: electric_vehicle_schemas = kwargs['electric_vehicles_def'] else: electric_vehicle_schemas = schema.get('electric_vehicles_def', {}) for electric_vehicle_name, electric_vehicle_schema in electric_vehicle_schemas.items(): if electric_vehicle_schema['include']: time_step_ratio = buildings[0].time_step_ratio if len(buildings) > 0 else 1.0 electric_vehicles.append(self._load_electric_vehicle(electric_vehicle_name,schema,electric_vehicle_schema,episode_tracker, time_step_ratio)) # set reward function # Extract reward configuration from schema reward_schema = schema['reward_function'] reward_type = reward_schema['type'] reward_attrs = reward_schema.get('attributes', {}) # Determine if it's a multi-building configuration (i.e., a mapping from building names to reward types) is_multi = isinstance(reward_type, dict) if is_multi: # Fallback to 'default' reward type if one isn't specified per building default_type = reward_type.get('default') if default_type is None and reward_type: default_type = next(iter(reward_type.values())) # Use the first available type if 'default' not set # Same fallback logic for attributes default_attrs = reward_attrs.get('default') if default_attrs is None and reward_attrs: default_attrs = next(iter(reward_attrs.values())) reward_functions = {} for building in buildings: name = building.name # Use building-specific reward type or fallback to default r_type = reward_type.get(name, default_type) r_attr = reward_attrs.get(name, default_attrs) or {} # Ensure it's a dict, not None if r_type is None: raise ValueError(f"No reward function defined for building '{name}' and no default provided") # Dynamically load class from dotted path string module_name = '.'.join(r_type.split('.')[:-1]) class_name = r_type.split('.')[-1] module = importlib.import_module(module_name) constructor = getattr(module, class_name) # Instantiate reward function for this building reward_functions[name] = constructor(None, **r_attr) # Combine individual building reward functions into a multi-building one reward_function = MultiBuildingRewardFunction(None, reward_functions) else: # Handle the single reward function case if 'reward_function' in kwargs and kwargs['reward_function'] is not None: reward_function_type = kwargs['reward_function'] # If a class is passed instead of a string, convert to dotted path if not isinstance(reward_function_type, str): reward_function_type = f"{reward_function_type.__module__}.{reward_function_type.__name__}" else: reward_function_type = reward_type # Use type from schema # Get attributes from kwargs or schema, default to empty dict reward_function_attributes = kwargs.get('reward_function_kwargs') or reward_attrs or {} # Dynamically load class from dotted path string module_name = '.'.join(reward_function_type.split('.')[:-1]) class_name = reward_function_type.split('.')[-1] module = importlib.import_module(module_name) constructor = getattr(module, class_name) # Instantiate the single reward function reward_function = constructor(None, **reward_function_attributes) return ( schema['root_directory'], buildings, electric_vehicles, schema['episode_time_steps'], schema['rolling_episode_split'], schema['random_episode_split'], schema['seconds_per_time_step'], reward_function, schema['central_agent'], schema['shared_observations'], episode_tracker ) def _load_building(self, index: int, building_name: str, schema: dict, episode_tracker: EpisodeTracker, pv_sizing_data: pd.DataFrame, battery_sizing_data: pd.DataFrame, **kwargs) -> Building: """Initializes and returns a building model.""" building_schema = schema['buildings'][building_name] building_kwargs = {} if building_schema.get('charging_constraints') is not None: building_kwargs['charging_constraints'] = building_schema['charging_constraints'] seconds_per_time_step = schema['seconds_per_time_step'] noise_std = building_schema.get('noise_std', 0.0) # data energy_simulation = pd.read_csv(os.path.join(schema['root_directory'], building_schema['energy_simulation'])) energy_simulation = EnergySimulation(**energy_simulation.to_dict('list'), seconds_per_time_step=seconds_per_time_step, noise_std=noise_std) building_kwargs['time_step_ratio'] = energy_simulation.time_step_ratios[index] weather = pd.read_csv(os.path.join(schema['root_directory'], building_schema['weather'])) weather = Weather(**weather.to_dict('list'), noise_std=noise_std) if building_schema.get('carbon_intensity', None) is not None: carbon_intensity = pd.read_csv(os.path.join(schema['root_directory'], building_schema['carbon_intensity'])) carbon_intensity = CarbonIntensity(**carbon_intensity.to_dict('list'), noise_std=noise_std) else: carbon_intensity = CarbonIntensity(np.zeros(energy_simulation.hour.shape[0], dtype='float32'), noise_std=noise_std) if building_schema.get('pricing', None) is not None: pricing = pd.read_csv(os.path.join(schema['root_directory'], building_schema['pricing'])) pricing = Pricing(**pricing.to_dict('list'), noise_std=noise_std) else: pricing = Pricing( np.zeros(energy_simulation.hour.shape[0], dtype='float32'), np.zeros(energy_simulation.hour.shape[0], dtype='float32'), np.zeros(energy_simulation.hour.shape[0], dtype='float32'), np.zeros(energy_simulation.hour.shape[0], dtype='float32'), noise_std=noise_std ) # construct building building_type = 'citylearn.citylearn.Building' if building_schema.get('type', None) is None else building_schema['type'] building_type_module = '.'.join(building_type.split('.')[0:-1]) building_type_name = building_type.split('.')[-1] building_constructor = getattr(importlib.import_module(building_type_module), building_type_name) # set dynamics if building_schema.get('dynamics', None) is not None: dynamics_type = building_schema['dynamics']['type'] dynamics_module = '.'.join(dynamics_type.split('.')[0:-1]) dynamics_name = dynamics_type.split('.')[-1] dynamics_constructor = getattr(importlib.import_module(dynamics_module), dynamics_name) attributes = building_schema['dynamics'].get('attributes', {}) attributes['filepath'] = os.path.join(schema['root_directory'], attributes['filename']) _ = attributes.pop('filename') building_kwargs[f'dynamics'] = dynamics_constructor(**attributes) else: building_kwargs['dynamics'] = None # set occupant if building_schema.get('occupant', None) is not None: building_occupant = building_schema['occupant'] occupant_type = building_occupant['type'] occupant_module = '.'.join(occupant_type.split('.')[0:-1]) occupant_name = occupant_type.split('.')[-1] occupant_constructor = getattr(importlib.import_module(occupant_module), occupant_name) attributes: dict = building_occupant.get('attributes', {}) parameters_filepath = os.path.join(schema['root_directory'], building_occupant['parameters_filename']) parameters = pd.read_csv(parameters_filepath) attributes['parameters'] = LogisticRegressionOccupantParameters(**parameters.to_dict('list')) attributes['episode_tracker'] = episode_tracker attributes['random_seed'] = schema['random_seed'] for k in ['increase', 'decrease']: attributes[f'setpoint_{k}_model_filepath'] = os.path.join(schema['root_directory'], attributes[f'setpoint_{k}_model_filename']) _ = attributes.pop(f'setpoint_{k}_model_filename') building_kwargs['occupant'] = occupant_constructor(**attributes) else: building_kwargs['occupant'] = None # set power outage model building_schema_power_outage = building_schema.get('power_outage', {}) simulate_power_outage = kwargs.get('simulate_power_outage') simulate_power_outage = building_schema_power_outage.get('simulate_power_outage') if simulate_power_outage is None else simulate_power_outage simulate_power_outage = simulate_power_outage[index] if isinstance(simulate_power_outage,list) else simulate_power_outage stochastic_power_outage = building_schema_power_outage.get('stochastic_power_outage') if building_schema_power_outage.get('stochastic_power_outage_model', None) is not None: stochastic_power_outage_model_type = building_schema_power_outage['stochastic_power_outage_model']['type'] stochastic_power_outage_model_module = '.'.join(stochastic_power_outage_model_type.split('.')[0:-1]) stochastic_power_outage_model_name = stochastic_power_outage_model_type.split('.')[-1] stochastic_power_outage_model_constructor = getattr( importlib.import_module(stochastic_power_outage_model_module), stochastic_power_outage_model_name ) attributes = building_schema_power_outage.get('stochastic_power_outage_model', {}).get('attributes', {}) stochastic_power_outage_model = stochastic_power_outage_model_constructor(**attributes) else: stochastic_power_outage_model = None # ------------------ Chargers ------------------ # Initialize chargers list chargers_list = [] #Adding chargers to buildings if they exist if building_schema.get("chargers", None) is not None: for charger_name, charger_config in building_schema["chargers"].items(): noise_std = charger_config.get('noise_std', 0.0) charger_simulation_file = pd.read_csv( os.path.join(schema['root_directory'], charger_config['charger_simulation']) ).iloc[schema['simulation_start_time_step']:schema['simulation_end_time_step'] + 1].copy() charger_simulation = ChargerSimulation(*charger_simulation_file.values.T, noise_std=noise_std) charger_type = charger_config['type'] charger_module = '.'.join(charger_type.split('.')[0:-1]) charger_class_name = charger_type.split('.')[-1] charger_class = getattr(importlib.import_module(charger_module), charger_class_name) charger_attributes = charger_config.get('attributes', {}) charger_attributes['episode_tracker'] = episode_tracker charger_object = charger_class(charger_simulation=charger_simulation, charger_id=charger_name, **charger_attributes, seconds_per_time_step=schema['seconds_per_time_step'], time_step_ratio = building_kwargs['time_step_ratio']) chargers_list.append(charger_object) washing_machines_list = [] # Adding washing machines to buildings if they exist if kwargs.get('washing_machines') is not None and len(kwargs['washing_machines']) > 0: washing_machine_schemas = kwargs['washing_machines'] else: washing_machine_schemas = building_schema.get('washing_machines', {}) for washing_machine_name, washing_machine_schema in washing_machine_schemas.items(): washing_machines_list.append(self._load_washing_machine(washing_machine_name,schema,washing_machine_schema,episode_tracker)) observation_metadata, action_metadata = self.process_metadata(schema, building_schema, chargers_list, washing_machines_list, index, energy_simulation,**kwargs) building: Building = building_constructor( energy_simulation=energy_simulation, washing_machines = washing_machines_list, electric_vehicle_chargers=chargers_list, weather=weather, observation_metadata=observation_metadata, action_metadata=action_metadata, carbon_intensity=carbon_intensity, pricing=pricing, name=building_name, seconds_per_time_step=schema['seconds_per_time_step'], random_seed=schema['random_seed'], episode_tracker=episode_tracker, simulate_power_outage=simulate_power_outage, stochastic_power_outage=stochastic_power_outage, stochastic_power_outage_model=stochastic_power_outage_model, **building_kwargs, ) # update devices device_metadata = { 'cooling_device': {'autosizer': building.autosize_cooling_device}, 'heating_device': {'autosizer': building.autosize_heating_device}, 'dhw_device': {'autosizer': building.autosize_dhw_device}, 'dhw_storage': {'autosizer': building.autosize_dhw_storage}, 'cooling_storage': {'autosizer': building.autosize_cooling_storage}, 'heating_storage': {'autosizer': building.autosize_heating_storage}, 'electrical_storage': {'autosizer': building.autosize_electrical_storage}, 'washing_machine': {'autosizer': building.autosize_electrical_storage}, 'pv': {'autosizer': building.autosize_pv}, } solar_generation = kwargs.get('solar_generation') solar_generation = True if solar_generation is None else solar_generation solar_generation = solar_generation[index] if isinstance(solar_generation, list) else solar_generation for device_name in device_metadata: if building_schema.get(device_name, None) is None: device = None elif device_name == 'pv' and not solar_generation: device = None else: device_type: str = building_schema[device_name]['type'] device_module = '.'.join(device_type.split('.')[0:-1]) device_type_name = device_type.split('.')[-1] constructor = getattr(importlib.import_module(device_module), device_type_name) attributes = building_schema[device_name].get('attributes', {}) attributes['seconds_per_time_step'] = schema['seconds_per_time_step'] # in case device technical specifications are to be randomly sampled, make sure each device per building has a unique seed md5 = hashlib.md5() device_random_seed = 0 for string in [building_name, building_type, device_name, device_type]: md5.update(string.encode()) hash_to_integer_base = 16 device_random_seed += int(md5.hexdigest(), hash_to_integer_base) device_random_seed = int(str(device_random_seed * (schema['random_seed'] + 1))[:9]) attributes = { **attributes, 'random_seed': attributes['random_seed'] if attributes.get('random_seed', None) is not None else device_random_seed } device = constructor(**attributes) autosize = False if building_schema[device_name].get('autosize', None) is None else building_schema[device_name]['autosize'] building.__setattr__(device_name, device) if autosize: autosizer = device_metadata[device_name]['autosizer'] autosize_kwargs = {} if building_schema[device_name].get('autosize_attributes', None) is None else \ building_schema[device_name]['autosize_attributes'] if isinstance(device, PV): autosize_kwargs['epw_filepath'] = os.path.join(schema['root_directory'], autosize_kwargs['epw_filepath']) autosize_kwargs['sizing_data'] = pv_sizing_data elif isinstance(device, Battery): autosize_kwargs['sizing_data'] = battery_sizing_data else: pass autosizer(**autosize_kwargs) else: pass # set back the random seed to to building's random seed device.random_seed = schema['random_seed'] building.observation_space = building.estimate_observation_space() building.action_space = building.estimate_action_space() return building
[docs] def process_metadata(self, schema, building_schema, chargers_list, washing_machines_list, index, energy_simulation: EnergySimulation, **kwargs): observation_metadata = {k: v['active'] for k, v in schema['observations'].items()} # Since minutes is Optional, in case the schema has minutes as observation metadata and some energy simulation building csv doesn't contain minutes, remove it from observation if 'minutes' in observation_metadata and energy_simulation.minutes is None: observation_metadata.pop('minutes', None) chargers_observations_metadata_helper = {k: v['active'] for k, v in schema['chargers_observations_helper'].items()} washing_machine_observations_metadata_helper = {k: v['active'] for k, v in schema['washing_machine_observations_helper'].items()} if kwargs.get('active_observations') is not None: active_observations = kwargs['active_observations'] active_observations = active_observations[index] if isinstance(active_observations[0], list) else active_observations # Update observation_metadata, ensuring that electric_vehicle_ observations are excluded observation_metadata = { k: True if k in active_observations else False for k in observation_metadata } # Update chargers_observations_metadata_helper, ensuring only electric_vehicle_ observations are included chargers_observations_metadata_helper = { k: True if k in active_observations else False for k in chargers_observations_metadata_helper } washing_machine_observations_metadata_helper = { k: True if k in active_observations else False for k in washing_machine_observations_metadata_helper } else: pass if kwargs.get('inactive_observations') is not None: inactive_observations = kwargs['inactive_observations'] inactive_observations = inactive_observations[index] if isinstance(inactive_observations[0], list) else inactive_observations elif building_schema.get('inactive_observations') is not None: inactive_observations = building_schema['inactive_observations'] else: inactive_observations = [] # Update observation_metadata for inactive observations observation_metadata = { k: False if k in inactive_observations else observation_metadata[ k] for k in observation_metadata } # Update chargers_observations_metadata_helper for inactive observations chargers_observations_metadata_helper = { k: False if k in inactive_observations else chargers_observations_metadata_helper[k] for k in chargers_observations_metadata_helper } washing_machine_observations_metadata_helper = { k: False if k in inactive_observations else washing_machine_observations_metadata_helper[k] for k in washing_machine_observations_metadata_helper } # action metadata action_metadata = {k: v['active'] for k, v in schema['actions'].items()} chargers_actions_metadata_helper = {k: v['active'] for k, v in schema['chargers_actions_helper'].items()} washing_machine_actions_metadata_helper = {k: v['active'] for k, v in schema['washing_machine_actions_helper'].items()} if kwargs.get('active_actions') is not None: active_actions = kwargs['active_actions'] active_actions = active_actions[index] if isinstance(active_actions[0], list) else active_actions action_metadata = {k: True if k in active_actions else False for k in action_metadata} chargers_actions_metadata_helper = {k: True if k in active_actions else False for k in chargers_actions_metadata_helper} washing_machine_actions_metadata_helper = {k: True if k in active_actions else False for k in washing_machine_actions_metadata_helper} else: pass if kwargs.get('inactive_actions') is not None: inactive_actions = kwargs['inactive_actions'] inactive_actions = inactive_actions[index] if isinstance(inactive_actions[0], list) else inactive_actions elif building_schema.get('inactive_actions') is not None: inactive_actions = building_schema['inactive_actions'] else: inactive_actions = [] action_metadata = {k: False if k in inactive_actions else v for k, v in action_metadata.items()} chargers_actions_metadata_helper = {k: False if k in inactive_actions else v for k, v in chargers_actions_metadata_helper.items()} washing_machine_actions_metadata_helper = {k: False if k in inactive_actions else v for k, v in washing_machine_actions_metadata_helper.items()} if len(chargers_list) > 0: for charger in chargers_list: # If present, iterate each charger charger_id = charger.charger_id #Connected if chargers_observations_metadata_helper.get("electric_vehicle_charger_connected_state", False): observation_metadata[f'electric_vehicle_charger_{charger_id}_connected_state'] = True if chargers_observations_metadata_helper.get("connected_electric_vehicle_at_charger_departure_time", False): observation_metadata[f'connected_electric_vehicle_at_charger_{charger_id}_departure_time'] = True if chargers_observations_metadata_helper.get("connected_electric_vehicle_at_charger_required_soc_departure", False): observation_metadata[f'connected_electric_vehicle_at_charger_{charger_id}_required_soc_departure'] = True if chargers_observations_metadata_helper.get("connected_electric_vehicle_at_charger_soc", False): observation_metadata[f'connected_electric_vehicle_at_charger_{charger_id}_soc'] = True if chargers_observations_metadata_helper.get("connected_electric_vehicle_at_charger_battery_capacity", False): observation_metadata[f'connected_electric_vehicle_at_charger_{charger_id}_battery_capacity'] = True #Incoming if chargers_observations_metadata_helper.get("electric_vehicle_charger_incoming_state", False): observation_metadata[ f'electric_vehicle_charger_{charger_id}_incoming_state'] = True # Observations names are composed from the charger unique ID if chargers_observations_metadata_helper.get("incoming_electric_vehicle_at_charger_estimated_arrival_time", False): observation_metadata[f'incoming_electric_vehicle_at_charger_{charger_id}_estimated_arrival_time'] = True if chargers_observations_metadata_helper.get( "incoming_electric_vehicle_at_charger_estimated_soc_arrival", False): observation_metadata[ f'incoming_electric_vehicle_at_charger_{charger_id}_estimated_soc_arrival'] = True #Actions if chargers_actions_metadata_helper.get("electric_vehicle_storage", False): action_metadata[f'electric_vehicle_storage_{charger.charger_id}'] = True if len(washing_machines_list) > 0: for washing_machine in washing_machines_list: # If present, iterate each charger washing_machine_name = washing_machine.name if washing_machine_observations_metadata_helper.get("washing_machine_start_time_step", False): observation_metadata[f'{washing_machine_name}_start_time_step'] = True if washing_machine_observations_metadata_helper.get("washing_machine_end_time_step", False): observation_metadata[f'{washing_machine_name}_end_time_step'] = True if washing_machine_actions_metadata_helper.get("washing_machine", False): action_metadata[f'{washing_machine_name}'] = True return observation_metadata, action_metadata
def _load_electric_vehicle(self, electric_vehicle_name: str, schema: dict, electric_vehicle_schema: dict, episode_tracker: EpisodeTracker, time_step_ratio) -> ElectricVehicle: """Initializes and returns an electric vehicle model.""" # Construct the battery object capacity = electric_vehicle_schema["battery"]["attributes"]["capacity"] nominal_power = electric_vehicle_schema["battery"]["attributes"]["nominal_power"] initial_soc = electric_vehicle_schema["battery"]["attributes"].get("initial_soc", random.uniform(0, 1)) depth_of_discharge = electric_vehicle_schema["battery"]["attributes"].get("depth_of_discharge", 0.10) battery = Battery( capacity=capacity, nominal_power=nominal_power, initial_soc=initial_soc, seconds_per_time_step=schema['seconds_per_time_step'], time_step_ratio=time_step_ratio, random_seed=schema['random_seed'], episode_tracker=episode_tracker, depth_of_discharge=depth_of_discharge ) # Get the EV constructor electric_vehicle_type = 'citylearn.citylearn.ElectricVehicle' \ if electric_vehicle_schema.get('type', None) is None else electric_vehicle_schema['type'] electric_vehicle_type_module = '.'.join(electric_vehicle_type.split('.')[0:-1]) electric_vehicle_type_name = electric_vehicle_type.split('.')[-1] electric_vehicle_constructor = getattr(importlib.import_module(electric_vehicle_type_module), electric_vehicle_type_name) # Initialize EV ev: ElectricVehicle = electric_vehicle_constructor( battery=battery, name=electric_vehicle_name, seconds_per_time_step=schema['seconds_per_time_step'], random_seed=schema['random_seed'], episode_tracker=episode_tracker ) return ev def _load_washing_machine( self, washing_machine_name: str, schema: dict, washing_machine_schema: dict, episode_tracker: EpisodeTracker ) -> WashingMachine: """ Load simulation data and initialize a WashingMachine instance. Parameters ---------- washing_machine_name : str Unique identifier for the washing machine. schema : dict Global schema containing configuration for simulation, such as time step size and paths. washing_machine_schema : dict Sub-schema specific to washing machine setup (e.g., file paths for energy profiles). episode_tracker : EpisodeTracker Object that tracks simulation episode and time step data. Returns ------- WashingMachine An initialized WashingMachine object using the provided simulation data. """ file_path = os.path.join(schema['root_directory'], washing_machine_schema['washing_machine_energy_simulation']) # Load CSV file and slice it to the relevant simulation range washing_machine_simulation = pd.read_csv(file_path).iloc[ schema['simulation_start_time_step']:schema['simulation_end_time_step'] + 1 ].copy() # Convert DataFrame into a WashingMachineSimulation object washing_machine_simulation = WashingMachineSimulation(*washing_machine_simulation.values.T) # Create and return the WashingMachine object wm = WashingMachine( washing_machine_simulation=washing_machine_simulation, episode_tracker=episode_tracker, name=washing_machine_name, seconds_per_time_step=schema['seconds_per_time_step'], random_seed=schema['random_seed'], ) return wm def __str__(self) -> str: """ Return a string representation of the current simulation state. Useful for logging or quick inspection of internal values. """ return str(self.as_dict())
[docs] def as_dict(self) -> dict: """ Convert the current simulation state to a dictionary. This includes key performance indicators such as energy usage, emissions, and electricity pricing at the current time step. Returns ------- dict Dictionary with energy and environmental metrics for the current step. """ if len(self.net_electricity_consumption) == 0: idx = 0 else: idx = max(0, min(self.time_step, len(self.net_electricity_consumption) - 1)) return { "Net Electricity Consumption-kWh": self.net_electricity_consumption[idx], "Self Consumption-kWh": self.total_self_consumption[idx], "Stored energy by community- kWh": self.energy_to_electrical_storage[idx], "Total Solar Generation-kWh": self.solar_generation[idx], "CO2-kg_co2": self.net_electricity_consumption_emission[idx], "Price-$": self.net_electricity_consumption_cost[idx], }
[docs] class Error(Exception): """Base class for other exceptions."""
[docs] class UnknownSchemaError(Error): """Raised when a schema is not a data set name, dict nor filepath.""" __MESSAGE = 'Unknown schema parsed into constructor. Schema must be name of CityLearn data set,'\ ' a filepath to JSON representation or `dict` object of a CityLearn schema.'\ ' Call citylearn.data.DataSet.get_names() for list of available CityLearn data sets.' def __init__(self,message=None): super().__init__(self.__MESSAGE if message is None else message)