#!/usr/bin/env python
import sys
import numpy as np
import xarray as xr
from synsatipy.starter import pyrttov, __rttov_version__
import synsatipy.data_handler as data_handler
import synsatipy.output as output
[docs]
class attributes:
""" """
pass
[docs]
class synsat_attributes:
""" """
synsat = attributes()
atlas = attributes()
pass
[docs]
class SynSatBase(pyrttov.Rttov, synsat_attributes):
"""Class for calculating MSG Synsats.
The is a child class of pyrttov.Rttov.
Notes
-----
General Notes on the Workflow:
1. Options need to be provided
2. Instrument (MSG-SEVIRI) is loaded
3. Data needs to be read
4. Atlasses are loaded (depend on time coordinate in data)
5. RTTOV is called
"""
def __init__(self, *args, **kwargs):
"""
Only added optional parameters are listed.
Convertion: key should start with "synsat_{keywordname}"
"""
# first, extract all specific optional arguments
synsat_kwargs = {}
keylist = kwargs.copy().keys()
for keyname in keylist:
if "synsat_" in keyname:
synsat_kwargs[keyname] = kwargs.pop(keyname)
# write keywords to attributes
attr = self.synsat
attr.atlasses_loaded = False
attr.kwargs = synsat_kwargs
# locate itself
pyrttov_path = pyrttov.__path__[0]
rttov_install_dir = "/".join(pyrttov_path.split("/")[:-2])
attr.rttov_install_dir = rttov_install_dir
attr.rttov_version = __rttov_version__
# inheritate all important methods & attributes
super().__init__(*args, **kwargs)
# set default options
self.set_default_options(**synsat_kwargs)
# load instrument based on specified instrument
self.load_instrument(**synsat_kwargs)
return
[docs]
def set_default_options(self, **synsat_kwargs):
"""
Sets default options for the RTTOV wrapper.
Parameters
----------
**synsat_kwargs : dict
Additional keyword arguments.
Returns
-------
None
"""
self.Options.AddInterp = True
self.Options.AddSolar = True
self.Options.AddClouds = True
self.Options.VerboseWrapper = True
self.synsat.nprofiles = None
return
[docs]
def load_instrument(self, **synsat_kwargs):
"""
Loads the specified instrument configuration.
Parameters
----------
**synsat_kwargs : dict
Additional keyword arguments including synsat_instrument.
Returns
-------
None
"""
implemented_instruments = ["seviri", "abi", "fci"]
# Default to SEVIRI if not specified
instrument = synsat_kwargs.get("synsat_instrument", "seviri").lower()
if instrument == "seviri":
# Load SEVIRI configuration
self.load_msg_seviri(**synsat_kwargs)
elif instrument == "abi":
# Load GOES-ABI configuration
self.load_goes_abi(**synsat_kwargs)
elif instrument == "fci":
# Load MTG-FCI configuration
self.load_mtg_fci(**synsat_kwargs)
else:
print(
f"... [synsat] WARNING: {instrument} is not a valid instrument. "
f"Supported instruments are: {', '.join(implemented_instruments)}"
)
# Raise an error if the instrument is not supported
raise ValueError(f"Unsupported instrument: {instrument}")
return
[docs]
def load_msg_seviri(self, synsat_msg_number=3, **synsat_kwargs):
"""
Loads configuration specific for the MSG-SEVIRI instrument.
Parameters
----------
synsat_msg_number : int
MSG number. (Default value = 3)
**synsat_kwargs : dict
Additional keyword arguments.
Returns
-------
None
"""
# SEVIRI specifics
# ================
seviri_allchannel_names = [
"vis006",
"vis008",
"nir016",
"ir039",
"wv062",
"wv073",
"ir087",
"ir097",
"ir108",
"ir120",
"ir134",
"hrv",
]
seviri_var_names = [
"rho006",
"rho008",
"rho016",
"bt039",
"bt062",
"bt073",
"bt087",
"bt097",
"bt108",
"bt120",
"bt134",
"rhohrv",
]
seviri_var_units = (
3
* [
"-",
]
+ 8
* [
"K",
]
+ ["-"]
)
# MSG options
# ===========
# For SEVIRI exclude ozone and hi-res vis channels (9 and 12) in this
# example
# chan_list_seviri = (1, 2, 3, 4, 5, 6, 7, 9, 10, 11)
default_chan_list = (5, 6, 7, 9, 10, 11)
chan_list_seviri = synsat_kwargs.get("synsat_channel_list", default_chan_list)
attr = self.synsat
attr.instrument = "SEVIRI"
subsatellite_lon = synsat_kwargs.get("synsat_subsatellite_lon", 0.0)
attr.subsatellite_lon = subsatellite_lon
chan_index = np.array(chan_list_seviri) - 1
attr.channels = np.array(seviri_var_names)[chan_index]
attr.units = np.array(seviri_var_units)[chan_index]
nchan_seviri = len(chan_list_seviri)
# check if solar channel are included
attr.solar_calculations = np.any(np.array(chan_list_seviri) < 5)
# Set the options for each Rttov instance:
# - the path to the coefficient file must always be specified
# - turn RTTOV interpolation on (because input pressure levels differ from
# coefficient file levels)
# - set the verbose_wrapper flag to true so the wrapper provides more
# information
# - enable solar simulations for SEVIRI
# - enable CO2 simulations for HIRS (the CO2 profiles are ignored for
# the SEVIRI and MHS simulations)
# - enable the store_trans wrapper option for MHS to provide access to
# RTTOV transmission structure
# Add cloud opt. props file
cldaer_filename = f"{attr.rttov_install_dir}/rtcoef_rttov13/cldaer_visir/sccldcoef_msg_{synsat_msg_number}_seviri.dat"
self.FileSccld = cldaer_filename
print(f"... [synsat] set cloud / aerosol file to {cldaer_filename}")
coef_filename = f"{attr.rttov_install_dir}/rtcoef_rttov13/rttov13pred54L/rtcoef_msg_{synsat_msg_number}_seviri_o3.dat"
self.FileCoef = coef_filename
print(f"... [synsat] load coefficient file {coef_filename}")
# save vars to attributes
attr.chan_list_instrument = chan_list_seviri
attr.nchan_instrument = nchan_seviri
attr.coef_filename = coef_filename
# Load the instruments: for HIRS and MHS do not supply a channel list and
# so read all channels
try:
self.loadInst(chan_list_seviri)
except self.RttovError as e:
sys.stderr.write("Error loading instrument(s): {!s}".format(e))
sys.exit(1)
return
[docs]
def load_goes_abi(self, synsat_goes_number=16, **synsat_kwargs):
"""
Loads configuration specific for the GOES-ABI instrument.
Parameters
----------
synsat_goes_number : int
GOES satellite number. (Default value = 16)
**synsat_kwargs : dict
Additional keyword arguments.
Returns
-------
None
"""
# ABI specifics
# ================
abi_allchannel_names = [
"ch01", # 0.47 μm - Blue
"ch02", # 0.64 μm - Red
"ch03", # 0.86 μm - Veggie
"ch04", # 1.37 μm - Cirrus
"ch05", # 1.6 μm - Snow/Ice
"ch06", # 2.2 μm - Cloud Particle Size
"ch07", # 3.9 μm - Shortwave Window
"ch08", # 6.2 μm - Upper-Level Water Vapor
"ch09", # 6.9 μm - Mid-Level Water Vapor
"ch10", # 7.3 μm - Lower-Level Water Vapor
"ch11", # 8.4 μm - Cloud-Top Phase
"ch12", # 9.6 μm - Ozone
"ch13", # 10.3 μm - Clean IR Longwave Window
"ch14", # 11.2 μm - IR Longwave Window
"ch15", # 12.3 μm - Dirty Longwave Window
"ch16", # 13.3 μm - CO2 Longwave
]
abi_var_names = [
"rho047", # Reflectivity channels (1-6)
"rho064",
"rho086",
"rho137",
"rho160",
"rho220",
"bt039", # Brightness temperature channels (7-16)
"bt062",
"bt069",
"bt073",
"bt084",
"bt096",
"bt103",
"bt112",
"bt123",
"bt133",
]
abi_var_units = (
6 * ["-",] + 10 * ["K",]
)
# GOES-ABI options
# ===========
# Default to IR channels (channels 7-16)
default_chan_list = (7, 8, 9, 10, 11, 12, 13, 14, 15, 16)
chan_list_instrument = synsat_kwargs.get("synsat_channel_list", default_chan_list)
attr = self.synsat
attr.instrument = "ABI"
subsatellite_lon = synsat_kwargs.get("synsat_subsatellite_lon", -75.2)
attr.subsatellite_lon = subsatellite_lon
chan_index = np.array(chan_list_instrument) - 1
attr.channels = np.array(abi_var_names)[chan_index]
attr.units = np.array(abi_var_units)[chan_index]
nchan_instrument = len(chan_list_instrument)
# check if solar channel are included
attr.solar_calculations = np.any(np.array(chan_list_instrument) < 7)
# Add cloud opt. props file
cldaer_filename = f"{attr.rttov_install_dir}/rtcoef_rttov13/cldaer_visir/sccldcoef_goes_{synsat_goes_number}_abi.dat"
self.FileSccld = cldaer_filename
print(f"... [synsat] set cloud / aerosol file to {cldaer_filename}")
coef_filename = f"{attr.rttov_install_dir}/rtcoef_rttov13/rttov13pred54L/rtcoef_goes_{synsat_goes_number}_abi_o3.dat"
self.FileCoef = coef_filename
print(f"... [synsat] load coefficient file {coef_filename}")
# save vars to attributes
attr.chan_list_instrument = chan_list_instrument
attr.nchan_instrument = nchan_instrument
attr.coef_filename = coef_filename
# Load the instruments
try:
self.loadInst(chan_list_instrument)
except self.RttovError as e:
sys.stderr.write("Error loading instrument(s): {!s}".format(e))
sys.exit(1)
return
[docs]
def load_mtg_fci(self, synsat_mtg_number=1, **synsat_kwargs):
"""
Loads configuration specific for the MTG-FCI instrument.
Parameters
----------
synsat_mtg_number : int
MTG satellite number. (Default value = 1)
**synsat_kwargs : dict
Additional keyword arguments.
Returns
-------
None
"""
# FCI specifics
# ================
fci_allchannel_names = [
"vis04", # 0.444 μm - Blue
"vis05", # 0.510 μm - Green
"vis06", # 0.640 μm - Red
"vis08", # 0.865 μm - Vegetation Red Edge
"vis09", # 0.914 μm - Water Vapour
"nir13", # 1.375 μm - Cirrus
"nir16", # 1.610 μm - Snow/Ice/Cloud Phase
"nir22", # 2.250 μm - Aerosol/Cloud Particle Size
"ir38", # 3.80 μm - Hot objects/Fire/Night microphysics
"wv63", # 6.25 μm - Upper-Level Water Vapour
"wv73", # 7.35 μm - Lower-Level Water Vapour
"ir87", # 8.70 μm - Cloud Phase/SO2
"ir97", # 9.66 μm - Ozone
"ir105", # 10.50 μm - Clean IR Window
"ir123", # 12.30 μm - Dirty IR Window
"ir133", # 13.30 μm - CO2
]
fci_var_names = [
"rho044", # Reflectivity channels (1-8)
"rho051",
"rho064",
"rho087",
"rho091",
"rho138",
"rho161",
"rho225",
"bt038", # Brightness temperature channels (8-16)
"bt063",
"bt073",
"bt087",
"bt097",
"bt105",
"bt123",
"bt133",
]
fci_var_units = (
8 * ["-",] + 8 * ["K",]
)
# MTG-FCI options
# ===========
# Default to IR and water vapor channels (channels 9-16)
default_chan_list = (9, 10, 11, 12, 13, 14, 15, 16)
chan_list_instrument = synsat_kwargs.get("synsat_channel_list", default_chan_list)
attr = self.synsat
attr.instrument = "FCI"
subsatellite_lon = synsat_kwargs.get("synsat_subsatellite_lon", 0.0)
attr.subsatellite_lon = subsatellite_lon
chan_index = np.array(chan_list_instrument) - 1
attr.channels = np.array(fci_var_names)[chan_index]
attr.units = np.array(fci_var_units)[chan_index]
nchan_instrument = len(chan_list_instrument)
# check if solar channel are included
attr.solar_calculations = np.any(np.array(chan_list_instrument) < 9)
# Add cloud opt. props file
cldaer_filename = f"{attr.rttov_install_dir}/rtcoef_rttov13/cldaer_visir/sccldcoef_mtg_{synsat_mtg_number}_fci.dat"
self.FileSccld = cldaer_filename
print(f"... [synsat] set cloud / aerosol file to {cldaer_filename}")
coef_filename = f"{attr.rttov_install_dir}/rtcoef_rttov13/rttov13pred54L/rtcoef_mtg_{synsat_mtg_number}_fci_o3.dat"
self.FileCoef = coef_filename
print(f"... [synsat] load coefficient file {coef_filename}")
# save vars to attributes
attr.chan_list_instrument = chan_list_instrument
attr.nchan_instrument = nchan_instrument
attr.coef_filename = coef_filename
# Load the instruments
try:
self.loadInst(chan_list_instrument)
except self.RttovError as e:
sys.stderr.write("Error loading instrument(s): {!s}".format(e))
sys.exit(1)
return
[docs]
def load_atlasses(self, synsat_default_month=8, **kwargs):
"""
Load the emissivity and BRDF atlases.
Parameters
----------
synsat_default_month : int
Month to use for the emissivity and BRDF atlases.
(Default value = 8)
**kwargs : dict
Additional keyword arguments.
Returns
-------
None
"""
# ------------------------------------------------------------------------
# Load the emissivity and BRDF atlases
# ------------------------------------------------------------------------
# Load the emissivity and BRDF atlases:
# - load data for the month in the profile data
# - load the IR emissivity atlas data for multiple instruments so it can be used for SEVIRI and HIRS
# - SEVIRI is the only VIS/NIR instrument we can use the single-instrument initialisation for the BRDF atlas
attr = self.synsat
if not attr.nprofiles is None:
# WARNING: this assumes that first month is representative for all profiles
synsat_month = self.Profiles.DateTimes[0, 1]
else:
synsat_month = synsat_default_month
irAtlas = pyrttov.Atlas()
irAtlas.AtlasPath = "{}/{}".format(attr.rttov_install_dir, "emis_data")
irAtlas.loadIrEmisAtlas(
synsat_month, ang_corr=True
) # Include angular correction, but do not initialise for single-instrument
if attr.solar_calculations:
brdfAtlas = pyrttov.Atlas()
brdfAtlas.AtlasPath = "{}/{}".format(attr.rttov_install_dir, "brdf_data")
brdfAtlas.loadBrdfAtlas(
synsat_month, self
) # Supply Rttov object to enable single-instrument initialisation
brdfAtlas.IncSea = False # Do not use BRDF atlas for sea surface types
# Set up the surface emissivity/reflectance arrays and associate with the Rttov objects
if attr.rttov_version >= 13.2:
nemis_classes = 5
else:
nemis_classes = 4
surfemisrefl_seviri = np.zeros(
(nemis_classes, attr.nprofiles, attr.nchan_instrument), dtype=np.float64
)
self.SurfEmisRefl = surfemisrefl_seviri
# Surface emissivity/reflectance arrays must be initialised *before every call to RTTOV*
# Negative values will cause RTTOV to supply emissivity/BRDF values (i.e. equivalent to
# calcemis/calcrefl TRUE - see RTTOV user guide)
surfemisrefl_seviri[:, :, :] = -1.0
# Call emissivity and BRDF atlases
try:
# Do not supply a channel list for SEVIRI: this returns emissivity/BRDF values for all
# *loaded* channels which is what is required
surfemisrefl_seviri[0, :, :] = irAtlas.getEmisBrdf(self)
if attr.solar_calculations:
surfemisrefl_seviri[1, :, :] = brdfAtlas.getEmisBrdf(self)
except self.RttovError as e:
# If there was an error the emissivities/BRDFs will not have been modified so it
# is OK to continue and call RTTOV with calcemis/calcrefl set to TRUE everywhere
sys.stderr.write("Error calling atlas: {!s}".format(e))
attr.atlasses_loaded = True
attr.atlasses_initialied_for_nprofiles = attr.nprofiles
return
[docs]
def run_workflow(self, **kwargs):
"""
Run the full RTTOV workflow. This includes loading the atlasses, running RTTOV
and extracting the output.
Parameters
----------
**kwargs : dict
Additional keyword arguments
Returns
-------
None
"""
# load example data
if self.synsat.nprofiles is None:
raise Exception("... [synsat] ERROR: no data loaded")
# prepare & load atlasses
self.load_atlasses(**kwargs)
# run RTTOV
self.runDirect()
return
[docs]
class SynSat(SynSatBase):
"""
SynSat class for calculating MSG Synsats.
The is a child class of SynSatBase.
Notes
-----
This class is the main class for calculating MSG Synsats.
Parameters
----------
*args : list
List of arguments passed to the parent class.
**kwargs : dict
Additional keyword arguments passed to the parent class.
"""
def __init__(self, *args, **kwargs):
# inheritate all important methods & attributes
super().__init__(*args, **kwargs)
[docs]
def load(self, inputfile_or_data, **kwargs):
"""
Load data from file or dataset.
Parameters
----------
inputfile_or_data : str or xr.Dataset
The input file or dataset.
**kwargs : dict
Additional keyword arguments.
Returns
-------
None
"""
model = kwargs.get("model", "auto")
lon0 = self.synsat.subsatellite_lon
# use data handler to load data
sdat = data_handler.DataHandler(model=model)
# check if file or dataset is provided
if type(inputfile_or_data) == type(""):
inputfile = inputfile_or_data
print(f"... [synsat] read data from file {inputfile}")
self.synsat.input_filename = inputfile
sdat.open_data(inputfile, lon0 = lon0, **kwargs)
elif type(inputfile_or_data) == type(xr.Dataset()):
sdat.input_data = inputfile_or_data
sdat.stack_data_as_profile(**kwargs)
self.synsat.data_handler = sdat
return
[docs]
def chunked_run(self, **kwargs):
"""
Runs a small chunk of the RTTOV workflow.
Parameters
----------
**kwargs : dict
Additional keyword arguments.
Returns
-------
None
"""
# transform input data into profiles
sdat = self.synsat.data_handler
lon0 = self.synsat.subsatellite_lon
profs = sdat.data2profile(lon0 = lon0, **kwargs)
# forward profiles to RTTOV
self.Profiles = profs
self.synsat.nprofiles = profs.Nprofiles
# and run workflow
self.run_workflow()
self.synsat.chunked_result += [self.BtRefl]
[docs]
def run(self, **kwargs):
"""
Run the complete RTTOV workflow. This is a wrapper for the chunked_run method.
Parameters
----------
efficiency_factor : int, optional
Multiplicative factor for the number of profiles processed per call when
chunked processing is enabled (i.e., when 'chunked' is in kwargs).
The actual number of profiles per chunk is calculated as
efficiency_factor * NprofsPerCall. Default is 4.
**kwargs : dict
Additional keyword arguments. If 'chunked' is included in kwargs, the
workflow will process data in chunks using the efficiency_factor parameter.
Returns
-------
None
"""
# init results field
self.synsat.chunked_result = []
if "chunked" not in kwargs:
isel = {"profile": slice(0, None)}
self.chunked_run(isel=isel, **kwargs)
else:
sdat = self.synsat.data_handler
efficiency_factor = kwargs.get("efficiency_factor", 4)
ntot = sdat.total_number_of_profiles
nprof_per_call = efficiency_factor * self.Options.NprofsPerCall
if np.mod(ntot, nprof_per_call) == 0:
residual = 0
else:
residual = 1
nchunks = ntot // nprof_per_call + residual
for ichunks in range(nchunks):
prof0 = ichunks * nprof_per_call
prof1 = (ichunks + 1) * nprof_per_call
if prof1 >= ntot:
prof1 = None
isel = {"profile": slice(prof0, prof1)}
print(f"... [synsat] running {ichunks}/{nchunks} chunk with", isel)
self.chunked_run(isel=isel, **kwargs)
self.synsat.result = np.row_stack(self.synsat.chunked_result)
[docs]
def save(self, output_filename):
"""
Save the output data to a netcdf file.
Parameters
----------
output_filename : str
The output filename.
Returns
-------
None
"""
out = self.extract_output()
print(f"... [synsat] write synsat data to {output_filename}")
out.to_netcdf(output_filename)
return