Skip to content

Build powerplants

Build the existing capacities for each node from GEM (global energy monitor) tracker data. This script is intended for use as part of the Snakemake workflow.

The GEM data has to be downloaded manually and placed in the source directory of the snakemake rule. download page: https://globalenergymonitor.org/projects/global-integrated-power-tracker/download-data/

Nodes can be assigned to specific GEM IDs based on their GPS location or administrative region location.

assign_node_from_gps(gem_data, nodes)

Assign plant node based on GPS coordinates of the plant. Will cause issues if the nodes tolerance is too low

Parameters:

Name Type Description Default
gem_data DataFrame

GEM data

required
nodes GeoDataFrame

node geometries (nodes as index).

required

Returns: pd.DataFrame: DataFrame with assigned nodes.

Source code in workflow/scripts/build_powerplants.py
def assign_node_from_gps(gem_data: pd.DataFrame, nodes: gpd.GeoDataFrame) -> pd.DataFrame:
    """
    Assign plant node based on GPS coordinates of the plant.
    Will cause issues if the nodes tolerance is too low

    Args:
        gem_data (pd.DataFrame): GEM data
        nodes (gpd.GeoDataFrame): node geometries (nodes as index).
    Returns:
        pd.DataFrame: DataFrame with assigned nodes."""

    gem_data["geometry"] = gem_data.apply(
        lambda row: Point(row["Longitude"], row["Latitude"]), axis=1
    )
    gem_gdf = gpd.GeoDataFrame(gem_data, geometry="geometry", crs="EPSG:4326")

    joined = nodes.reset_index(names="node").sjoin_nearest(gem_gdf, how="right")
    missing = joined[joined.node.isna()]
    if not missing.empty:
        logger.warning(
            f"Some GEM locations are not covered by the nodes at GPS: {missing['Plant name'].head()}"
        )
    return joined

clean_gem_data(gem_data, gem_cfg)

Clean the GEM data by - mapping GEM types onto pypsa types - filtering for relevant project statuses - cleaning invalid entries (e.g "not found"->nan)

Parameters:

Name Type Description Default
gem_data DataFrame

GEM dataset.

required
gem_cfg dict

Configuration dictionary, 'global_energy_monitor.yaml'

required

Returns: pd.DataFrame: Cleaned GEM data.

Source code in workflow/scripts/build_powerplants.py
def clean_gem_data(gem_data: pd.DataFrame, gem_cfg: dict) -> pd.DataFrame:
    """
    Clean the GEM data by
     - mapping GEM types onto pypsa types
     - filtering for relevant project statuses
     - cleaning invalid entries (e.g "not found"->nan)

    Args:
        gem_data (pd.DataFrame): GEM dataset.
        gem_cfg (dict): Configuration dictionary, 'global_energy_monitor.yaml'
    Returns:
        pd.DataFrame: Cleaned GEM data."""

    valid_project_states = gem_cfg["status"]
    GEM = gem_data.query("Status in @valid_project_states")
    GEM.rename(columns={"Plant _ Project name": "Plant name"}, inplace=True)
    GEM.loc[:, "Retired year"] = GEM["Retired year"].replace("not found", np.nan)
    GEM.loc[:, "Start year"] = GEM["Start year"].replace("not found", np.nan)
    GEM = GEM[gem_cfg["relevant_columns"]]

    # Remove whitespace from admin columns
    # Remove all whitespace (including tabs, newlines) from admin columns
    admin_cols = [col for col in ADM_COLS.values() if col in GEM.columns]
    GEM[admin_cols] = GEM[admin_cols].apply(lambda x: x.str.replace(r"\s+", "", regex=True))

    # split oil and gas, rename bioenergy
    gas_mask = GEM.query("Type == 'oil/gas' & Fuel.str.contains('gas', case=False, na=False)").index
    GEM.loc[gas_mask, "Type"] = "gas"
    GEM.Type = GEM.Type.str.replace("bioenergy", "biomass")

    # split CHP (potential issue: split before type split. After would be better)
    if gem_cfg["CHP"].get("split", False):
        GEM.loc[:, "CHP"] = GEM.loc[:, "CHP"].map({"yes": True}).fillna(False)
        chp_mask = GEM[GEM["CHP"] == True].index

        aliases = gem_cfg["CHP"].get("aliases", [])
        for alias in aliases:
            chp_mask = chp_mask.append(
                GEM[GEM["Plant name"].str.contains(alias, case=False, na=False)].index
            )
        chp_mask = chp_mask.unique()
        GEM.loc[chp_mask, "Type"] = "CHP " + GEM.loc[chp_mask, "Type"]

    GEM["tech"] = ""
    for tech, mapping in gem_cfg["tech_map"].items():
        if not isinstance(mapping, dict):
            raise ValueError(
                f"Mapping for {tech} is a {type(mapping)} - expected dict. Check your config."
            )

        tech_mask = GEM.query(f"Type == '{tech}'").index
        if tech_mask.empty:
            continue
        GEM.loc[tech_mask, "Type"] = GEM.loc[tech_mask, "Technology"].map(mapping)

        # apply defaults if requested
        if "default" not in mapping:
            continue
        fill_val = mapping["default"]
        if fill_val is not None:
            GEM.loc[tech_mask, "Type"] = GEM.loc[tech_mask, "Type"].fillna(value=fill_val)
        else:
            GEM.loc[tech_mask, "Type"] = GEM.loc[tech_mask, "Type"].dropna()

    return GEM.dropna(subset=["Type"])

group_by_year(df, year_bins, base_year=2020)

Group the DataFrame by year bins.

Parameters:

Name Type Description Default
df DataFrame

DataFrame with a 'Start year' column.

required
year_bins list

List of year bins to group by.

required
base_year int

cut-off for histirocal period. Default is 2020.

2020

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with a new 'grouping_year' column.

Source code in workflow/scripts/build_powerplants.py
def group_by_year(df: pd.DataFrame, year_bins: list, base_year=2020) -> pd.DataFrame:
    """
    Group the DataFrame by year bins.

    Args:
        df (pd.DataFrame): DataFrame with a 'Start year' column.
        year_bins (list): List of year bins to group by.
        base_year (int): cut-off for histirocal period. Default is 2020.

    Returns:
        pd.DataFrame: DataFrame with a new 'grouping_year' column.
    """
    min_start_year = min(year_bins) - 2.5
    base_year = 2020
    df = df[df["Start year"] > min_start_year]
    df = df[df["Retired year"].isna() | (df["Retired year"] > base_year)].reset_index(drop=True)
    df["grouping_year"] = np.take(year_bins, np.digitize(df["Start year"], year_bins, right=True))

    return df

load_gem_excel(path, sheetname='Units', country_col='Country/area', country_names=['China'])

Load a Global Energy monitor excel file as a dataframe.

Parameters:

Name Type Description Default
path PathLike

Path to the Excel file.

required
sheetname str

Name of the sheet to load. Default is "Units".

'Units'
country_col str

Column name for country names. Default is "Country/area".

'Country/area'
country_names list

List of country names to filter by. Default is ["China"].

['China']
Source code in workflow/scripts/build_powerplants.py
def load_gem_excel(
    path: os.PathLike, sheetname="Units", country_col="Country/area", country_names=["China"]
) -> pd.DataFrame:
    """
    Load a Global Energy monitor excel file as a dataframe.

    Args:
        path (os.PathLike): Path to the Excel file.
        sheetname (str): Name of the sheet to load. Default is "Units".
        country_col (str): Column name for country names. Default is "Country/area".
        country_names (list): List of country names to filter by. Default is ["China"].
    """

    df = pd.read_excel(path, sheet_name=sheetname, engine="openpyxl")
    # replace problem characters in column names
    df.columns = df.columns.str.replace("/", "_")
    country_col = country_col.replace("/", "_")

    if not country_col in df.columns:
        logger.warning(f"Column {country_col} not found in {path}. Returning unfiltered DataFrame.")
        return df

    return df.query(f"{country_col} in @country_names")