Skip to content

Prepare existing capacities

Functions to prepare existing assets for the network

SHORT TERM FIX until PowerPlantMatching is implemented - required as split from add_existing_baseyear for remind compat

assign_year_bins(df, year_bins)

Assign a year bin to the existing capacities according to the config

Parameters:

Name Type Description Default
df DataFrame

DataFrame with existing capacities and build years (DateIn)

required
year_bins list

years to bin the existing capacities to

required

Returns: pd.DataFrame: DataFrame regridded to the year bins

Source code in workflow/scripts/prepare_existing_capacities.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def assign_year_bins(df: pd.DataFrame, year_bins: list) -> pd.DataFrame:
    """
    Assign a year bin to the existing capacities according to the config

    Args:
        df (pd.DataFrame): DataFrame with existing capacities and build years (DateIn)
        year_bins (list): years to bin the existing capacities to
    Returns:
        pd.DataFrame: DataFrame regridded to the year bins
    """

    df_ = df.copy()
    # bin by years (np.digitize)
    df_["grouping_year"] = np.take(year_bins, np.digitize(df.DateIn, year_bins, right=True))
    return df_.fillna(0)

convert_CHP_to_poweronly(capacities)

Convert CHP capacities to power-only capacities by removing the heat part

Parameters:

Name Type Description Default
capacities DataFrame

DataFrame with existing capacities

required

Returns: pd.DataFrame: DataFrame with converted capacities

Source code in workflow/scripts/prepare_existing_capacities.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def convert_CHP_to_poweronly(capacities: pd.DataFrame) -> pd.DataFrame:
    """Convert CHP capacities to power-only capacities by removing the heat part

    Args:
        capacities (pd.DataFrame): DataFrame with existing capacities
    Returns:
        pd.DataFrame: DataFrame with converted capacities
    """
    # Convert CHP to power-only by removing the heat part
    chp_mask = capacities.Tech.str.contains("CHP")
    capacities.loc[chp_mask, "Fueltype"] = (
        capacities.loc[chp_mask, "Fueltype"]
        .str.replace("central coal CHP", "coal power plant")
        .str.replace("central gas CHP", "gas CCGT")
    )
    # update the Tech field based on the converted Fueltype
    capacities.loc[chp_mask, "Tech"] = (
        capacities.loc[chp_mask, "Fueltype"]
        .str.replace(" CHP", "")
        .str.replace("CHP ", " ")
        .str.replace("gas ", "")
        .str.replace("coal power plant", "coal")
    )
    return capacities

determine_simulation_timespan(config, year)

Determine the simulation timespan in years (so the network object is not needed) Args: config (dict): the snakemake config year (int): the year to simulate Returns: int: the simulation timespan in years

Source code in workflow/scripts/prepare_existing_capacities.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def determine_simulation_timespan(config: dict, year: int) -> int:
    """Determine the simulation timespan in years (so the network object is not needed)
    Args:
        config (dict): the snakemake config
        year (int): the year to simulate
    Returns:
        int: the simulation timespan in years
    """

    # make snapshots (drop leap days) -> possibly do all the unpacking in the function
    snapshot_cfg = config["snapshots"]
    snapshots = make_periodic_snapshots(
        year=year,
        freq=snapshot_cfg["freq"],
        start_day_hour=snapshot_cfg["start"],
        end_day_hour=snapshot_cfg["end"],
        bounds=snapshot_cfg["bounds"],
        # naive local timezone
        tz=None,
        end_year=None if not snapshot_cfg["end_year_plus1"] else year + 1,
    )

    # load costs
    n_years = config["snapshots"]["frequency"] * len(snapshots) / YEAR_HRS

    return n_years

distribute_vre_by_grade(cap_by_year, grade_capacities)

distribute vre capacities by grade potential, use up better grades first

Parameters:

Name Type Description Default
cap_by_year Series

the vre tech potential p_nom_max added per year

required
grade_capacities Series

the vre grade potential for the tech and bus

required

Returns: pd.DataFrame: DataFrame with the distributed vre capacities (shape: years x buses)

Source code in workflow/scripts/prepare_existing_capacities.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def distribute_vre_by_grade(cap_by_year: pd.Series, grade_capacities: pd.Series) -> pd.DataFrame:
    """distribute vre capacities by grade potential, use up better grades first

    Args:
        cap_by_year (pd.Series): the vre tech potential p_nom_max added per year
        grade_capacities (pd.Series): the vre grade potential for the tech and bus
    Returns:
        pd.DataFrame: DataFrame with the distributed vre capacities (shape: years x buses)
    """

    availability = cap_by_year.sort_index(ascending=False)
    to_distribute = grade_capacities.fillna(0).sort_index()
    n_years = len(to_distribute)
    n_sources = len(availability)

    # To store allocation per year per source (shape: sources x years)
    allocation = np.zeros((n_sources, n_years), dtype=int)
    remaining = availability.values

    for j in range(n_years):
        needed = to_distribute.values[j]
        cumsum = np.cumsum(remaining)
        used_up = cumsum < needed
        cutoff = np.argmax(cumsum >= needed)

        allocation[used_up, j] = remaining[used_up]

        if needed > (cumsum[cutoff - 1] if cutoff > 0 else 0):
            allocation[cutoff, j] = needed - (cumsum[cutoff - 1] if cutoff > 0 else 0)

        # Subtract what was used from availability
        remaining -= allocation[:, j]

    return pd.DataFrame(data=allocation, columns=grade_capacities.index, index=availability.index)

fix_existing_capacities(existing_df, costs, year_bins, baseyear)

add/fill missing dateIn, drop expired assets, drop too new assets

Parameters:

Name Type Description Default
existing_df DataFrame

the existing capacities

required
costs DataFrame

the technoeconomic data

required
year_bins list

the year groups

required
baseyear int

the base year (run year)

required

Returns:

Type Description
DataFrame

pd.DataFrame: fixed capacities

Source code in workflow/scripts/prepare_existing_capacities.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def fix_existing_capacities(
    existing_df: pd.DataFrame, costs: pd.DataFrame, year_bins: list, baseyear: int
) -> pd.DataFrame:
    """add/fill missing dateIn, drop expired assets, drop too new assets

    Args:
        existing_df (pd.DataFrame): the existing capacities
        costs (pd.DataFrame): the technoeconomic data
        year_bins (list): the year groups
        baseyear (int): the base year (run year)

    Returns:
        pd.DataFrame: fixed capacities
    """
    existing_df.DateIn = existing_df.DateIn.astype(int)
    # add/fill missing dateIn
    if "DateOut" not in existing_df.columns:
        existing_df["DateOut"] = np.nan
    # names matching costs split across FuelType and Tech, apply to both. Fillna means no overwrite
    lifetimes = existing_df.Fueltype.map(costs.lifetime).fillna(
        existing_df.Tech.map(costs.lifetime)
    )
    if lifetimes.isna().any():
        raise ValueError(
            f"Some assets have no lifetime assigned: \n{lifetimes[lifetimes.isna()]}. "
            "Please check the costs file for the missing lifetimes."
        )
    existing_df.loc[:, "DateOut"] = existing_df.DateOut.fillna(lifetimes) + existing_df.DateIn

    # TODO go through the pypsa-EUR fuel drops for the new ppmatching style
    # drop assets which are already phased out / decommissioned
    phased_out = existing_df[existing_df["DateOut"] < baseyear].index
    existing_df.drop(phased_out, inplace=True)

    newer_assets = (existing_df.DateIn > max(year_bins)).sum()
    if newer_assets:
        logger.warning(
            f"There are {newer_assets} assets with build year "
            f"after last power grouping year {max(year_bins)}. "
            "These assets are dropped and not considered."
            "Consider to redefine the grouping years to keep them."
        )
        to_drop = existing_df[existing_df.DateIn > max(year_bins)].index
        existing_df.drop(to_drop, inplace=True)

    existing_df["lifetime"] = existing_df.DateOut - existing_df["grouping_year"]

    existing_df.rename(columns={"cluster_bus": "bus"}, inplace=True)
    return existing_df

read_existing_capacities(paths_dict, techs)

Read existing capacities from csv files and format them Args: paths_dict (dict[str, os.PathLike]): dictionary with paths to the csv files techs (list): list of technologies to read Returns: pd.DataFrame: DataFrame with existing capacities

Source code in workflow/scripts/prepare_existing_capacities.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def read_existing_capacities(paths_dict: dict[str, os.PathLike], techs: list) -> pd.DataFrame:
    """Read existing capacities from csv files and format them
    Args:
        paths_dict (dict[str, os.PathLike]): dictionary with paths to the csv files
        techs (list): list of technologies to read
    Returns:
        pd.DataFrame: DataFrame with existing capacities
    """
    # TODO fix centralise (make a dict from start?)
    carrier = {
        "coal": "coal power plant",
        "CHP coal": "central coal CHP",
        "CHP gas": "central gas CHP",
        "OCGT": "gas OCGT",
        "CCGT": "gas CCGT",
        "solar": "solar",
        "solar thermal": "central solar thermal",
        "onwind": "onwind",
        "offwind": "offwind",
        "coal boiler": "central coal boiler",
        "ground heat pump": "central ground-sourced heat pump",
        "nuclear": "nuclear",
    }
    carrier = {k: v for k, v in carrier.items() if k in techs}

    df_agg = pd.DataFrame()
    for tech in carrier:
        df = pd.read_csv(paths_dict[tech], index_col=0).fillna(0.0)
        df.columns = df.columns.astype(int)
        df = df.sort_index()

        for year in df.columns:
            for node in df.index:
                name = f"{node}-{tech}-{year}"
                capacity = df.loc[node, year]
                if capacity > 0.0:
                    df_agg.at[name, "Fueltype"] = carrier[tech]
                    df_agg.at[name, "Tech"] = tech
                    df_agg.at[name, "Capacity"] = capacity
                    df_agg.at[name, "DateIn"] = year
                    df_agg.at[name, "cluster_bus"] = node

    return df_agg