Source code for citylearn.internal.building_ops

from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Dict, Mapping, Optional, Tuple, Union

import numpy as np

from citylearn.energy_model import HeatPump
from citylearn.preprocessing import Normalize, PeriodicNormalization

if TYPE_CHECKING:
    from citylearn.building import Building

LOGGER = logging.getLogger()


[docs] class BuildingOpsService: """Internal observation/action operations for `Building`.""" def __init__(self, building: "Building"): self.building = building
[docs] def observations( self, include_all: bool = None, normalize: bool = None, periodic_normalization: bool = None, check_limits: bool = None, ) -> Mapping[str, float]: """Observations at current time step.""" building = self.building normalize = False if normalize is None else normalize periodic_normalization = False if periodic_normalization is None else periodic_normalization include_all = False if include_all is None else include_all check_limits = False if check_limits is None else check_limits data = self.get_observations_data(include_all=include_all) if include_all: valid_observations = list(set(data.keys()) | set(building.active_observations)) else: valid_observations = building.active_observations observations = {k: data[k] for k in valid_observations if k in data.keys()} observations = self.update_ev_charger_observations( observations, valid_observations, building.electric_vehicle_chargers, include_all=include_all, ) observations = self.update_washing_machine_observations( observations, valid_observations, building.washing_machines, ) unknown_observations = set(observations.keys()).difference(set(valid_observations)) assert len(unknown_observations) == 0, f'Unknown observations: {unknown_observations}' non_periodic_low_limit, non_periodic_high_limit = building.non_periodic_normalized_observation_space_limits periodic_low_limit, periodic_high_limit = building.periodic_normalized_observation_space_limits periodic_observations = building.get_periodic_observation_metadata() if check_limits: for key in building.active_observations: value = observations[key] lower = non_periodic_low_limit[key] upper = non_periodic_high_limit[key] if not lower <= value <= upper: report = { 'Building': building.name, 'episode': building.episode_tracker.episode, 'time_step': f'{building.time_step + 1}/{building.episode_tracker.episode_time_steps}', 'observation': key, 'value': value, 'lower': lower, 'upper': upper, } LOGGER.debug(f'Observation outside space limit: {report}') if periodic_normalization: observations_copy = {k: v for k, v in observations.items()} observations = {} periodic_normalizer = PeriodicNormalization(x_max=0) for key, value in observations_copy.items(): if key in periodic_observations: periodic_normalizer.x_max = max(periodic_observations[key]) sin_x, cos_x = value * periodic_normalizer observations[f'{key}_cos'] = cos_x observations[f'{key}_sin'] = sin_x else: observations[key] = value if normalize: normalizer = Normalize(0.0, 1.0) for key, value in observations.items(): normalizer.x_min = periodic_low_limit[key] normalizer.x_max = periodic_high_limit[key] observations[key] = value * normalizer return observations
[docs] def update_ev_charger_observations(self, observations, valid_observations, ev_chargers, include_all: bool = False): """Update observations for each electric vehicle charger.""" building = self.building for charger in ev_chargers: charger_id = charger.charger_id sim = charger.charger_simulation t = building.time_step endogenous_t = t if include_all else max(t - 1, 0) connected_state_key = f'electric_vehicle_charger_{charger_id}_connected_state' incoming_state_key = f'electric_vehicle_charger_{charger_id}_incoming_state' departure_key = f'connected_electric_vehicle_at_charger_{charger_id}_departure_time' req_soc_key = f'connected_electric_vehicle_at_charger_{charger_id}_required_soc_departure' soc_key = f'connected_electric_vehicle_at_charger_{charger_id}_soc' capacity_key = f'connected_electric_vehicle_at_charger_{charger_id}_battery_capacity' arrival_key = f'incoming_electric_vehicle_at_charger_{charger_id}_estimated_arrival_time' soc_arrival_key = f'incoming_electric_vehicle_at_charger_{charger_id}_estimated_soc_arrival' state = sim.electric_vehicle_charger_state[t] if t < len(sim.electric_vehicle_charger_state) else np.nan if charger.connected_electric_vehicle and state == 1: if connected_state_key in valid_observations: observations[connected_state_key] = 1 if departure_key in valid_observations: observations[departure_key] = int(sim.electric_vehicle_departure_time[t]) if req_soc_key in valid_observations: observations[req_soc_key] = float(sim.electric_vehicle_required_soc_departure[t]) if soc_key in valid_observations: observations[soc_key] = charger.connected_electric_vehicle.battery.soc[endogenous_t] if capacity_key in valid_observations: observations[capacity_key] = float(charger.connected_electric_vehicle.battery.capacity) else: if connected_state_key in valid_observations: observations[connected_state_key] = 0 if departure_key in valid_observations: observations[departure_key] = -1 if req_soc_key in valid_observations: observations[req_soc_key] = -0.1 if soc_key in valid_observations: observations[soc_key] = -0.1 if capacity_key in valid_observations: observations[capacity_key] = -1.0 if charger.incoming_electric_vehicle and state == 2: if incoming_state_key in valid_observations: observations[incoming_state_key] = 1 if arrival_key in valid_observations: observations[arrival_key] = int(sim.electric_vehicle_estimated_arrival_time[t]) if soc_arrival_key in valid_observations: observations[soc_arrival_key] = float(sim.electric_vehicle_estimated_soc_arrival[t]) else: if incoming_state_key in valid_observations: observations[incoming_state_key] = 0 if arrival_key in valid_observations: observations[arrival_key] = -1 if soc_arrival_key in valid_observations: observations[soc_arrival_key] = -0.1 return observations
[docs] def update_washing_machine_observations(self, observations, valid_observations, washing_machines): """Update observations for each washing machine.""" for washing_machine in washing_machines: washing_machine_name = washing_machine.name washing_machine_observations = washing_machine.observations() start_key = f'{washing_machine_name}_start_time_step' if start_key in valid_observations: observations[start_key] = next( (value for key, value in washing_machine_observations.items() if '_start_time_step' in key), -1, ) end_key = f'{washing_machine_name}_end_time_step' if end_key in valid_observations: observations[end_key] = next( (value for key, value in washing_machine_observations.items() if '_end_time_step' in key), -1, ) return observations
[docs] def get_observations_data(self, include_all: bool = False) -> Mapping[str, Union[float, int]]: """Build base observation dictionary without normalization.""" building = self.building electric_vehicle_chargers_dict = {} washing_machines_dict = {} t = building.time_step endogenous_t = t if include_all else max(t - 1, 0) for charger in building.electric_vehicle_chargers or []: charger_id = charger.charger_id connected_car = charger.connected_electric_vehicle if connected_car is not None: last_charged_kwh = 0.0 if 0 <= endogenous_t < len(charger.past_charging_action_values_kwh): last_charged_kwh = float(charger.past_charging_action_values_kwh[endogenous_t]) battery_soc = connected_car.battery.soc[endogenous_t] previous_battery_soc = connected_car.battery.initial_soc if endogenous_t == 0 else connected_car.battery.soc[endogenous_t - 1] required_soc = charger.charger_simulation.electric_vehicle_required_soc_departure[t] hours_until_departure = charger.charger_simulation.electric_vehicle_departure_time[t] battery_capacity = connected_car.battery.capacity min_capacity = (1 - connected_car.battery.depth_of_discharge) * battery_capacity electric_vehicle_chargers_dict[charger_id] = { 'connected': True, 'last_charged_kwh': last_charged_kwh, 'previous_battery_soc': previous_battery_soc, 'battery_soc': battery_soc, 'battery_capacity': battery_capacity, 'min_capacity': min_capacity, 'required_soc': required_soc, 'hours_until_departure': hours_until_departure, 'max_charging_power': charger.max_charging_power, 'max_discharging_power': charger.max_discharging_power, } else: electric_vehicle_chargers_dict[charger_id] = { 'connected': False, 'last_charged_kwh': 0.0, 'previous_battery_soc': None, 'battery_soc': None, 'battery_capacity': None, 'min_capacity': None, 'required_soc': None, 'hours_until_departure': None, 'max_charging_power': charger.max_charging_power, 'max_discharging_power': charger.max_discharging_power, } for washing_machine in building.washing_machines or []: washing_machine_name = washing_machine.name def _safe(arr, idx, default): try: return arr[idx] except Exception: return default start_time_step = _safe(washing_machine.washing_machine_simulation.wm_start_time_step, t, -1) end_time_step = _safe(washing_machine.washing_machine_simulation.wm_end_time_step, t, -1) load_profile = _safe(washing_machine.washing_machine_simulation.load_profile, t, 0.0) washing_machines_dict[washing_machine_name] = { 'wm_start_time_step': start_time_step, 'wm_end_time_step': end_time_step, 'load_profile': load_profile, } observations = {} for key, series in building._energy_simulation_observation_sources: if t < len(series): observations[key] = series[t] for key, series in building._weather_observation_sources: if t < len(series): observations[key] = series[t] for key, series in building._pricing_observation_sources: if t < len(series): observations[key] = series[t] for key, series in building._carbon_observation_sources: if t < len(series): observations[key] = series[t] observations.update({ 'solar_generation': abs(building.solar_generation[t]), **{ 'cooling_storage_soc': building.cooling_storage.soc[endogenous_t], 'heating_storage_soc': building.heating_storage.soc[endogenous_t], 'dhw_storage_soc': building.dhw_storage.soc[endogenous_t], 'electrical_storage_soc': building.electrical_storage.soc[endogenous_t], }, 'cooling_demand': building.energy_from_cooling_device[endogenous_t] + abs(min(building.cooling_storage.energy_balance[endogenous_t], 0.0)), 'heating_demand': building.energy_from_heating_device[endogenous_t] + abs(min(building.heating_storage.energy_balance[endogenous_t], 0.0)), 'dhw_demand': building.energy_from_dhw_device[endogenous_t] + abs(min(building.dhw_storage.energy_balance[endogenous_t], 0.0)), 'net_electricity_consumption': building.net_electricity_consumption[endogenous_t], 'cooling_electricity_consumption': building.cooling_electricity_consumption[endogenous_t], 'heating_electricity_consumption': building.heating_electricity_consumption[endogenous_t], 'dhw_electricity_consumption': building.dhw_electricity_consumption[endogenous_t], 'cooling_storage_electricity_consumption': building.cooling_storage_electricity_consumption[endogenous_t], 'heating_storage_electricity_consumption': building.heating_storage_electricity_consumption[endogenous_t], 'dhw_storage_electricity_consumption': building.dhw_storage_electricity_consumption[endogenous_t], 'electrical_storage_electricity_consumption': building.electrical_storage_electricity_consumption[endogenous_t], 'washing_machine_electricity_consumption': building.washing_machines_electricity_consumption[endogenous_t], 'cooling_device_efficiency': building.cooling_device.get_cop(building.weather.outdoor_dry_bulb_temperature[t], heating=False), 'heating_device_efficiency': building.heating_device.get_cop(building.weather.outdoor_dry_bulb_temperature[t], heating=True) if isinstance(building.heating_device, HeatPump) else building.heating_device.efficiency, 'dhw_device_efficiency': building.dhw_device.get_cop(building.weather.outdoor_dry_bulb_temperature[t], heating=True) if isinstance(building.dhw_device, HeatPump) else building.dhw_device.efficiency, 'indoor_dry_bulb_temperature_cooling_set_point': building.energy_simulation.indoor_dry_bulb_temperature_cooling_set_point[t], 'indoor_dry_bulb_temperature_heating_set_point': building.energy_simulation.indoor_dry_bulb_temperature_heating_set_point[t], 'indoor_dry_bulb_temperature_cooling_delta': building.energy_simulation.indoor_dry_bulb_temperature[t] - building.energy_simulation.indoor_dry_bulb_temperature_cooling_set_point[t], 'indoor_dry_bulb_temperature_heating_delta': building.energy_simulation.indoor_dry_bulb_temperature[t] - building.energy_simulation.indoor_dry_bulb_temperature_heating_set_point[t], 'comfort_band': building.energy_simulation.comfort_band[t], 'occupant_count': building.energy_simulation.occupant_count[t], 'power_outage': building.power_outage_signal[t], 'electric_vehicles_chargers_dict': electric_vehicle_chargers_dict, 'washing_machines_dict': washing_machines_dict, }) if ( getattr(building, '_charging_constraints_enabled', False) and getattr(building, '_expose_charging_constraints', False) and isinstance(building._charging_constraints_state, dict) ): state = building._charging_constraints_state headroom = state.get('building_headroom_kw') if headroom is not None: observations['charging_building_headroom_kw'] = headroom export_headroom = state.get('building_export_headroom_kw') if export_headroom is not None: observations['charging_building_export_headroom_kw'] = export_headroom for phase_name, value in (state.get('phase_headroom_kw') or {}).items(): if value is not None: observations[f'charging_phase_{phase_name}_headroom_kw'] = value for phase_name, value in (state.get('phase_export_headroom_kw') or {}).items(): if value is not None: observations[f'charging_phase_{phase_name}_export_headroom_kw'] = value if getattr(building, '_charging_constraints_enabled', False): if getattr(building, '_expose_charging_violation', False): observations['charging_constraint_violation_kwh'] = building._charging_constraint_last_penalty_kwh if getattr(building, '_phase_encoding_observations', None): observations.update(building._phase_encoding_observations) return observations
[docs] def apply_actions( self, cooling_or_heating_device_action: float = None, cooling_device_action: float = None, heating_device_action: float = None, cooling_storage_action: float = None, heating_storage_action: float = None, dhw_storage_action: float = None, electrical_storage_action: float = None, washing_machine_actions: dict = None, electric_vehicle_storage_actions: dict = None, ): """Update demand and charge/discharge storage devices.""" building = self.building if electric_vehicle_storage_actions is not None: electric_vehicle_storage_actions = dict(electric_vehicle_storage_actions) if 'cooling_or_heating_device' in building.active_actions: assert 'cooling_device' not in building.active_actions and 'heating_device' not in building.active_actions, \ 'cooling_device and heating_device actions must be set to False when cooling_or_heating_device is True.' \ ' They will be implicitly set based on the polarity of cooling_or_heating_device.' cooling_device_action = abs(min(cooling_or_heating_device_action, 0.0)) heating_device_action = abs(max(cooling_or_heating_device_action, 0.0)) else: assert not ('cooling_device' in building.active_actions and 'heating_device' in building.active_actions), \ 'cooling_device and heating_device actions cannot both be set to True to avoid both actions having' \ ' values > 0.0 in the same time step. Use cooling_or_heating_device action instead to control' \ ' both cooling_device and heating_device in a building.' cooling_device_action = np.nan if 'cooling_device' not in building.active_actions else cooling_device_action heating_device_action = np.nan if 'heating_device' not in building.active_actions else heating_device_action cooling_storage_action = 0.0 if 'cooling_storage' not in building.active_actions else cooling_storage_action heating_storage_action = 0.0 if 'heating_storage' not in building.active_actions else heating_storage_action dhw_storage_action = 0.0 if 'dhw_storage' not in building.active_actions else dhw_storage_action electrical_storage_action = 0.0 if 'electrical_storage' not in building.active_actions else electrical_storage_action electric_vehicle_storage_actions, electrical_storage_action = self.apply_charging_constraints_to_actions( electric_vehicle_storage_actions, electrical_storage_action, ) actions = { 'cooling_demand': (building.update_cooling_demand, (cooling_device_action,)), 'heating_demand': (building.update_heating_demand, (heating_device_action,)), 'cooling_device': (building.update_energy_from_cooling_device, ()), 'cooling_storage': (building.update_cooling_storage, (cooling_storage_action,)), 'heating_device': (building.update_energy_from_heating_device, ()), 'heating_storage': (building.update_heating_storage, (heating_storage_action,)), 'dhw_device': (building.update_energy_from_dhw_device, ()), 'dhw_storage': (building.update_dhw_storage, (dhw_storage_action,)), 'non_shiftable_load': (building.update_non_shiftable_load, ()), 'electrical_storage': (building.update_electrical_storage, (electrical_storage_action,)), } priority_list = list(actions.keys()) if electric_vehicle_storage_actions is not None: electric_vehicle_priority_list = [] for charger_id, action in electric_vehicle_storage_actions.items(): action_key = f'electric_vehicle_storage_{charger_id}' if action_key not in building.active_actions: raise ValueError('This action should not be applied. Verify') for charger in building.electric_vehicle_chargers: if charger.charger_id == charger_id: actions[action_key] = (charger.update_connected_electric_vehicle_soc, (action,)) electric_vehicle_priority_list.append(action_key) priority_list = priority_list + electric_vehicle_priority_list if washing_machine_actions is not None: washing_machine_priority_list = [] for washing_machine_name, action in washing_machine_actions.items(): action_key = f'{washing_machine_name}' if action_key not in building.active_actions: raise ValueError('This action should not be applied. Verify') for washing_machine in building.washing_machines: if washing_machine.name == washing_machine_name: actions[action_key] = (washing_machine.start_cycle, (action,)) washing_machine_priority_list.append(action_key) priority_list = priority_list + washing_machine_priority_list if electrical_storage_action < 0.0: key = 'electrical_storage' priority_list.remove(key) priority_list = [key] + priority_list for key in ['cooling', 'heating', 'dhw']: storage = f'{key}_storage' device = f'{key}_device' if actions[storage][1][0] < 0.0: storage_ix = priority_list.index(storage) device_ix = priority_list.index(device) priority_list[storage_ix] = device priority_list[device_ix] = storage for key in priority_list: func, args = actions[key] try: func(*args) except NotImplementedError: pass
def _safe_scalar(self, value, default: float = 0.0) -> float: try: scalar = float(value) except (TypeError, ValueError): return float(default) if not np.isfinite(scalar): return float(default) return scalar def _safe_index(self, values, idx: int, default: float = 0.0) -> float: try: return self._safe_scalar(values[idx], default) except Exception: return float(default) def _current_phase_names(self): building = self.building if getattr(building, '_electrical_service_mode', 'single_phase') == 'three_phase': return ['L1', 'L2', 'L3'] return ['L1'] def _split_unassigned_power(self, power_kw: float) -> Dict[str, float]: building = self.building phase_names = self._current_phase_names() if len(phase_names) == 1: return {'L1': float(power_kw)} split_mode = str(getattr(building, '_electrical_service_default_split', 'balanced')).strip().lower() if split_mode in {'l1', 'l2', 'l3'}: return {phase: float(power_kw if phase.lower() == split_mode else 0.0) for phase in phase_names} share = float(power_kw) / len(phase_names) return {phase: share for phase in phase_names} def _split_power_by_connection(self, power_kw: float, phase_connection: Optional[str]) -> Dict[str, float]: phase_names = self._current_phase_names() if len(phase_names) == 1: return {'L1': float(power_kw)} if phase_connection in {'L1', 'L2', 'L3'}: return {phase: float(power_kw if phase == phase_connection else 0.0) for phase in phase_names} if phase_connection == 'all_phases': share = float(power_kw) / len(phase_names) return {phase: share for phase in phase_names} return self._split_unassigned_power(power_kw) def _estimate_non_controllable_base_power(self) -> Tuple[float, Dict[str, float]]: building = self.building t = building.time_step phase_names = self._current_phase_names() if building.power_outage: return 0.0, {phase: 0.0 for phase in phase_names} temperature = self._safe_index(building.weather.outdoor_dry_bulb_temperature, t, 0.0) cooling_demand = self._safe_index(building.energy_from_cooling_device, t, 0.0) + self._safe_index( building.cooling_storage.energy_balance, t, 0.0 ) cooling_kw = self._safe_scalar(building.cooling_device.get_input_power(cooling_demand, temperature, heating=False), 0.0) heating_demand = self._safe_index(building.energy_from_heating_device, t, 0.0) + self._safe_index( building.heating_storage.energy_balance, t, 0.0 ) if isinstance(building.heating_device, HeatPump): heating_kw = self._safe_scalar( building.heating_device.get_input_power(heating_demand, temperature, heating=True), 0.0, ) else: heating_kw = self._safe_scalar(building.heating_device.get_input_power(heating_demand), 0.0) dhw_demand = self._safe_index(building.energy_from_dhw_device, t, 0.0) + self._safe_index( building.dhw_storage.energy_balance, t, 0.0 ) if isinstance(building.dhw_device, HeatPump): dhw_kw = self._safe_scalar(building.dhw_device.get_input_power(dhw_demand, temperature, heating=True), 0.0) else: dhw_kw = self._safe_scalar(building.dhw_device.get_input_power(dhw_demand), 0.0) non_shiftable_kw = self._safe_index(building.energy_to_non_shiftable_load, t, 0.0) solar_kw = self._safe_index(building.solar_generation, t, 0.0) washing_kw = sum(self._safe_index(wm.electricity_consumption, t, 0.0) for wm in building.washing_machines or []) base_total_kw = cooling_kw + heating_kw + dhw_kw + non_shiftable_kw + solar_kw + washing_kw base_phase_kw = self._split_unassigned_power(base_total_kw) return float(base_total_kw), base_phase_kw def _charger_requested_power_kw(self, charger, action: float) -> float: if action is None: return 0.0 action = self._safe_scalar(action, 0.0) action = float(np.clip(action, -1.0, 1.0)) if action > 0.0: max_power = self._safe_scalar(getattr(charger, 'max_charging_power', 0.0), 0.0) return action * max_power if max_power > 0.0 else 0.0 if action < 0.0: max_power = self._safe_scalar(getattr(charger, 'max_discharging_power', 0.0), 0.0) return -abs(action) * max_power if max_power > 0.0 else 0.0 return 0.0 def _charger_action_from_power_kw(self, charger, target_power_kw: float) -> float: target_power_kw = self._safe_scalar(target_power_kw, 0.0) if target_power_kw > 0.0: max_power = self._safe_scalar(getattr(charger, 'max_charging_power', 0.0), 0.0) min_power = self._safe_scalar(getattr(charger, 'min_charging_power', 0.0), 0.0) if max_power <= 0.0: return 0.0 if min_power > 0.0 and target_power_kw < min_power: return 0.0 return float(np.clip(target_power_kw / max_power, 0.0, 1.0)) if target_power_kw < 0.0: max_power = self._safe_scalar(getattr(charger, 'max_discharging_power', 0.0), 0.0) min_power = self._safe_scalar(getattr(charger, 'min_discharging_power', 0.0), 0.0) requested = abs(target_power_kw) if max_power <= 0.0: return 0.0 if min_power > 0.0 and requested < min_power: return 0.0 return float(-np.clip(requested / max_power, 0.0, 1.0)) return 0.0 def _storage_requested_power_kw(self, action: Optional[float]) -> float: building = self.building if action is None: return 0.0 action = self._safe_scalar(action, 0.0) action = float(np.clip(action, -1.0, 1.0)) nominal_power = self._safe_scalar(getattr(building.electrical_storage, 'nominal_power', 0.0), 0.0) return action * nominal_power if nominal_power > 0.0 else 0.0 def _storage_action_from_power_kw(self, target_power_kw: float) -> float: building = self.building nominal_power = self._safe_scalar(getattr(building.electrical_storage, 'nominal_power', 0.0), 0.0) if nominal_power <= 0.0: return 0.0 return float(np.clip(target_power_kw / nominal_power, -1.0, 1.0)) def _compute_totals(self, base_total_kw: float, base_phase_kw: Mapping[str, float], controls, scales): total_kw = float(base_total_kw) phase_kw = {phase: float(value) for phase, value in base_phase_kw.items()} for control_id, control in controls.items(): scale = self._safe_scalar(scales.get(control_id, 1.0), 1.0) total_kw += control['request_total_kw'] * scale for phase_name, value in control['request_phase_kw'].items(): phase_kw[phase_name] = phase_kw.get(phase_name, 0.0) + (value * scale) return total_kw, phase_kw def _scale_for_import_scope(self, current_value_kw, limit_kw, controls, scales, component_getter) -> bool: limit_kw = self._safe_scalar(limit_kw, np.nan) current_value_kw = self._safe_scalar(current_value_kw, 0.0) if not np.isfinite(limit_kw): return False if limit_kw is None or current_value_kw <= limit_kw + 1e-9: return False relevant = [] for control_id, control in controls.items(): component_kw = component_getter(control) if component_kw > 0.0 and self._safe_scalar(scales.get(control_id, 0.0), 0.0) > 0.0: relevant.append((control_id, component_kw)) if not relevant: return False current_relevant_kw = sum(scales[control_id] * component_kw for control_id, component_kw in relevant) if current_relevant_kw <= 1e-9: return False fixed_kw = current_value_kw - current_relevant_kw allowed_kw = limit_kw - fixed_kw factor = 0.0 if allowed_kw <= 0.0 else min(1.0, allowed_kw / current_relevant_kw) if factor >= 1.0 - 1e-9: return False for control_id, _ in relevant: scales[control_id] *= factor return True def _scale_for_export_scope(self, current_value_kw, limit_kw, controls, scales, component_getter) -> bool: limit_kw = self._safe_scalar(limit_kw, np.nan) current_value_kw = self._safe_scalar(current_value_kw, 0.0) if not np.isfinite(limit_kw): return False if limit_kw is None: return False current_export_kw = max(-current_value_kw, 0.0) if current_export_kw <= limit_kw + 1e-9: return False relevant = [] for control_id, control in controls.items(): component_kw = component_getter(control) if component_kw < 0.0 and self._safe_scalar(scales.get(control_id, 0.0), 0.0) > 0.0: relevant.append((control_id, component_kw)) if not relevant: return False current_relevant_export_kw = sum(scales[control_id] * abs(component_kw) for control_id, component_kw in relevant) if current_relevant_export_kw <= 1e-9: return False fixed_kw = current_value_kw + current_relevant_export_kw allowed_export_kw = limit_kw + fixed_kw factor = 0.0 if allowed_export_kw <= 0.0 else min(1.0, allowed_export_kw / current_relevant_export_kw) if factor >= 1.0 - 1e-9: return False for control_id, _ in relevant: scales[control_id] *= factor return True def _apply_legacy_charging_constraints(self, actions: Optional[Mapping[str, float]]) -> Optional[Mapping[str, float]]: building = self.building if not actions: building._set_default_charging_headroom() return actions positive_requests = {} scales = {} for charger_id, action in actions.items(): if action is None or action <= 0.0: continue charger = building._charger_lookup.get(charger_id) if charger is None: continue max_power = getattr(charger, 'max_charging_power', 0.0) or 0.0 if max_power <= 0.0: continue positive_requests[charger_id] = action * max_power scales[charger_id] = 1.0 violation_kw = 0.0 if positive_requests: total_kw = sum(positive_requests.values()) building_limit = building._building_charger_limit_kw building_limit = self._safe_scalar(building_limit, np.nan) if np.isfinite(building_limit) and building_limit >= 0.0 and total_kw > building_limit: scale = 0.0 if building_limit == 0 else building_limit / total_kw for charger_id in scales: scales[charger_id] *= scale violation_kw += total_kw - building_limit for phase in building._phase_limits: limit = phase.get('limit_kw') limit = self._safe_scalar(limit, np.nan) if not np.isfinite(limit) or limit < 0.0: continue chargers = phase.get('chargers', []) or [] phase_sum = sum( positive_requests.get(charger_id, 0.0) * scales.get(charger_id, 1.0) for charger_id in chargers if charger_id in positive_requests ) if phase_sum > limit: phase_scale = 0.0 if limit == 0 else limit / phase_sum for charger_id in chargers: if charger_id in scales: scales[charger_id] *= phase_scale violation_kw += phase_sum - limit scaled_positive_kw = { charger_id: positive_requests[charger_id] * scales.get(charger_id, 1.0) for charger_id in positive_requests } used_kw = sum(scaled_positive_kw.values()) actions = dict(actions) for charger_id, action in list(actions.items()): if action is None or action <= 0.0: continue charger = building._charger_lookup.get(charger_id) if charger is None: continue max_power = getattr(charger, 'max_charging_power', 0.0) or 0.0 if max_power <= 0.0: actions[charger_id] = 0.0 continue target_kw = scaled_positive_kw.get(charger_id, 0.0) actions[charger_id] = max(0.0, min(action, target_kw / max_power)) if getattr(building, '_expose_charging_constraints', False): building_limit = self._safe_scalar(building._building_charger_limit_kw, np.nan) building_headroom = None if not np.isfinite(building_limit) else building_limit - used_kw phase_headroom = {} for phase in building._phase_limits: limit = phase.get('limit_kw') limit = self._safe_scalar(limit, np.nan) if not np.isfinite(limit): phase_headroom[phase['name']] = None else: used = sum(scaled_positive_kw.get(charger_id, 0.0) for charger_id in phase.get('chargers', [])) phase_headroom[phase['name']] = limit - used building._charging_constraints_state = { 'building_headroom_kw': building_headroom, 'building_export_headroom_kw': None, 'phase_headroom_kw': phase_headroom, 'phase_export_headroom_kw': {}, 'total_power_kw': used_kw, 'phase_power_kw': {}, } penalty_kwh = self._safe_scalar(violation_kw * (building.seconds_per_time_step / 3600), 0.0) building._charging_constraint_penalty_kwh = penalty_kwh building._charging_constraint_last_penalty_kwh = penalty_kwh phase_power = {} if getattr(building, '_electrical_service_enabled', False): phase_power = dict((building._charging_constraints_state or {}).get('phase_power_kw') or {}) building._record_charging_constraint_state( violation_kwh=penalty_kwh, total_power_kw=float((building._charging_constraints_state or {}).get('total_power_kw', used_kw)), phase_power_kw=phase_power, ) else: building._set_default_charging_headroom() building._record_charging_constraint_state( violation_kwh=0.0, total_power_kw=0.0, phase_power_kw={}, ) return actions def _apply_electrical_service_constraints( self, actions: Optional[Mapping[str, float]], electrical_storage_action: Optional[float], ) -> Tuple[Optional[Mapping[str, float]], Optional[float]]: building = self.building phase_names = self._current_phase_names() base_total_kw, base_phase_kw = self._estimate_non_controllable_base_power() base_phase_kw = {phase: base_phase_kw.get(phase, 0.0) for phase in phase_names} controls = {} adjusted_actions = None if actions is None else dict(actions) for charger_id, action in (actions or {}).items(): charger = building._charger_lookup.get(charger_id) if charger is None: continue request_total_kw = self._charger_requested_power_kw(charger, action) if abs(request_total_kw) <= 1e-9: continue phase_connection = building._charger_phase_map.get(charger_id) request_phase_kw = self._split_power_by_connection(request_total_kw, phase_connection) controls[charger_id] = { 'request_total_kw': request_total_kw, 'request_phase_kw': request_phase_kw, } storage_control_id = '__electrical_storage__' request_storage_kw = self._storage_requested_power_kw(electrical_storage_action) if abs(request_storage_kw) > 1e-9: request_phase_kw = self._split_power_by_connection(request_storage_kw, building.electrical_storage_phase_connection) controls[storage_control_id] = { 'request_total_kw': request_storage_kw, 'request_phase_kw': request_phase_kw, } scales = {control_id: 1.0 for control_id in controls} total_limits = building._electrical_service_limits.get('total', {}) per_phase_limits = building._electrical_service_limits.get('per_phase', {}) for _ in range(8): changed = False total_kw, phase_kw = self._compute_totals(base_total_kw, base_phase_kw, controls, scales) changed |= self._scale_for_import_scope( total_kw, total_limits.get('import_kw'), controls, scales, component_getter=lambda c: c['request_total_kw'], ) changed |= self._scale_for_export_scope( total_kw, total_limits.get('export_kw'), controls, scales, component_getter=lambda c: c['request_total_kw'], ) for phase_name in phase_names: phase_limit = per_phase_limits.get(phase_name, {}) changed |= self._scale_for_import_scope( phase_kw.get(phase_name, 0.0), phase_limit.get('import_kw'), controls, scales, component_getter=lambda c, p=phase_name: c['request_phase_kw'].get(p, 0.0), ) changed |= self._scale_for_export_scope( phase_kw.get(phase_name, 0.0), phase_limit.get('export_kw'), controls, scales, component_getter=lambda c, p=phase_name: c['request_phase_kw'].get(p, 0.0), ) if not changed: break total_kw, phase_kw = self._compute_totals(base_total_kw, base_phase_kw, controls, scales) total_kw = self._safe_scalar(total_kw, 0.0) phase_kw = {phase: self._safe_scalar(value, 0.0) for phase, value in phase_kw.items()} if adjusted_actions is not None: for charger_id in adjusted_actions: charger = building._charger_lookup.get(charger_id) if charger is None: continue control = controls.get(charger_id) target_kw = 0.0 if control is None else control['request_total_kw'] * scales.get(charger_id, 1.0) adjusted_actions[charger_id] = self._charger_action_from_power_kw(charger, target_kw) adjusted_storage_action = electrical_storage_action if electrical_storage_action is not None: storage_control = controls.get(storage_control_id) target_kw = 0.0 if storage_control is None else storage_control['request_total_kw'] * scales.get(storage_control_id, 1.0) adjusted_storage_action = self._storage_action_from_power_kw(target_kw) violation_kw = 0.0 import_limit = self._safe_scalar(total_limits.get('import_kw'), np.nan) export_limit = self._safe_scalar(total_limits.get('export_kw'), np.nan) if np.isfinite(import_limit): violation_kw += max(total_kw - import_limit, 0.0) if np.isfinite(export_limit): violation_kw += max(-total_kw - export_limit, 0.0) phase_headroom = {} phase_export_headroom = {} for phase_name in phase_names: phase_total = self._safe_scalar(phase_kw.get(phase_name, 0.0), 0.0) phase_limit = per_phase_limits.get(phase_name, {}) phase_import_limit = self._safe_scalar(phase_limit.get('import_kw'), np.nan) phase_export_limit = self._safe_scalar(phase_limit.get('export_kw'), np.nan) phase_headroom[phase_name] = None if not np.isfinite(phase_import_limit) else (phase_import_limit - phase_total) phase_export_headroom[phase_name] = None if not np.isfinite(phase_export_limit) else (phase_export_limit + phase_total) if np.isfinite(phase_import_limit): violation_kw += max(phase_total - phase_import_limit, 0.0) if np.isfinite(phase_export_limit): violation_kw += max(-phase_total - phase_export_limit, 0.0) building_headroom = None if not np.isfinite(import_limit) else (import_limit - total_kw) building_export_headroom = None if not np.isfinite(export_limit) else (export_limit + total_kw) building._charging_constraints_state = { 'building_headroom_kw': building_headroom, 'building_export_headroom_kw': building_export_headroom, 'phase_headroom_kw': phase_headroom, 'phase_export_headroom_kw': phase_export_headroom, 'total_power_kw': total_kw, 'phase_power_kw': phase_kw, } penalty_kwh = self._safe_scalar(violation_kw * (building.seconds_per_time_step / 3600.0), 0.0) building._charging_constraint_penalty_kwh = penalty_kwh building._charging_constraint_last_penalty_kwh = penalty_kwh building._record_charging_constraint_state( violation_kwh=penalty_kwh, total_power_kw=float(total_kw), phase_power_kw=phase_kw, ) return adjusted_actions, adjusted_storage_action
[docs] def apply_charging_constraints_to_actions( self, actions: Optional[Mapping[str, float]], electrical_storage_action: Optional[float] = None, ) -> Tuple[Optional[Mapping[str, float]], Optional[float]]: """Apply configured electrical constraints and return adjusted EV/storage actions.""" building = self.building building._charging_constraint_penalty_kwh = 0.0 building._charging_constraint_last_penalty_kwh = 0.0 if not building._charging_constraints_enabled: return actions, electrical_storage_action if getattr(building, '_electrical_service_enabled', False): return self._apply_electrical_service_constraints(actions, electrical_storage_action) adjusted_actions = self._apply_legacy_charging_constraints(actions) return adjusted_actions, electrical_storage_action