Skip to content

Solve network

Functions to add constraints and prepare the network for the solver. Associated with the solve_networks rule in the Snakefile.

add_battery_constraints(n)

Add constraint ensuring that charger = discharger, i.e. 1 * charger_size - efficiency * discharger_size = 0

Source code in workflow/scripts/solve_network.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def add_battery_constraints(n: pypsa.Network):
    """
    Add constraint ensuring that charger = discharger, i.e.
    1 * charger_size - efficiency * discharger_size = 0
    """
    if not n.links.p_nom_extendable.any():
        return

    discharger_bool = n.links.index.str.contains("battery discharger")
    charger_bool = n.links.index.str.contains("battery charger")

    dischargers_ext = n.links[discharger_bool].query("p_nom_extendable").index
    chargers_ext = n.links[charger_bool].query("p_nom_extendable").index

    eff = n.links.efficiency[dischargers_ext].values
    lhs = n.model["Link-p_nom"].loc[chargers_ext] - n.model["Link-p_nom"].loc[dischargers_ext] * eff

    n.model.add_constraints(lhs == 0, name="Link-charger_ratio")

add_chp_constraints(n)

Add constraints to couple the heat and electricity output of CHP plants (using the cb and cv parameter). See the DEA technology cataloge

Parameters:

Name Type Description Default
n Network

the pypsa network object to which's model the constraints are added

required
Source code in workflow/scripts/solve_network.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def add_chp_constraints(n: pypsa.Network):
    """Add constraints to couple the heat and electricity output of CHP plants
         (using the cb and cv parameter). See the DEA technology cataloge

    Args:
        n (pypsa.Network): the pypsa network object to which's model the constraints are added
    """
    electric = n.links.index.str.contains("CHP") & n.links.index.str.contains("generator")
    heat = n.links.index.str.contains("CHP") & n.links.index.str.contains("boiler")

    electric_ext = n.links[electric].query("p_nom_extendable").index
    heat_ext = n.links[heat].query("p_nom_extendable").index

    electric_fix = n.links[electric].query("~p_nom_extendable").index
    heat_fix = n.links[heat].query("~p_nom_extendable").index

    p = n.model["Link-p"]  # dimension: [time, link]

    # output ratio between heat and electricity and top_iso_fuel_line for extendable
    if not electric_ext.empty:
        p_nom = n.model["Link-p_nom"]

        lhs = (
            p_nom.loc[electric_ext]
            * (n.links.p_nom_ratio * n.links.efficiency)[electric_ext].values
            - p_nom.loc[heat_ext] * n.links.efficiency[heat_ext].values
        )
        n.model.add_constraints(lhs == 0, name="chplink-fix_p_nom_ratio")

        rename = {"Link-ext": "Link"}
        lhs = p.loc[:, electric_ext] + p.loc[:, heat_ext] - p_nom.rename(rename).loc[electric_ext]
        n.model.add_constraints(lhs <= 0, name="chplink-top_iso_fuel_line_ext")

    # top_iso_fuel_line for fixed
    if not electric_fix.empty:
        lhs = p.loc[:, electric_fix] + p.loc[:, heat_fix]
        rhs = n.links.p_nom[electric_fix]
        n.model.add_constraints(lhs <= rhs, name="chplink-top_iso_fuel_line_fix")

    # back-pressure
    if not n.links[electric].index.empty:
        lhs = (
            p.loc[:, heat] * (n.links.efficiency[heat] * n.links.c_b[electric].values)
            - p.loc[:, electric] * n.links.efficiency[electric]
        )
        n.model.add_constraints(lhs <= 0, name="chplink-backpressure")

add_co2_constraints_prices(network, co2_control)

Add co2 constraints or prices

Parameters:

Name Type Description Default
network Network

the network to which prices or constraints are to be added

required
co2_control dict

the config

required

Raises:

Type Description
ValueError

unrecognised co2 control option

Source code in workflow/scripts/solve_network.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def add_co2_constraints_prices(network: pypsa.Network, co2_control: dict):
    """Add co2 constraints or prices

    Args:
        network (pypsa.Network): the network to which prices or constraints are to be added
        co2_control (dict): the config

    Raises:
        ValueError: unrecognised co2 control option
    """

    if co2_control["control"] is None:
        pass
    elif co2_control["control"] == "price":
        logger.info("Adding CO2 price to marginal costs of generators and storage units")
        add_emission_prices(network, emission_prices={"co2": co2_control["co2_pr_or_limit"]})

    elif co2_control["control"].startswith("budget"):
        co2_limit = co2_control["co2_pr_or_limit"]
        logger.info("Adding CO2 constraint based on scenario {co2_limit}")
        network.add(
            "GlobalConstraint",
            "co2_limit",
            type="primary_energy",
            carrier_attribute="co2_emissions",
            sense="<=",
            constant=co2_limit,
        )
    else:
        logger.error(f"Unhandled CO2 control config {co2_control} due to unknown control.")
        raise ValueError(f"Unhandled CO2 control config {co2_control} due to unknown control")

add_emission_prices(n, emission_prices={'co2': 0.0}, exclude_co2=False)

from pypsa-eur: add GHG price to marginal costs of generators and storage units

Parameters:

Name Type Description Default
n Network

the pypsa network

required
emission_prices dict

emission prices per GHG. Defaults to {"co2": 0.0}.

{'co2': 0.0}
exclude_co2 bool

do not charge for CO2 emissions. Defaults to False.

False
Source code in workflow/scripts/solve_network.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def add_emission_prices(n: pypsa.Network, emission_prices={"co2": 0.0}, exclude_co2=False):
    """from pypsa-eur: add GHG price to marginal costs of generators and storage units

    Args:
        n (pypsa.Network): the pypsa network
        emission_prices (dict, optional): emission prices per GHG. Defaults to {"co2": 0.0}.
        exclude_co2 (bool, optional): do not charge for CO2 emissions. Defaults to False.
    """
    if exclude_co2:
        emission_prices.pop("co2")
    em_price = (
        pd.Series(emission_prices).rename(lambda x: x + "_emissions")
        * n.carriers.filter(like="_emissions")
    ).sum(axis=1)

    n.meta.update({"emission_prices": emission_prices})

    gen_em_price = n.generators.carrier.map(em_price) / n.generators.efficiency

    n.generators["marginal_cost"] += gen_em_price
    n.generators_t["marginal_cost"] += gen_em_price[n.generators_t["marginal_cost"].columns]
    # storage units su
    su_em_price = n.storage_units.carrier.map(em_price) / n.storage_units.efficiency_dispatch
    n.storage_units["marginal_cost"] += su_em_price

    logger.info("Added emission prices to marginal costs of generators and storage units")
    logger.info(f"\tEmission prices: {emission_prices}")

add_land_use_constraint(n, planning_horizons)

Add land use constraints for renewable energy potential. This ensures that the brownfield + greenfield vre installations for each generator tech do not exceed the technical potential.

Parameters:

Name Type Description Default
n Network

the network object to add the constraints to

required
planning_horizons str | int

the planning horizon year as string

required
Source code in workflow/scripts/solve_network.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
def add_land_use_constraint(n: pypsa.Network, planning_horizons: str | int) -> None:
    """
    Add land use constraints for renewable energy potential. This ensures that the brownfield +
     greenfield vre installations for each generator tech do not exceed the technical potential.

    Args:
        n (pypsa.Network): the network object to add the constraints to
        planning_horizons (str | int): the planning horizon year as string
    """
    # warning: this will miss existing offwind which is not classed AC-DC and has carrier 'offwind'

    for carrier in [
        "solar",
        "solar thermal",
        "onwind",
        "offwind",
        "offwind-ac",
        "offwind-dc",
        "offwind-float",
    ]:
        ext_i = (n.generators.carrier == carrier) & ~n.generators.p_nom_extendable
        grouper = n.generators.loc[ext_i].index.str.replace(f" {carrier}.*$", "", regex=True)
        existing = n.generators.loc[ext_i, "p_nom"].groupby(grouper).sum()
        existing.index += f" {carrier}"
        n.generators.loc[existing.index, "p_nom_max"] -= existing

    # check if existing capacities are larger than technical potential
    existing_large = n.generators[n.generators["p_nom_min"] > n.generators["p_nom_max"]].index
    if len(existing_large):
        logger.warning(
            f"Existing capacities larger than technical potential for {existing_large},\
                        adjust technical potential to existing capacities"
        )
        n.generators.loc[existing_large, "p_nom_max"] = n.generators.loc[
            existing_large, "p_nom_min"
        ]

    n.generators["p_nom_max"] = n.generators["p_nom_max"].clip(lower=0)

add_operational_reserve_margin(n, config)

Build operational reserve margin constraints based on the formulation given in https://genxproject.github.io/GenX.jl/stable/Model_Reference/core/#GenX.operational_reserves_core!-Tuple{JuMP.Model,%20Dict,%20Dict}

The constraint is network wide and not at each node!

Parameters:

Name Type Description Default
n Network

the network object to optimize

required
config dict

the configuration dictionary

required
Example

config.yaml requires to specify operational_reserve: operational_reserve: activate: true epsilon_load: 0.02 # percentage of load at each snapshot epsilon_vres: 0.02 # percentage of VRES at each snapshot contingency: 400000 # MW

Source code in workflow/scripts/solve_network.py
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
def add_operational_reserve_margin(n: pypsa.network, config):
    """
    Build operational reserve margin constraints based on the formulation given in
    https://genxproject.github.io/GenX.jl/stable/Model_Reference/core/#GenX.operational_reserves_core!-Tuple{JuMP.Model,%20Dict,%20Dict}

    The constraint is network wide and not at each node!

    Args:
        n (pypsa.Network): the network object to optimize
        config (dict): the configuration dictionary

    Example:
        config.yaml requires to specify operational_reserve:
        operational_reserve:
            activate: true
            epsilon_load: 0.02 # percentage of load at each snapshot
            epsilon_vres: 0.02 # percentage of VRES at each snapshot
            contingency: 400000 # MW
    """
    reserve_config = config["operational_reserve"]
    VRE_TECHS = config["Techs"].get("non_dispatchable", ["onwind", "offwind", "solar"])
    EPSILON_LOAD, EPSILON_VRES = reserve_config["epsilon_load"], reserve_config["epsilon_vres"]
    CONTINGENCY = float(reserve_config["contingency"])

    # AC producers
    ac_mask = n.generators.bus.map(n.buses.carrier) == "AC"
    ac_buses = n.buses.query("carrier =='AC'").index
    attached_carriers = filter_carriers(n, "AC")
    # conceivably a link could have a negative efficiency and flow towards bus0 - don't consider
    prod_links = n.links.query("carrier in @attached_carriers & not bus0 in @ac_buses")
    transport_links = prod_links.bus0.map(n.buses.carrier) == prod_links.bus1.map(n.buses.carrier)
    prod_links = prod_links.loc[transport_links == False]
    prod_gen = n.generators.loc[ac_mask]
    producers_all = prod_links.index.append(prod_gen.index)
    producers_all.name = "Producers-p"

    # RSERVES
    n.model.add_variables(0, np.inf, coords=[n.snapshots, prod_gen.index], name="Generator-r")
    n.model.add_variables(0, np.inf, coords=[n.snapshots, prod_links.index], name="Link-r")

    # Define Reserve and weigh VRES by their mean availability ("capacity credit")
    vres_gen = prod_gen.query("carrier in @VRE_TECHS")
    non_vre = prod_gen.index.difference(vres_gen.index)
    # full capacity credit for non-VRE producers (questionable, maybe should be weighted by availability)
    summed_reserve = (n.model["Link-r"] * prod_links.efficiency).sum("Link") + n.model[
        "Generator-r"
    ].loc[:, non_vre].sum("Generator")

    # VRE capacity credit & margin reqs
    ext_idx = vres_gen.query("p_nom_extendable").index
    avail = n.generators_t.p_max_pu.loc[:, vres_gen.index]
    vres_idx = avail.columns

    if not vres_gen.empty:
        # Reserve score based on actual avail (perfect foresight) not mean/expected avail
        vre_reserve_score = (n.model["Generator-r"].loc[:, vres_gen.index] * avail).sum("Generator")
        summed_reserve += vre_reserve_score
    if not ext_idx.empty and not vres_idx.empty:
        # reqs from brownfield VRE generators. epsilon is the margin for VRES forecast error
        avail_factor = n.generators_t.p_max_pu[ext_idx]
        p_nom_vres = n.model["Generator-p_nom"].loc[ext_idx].rename({"Generator-ext": "Generator"})
        vre_req_ext = (p_nom_vres * (EPSILON_VRES * xr.DataArray(avail_factor))).sum("Generator")
    else:
        vre_req_ext = 0

    if not vres_idx.empty:
        # reqs extendable VRE generators
        avail_factor = n.generators_t.p_max_pu[vres_idx.difference(ext_idx)]
        renewable_capacity = n.generators.p_nom[vres_idx.difference(ext_idx)]
        vre_req_fix = (avail_factor * renewable_capacity).sum(axis=1)
    else:
        vre_req_fix = 0

    lhs = summed_reserve - vre_req_ext
    # Right-hand-side
    demand = get_as_dense(n, "Load", "p_set").sum(axis=1)
    rhs = EPSILON_LOAD * demand + EPSILON_VRES * vre_req_fix + CONTINGENCY

    n.model.add_constraints(lhs >= rhs, name="Reserve-margin")

    # Need additional constraints (reserve + dispatch <= p_nom): gen_r + gen_p <= gen_p_nom (capacity)
    to_constrain = {"Link": prod_links, "Generator": prod_gen}
    for component, producer in to_constrain.items():
        logger.info(f"adding secondary reserve constraint for {component}s")

        fix_i = producer.query("p_nom_extendable==False").index
        ext_i = producer.query("p_nom_extendable==True").index

        dispatch = n.model[f"{component}-p"].loc[:, producer.index]
        reserve = n.model[f"{component}-r"].loc[:, ext_i.union(fix_i)]

        capacity_variable = n.model[f"{component}-p_nom"].loc[ext_i]
        capacity_variable = capacity_variable.rename({f"{component}-ext": f"{component}"})
        capacity_fixed = getattr(n, component.lower() + "s").p_nom[fix_i]

        p_max_pu = get_as_dense(n, f"{component}", "p_max_pu")

        lhs = dispatch + reserve
        # MAY have to check what happens in case pmaxpu is not defined for all items
        rhs = capacity_variable * p_max_pu[ext_i] + (p_max_pu[fix_i] * capacity_fixed)
        n.model.add_constraints(
            lhs - rhs.loc[lhs.indexes] <= 0, name=f"{component}-p-reserve-upper"
        )

add_remind_paid_off_constraints(n)

Paid-off components can be placed wherever PyPSA wants but have a total limit.

Add constraints to ensure that the paid off capacity from REMIND is not exceeded across the network & that it does not exceed the technical potential.

Parameters:

Name Type Description Default
n Network

the network object to which's model the constraints are added

required
Source code in workflow/scripts/solve_network.py
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
def add_remind_paid_off_constraints(n: pypsa.Network) -> None:
    """
    Paid-off components can be placed wherever PyPSA wants but have a total limit.

    Add constraints to ensure that the paid off capacity from REMIND is not
    exceeded across the network & that it does not exceed the technical potential.

    Args:
        n (pypsa.Network): the network object to which's model the constraints are added
    """

    if not n.config["run"].get("is_remind_coupled", False):
        logger.info("Skipping paid off constraints as REMIND is not coupled")
        return

    # In coupled-mode components (Generators, Links,..) have limits p/e_nom_rcl & a tech_group
    # These columns are added by `add_existing_baseyear.add_paid_off_capacity`.
    # p/e_nom_rcl is the availale paid-off capacity per tech group and is nan for non paid-off (usual) generators.
    # rcl is a legacy name from Aodenweller
    for component in ["Generator", "Link", "Store"]:

        prefix = "e" if component == "Store" else "p"
        paid_off_col = f"{prefix}_nom_max_rcl"

        paid_off = getattr(n, component.lower() + "s").copy()
        # if there are no paid_off components
        if paid_off_col not in paid_off.columns:
            continue
        else:
            paid_off.dropna(subset=[paid_off_col], inplace=True)

        paid_off_totals = paid_off.set_index("tech_group")[paid_off_col].drop_duplicates()

        # LHS: p_nom per technology grp < totals
        groupers = [paid_off["tech_group"]]
        grouper_lhs = xr.DataArray(pd.MultiIndex.from_arrays(groupers), dims=[f"{component}-ext"])
        p_nom_groups = (
            n.model[f"{component}-{prefix}_nom"].loc[paid_off.index].groupby(grouper_lhs).sum()
        )

        # get indices to sort RHS. the grouper is multi-indexed (legacy from PyPSA-Eur)
        idx = p_nom_groups.indexes["group"]
        idx = [x[0] for x in idx]

        # Add constraint
        if not p_nom_groups.empty:
            n.model.add_constraints(
                p_nom_groups <= paid_off_totals[idx].values,
                name=f"paidoff_cap_totals_{component.lower()}",
            )

    # === ensure normal e/p_nom_max is respected for (paid_off + normal) components
    # e.g. if PV has 100MW tech potential at nodeA, paid_off+normal p_nom_opt <100MW
    for component in ["Generator", "Link", "Store"]:
        paidoff_comp = getattr(n, component.lower() + "s").copy()

        prefix = "e" if component == "Store" else "p"
        paid_off_col = f"{prefix}_nom_max_rcl"
        # if there are no paid_off components
        if paid_off_col not in paidoff_comp.columns:
            continue
        else:
            paidoff_comp.dropna(subset=[paid_off_col], inplace=True)

        # techs that only exist as paid-off don't have usual counterparts
        remind_only_techs = n.config["existing_capacities"].get("remind_only_tech_groups", [])
        paidoff_comp = paidoff_comp.query("tech_group not in @remind_only_techs")

        if paidoff_comp.empty:
            continue

        # find equivalent usual (not paid-off) components
        usual_comps_idx = paidoff_comp.index.str.replace("_paid_off", "")
        usual_comps = getattr(n, component.lower() + "s").loc[usual_comps_idx].copy()
        usual_comps = usual_comps[~usual_comps.p_nom_max.isin([np.inf, np.nan])]

        paid_off_wlimits = paidoff_comp.loc[usual_comps.index + "_paid_off"]
        to_constrain = pd.concat([usual_comps, paid_off_wlimits], axis=0)
        if to_constrain.empty:
            continue
        to_constrain.rename_axis(index=f"{component}-ext", inplace=True)
        # otherwise n.model query will fail. This is needed in case freeze_compoents was used
        # it is fine so long as p_nom is zero for the frozen components
        to_constrain = to_constrain.query("p_nom_extendable==True")
        to_constrain["grouper"] = to_constrain.index.str.replace("_paid_off", "")

        grouper = xr.DataArray(to_constrain.grouper, dims=[f"{component}-ext"])

        lhs = n.model[f"{component}-{prefix}_nom"].loc[to_constrain.index].groupby(grouper).sum()
        # RHS
        idx = lhs.indexes["grouper"]

        if not lhs.empty:
            n.model.add_constraints(
                lhs <= usual_comps.loc[idx].p_nom_max.values,
                name=f"constrain_paidoff&usual_{component}_potential",
            )

add_transmission_constraints(n)

Add constraint ensuring that transmission lines p_nom are the same for both directions, i.e. p_nom positive = p_nom negative

Parameters:

Name Type Description Default
n Network

the network object to optimize

required
Source code in workflow/scripts/solve_network.py
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
def add_transmission_constraints(n: pypsa.Network):
    """
    Add constraint ensuring that transmission lines p_nom are the same for both directions, i.e.
    p_nom positive = p_nom negative

    Args:
        n (pypsa.Network): the network object to optimize
    """

    if not n.links.p_nom_extendable.any():
        return

    positive_bool = n.links.index.str.contains("positive")
    negative_bool = n.links.index.str.contains("reversed")

    positive_ext = n.links[positive_bool].query("p_nom_extendable").index
    negative_ext = n.links[negative_bool].query("p_nom_extendable").index

    lhs = n.model["Link-p_nom"].loc[positive_ext]
    rhs = n.model["Link-p_nom"].loc[negative_ext]

    n.model.add_constraints(lhs == rhs, name="Link-transmission")

extra_functionality(n, _)

Add supplementary constraints to the network model. pypsa.linopf.network_lopf. If you want to enforce additional custom constraints, this is a good location to add them. The arguments opts and snakemake.config are expected to be attached to the network.

Parameters:

Name Type Description Default
n Network

the network object to optimize

required
Source code in workflow/scripts/solve_network.py
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
def extra_functionality(n: pypsa.Network, _) -> None:
    """
    Add supplementary constraints to the network model. ``pypsa.linopf.network_lopf``.
    If you want to enforce additional custom constraints, this is a good location to add them.
    The arguments ``opts`` and ``snakemake.config`` are expected to be attached to the network.

    Args:
        n (pypsa.Network): the network object to optimize
    """
    config = n.config
    add_battery_constraints(n)
    add_transmission_constraints(n)
    add_chp_constraints(n)
    if config["run"].get("is_remind_coupled", False):
        logger.info("Adding remind paid off constraints")
        add_remind_paid_off_constraints(n)

    reserve = config.get("operational_reserve", {})
    if reserve.get("activate", False):
        logger.info("Adding operational reserve margin constraints")
        add_operational_reserve_margin(n, config)

    logger.info("Added extra functionality to the network model")

freeze_components(n, config, exclude=['H2 turbine'])

Set p_nom_extendable=False for the components in the network. Applies to vre_techs and conventional technologies not in the exclude list.

Parameters:

Name Type Description Default
n Network

the network object

required
config dict

the configuration dictionary

required
exclude list

list of technologies to exclude from freezing. Defaults to ["OCGT"]

['H2 turbine']
Source code in workflow/scripts/solve_network.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def freeze_components(n: pypsa.Network, config: dict, exclude: list = ["H2 turbine"]):
    """Set p_nom_extendable=False for the components in the network.
    Applies to vre_techs and conventional technologies not in the exclude list.

    Args:
        n (pypsa.Network): the network object
        config (dict): the configuration dictionary
        exclude (list, optional): list of technologies to exclude from freezing.
            Defaults to ["OCGT"]
    """

    # Freeze VRE and conventional techs
    freeze = config["Techs"]["vre_techs"] + config["Techs"]["conv_techs"]
    freeze = [f for f in freeze if f not in exclude]
    if "coal boiler" in freeze:
        freeze += ["coal boiler central", "coal boiler decentral"]
    if "gas boiler" in freeze:
        freeze += ["gas boiler central", "gas boiler decentral"]

    # very ugly -> how to make more robust?
    to_fix = {
        "OCGT": "gas OCGT",
        "CCGT": "gas CCGT",
        "CCGT-CCS": "gas ccs",
        "coal power plant": "coal",
        "coal-CCS": "coal ccs",
    }
    freeze += [to_fix[k] for k in to_fix if k in freeze]

    for comp in ["generators", "links"]:
        query = "carrier in @freeze & p_nom_extendable == True"
        components = getattr(n, comp)
        # p_nom_max_rcl.isna(): exclude paid_off as needed
        if "p_nom_max_rcl" in components.columns:
            query += " & p_nom_max_rcl.isna()"
        mask = components.query(query).index
        components.loc[mask, "p_nom_extendable"] = False

prepare_network(n, solve_opts, config, plan_year, co2_pathway)

prepare the network for the solver, Args: n (pypsa.Network): the network object to optimize solve_opts (dict): solving options config (dict): the snakemake configuration dictionary plan_year (int): planning horizon year for which network is solved co2_pathway (str): the CO2 pathway name to use

Returns:

Type Description
Network

pypsa.Network: network object with additional constraints

Source code in workflow/scripts/solve_network.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def prepare_network(
    n: pypsa.Network, solve_opts: dict, config: dict, plan_year: int, co2_pathway: str
) -> pypsa.Network:
    """prepare the network for the solver,
    Args:
        n (pypsa.Network): the network object to optimize
        solve_opts (dict): solving options
        config (dict): the snakemake configuration dictionary
        plan_year (int): planning horizon year for which network is solved
        co2_pathway (str): the CO2 pathway name to use

    Returns:
        pypsa.Network: network object with additional constraints
    """

    co2_opts = ConfigManager(config).fetch_co2_restriction(co2_pathway, int(plan_year))
    add_co2_constraints_prices(n, co2_opts)

    if "clip_p_max_pu" in solve_opts:
        for df in (n.generators_t.p_max_pu, n.storage_units_t.inflow):
            df.where(df > solve_opts["clip_p_max_pu"], other=0.0, inplace=True)

    # TODO duplicated with freeze components
    if solve_opts.get("load_shedding"):
        n.add("Carrier", "Load Shedding")
        buses_i = n.buses.query("carrier == 'AC'").index
        n.add(
            "Generator",
            buses_i,
            " load",
            bus=buses_i,
            carrier="Load Shedding",
            marginal_cost=solve_opts.get("voll", 1e5),  # EUR/MWh
            # intersect between macroeconomic and surveybased
            # willingness to pay
            # http://journal.frontiersin.org/article/10.3389/fenrg.2015.00055/full
            p_nom=1e6,  # MW
        )

    if solve_opts.get("noisy_costs"):
        for t in n.iterate_components(n.one_port_components):
            # if 'capital_cost' in t.df:
            #    t.df['capital_cost'] += 1e1 + 2.*(np.random.random(len(t.df)) - 0.5)
            if "marginal_cost" in t.df:
                t.df["marginal_cost"] += 1e-2 + 2e-3 * (np.random.random(len(t.df)) - 0.5)

        for t in n.iterate_components(["Line", "Link"]):
            t.df["capital_cost"] += (1e-1 + 2e-2 * (np.random.random(len(t.df)) - 0.5)) * t.df[
                "length"
            ]

    if config["run"].get("is_remind_coupled", False) & (
        config["existing_capacities"].get("freeze_new", False)
    ):
        freeze_components(
            n,
            config,
            exclude=config["existing_capacities"].get("never_freeze", []),
        )

    if solve_opts.get("nhours"):
        nhours = solve_opts["nhours"]
        n.set_snapshots(n.snapshots[:nhours])
        n.snapshot_weightings[:] = YEAR_HRS / nhours

    if config["existing_capacities"].get("add", False):
        add_land_use_constraint(n, plan_year)

    return n

set_transmission_limit(n, kind, factor, n_years=1)

Set global transimission limit constraints - adapted from pypsa-eur

Parameters:

Name Type Description Default
n Network

the network object

required
kind str

the kind of limit to set, either 'c' for cost or 'v' for volume or l for length

required
factor float or str

the factor to apply to the base year quantity, per year

required
n_years int

the number of years to consider for the limit. Defaults to 1.

1
Source code in workflow/scripts/solve_network.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def set_transmission_limit(n: pypsa.Network, kind: str, factor: float, n_years=1):
    """
    Set global transimission limit constraints - adapted from pypsa-eur

    Args:
        n (pypsa.Network): the network object
        kind (str): the kind of limit to set, either 'c' for cost or 'v' for volume or l for length
        factor (float or str): the factor to apply to the base year quantity, per year
        n_years (int, optional): the number of years to consider for the limit. Defaults to 1.
    """
    logger.info(
        f"Adding global transmission limit for {kind} with factor {factor}/year & {n_years} years"
    )
    links_dc = n.links.query("carrier in ['AC','DC']").index
    # links_dc_rev = n.links.query("carrier in ['AC','DC'] & Link.str.contains('reverse')").index

    _lines_s_nom = (
        np.sqrt(3)
        * n.lines.type.map(n.line_types.i_nom)
        * n.lines.num_parallel
        * n.lines.bus0.map(n.buses.v_nom)
    )
    lines_s_nom = n.lines.s_nom.where(n.lines.type == "", _lines_s_nom)

    col = "capital_cost" if kind == "c" else "length"
    ref = lines_s_nom @ n.lines[col] + n.links.loc[links_dc, "p_nom"] @ n.links.loc[links_dc, col]

    if factor == "opt" or float(factor) ** n_years > 1.0:
        n.lines["s_nom_min"] = lines_s_nom
        n.lines["s_nom_extendable"] = True

        n.links.loc[links_dc, "p_nom_extendable"] = True

    elif float(factor) ** n_years == 1.0:
        # if factor is 1.0, then we do not need to extend
        n.lines["s_nom_min"] = lines_s_nom
        n.lines["s_nom_extendable"] = False
        n.links.loc[links_dc, "p_nom_extendable"] = False

        # factor = 1 + 1e-7  # to avoid numerical issues with the constraints

    elif float(factor) ** n_years < 1.0:
        n.lines["s_nom_min"] = 0
        n.links.loc[links_dc, "p_nom_min"] = 0
        # n.links.loc[links_dc_rev, "p_nom_min"] = 0

    if factor != "opt":
        con_type = "expansion_cost" if kind == "c" else "volume_expansion"
        rhs = float(factor) ** n_years * ref
        logger.info(
            f"Adding global transmission limit for {kind} to {float(factor)**n_years} current value"
        )
        n.add(
            "GlobalConstraint",
            f"l{kind}_limit",
            type=f"transmission_{con_type}_limit",
            sense="<=",
            constant=rhs,
            carrier_attribute="AC, DC",
        )

solve_network(n, config, solving, opts='', **kwargs)

perform the optimisation Args: n (pypsa.Network): the pypsa network object config (dict): the configuration dictionary solving (dict): the solving configuration dictionary opts (str): optional wildcards such as ll (not used in pypsa-china)

Returns:

Type Description
Network

pypsa.Network: the optimized network

Source code in workflow/scripts/solve_network.py
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
def solve_network(
    n: pypsa.Network, config: dict, solving: dict, opts: str = "", **kwargs
) -> pypsa.Network:
    """perform the optimisation
    Args:
        n (pypsa.Network): the pypsa network object
        config (dict): the configuration dictionary
        solving (dict): the solving configuration dictionary
        opts (str): optional wildcards such as ll (not used in pypsa-china)

    Returns:
        pypsa.Network: the optimized network
    """
    set_of_options = solving["solver"]["options"]
    solver_options = solving["solver_options"][set_of_options] if set_of_options else {}
    solver_name = solving["solver"]["name"]
    cf_solving = solving["options"]
    track_iterations = cf_solving.get("track_iterations", False)
    min_iterations = cf_solving.get("min_iterations", 4)
    max_iterations = cf_solving.get("max_iterations", 6)
    transmission_losses = cf_solving.get("transmission_losses", 0)

    # add to network for extra_functionality
    n.config = config
    n.opts = opts

    skip_iterations = cf_solving.get("skip_iterations", False)
    if not n.lines.s_nom_extendable.any():
        skip_iterations = True
        logger.info("No expandable lines found. Skipping iterative solving.")

    if skip_iterations:
        status, condition = n.optimize(
            solver_name=solver_name,
            transmission_losses=transmission_losses,
            extra_functionality=extra_functionality,
            **solver_options,
            **kwargs,
        )
    else:
        status, condition = n.optimize.optimize_transmission_expansion_iteratively(
            solver_name=solver_name,
            track_iterations=track_iterations,
            min_iterations=min_iterations,
            max_iterations=max_iterations,
            transmission_losses=transmission_losses,
            extra_functionality=extra_functionality,
            **solver_options,
            **kwargs,
        )

    if status != "ok":
        logger.warning(f"Solving status '{status}' with termination condition '{condition}'")
    if "infeasible" in condition:
        raise RuntimeError("Solving status 'infeasible'")

    return n