Skip to content

Add existing baseyear

Functions to add brownfield capacities to the network for a reference year - adds VREs per grade and corrects technical potential. Best available grade is chosen

add_base_year(n, plan_year)

Add base year information to newly built components in the network.

Sets the 'build_year' attribute for all extendable generators and links in the network to the specified planning year. This is used to track when infrastructure is added to the system.

Parameters:

Name Type Description Default
n Network

The PyPSA network object to modify.

required
plan_year int

The planning year to assign as the build year for new components.

required
Note

This function modifies the network in-place by updating the 'build_year' attribute for components marked as extendable (p_nom_extendable==True).

Example

network = pypsa.Network() add_base_year(network, 2030)

All extendable components now have build_year = 2030

Source code in workflow/scripts/add_existing_baseyear.py
def add_base_year(n: pypsa.Network, plan_year: int):
    """Add base year information to newly built components in the network.

    Sets the 'build_year' attribute for all extendable generators and links
    in the network to the specified planning year. This is used to track
    when infrastructure is added to the system.

    Args:
        n: The PyPSA network object to modify.
        plan_year: The planning year to assign as the build year for new components.

    Note:
        This function modifies the network in-place by updating the 'build_year'
        attribute for components marked as extendable (p_nom_extendable==True).

    Example:
        >>> network = pypsa.Network()
        >>> add_base_year(network, 2030)
        # All extendable components now have build_year = 2030
    """

    for component in ["links", "generators"]:
        comp = getattr(n, component)
        mask = comp.query("p_nom_extendable==True").index
        comp.loc[mask, "build_year"] = plan_year

add_existing_vre_capacities(n, costs, vre_caps, config)

Add existing VRE capacities to the network and distribute them by vre grade potential. Adapted from pypsa-eur but the VRE capacities are province resolved.

NOTE that using this function requires adding the land-use constraint in solve_network so that the existing capacities are subtracted from the available potential

Parameters:

Name Type Description Default
n Network

the network

required
costs DataFrame

costs of the technologies

required
vre_caps DataFrame

existing brownfield VRE capacities in MW

required
config dict

snakemake configuration dictionary

required

Returns: pd.DataFrame: DataFrame with existing VRE capacities distributed by CF grade

Source code in workflow/scripts/add_existing_baseyear.py
def add_existing_vre_capacities(
    n: pypsa.Network,
    costs: pd.DataFrame,
    vre_caps: pd.DataFrame,
    config: dict,
) -> pd.DataFrame:
    """
    Add existing VRE capacities to the network and distribute them by vre grade potential.
    Adapted from pypsa-eur but the VRE capacities are province resolved.

    NOTE that using this function requires adding the land-use constraint in solve_network so
      that the existing capacities are subtracted from the available potential

    Args:
        n (pypsa.Network): the network
        costs (pd.DataFrame): costs of the technologies
        vre_caps (pd.DataFrame): existing brownfield VRE capacities in MW
        config (dict): snakemake configuration dictionary
    Returns:
        pd.DataFrame: DataFrame with existing VRE capacities distributed by CF grade

    """

    tech_map = {"solar": "PV", "onwind": "Onshore", "offwind-ac": "Offshore", "offwind": "Offshore"}
    tech_map = {k: tech_map[k] for k in tech_map if k in config["Techs"]["vre_techs"]}

    # historical data by tech, location and build year
    grouped_vre = vre_caps.groupby(["Tech", "bus", "DateIn"]).Capacity.sum()
    vre_df = grouped_vre.unstack().reset_index()
    df_agg = pd.DataFrame()

    # iterate over vre carriers
    for carrier in tech_map:
        df = vre_df[vre_df.Tech == carrier].drop(columns=["Tech"])
        df.set_index("bus", inplace=True)
        df.columns = df.columns.astype(int)

        # fetch existing vre generators (n grade bins per node)
        gen_i = n.generators.query("carrier == @carrier").index
        carrier_gens = n.generators.loc[gen_i]
        res_capacities = []
        # for each bus, distribute the vre capacities by grade potential - best first
        for bus, group in carrier_gens.groupby("bus"):
            if bus not in df.index:
                continue
            res_capacities.append(distribute_vre_by_grade(group.p_nom_max, df.loc[bus]))

        if res_capacities:
            res_capacities = pd.concat(res_capacities, axis=0)

            for year in df.columns:
                for gen in res_capacities.index:
                    bus_bin = re.sub(f" {carrier}.*", "", gen)
                    bus, bin_id = bus_bin.rsplit(" ", maxsplit=1)
                    name = f"{bus_bin} {carrier}-{int(year)}"
                    capacity = res_capacities.loc[gen, year]
                    if capacity > 0.0:
                        cost_key = carrier.split("-", maxsplit=1)[0]
                        df_agg.at[name, "Fueltype"] = carrier
                        df_agg.at[name, "Capacity"] = capacity
                        df_agg.at[name, "DateIn"] = int(year)
                        df_agg.at[name, "grouping_year"] = int(year)
                        df_agg.at[name, "lifetime"] = costs.at[cost_key, "lifetime"]
                        df_agg.at[name, "DateOut"] = year + costs.at[cost_key, "lifetime"] - 1
                        df_agg.at[name, "bus"] = bus
                        df_agg.at[name, "resource_class"] = bin_id

    if df_agg.empty:
        return df_agg

    df_agg.loc[:, "Tech"] = df_agg.Fueltype
    return df_agg

add_paid_off_capacity(network, paid_off_caps, costs, cutoff=100)

Add capacities that have been paid off to the network. This is intended for REMIND coupling, where (some of) the REMIND investments can be freely allocated to the optimal node. NB: an additional constraing is needed to ensure that the capacity is not exceeded.

Parameters:

Name Type Description Default
network Network

the network to which the capacities are added.

required
paid_off_caps DataFrame

DataFrame with paid off capacities & columns [tech_group, Capacity, techs]

required
costs DataFrame

techno-economic data for the technologies

required
cutoff int

minimum capacity to be considered. Defaults to 100 MW.

100
Source code in workflow/scripts/add_existing_baseyear.py
def add_paid_off_capacity(
    network: pypsa.Network, paid_off_caps: pd.DataFrame, costs: pd.DataFrame, cutoff=100
):
    """
    Add capacities that have been paid off to the network. This is intended
    for REMIND coupling, where (some of) the REMIND investments can be freely allocated
    to the optimal node. NB: an additional constraing is needed to ensure that
    the capacity is not exceeded.

    Args:
        network (pypsa.Network): the network to which the capacities are added.
        paid_off_caps (pd.DataFrame): DataFrame with paid off capacities & columns
            [tech_group, Capacity, techs]
        costs (pd.DataFrame): techno-economic data for the technologies
        cutoff (int, optional): minimum capacity to be considered. Defaults to 100 MW."""

    paid_off = paid_off_caps.reset_index()

    # explode tech list per tech group (constraint will apply to group)
    paid_off.techs = paid_off.techs.apply(to_list)
    paid_off = paid_off.explode("techs")
    paid_off["carrier"] = paid_off.techs.str.replace("'", "")
    paid_off.set_index("carrier", inplace=True)
    # clip small capacities
    paid_off["p_nom_max"] = paid_off.Capacity.apply(lambda x: 0 if x < cutoff else x)
    paid_off.drop(columns=["Capacity", "techs"], inplace=True)
    paid_off = paid_off.query("p_nom_max > 0")

    component_settings = {
        "Generator": {
            "join_col": "carrier",
            "attrs_to_fix": ["p_min_pu", "p_max_pu"],
        },
        "Link": {
            "join_col": "carrier",
            "attrs_to_fix": ["p_min_pu", "p_max_pu", "efficiency", "efficiency2"],
        },
        "Store": {
            "join_col": "carrier",
            "attrs_to_fix": [],
        },
    }

    # TODO make a centralised setting or update cpl config
    rename_carriers = {"OCGT": "gas OCGT", "CCGT": "gas CCGT"}
    paid_off.rename(rename_carriers, inplace=True)

    for component, settings in component_settings.items():
        prefix = "e" if component == "Store" else "p"
        paid_off_comp = paid_off.rename(columns={"p_nom_max": f"{prefix}_nom_max"})

        # exclude brownfield capacities
        df = getattr(network, component.lower() + "s").query(f"{prefix}_nom_extendable == True")
        # join will add the tech_group and p_nom_max_rcl columns, used later for constraints
        # rcl is legacy name from Adrian for region country limit
        paid = df.join(paid_off_comp, on=[settings["join_col"]], how="right", rsuffix="_rcl")
        paid.dropna(subset=[f"{prefix}_nom_max", f"{prefix}_nom_max_rcl"], inplace=True)
        paid = paid.loc[paid.index.dropna()]
        if paid.empty:
            continue

        # REMIND cap is in output, PyPSA link in input
        if component == "Link":
            paid.loc[:, "p_nom_max_rcl"] /= paid.loc[:, "efficiency"]

        paid.index += "_paid_off"
        # set permissive options for the paid-off capacities (constraint per group added to model later)
        paid["capital_cost"] = 0
        paid[f"{prefix}_nom_min"] = 0.0
        paid[f"{prefix}_nom"] = 0.0
        paid[f"{prefix}_nom_max"] = np.inf

        # add to the network
        network.add(component, paid.index, **paid)
        # now add the dynamic attributes not carried over by n.add (per unit avail etc)
        for missing_attr in settings["attrs_to_fix"]:
            df_t = getattr(network, component.lower() + "s_t")[missing_attr]
            if not df_t.empty:
                base_cols = [
                    x for x in paid.index.str.replace("_paid_off", "") if x in df_t.columns
                ]
                df_t.loc[:, pd.Index(base_cols) + "_paid_off"] = df_t[base_cols].rename(
                    columns=lambda x: x + "_paid_off"
                )

    if "biomass" in paid_off.index and "biomass" not in network.generators.carrier.unique():
        _add_paidoff_biomass(
            network,
            costs,
            paid_off.loc["biomass", "p_nom_max"],
            tech_group=paid_off.loc["biomass", "tech_group"],
        )

add_power_capacities_installed_before_baseyear(n, costs, config, installed_capacities, eff_penalty_hist=0.0)

Add existing power capacities to the network. Note: hydro dams brownfield handled by prepare_network

Parameters:

Name Type Description Default
n Network

the network

required
costs DataFrame

techno-economic data

required
config dict

configuration dictionary

required
installed_capacities DataFrame

installed capacities in MW

required
eff_penalty_hist float

efficiency penalty for historical plants (1-x)*current

0.0
Source code in workflow/scripts/add_existing_baseyear.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
def add_power_capacities_installed_before_baseyear(
    n: pypsa.Network,
    costs: pd.DataFrame,
    config: dict,
    installed_capacities: pd.DataFrame,
    eff_penalty_hist=0.0,
):
    """
    Add existing power capacities to the network.
    Note: hydro dams brownfield handled by prepare_network

    Args:
        n (pypsa.Network): the network
        costs (pd.DataFrame): techno-economic data
        config (dict): configuration dictionary
        installed_capacities (pd.DataFrame): installed capacities in MW
        eff_penalty_hist (float): efficiency penalty for historical plants (1-x)*current
    """

    logger.info("adding power capacities installed before baseyear")

    df = installed_capacities.copy()
    # fix fuel type CHP order to match network
    df["tech_clean"] = df["Fueltype"].str.replace(r"^CHP (.+)$", r"\1 CHP", regex=True)
    df["tech_clean"] = df["tech_clean"].str.replace("central ", "")
    df["tech_clean"] = df["tech_clean"].str.replace("decentral ", "")

    # TODO fix this based on config / centralise / other
    carrier_map = {
        "coal": "coal",
        "coal power plant": "coal",
        "CHP coal": "CHP coal",
        "coal CHP": "CHP coal",
        "CHP gas": "CHP gas",
        "gas CHP": "CHP gas",
        "gas OCGT": "gas OCGT",
        "gas CCGT": "gas CCGT",
        "solar": "solar",
        "solar thermal": "solar thermal",
        "onwind": "onwind",
        "offwind": "offwind",
        "coal boiler": "coal boiler central",
        "ground-sourced heat pump": "heat pump",
        "ground heat pump": "heat pump",
        "air heat pump": "heat pump",
        "nuclear": "nuclear",
        "PHS": "PHS",
    }
    costs_map = {
        "coal power plant": "coal",
        "coal CHP": "central coal CHP",
        "gas CHP": "central gas CHP",
        "gas OCGT": "OCGT",
        "gas CCGT": "CCGT",
        "solar": "solar",
        "solar thermal": "central solar thermal",
        "onwind": "onwind",
        "offwind": "offwind",
        "coal boiler": "central coal boiler",
        "heat pump": "central ground-sourced heat pump",
        "ground-sourced heat pump": "central ground-sourced heat pump",
        "nuclear": "nuclear",
    }

    # add techs that may have a direct match to the technoecon data
    missing_techs = {k: k for k in df.Fueltype.unique() if k not in costs_map}
    costs_map.update(missing_techs)

    if "resource_class" not in df.columns:
        df["resource_class"] = ""
    else:
        df.resource_class.fillna("", inplace=True)
    logger.info(df.grouping_year.unique())
    # TODO: exclude collapse of coal & coal CHP IF CCS retrofitting is enabled
    if config["existing_capacities"].get("collapse_years", False):
        df.grouping_year = 1 # 0 is default
    df.grouping_year = df.grouping_year.astype(int, errors="ignore")

    df_ = df.pivot_table(
        index=["grouping_year", "tech_clean", "resource_class"],
        columns="bus",
        values="Capacity",
        aggfunc="sum",
    )

    df_.fillna(0, inplace=True)

    defined_carriers = n.carriers.index.unique().to_list()
    vre_carriers = ["solar", "onwind", "offwind"]

    # TODO do we really need to loop over the years? / so many things?
    # something like df_.unstack(level=0) would be more efficient
    for grouping_year, generator, resource_grade in df_.index:
        build_year = 1 if grouping_year == "brownwfield" else grouping_year
        logger.info(f"Adding existing generator {generator} with year grp {grouping_year}")
        if not carrier_map.get(generator, "missing") in defined_carriers:
            logger.warning(
                f"Carrier {carrier_map.get(generator, None)} for {generator} not defined in network"
                "Consider adding to the CARRIER_MAP"
            )
        elif costs_map.get(generator) is None:
            raise ValueError(f"{generator} not defined in technoecon map - check costs_map")

        # capacity is the capacity in MW at each node for this
        capacity = df_.loc[grouping_year, generator, resource_grade]
        if capacity.values.max() == 0:
            continue
        capacity = capacity[capacity > config["existing_capacities"]["threshold_capacity"]].dropna()
        buses = capacity.index
        # fix index for network.add (merge grade to name)
        if resource_grade:
            capacity.index += " " + resource_grade
        capacity.index += " " + costs_map[generator]

        costs_key = costs_map[generator]

        if generator in vre_carriers:
            mask = n.generators_t.p_max_pu.columns.map(n.generators.carrier) == generator
            p_max_pu = n.generators_t.p_max_pu.loc[:, mask]
            n.add(
                "Generator",
                capacity.index,
                suffix=f"-{grouping_year}",
                bus=buses,
                carrier=carrier_map[generator],
                p_nom=capacity,
                p_nom_min=capacity,
                p_nom_extendable=False,
                marginal_cost=costs.at[costs_key, "marginal_cost"],
                efficiency=costs.at[costs_key, "efficiency"],
                p_max_pu=p_max_pu[capacity.index],
                build_year=build_year,
                lifetime=costs.at[costs_key, "lifetime"],
                location=buses,
            )

        elif generator in ["nuclear", "coal power plant", "biomass", "oil"]:
            n.add(
                "Generator",
                capacity.index,
                suffix="-" + str(grouping_year),
                bus=buses,
                carrier=carrier_map[generator],
                p_nom=capacity,
                p_nom_max=capacity,
                p_nom_min=capacity,
                p_nom_extendable=False,
                p_max_pu=config["nuclear_reactors"]["p_max_pu"] if generator == "nuclear" else 1,
                p_min_pu=config["nuclear_reactors"]["p_min_pu"] if generator == "nuclear" else 0,
                marginal_cost=costs.at[costs_key, "marginal_cost"],
                efficiency=costs.at[costs_key, "efficiency"] * (1 - eff_penalty_hist),
                build_year=build_year,
                lifetime=costs.at[costs_key, "lifetime"],
                location=buses,
            )

        # TODO this does not add the carrier to the list
        elif generator in ["gas CCGT", "gas OCGT"]:
            bus0 = buses + " gas"
            carrier_ = carrier_map[generator]
            # ugly fix to register the carrier. Emissions for sub carrier are 0: they are accounted for at gas bus
            n.carriers.loc[carrier_] = {
                "co2_emissions": 0,
                "color": config["plotting"]["tech_colors"][carrier_],
                "nice_name": config["plotting"]["nice_names"][carrier_],
                "max_growth": np.inf,
                "max_relative_growth": 0,
            }
            # now add link - carrier should exist
            n.add(
                "Link",
                capacity.index,
                suffix="-" + str(grouping_year),
                bus0=bus0,
                bus1=buses,
                carrier=carrier_map[generator],
                marginal_cost=costs.at[costs_key, "efficiency"]
                * costs.at[costs_key, "VOM"],  # NB: VOM is per MWel
                # NB: fixed cost is per MWel
                p_nom=capacity / costs.at[costs_key, "efficiency"],
                p_nom_min=capacity / costs.at[costs_key, "efficiency"],
                p_nom_max=capacity / costs.at[costs_key, "efficiency"],
                p_nom_extendable=False,
                efficiency=costs.at[costs_key, "efficiency"] * (1 - eff_penalty_hist),
                build_year=build_year,
                lifetime=costs.at[costs_key, "lifetime"],
                location=buses,
            )
        elif generator in [
            "solar thermal",
            "CHP coal",
            "CHP gas",
            "heat pump",
            "coal boiler",
        ] and not config.get("heat_coupling", False):
            logger.info(f"Skipped {generator} because heat coupling is not activated")

        elif generator == "solar thermal":
            p_max_pu = n.generators_t.p_max_pu[capacity.index]
            p_max_pu.columns = capacity.index
            n.add(
                "Generator",
                capacity.index,
                suffix=f"-{str(grouping_year)}",
                bus=buses + " central heat",
                carrier=carrier_map[generator],
                p_nom=capacity,
                p_nom_min=capacity,
                p_nom_max=capacity,
                p_nom_extendable=False,
                marginal_cost=costs.at["central " + generator, "marginal_cost"],
                p_max_pu=p_max_pu,
                build_year=build_year,
                lifetime=costs.at["central " + generator, "lifetime"],
                location=buses,
            )

        elif generator in ["CHP coal", "coal CHP"]:
            bus0 = buses + " coal fuel"
            n.add(
                "Link",
                capacity.index,
                suffix=f" generator-{str(grouping_year)}",
                bus0=bus0,
                bus1=buses,
                carrier=carrier_map[generator],
                marginal_cost=costs.at["central coal CHP", "efficiency"]
                * costs.at["central coal CHP", "VOM"],  # NB: VOM is per MWel
                p_nom=capacity / costs.at["central coal CHP", "efficiency"],
                p_nom_min=capacity / costs.at["central coal CHP", "efficiency"],
                p_nom_max=capacity / costs.at["central coal CHP", "efficiency"],
                p_nom_extendable=False,
                efficiency=costs.at["central coal CHP", "efficiency"] * (1 - eff_penalty_hist),
                heat_to_power=config["chp_parameters"]["coal"]["heat_to_power"],
                build_year=build_year,
                lifetime=costs.at["central coal CHP", "lifetime"],
                location=buses,
            )
            # simplified treatment based on a decrease with c_v and a max htpwr ratio
            htpr = config["chp_parameters"]["coal"]["heat_to_power"]

            n.add(
                "Link",
                capacity.index,
                suffix=f" boiler-{str(grouping_year)}",
                bus0=bus0,
                bus1=buses + " central heat",
                carrier=carrier_map[generator],
                marginal_cost=costs.at["central coal CHP", "efficiency"]
                * costs.at["central coal CHP", "VOM"],  # NB: VOM is per MWel
                # p_max will be constrained by chp constraints
                p_nom=capacity * htpr,
                p_nom_min=capacity * htpr,
                p_nom_max=capacity * htpr,
                p_nom_extendable=False,
                # total eff will be fixed by CHP constraints
                efficiency=config["chp_parameters"]["coal"]["total_eff"],
                build_year=build_year,
                lifetime=costs.at["central coal CHP", "lifetime"],
                location=buses,
            )

        elif generator in ["CHP gas", "gas CHP"]:
            bus0 = buses + " gas"
            n.add(
                "Link",
                capacity.index,
                suffix=f" generator-{str(grouping_year)}",
                bus0=bus0,
                bus1=buses,
                carrier=carrier_map[generator],
                marginal_cost=costs.at["central gas CHP CC", "efficiency"]
                * costs.at["central gas CHP CC", "VOM"],  # NB: VOM is per MWel
                capital_cost=costs.at["central gas CHP CC", "efficiency"]
                * costs.at["central gas CHP CC", "capital_cost"],  # NB: fixed cost is per MWel,
                p_nom=capacity / costs.at["central gas CHP CC", "efficiency"],
                p_nom_min=capacity / costs.at["central gas CHP CC", "efficiency"],
                p_nom_extendable=False,
                efficiency=costs.at["central gas CHP CC", "efficiency"] * (1 - eff_penalty_hist),
                heat_to_power=config["chp_parameters"]["gas"]["heat_to_power"],
                build_year=build_year,
                lifetime=costs.at["central gas CHP CC", "lifetime"],
                location=buses,
            )
            # simplified treatment based on a decrease with c_v and a max htpwr ratio
            htpr = config["chp_parameters"]["gas"]["heat_to_power"]

            n.add(
                "Link",
                capacity.index,
                suffix=f" boiler-{str(grouping_year)}",
                bus0=bus0,
                bus1=buses + " central heat",
                carrier=carrier_map[generator],
                marginal_cost=costs.at["central gas CHP CC", "efficiency"]
                * costs.at["central gas CHP CC", "VOM"],  # NB: VOM is per MWel
                # pmax will be constrained by chp constraints
                p_nom=capacity * htpr,
                p_nom_min=capacity * htpr,
                p_nom_max=capacity * htpr,
                p_nom_extendable=False,
                # will be constrained by chp constraints
                efficiency=config["chp_parameters"]["gas"]["total_eff"],
                build_year=build_year,
                lifetime=costs.at["central gas CHP CC", "lifetime"],
                location=buses,
            )

        elif generator.find("coal boiler") != -1:
            bus0 = buses + " coal"
            cat = "central" if generator.find("decentral") == -1 else "decentral"
            n.add(
                "Link",
                capacity.index,
                suffix="" + cat + generator + "-" + str(grouping_year),
                bus0=bus0,
                bus1=capacity.index + cat + "heat",
                carrier=carrier_map[generator],
                marginal_cost=costs.at[cat.lstrip() + generator, "efficiency"]
                * costs.at[cat.lstrip() + generator, "VOM"],
                capital_cost=costs.at[cat.lstrip() + generator, "efficiency"]
                * costs.at[cat.lstrip() + generator, "capital_cost"],
                p_nom=capacity / costs.at[cat.lstrip() + generator, "efficiency"],
                p_nom_min=capacity / costs.at[cat.lstrip() + generator, "efficiency"],
                p_nom_max=capacity / costs.at[cat.lstrip() + generator, "efficiency"],
                p_nom_extendable=False,
                efficiency=costs.at[cat.lstrip() + generator, "efficiency"],
                build_year=build_year,
                lifetime=costs.at[cat.lstrip() + generator, "lifetime"],
                location=buses,
            )

        # TODO fix read operation in func, fix snakemake in function, make air pumps?
        elif generator == "heat pump":
            # TODO separate the read operation from the add operation
            with pd.HDFStore(snakemake.input.cop_name, mode="r") as store:
                gshp_cop = store["gshp_cop_profiles"]
                gshp_cop.index = gshp_cop.index.tz_localize(None)
                gshp_cop = shift_profile_to_planning_year(
                    gshp_cop, snakemake.wildcards.planning_horizons
                )
                gshp_cop = gshp_cop.loc[n.snapshots]
            n.add(
                "Link",
                capacity.index,
                suffix="-" + str(grouping_year),
                bus0=capacity.index,
                bus1=capacity.index + " central heat",
                carrier="heat pump",
                efficiency=(
                    gshp_cop[capacity.index]
                    if config["time_dep_hp_cop"]
                    else costs.at["decentral ground-sourced heat pump", "efficiency"]
                ),
                capital_cost=costs.at["decentral ground-sourced heat pump", "efficiency"]
                * costs.at["decentral ground-sourced heat pump", "capital_cost"],
                marginal_cost=costs.at["decentral ground-sourced heat pump", "efficiency"]
                * costs.at["decentral ground-sourced heat pump", "marginal_cost"],
                p_nom=capacity / costs.at["decentral ground-sourced heat pump", "efficiency"],
                p_nom_min=capacity / costs.at["decentral ground-sourced heat pump", "efficiency"],
                p_nom_max=capacity / costs.at["decentral ground-sourced heat pump", "efficiency"],
                p_nom_extendable=False,
                build_year=build_year,
                lifetime=costs.at["decentral ground-sourced heat pump", "lifetime"],
                location=buses,
            )

        elif generator == "PHS":
            # pure pumped hydro storage, fixed, 6h energy by default, no inflow
            n.add(
                "StorageUnit",
                capacity.index,
                suffix="-" + str(grouping_year),
                bus=buses,
                carrier="PHS",
                p_nom=capacity,
                p_nom_min=capacity,
                p_nom_max=capacity,
                p_nom_extendable=False,
                max_hours=config["hydro"]["PHS_max_hours"],
                efficiency_store=np.sqrt(costs.at["PHS", "efficiency"]),
                efficiency_dispatch=np.sqrt(costs.at["PHS", "efficiency"]),
                cyclic_state_of_charge=True,
                marginal_cost=0.0,
                location=buses,
            )

        else:
            logger.warning(
                f"Skipped existing capacitity for '{generator}'"
                + " - tech not implemented as existing capacity"
            )

    return

distribute_vre_by_grade(cap_by_year, grade_capacities)

Distribute VRE capacities by grade, ensuring potentials are respected and prioritizing better grades first

Allocates variable renewable energy (VRE) capacity additions across different resource quality grades. The algorithm preferentially uses higher-quality grades before moving to lower-quality ones, implementing a "fill-up" strategy.

Parameters:

Name Type Description Default
cap_by_year Series

Annual VRE capacity additions indexed by year. Values represent the total capacity to be added in each year (MW or GW).

required
grade_capacities Series

Available capacity potential by resource grade for a bus, indexed by grade identifier. Higher-quality grades should have lower indices for proper prioritization.

required

Returns:

Type Description
DataFrame

DataFrame with distributed capacities where: - Rows are indexed by years (from cap_by_year) - Columns are indexed by grades (from grade_capacities) - Values represent allocated capacity for each year-grade combination

Example

cap_additions = pd.Series([100, 200], index=[2020, 2030]) grade_potentials = pd.Series([50, 75, 100], index=['grade_1', 'grade_2', 'grade_3']) result = distribute_vre_by_grade(cap_additions, grade_potentials) print(result.sum(axis=1)) # Should match original yearly totals

Note

The function assumes grade_capacities are ordered with best grades first. If total demand exceeds available capacity, the algorithm allocates as much as possible following the grade priority.

Source code in workflow/scripts/add_existing_baseyear.py
def distribute_vre_by_grade(cap_by_year: pd.Series, grade_capacities: pd.Series) -> pd.DataFrame:
    """Distribute VRE capacities by grade, ensuring potentials are respected and prioritizing better grades first

    Allocates variable renewable energy (VRE) capacity additions across different
    resource quality grades. The algorithm preferentially uses higher-quality
    grades before moving to lower-quality ones, implementing a "fill-up" strategy.

    Args:
        cap_by_year (pd.Series): Annual VRE capacity additions indexed by year. Values represent
            the total capacity to be added in each year (MW or GW).
        grade_capacities (pd.Series): Available capacity potential by resource grade for a bus,
            indexed by grade identifier. Higher-quality grades should have lower indices for
            proper prioritization.

    Returns:
        DataFrame with distributed capacities where:
            - Rows are indexed by years (from cap_by_year)
            - Columns are indexed by grades (from grade_capacities)
            - Values represent allocated capacity for each year-grade combination

    Example:
        >>> cap_additions = pd.Series([100, 200], index=[2020, 2030])
        >>> grade_potentials = pd.Series([50, 75, 100], index=['grade_1', 'grade_2', 'grade_3'])
        >>> result = distribute_vre_by_grade(cap_additions, grade_potentials)
        >>> print(result.sum(axis=1))  # Should match original yearly totals

    Note:
        The function assumes grade_capacities are ordered with best grades first.
        If total demand exceeds available capacity, the algorithm allocates as much
        as possible following the grade priority.
    """

    availability = cap_by_year.sort_index(ascending=False)
    to_distribute = grade_capacities.fillna(0).sort_index()
    n_years = len(to_distribute)
    n_sources = len(availability)

    # To store allocation per year per source (shape: sources x years)
    allocation = np.zeros((n_sources, n_years), dtype=int)
    remaining = availability.values

    for j in range(n_years):
        needed = to_distribute.values[j]
        cumsum = np.cumsum(remaining)
        used_up = cumsum < needed
        cutoff = np.argmax(cumsum >= needed)

        allocation[used_up, j] = remaining[used_up]

        if needed > (cumsum[cutoff - 1] if cutoff > 0 else 0):
            allocation[cutoff, j] = needed - (cumsum[cutoff - 1] if cutoff > 0 else 0)

        # Subtract what was used from availability
        remaining -= allocation[:, j]

    return pd.DataFrame(data=allocation, columns=grade_capacities.index, index=availability.index)

filter_brownfield_capacities(existing_df, plan_year)

Filter brownfield capacities to remove retired/not yet built plants . Parameters: existing_df (pd.DataFrame): DataFrame containing asset information with at least the columns 'DateOut', 'DateIn', 'grouping_year', and 'cluster_bus'. plan_year (int): The year modelled year/horizon Returns: pd.DataFrame: The filtered and updated DataFrame.

Source code in workflow/scripts/add_existing_baseyear.py
def filter_brownfield_capacities(existing_df: pd.DataFrame, plan_year: int) -> pd.DataFrame:
    """
    Filter brownfield capacities to remove retired/not yet built plants .
    Parameters:
        existing_df (pd.DataFrame): DataFrame containing asset information with at least the columns 'DateOut', 'DateIn', 'grouping_year', and 'cluster_bus'.
        plan_year (int): The year modelled year/horizon
    Returns:
        pd.DataFrame: The filtered and updated DataFrame.
    """

    # drop assets which are already phased out / decommissioned
    phased_out = existing_df[existing_df["DateOut"] < plan_year].index
    existing_df.drop(phased_out, inplace=True)

    to_drop = existing_df[existing_df.DateIn > plan_year].index
    existing_df.drop(to_drop, inplace=True)

    existing_df.rename(columns={"cluster_bus": "bus"}, inplace=True)

    return existing_df