Skip to content

Etl

ETL TOOL BOX

  • Abstracted transformations (Transformation, register_etl)
  • ETL registry (list of named conversions)
  • pre-defined conversions (convert_loads, technoeconomic_data)

Transformation dataclass

Data class representing the YAML config for the ETL target

Source code in src/rpycpl/etl.py
37
38
39
40
41
42
43
44
45
46
47
@dataclass
class Transformation:
    """Data class representing the YAML config for the ETL target"""

    name: str
    method: Optional[str] = None
    frames: Dict[str, Any] = field(default_factory=dict)
    params: Dict[str, Any] = field(default_factory=dict)
    filters: Dict[str, Any] = field(default_factory=dict)
    kwargs: Dict[str, Any] = field(default_factory=dict)
    dependencies: Dict[str, Any] = field(default_factory=dict)

build_tech_groups(frames, map_param='investment')

Wrapper for the utils.build_tech_map function

Source code in src/rpycpl/etl.py
50
51
52
53
@register_etl("build_tech_map")
def build_tech_groups(frames, map_param="investment") -> pd.DataFrame:
    """Wrapper for the utils.build_tech_map function"""
    return build_tech_map(frames["tech_mapping"], map_param)

convert_loads(loads, region=None)

conversion for loads

Parameters:

Name Type Description Default
loads dict

dictionary of dataframes with loads

required
region (str, Optional)

region to filter the data by

None

Returns: pd.DataFrame: converted loads (year: load type, value in Mwh)

Source code in src/rpycpl/etl.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@register_etl("convert_load")
def convert_loads(loads: dict[str, pd.DataFrame], region: str = None) -> pd.DataFrame:
    """conversion for loads

    Args:
        loads (dict): dictionary of dataframes with loads
        region (str, Optional): region to filter the data by
    Returns:
        pd.DataFrame: converted loads (year: load type, value in Mwh)
    """
    TWYR2MWH = 365 * 24 * 1e6
    outp = pd.DataFrame()
    for k, df in loads.items():
        df["load"] = k.split("_")[0]
        if ("region" in df.columns) & (region is not None):
            df = df.query("region == @region").drop(columns=["region"])
        df.value *= TWYR2MWH
        outp = pd.concat([outp, df], axis=0)
    return outp.set_index("year")

convert_remind_capacities(frames, cutoff=0, region=None)

conversion for capacities

Parameters:

Name Type Description Default
frames dict

dictionary of dataframes with capacities

required
region (str, Optional)

region to filter the data by

None
cutoff (int, Optional)

min capacity in MW

0

Returns: pd.DataFrame: converted capacities (year: load type, value in Mwh)

Source code in src/rpycpl/etl.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
@register_etl("convert_capacities")
def convert_remind_capacities(
    frames: dict[str, pd.DataFrame], cutoff=0, region: str = None
) -> pd.DataFrame:
    """conversion for capacities

    Args:
        frames (dict): dictionary of dataframes with capacities
        region (str, Optional): region to filter the data by
        cutoff (int, Optional): min capacity in MW
    Returns:
        pd.DataFrame: converted capacities (year: load type, value in Mwh)
    """
    TW2MW = 1e6
    caps = frames["capacities"]
    caps.loc[:, "value"] *= TW2MW

    if ("region" in caps.columns) & (region is not None):
        caps = caps.query("region == @region").drop(columns=["region"])

    too_small = caps.query("value < @cutoff").index
    caps.loc[too_small, "value"] = 0

    if "tech_groups" in frames:
        tech_map = frames["tech_groups"]
        caps.loc[:, "tech_group"] = caps.technology.map(tech_map.group.to_dict())

    return caps.rename(columns={"value": "capacity"}).set_index("year")

harmonize_capacities(pypsa_capacities, remind_capacities)

Harmonize the REMIND and PyPSA capacities - scale down the pypsa capacities to not exceed the remind capacities - where REMIND exceeds the pypsa capacities, calculate a paid-off capacity which will be added to the pypsa model as zero-capex techs. The model can allocate it where it sees fit but the total is constrained

Parameters:

Name Type Description Default
pypsa_capacities dict[str, DataFrame]

Dictionary with the pypsa capacities {year: powerplantmatching_capacities}.

required
remind_capacities DataFrame

DataFrame with the remind capacities for all years

required

Returns: dict[str, pd.DataFrame]: Dictionary with the harmonized capacities {year: harmonized_capacities}.

Source code in src/rpycpl/etl.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
@register_etl("harmonize_capacities")
def harmonize_capacities(
    pypsa_capacities: dict[str, pd.DataFrame], remind_capacities: pd.DataFrame
) -> dict[str, pd.DataFrame]:
    """Harmonize the REMIND and PyPSA capacities
        - scale down the pypsa capacities to not exceed the remind capacities
        - where REMIND exceeds the pypsa capacities, calculate a paid-off capacity
          which will be added to the pypsa model as zero-capex techs. The model
           can allocate it where it sees fit but the total is constrained

    Args:
        pypsa_capacities (dict[str, pd.DataFrame]): Dictionary with the pypsa capacities
            {year: powerplantmatching_capacities}.
        remind_capacities (pd.DataFrame): DataFrame with the remind capacities for all years
    Returns:
        dict[str, pd.DataFrame]: Dictionary with the harmonized capacities
            {year: harmonized_capacities}.
    """

    harmonized = {}
    for year, pypsa_caps in pypsa_capacities.items():
        logger.debug(f"Harmonizing capacities for year {year}")
        yr = int(year)
        scaled_down_caps = scale_down_capacities(pypsa_caps, remind_capacities.query("year == @yr"))
        harmonized[year] = scaled_down_caps

    return harmonized

paidoff_capacities(remind_capacities, harmonized_pypsa_caps)

Wrapper for the capacities_etl.calc_paid_off_capacity function.

Calculate the additional paid-off capacity available to PyPSA from REMIND investment decisions. The paid-off capacity is the difference between the REMIND capacities and the harmonized PyPSA capacities. The paid-off capacity is available to PyPSA as a zero-capex tech.

Parameters:

Name Type Description Default
remind_capacities DataFrame

DataFrame with REMIND capacities in MW.

required
harmonized_pypsa_caps dict[str, DataFrame]

Dictionary with harmonized PyPSA capacities by year (capped to REMIND cap)

required

Returns: pd.DataFrame: DataFrame with the available paid-off capacity by tech group.

Source code in src/rpycpl/etl.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
@register_etl("calc_paid_off_capacity")
def paidoff_capacities(
    remind_capacities: pd.DataFrame, harmonized_pypsa_caps: dict[str, pd.DataFrame]
) -> pd.DataFrame:
    """Wrapper for the capacities_etl.calc_paid_off_capacity function.

    Calculate the additional paid-off capacity available to PyPSA from REMIND investment decisions.
       The paid-off capacity is the difference between the REMIND capacities and the harmonized
       PyPSA capacities. The paid-off capacity is available to PyPSA as a zero-capex tech.

    Args:
        remind_capacities (pd.DataFrame): DataFrame with REMIND capacities in MW.
        harmonized_pypsa_caps (dict[str, pd.DataFrame]): Dictionary with harmonized
            PyPSA capacities by year (capped to REMIND cap)
    Returns:
        pd.DataFrame: DataFrame with the available paid-off capacity by tech group.
    """

    return calc_paidoff_capacity(remind_capacities, harmonized_pypsa_caps)

register_etl(name)

decorator factory to register ETL functions

Source code in src/rpycpl/etl.py
26
27
28
29
30
31
32
33
def register_etl(name):
    """decorator factory to register ETL functions"""

    def decorator(func):
        ETL_REGISTRY[name] = func
        return func

    return decorator

technoeconomic_data(frames, mappings, pypsa_costs, currency_conversion, years=None)

Mapping adapted from Johannes Hemp, based on csv mapping table

Parameters:

Name Type Description Default
frames Dict[str, DataFrame]

dictionary of remind frames

required
mappings DataFrame

the mapping dataframe

required
pypsa_costs DataFrame

pypsa costs dataframe

required
currency_conversion float

conversion factor for the currency

required
years Optional[list]

years to consider, if None REMIND capex years is used

None

Returns: pd.DataFrame: dataframe with the mapped techno-economic data

Source code in src/rpycpl/etl.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
@register_etl("technoeconomic_data")
def technoeconomic_data(
    frames: Dict[str, pd.DataFrame],
    mappings: pd.DataFrame,
    pypsa_costs: pd.DataFrame,
    currency_conversion: 1,
    years: Optional[list] = None,
) -> pd.DataFrame:
    """Mapping adapted from Johannes Hemp, based on csv mapping table

    Args:
        frames (Dict[str, pd.DataFrame]): dictionary of remind frames
        mappings (pd.DataFrame): the mapping dataframe
        pypsa_costs (pd.DataFrame): pypsa costs dataframe
        currency_conversion (float): conversion factor for the currency
        years (Optional[list]): years to consider, if None REMIND capex years is used
    Returns:
        pd.DataFrame: dataframe with the mapped techno-economic data
    """

    # explode multiple references into rows
    mappings.loc[:, "reference"] = mappings["reference"].apply(to_list)

    # check the data & mappings
    validate_mappings(mappings)

    if years is None:
        years = frames["capex"].year.unique()

    weight_frames = [frames[k].assign(weight_type=k) for k in frames if k.startswith("weights")]
    weights = pd.concat(
        [df.rename(columns={"carrier": "technology", "value": "weight"}) for df in weight_frames]
    )

    costs_remind = make_pypsa_like_costs(frames)
    costs_remind = costs_remind.merge(weights, on=["technology", "year"], how="left")

    validate_remind_data(costs_remind, mappings)

    mappings.loc[:, "reference"] = mappings["reference"].apply(to_list)

    # apply the mappings to pypsa tech
    mapped_costs = map_to_pypsa_tech(
        remind_costs_formatted=costs_remind,
        pypsa_costs=pypsa_costs,
        mappings=mappings,
        weights=weights,
        years=years,
        currency_conversion=currency_conversion,
    )
    mapped_costs["value"].fillna(0, inplace=True)
    mapped_costs.fillna(" ", inplace=True)

    return mapped_costs