diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml
index 44ab3031..ceffa091 100644
--- a/.github/workflows/ci.yaml
+++ b/.github/workflows/ci.yaml
@@ -13,10 +13,10 @@ jobs:
fail-fast: true
matrix:
os: ["windows-latest", "ubuntu-latest", "macos-latest"]
- python-version: ["3.9", "3.10", "3.11"]
+ python-version: ["3.9", "3.11", "3.12"]
experimental: [false]
include:
- - python-version: "3.11"
+ - python-version: "3.12"
os: "ubuntu-latest"
experimental: true
diff --git a/bin/get_snos.py b/bin/get_snos.py
new file mode 100644
index 00000000..f7534bcd
--- /dev/null
+++ b/bin/get_snos.py
@@ -0,0 +1,133 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2024 Pytroll
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""
+Getting SNOs for two configurable satellite platforms.
+
+SNO = Simultaneous Nadir Overpass: When two platforms sub-satellite track nadir
+views cross each other in space and time. One can set a threshold in time
+allowing the times to differ by up to a few minutes.
+
+"""
+
+from pyorbital.sno_utils import SNOfinder
+from pyorbital.logger import setup_logging_from_config
+import datetime as dt
+from datetime import timezone
+import logging
+
+
+logger = logging.getLogger('snos')
+
+# LOG = logging.getLogger('snos')
+# handler = logging.StreamHandler(sys.stderr)
+# handler.setLevel(0)
+# LOG.setLevel(0)
+# LOG.addHandler(handler)
+
+
+def get_arguments():
+ """Get the comman line arguments required to run the script."""
+ import argparse
+
+ parser = argparse.ArgumentParser(description='Calculate SNOS between two satellite platforms')
+ parser.add_argument("-s", "--start-datetime",
+ required=True,
+ dest="start_datetime",
+ type=str,
+ default=None,
+ help="The datetime string corresponding to the start time of when SNOS should be calculated")
+ parser.add_argument("-e", "--end-datetime",
+ required=True,
+ dest="end_datetime",
+ type=str,
+ default=None,
+ help="The datetime string corresponding to the end time of when SNOS should be calculated")
+ parser.add_argument("-t", "--time-window",
+ required=True,
+ dest="time_window",
+ type=int,
+ default=None,
+ help=("The time window in number of minutes - the maximum time allowed between " +
+ "the two SNO observations"))
+ parser.add_argument("--platform-name-A",
+ required=True,
+ dest="platform_name_one",
+ type=str,
+ default=None,
+ help="The name of the satellite platform (A)")
+ parser.add_argument("--platform-name-B",
+ required=True,
+ dest="platform_name_two",
+ type=str,
+ default=None,
+ help="The name of the satellite platform (B)")
+ parser.add_argument("--tolerance",
+ required=True,
+ dest="arc_len_min",
+ type=int,
+ default=None,
+ help=("The length in minutes of the delta arc used to find the point where "
+ "the two sub-satellite tracks cross. (2 is good, 5 is fast). "
+ "The lower this is the more accurate the SNO finding. The "
+ "higher this value is the less accurate but the faster the SNO finder is."))
+ parser.add_argument("-c", "--configfile",
+ required=True,
+ dest="configfile",
+ type=str,
+ default=None,
+ help="The path to the configuration file")
+ parser.add_argument("-l", "--log-config", dest="log_config",
+ type=str,
+ default=None,
+ help="Log config file to use instead of the standard logging.")
+
+ args = parser.parse_args()
+ return args
+
+
+if __name__ == "__main__":
+ """Find SNOs for the two platforms within the time period given."""
+ args = get_arguments()
+ if args.log_config:
+ setup_logging_from_config(args)
+
+ logger.info("Starting up.")
+
+ platform_id_one = args.platform_name_one
+ platform_id_two = args.platform_name_two
+
+ minutes_thr = int(args.time_window)
+ starttime = dt.datetime.strptime(args.start_datetime, "%Y%m%d%H%M")
+ starttime = starttime.replace(tzinfo=timezone.utc)
+
+ endtime = dt.datetime.strptime(args.end_datetime, "%Y%m%d%H%M")
+ endtime = endtime.replace(tzinfo=timezone.utc)
+ arclength_minutes = args.arc_len_min
+
+ sno_finder = SNOfinder(platform_id_one, platform_id_two, (starttime, endtime),
+ minutes_thr, arclength_minutes)
+ sno_finder.set_configuration(args.configfile)
+ sno_finder.initialize()
+ results = sno_finder.get_snos_within_time_window()
+
+ logger.info("Finished getting SNOs")
+ print(results)
+
+ sno_finder.dataframe2geojson(results)
+ sno_finder.write_geojson('./results.geojson')
diff --git a/continuous_integration/environment.yaml b/continuous_integration/environment.yaml
index aea32b87..c4da08ca 100644
--- a/continuous_integration/environment.yaml
+++ b/continuous_integration/environment.yaml
@@ -22,6 +22,8 @@ dependencies:
- mock
- zarr
- geoviews
+ - geopy
+ - geojson
- pytest
- pytest-cov
- fsspec
diff --git a/examples/snos.yaml_template b/examples/snos.yaml_template
new file mode 100644
index 00000000..3235aeb7
--- /dev/null
+++ b/examples/snos.yaml_template
@@ -0,0 +1,18 @@
+# The position of a Direct Readout station of interest. It is not necessary to
+# change this in order to calculate the SNOs, but if/when set the sno-script
+# will mark those crossings where the satellites being matched are within the
+# reception horizon (makred=True).
+station:
+ # Grenwhich London, approx.:
+ longitude: 0.0
+ latitude: 51.478
+ altitude: 30.0
+
+# Here a list of directories can be given, to determine where to look up the
+# TLE files covering the time period of interest.
+tle-dirs:
+ - /path/to/tle/files
+
+
+# The format of tle filenames.
+tle-file-format: 'tle-%Y%m%d.txt'
diff --git a/pyorbital/config.py b/pyorbital/config.py
new file mode 100644
index 00000000..9df6efe5
--- /dev/null
+++ b/pyorbital/config.py
@@ -0,0 +1,55 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2024 Pyorbital developers
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""Helper functions to read and extract configuration parameters."""
+
+import yaml
+from yaml import SafeLoader
+from collections.abc import Mapping
+
+
+def _recursive_dict_update(d, u):
+ """Recursive dictionary update.
+
+ Copied from:
+
+ http://stackoverflow.com/questions/3232943/update-value-of-a-nested-dictionary-of-varying-depth
+
+ """
+ for k, v in u.items():
+ if isinstance(v, Mapping):
+ r = _recursive_dict_update(d.get(k, {}), v)
+ d[k] = r
+ else:
+ d[k] = u[k]
+ return d
+
+
+def get_config(configfile):
+ """Get the configuration from file.
+
+ :configfile: The file path of the yaml configuration file.
+
+ :return: A configuration dictionary.
+
+ """
+ config = {}
+ with open(configfile, 'r') as fp_:
+ config = _recursive_dict_update(config, yaml.load(fp_, Loader=SafeLoader))
+
+ return config
diff --git a/pyorbital/logger.py b/pyorbital/logger.py
index b4506643..323c7414 100644
--- a/pyorbital/logger.py
+++ b/pyorbital/logger.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-# Copyright (c) 2023 Pyorbital developers
+# Copyright (c) 2023 - 2024 Pyorbital developers
# This program is free software: you can redistribute it and/or modify
@@ -20,6 +20,37 @@
"""Functionality to support standard logging."""
import logging
+import logging.config
+import logging.handlers
+import yaml
+
+
+LOG_FORMAT = "[%(asctime)s %(levelname)-8s] %(message)s"
+
+log_levels = {
+ 0: logging.WARN,
+ 1: logging.INFO,
+ 2: logging.DEBUG,
+}
+
+
+def setup_logging_from_config(cmd_args):
+ """Set up logging."""
+ if cmd_args.log_config is not None:
+ with open(cmd_args.log_config) as fd:
+ log_dict = yaml.safe_load(fd.read())
+ logging.config.dictConfig(log_dict)
+ return
+
+ root = logging.getLogger('')
+ root.setLevel(log_levels[cmd_args.verbosity])
+
+ fh_ = logging.StreamHandler()
+
+ formatter = logging.Formatter(LOG_FORMAT)
+ fh_.setFormatter(formatter)
+
+ root.addHandler(fh_)
def debug_on():
diff --git a/pyorbital/sno_utils.py b/pyorbital/sno_utils.py
new file mode 100644
index 00000000..55c66eed
--- /dev/null
+++ b/pyorbital/sno_utils.py
@@ -0,0 +1,476 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2024 Pytroll Community
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""Utility functions to find simultaneous nadir overpasses (SNOs)."""
+
+import json
+import datetime as dt
+from datetime import timedelta, timezone
+from geopy import distance
+# from geojson import dump
+import numpy as np
+import pandas as pd
+
+from pyresample.spherical import SCoordinate
+import pyresample as pr
+from pyorbital.config import get_config
+from pyorbital.orbital import Orbital
+from pyorbital.tlefile import SATELLITES
+from pyorbital.tle_archive import get_datetime_from_tle
+from pyorbital.tle_archive import TwoLineElementsFinder
+
+from trollsift.parser import Parser
+from pathlib import Path
+import time
+import logging
+
+
+class PlatformNotSupported(Exception):
+ """Exception error when platform is not supported."""
+
+ pass
+
+
+OSCAR_NAMES = {'npp': 'Suomi-NPP',
+ 'snpp': 'Suomi-NPP',
+ 'aqua': 'EOS-Aqua',
+ 'metopb': 'Metop-B',
+ 'metopa': 'Metop-A',
+ 'noaa19': 'NOAA-19',
+ 'noaa18': 'NOAA-18',
+ 'sentinel3a': 'Sentinel-3A',
+ 'sentinel3b': 'Sentinel-3B',
+ 'fengyun3d': 'FY-3D',
+ 'noaa15': 'NOAA-15',
+ 'noaa16': 'NOAA-16',
+ 'calipso': 'CALIPSO'
+ }
+
+
+TLE_SATNAME = {'npp': 'SUOMI NPP',
+ 'snpp': 'SUOMI NPP',
+ 'aqua': 'AQUA',
+ 'metopb': 'METOP-B',
+ 'metopa': 'METOP-A',
+ 'Metop-C': 'METOP-C',
+ 'Metop-B': 'METOP-B',
+ 'Metop-A': 'METOP-A',
+ 'noaa19': 'NOAA 19',
+ 'noaa18': 'NOAA 18',
+ 'sentinel3a': 'SENTINEL-3A',
+ 'sentinel3b': 'SENTINEL-3B',
+ 'fengyun3d': 'FENGYUN 3D',
+ 'noaa15': 'NOAA 15',
+ 'noaa16': 'NOAA 16',
+ 'NOAA-18': 'NOAA 18',
+ 'NOAA-19': 'NOAA 19'
+ }
+
+ZERO_SECONDS = timedelta(seconds=0)
+
+LOG = logging.getLogger(__name__)
+
+tic = time.time()
+
+
+class SNOfinder:
+ """Find Simultaneous Nadir Overpass (SNO) points between two satellites.
+
+ Finding or predicting SNO points between two satellites.
+ """
+
+ def __init__(self, platform_id, calipso_id, time_window, sno_min_thr, arc_len_min=2):
+ """Initialize the SNO finder class."""
+ self.platform_id = platform_id
+ self.calipso_id = calipso_id
+ self.time_start = time_window[0]
+ self.time_end = time_window[1]
+ self.arc_len_min = arc_len_min
+ self.sno_minute_threshold = sno_min_thr
+ self._tle_buffer_calipso = {}
+ self._tle_buffer_other = {}
+
+ self.tle_finder_one = None
+ self.tle_finder_other = None
+
+ self._epoch_begin = dt.datetime(1970, 1, 1).replace(tzinfo=timezone.utc)
+ self.t_diff = timedelta(days=1)
+ self._minthr_step = 20 # min less than half an orbit
+ self.timestep_double = timedelta(seconds=60 * self._minthr_step * 2.0)
+
+ self._check_times()
+ self._check_platforms()
+
+ self.tle_id_platform_one = get_satellite_catalogue_number_from_name(self.calipso_id)
+ self.tle_id_platform_other = get_satellite_catalogue_number_from_name(self.platform_id)
+
+ def _check_times(self):
+ """Check if the selected time period is supported."""
+ if self._epoch_begin > (self.time_start - self.t_diff):
+ raise IOError(f"Start time too early: {str(self.time_start)}")
+
+ def _check_platforms(self):
+ # Check if satellite is supported:
+ if OSCAR_NAMES.get(self.platform_id, self.platform_id).upper() not in SATELLITES.keys():
+ raise PlatformNotSupported(f"Platform {self.platform_id} not supported!")
+
+ def set_configuration(self, configfile):
+ """Set the basic configuration from yaml config file."""
+ conf = get_config(configfile)
+ self._conf = conf
+
+ self.station = {}
+ self.station['lon'] = conf['station']['longitude']
+ self.station['lat'] = conf['station']['latitude']
+ self.station['alt'] = conf['station']['altitude']
+
+ def initialize(self):
+ """Initialize."""
+ self.get_tle_filenames()
+ self.tle_finder_one = TwoLineElementsFinder(self.tle_id_platform_one, str(self.filename_calipso))
+ self.tle_finder_one.populate_tle_buffer()
+ self._tle_buffer_calipso = self.tle_finder_one.tle_buffer
+
+ self.tle_finder_other = TwoLineElementsFinder(self.tle_id_platform_other, str(self.filename_other))
+ self.tle_finder_other.populate_tle_buffer()
+ self._tle_buffer_other = self.tle_finder_other.tle_buffer
+
+ def get_tlefilename(self, platform_id):
+ """From a platform id try find the filename with all TLEs."""
+ tle_dirs = self._conf['tle-dirs']
+ tle_file_format = self._conf['tle-file-format']
+
+ pobj = Parser(tle_file_format)
+
+ filename = None
+ for tledir in tle_dirs:
+ filename = Path(tledir) / pobj.compose({'platform': platform_id})
+ if filename.exists():
+ break
+
+ if filename and filename.exists():
+ return filename
+ else:
+ return None
+
+ def get_tle_filenames(self):
+ """From the two platform ids get the two filenames with all the TLE data."""
+ self.filename_calipso = self.get_tlefilename(self.calipso_id)
+ self.filename_other = self.get_tlefilename(self.platform_id)
+
+ def dataframe2geojson(self, df_):
+ """Convert the resulting Pandas dataframe to a Geojson object."""
+ gjson = {"type": "FeatureCollection", "features": []}
+
+ # Go through dataframe, append entries to geojson format
+ for _, row in df_.iterrows():
+ feature = {"type": "Feature", "geometry": {"type": "Point",
+ "coordinates": [row['sno_longitude'],
+ row['sno_latitude']]},
+ "properties": {"datetime1": row['satAdatetime'].isoformat(),
+ "datetime2": row['satBdatetime'].isoformat(),
+ "tdif_fmin": row['minutes_diff'],
+ "within_area": row["within_local_reception_area"]}}
+ gjson['features'].append(feature)
+ self.geojson_results = gjson
+
+ def write_geojson(self, filename):
+ """Write the geojson results to file."""
+ import json
+ with open(filename, 'w') as fp:
+ json.dump(self.geojson_results, fp)
+
+ def get_snos_within_time_window(self):
+ """Search and retrieve the SNOs inside the time window defined."""
+ calipso_obj = self._epoch_begin
+ other_obj = self._epoch_begin
+
+ tle_calipso = None
+ tle_the_other = None
+ tobj = self.time_start
+ tobj_tmp = self.time_start
+ i = 0
+ results = []
+ while tobj < self.time_end:
+ i = i + 1
+ if i % 100 == 0:
+ print(time.time() - tic)
+
+ if not tle_calipso or abs(calipso_obj - tobj) > self.t_diff:
+ tle_calipso = self.tle_finder_one.get_best_tle_from_archive(tobj)
+ self._tle_buffer_calipso = self.tle_finder_one.tle_buffer
+
+ if tle_calipso:
+ calipso_obj = get_datetime_from_tle(tle_calipso)
+
+ if not tle_the_other or abs(other_obj - tobj) > self.t_diff:
+ tle_the_other = self.tle_finder_other.get_best_tle_from_archive(tobj)
+ self._tle_buffer_other = self.tle_finder_other.tle_buffer
+
+ if tle_the_other:
+ other_obj = get_datetime_from_tle(tle_the_other)
+
+ calipso = Orbital(self.calipso_id, line1=tle_calipso.line1, line2=tle_calipso.line2)
+ the_other = Orbital(self.platform_id, line1=tle_the_other.line1, line2=tle_the_other.line2)
+
+ retv = self.search_intersection(tobj, calipso, the_other)
+ if retv:
+ arc_calipso, arc_the_other = retv
+ sno = get_sno_point(calipso, the_other,
+ arc_calipso, arc_the_other,
+ tobj, self.sno_minute_threshold, self.station)
+
+ if sno:
+ self.write_output_on_screen(sno, arc_calipso, arc_the_other, i)
+ results.append(sno)
+
+ tobj = tobj + self.timestep_double
+ if tobj - tobj_tmp > timedelta(days=1):
+ tobj_tmp = tobj
+ LOG.debug(tobj_tmp.strftime("%Y-%m-%d"))
+
+ print(str(results[0]))
+ return pd.DataFrame(results)
+
+ def search_intersection(self, dtime, orb_obj1, orb_obj2):
+ """Get two pieces of a great arc circle and check if there is an intersection."""
+ # make sure the two sat pass the SNO in the same step. We need and overlap of at least half minthr minutes.
+ timestep_plus_30s = timedelta(seconds=60 * self._minthr_step * 1.0 + (self.sno_minute_threshold*0.5)*60 + 30)
+
+ arc_orb_obj1_vector = get_arc_vector(dtime, timestep_plus_30s, orb_obj1, self.arc_len_min)
+ arc_orb_obj2_vector = get_arc_vector(dtime, timestep_plus_30s, orb_obj2, self.arc_len_min)
+ # Approximate tracks with one arc each self.arc_len_min minutes.
+ # For each pair of arcs check if they intersect.
+ # There is atmost one intersection. Quit when we find it.
+ for arc_orb_obj1 in arc_orb_obj1_vector:
+ for arc_orb_obj2 in arc_orb_obj2_vector:
+ if arc_orb_obj1.intersects(arc_orb_obj2):
+ return arc_orb_obj1, arc_orb_obj2
+
+ return
+
+ def write_output_on_screen(self, sno, arc_calipso, arc_the_other, idx):
+ """Write output results on stdout."""
+ # For debugging:
+ create_geojson_line('./calipso_arc_%d.geojson' % idx, arc_calipso)
+
+ seconds_a = int(sno['satAdatetime'].strftime("%S")) + \
+ float(sno['satAdatetime'].strftime("%f"))/1000000.
+ seconds_b = int(sno['satBdatetime'].strftime("%S")) + \
+ float(sno['satBdatetime'].strftime("%f"))/1000000.
+
+ print(" " +
+ str(sno['satBdatetime'].strftime("%Y%m%d %H:%M")) +
+ "%5.1fs" % seconds_b + " "*5 +
+ str(sno['satAdatetime'].strftime("%Y%m%d %H:%M")) +
+ "%5.1fs" % seconds_a +
+ " "*6 + "(%7.2f, %7.2f)" % (sno['sno_latitude'], sno['sno_longitude']) +
+ " " + "%4.1f min" % (sno['minutes_diff']) + " " + str(sno['within_local_reception_area'])
+ )
+
+
+def get_satellite_catalogue_number_from_name(satname):
+ """Return the satellite catalogue number from the platform name."""
+ return SATELLITES[OSCAR_NAMES.get(satname, satname).upper()]
+
+
+def create_geojson_line(filename, arc):
+ """From a pyresample arc vector store it to a Geojson file."""
+ # geojson = {"type": "Feature"}
+ geojson = {"type": "Feature", "geometry": {"type": "LineString",
+ "coordinates": [
+ [arc.start.vertices_in_degrees[0][0],
+ arc.start.vertices_in_degrees[0][1]],
+ [arc.end.vertices_in_degrees[0][0],
+ arc.end.vertices_in_degrees[0][1]]
+ ]}}
+
+ with open(filename, 'w') as fp:
+ json.dump(geojson, fp)
+
+
+def get_arc_vector(timeobj, delta_t, sat, arc_len_min):
+ """Get the arc defining the sub-satellite track in a certin time window.
+
+ The time window is given by the start time 'timeobj' to start time
+ 'timeobj' plus time step 'delta_t'.
+ """
+ # Get positions at several points between +/- delta_t:
+ len_one_arc_s = 60 * arc_len_min # s
+ num = int(delta_t.seconds/len_one_arc_s)
+ # print(2*num)
+ pos_vec = [sat.get_lonlatalt(timeobj + ind * 1.0/num * delta_t) for ind in range(-num, num + 1, 1)]
+
+ # Calculate the Arc for each pixel. Later we sill see if arcs cross each other.
+ # We could use only start and end. But then the SNO would be approximated some times to much.
+ arcs = [pr.spherical.Arc(SCoordinate(lon=np.deg2rad(point1[0]),
+ lat=np.deg2rad(point1[1])),
+ SCoordinate(lon=np.deg2rad(point2[0]),
+ lat=np.deg2rad(point2[1])))
+ for point1, point2 in zip(pos_vec[0:-1], pos_vec[1:])]
+
+ return arcs
+
+
+def get_sno_point(calipso, the_other, arc_calipso, arc_the_other, tobj, minthr, station):
+ """Get the SNO point if there is any.
+
+ If the two sub-satellite tracks of the overpasses intersects
+ get the sub-satellite position and time where they cross,
+ and determine if the time deviation is smaller than the require threshold:
+ """
+ import math
+ intersect = arc_calipso.intersection(arc_the_other)
+ point = (math.degrees(intersect.lon),
+ math.degrees(intersect.lat))
+ # SNO around tobj check for passes between +- one hour.
+ # So that wanted pass for sure is next pass!
+ nextp = the_other.get_next_passes(tobj - timedelta(seconds=60*60), 2, # Number of hours to find overpasses
+ point[0], point[1], 0)
+
+ minthr_step = 20 # min less than half an orbit probably
+ dtime = timedelta(seconds=60 * minthr_step * 2.0)
+
+ if len(nextp) > 0:
+ riset, fallt, maxt = nextp[0]
+ else:
+ print("No next passes found for, probably a bug!")
+ tobj = tobj + dtime
+ return None
+
+ nextp = calipso.get_next_passes(tobj - timedelta(seconds=60*60),
+ 2,
+ point[0],
+ point[1],
+ 0)
+ if len(nextp) > 0:
+ riset, fallt, maxt_calipso = nextp[0]
+ else:
+ print("No next passes found for, probably a bug!")
+ tobj = tobj + dtime
+ return None
+
+ # Get observer look from Norrkoping to the satellite when it is
+ # in zenith over the SNO point:
+
+ azi, elev = the_other.get_observer_look(maxt,
+ station['lon'], station['lat'], station['alt'])
+ isNorrk = (elev > 0.0)
+
+ tdelta = (maxt_calipso - maxt)
+ tdmin = (tdelta.seconds + tdelta.days * 24*3600) / 60.
+
+ if abs(tdmin) < minthr:
+ match = {}
+ match['satAdatetime'] = maxt
+ match['satBdatetime'] = maxt_calipso
+ match['sno_longitude'] = point[0]
+ match['sno_latitude'] = point[1]
+ match['minutes_diff'] = tdmin
+ match['within_local_reception_area'] = isNorrk
+ return match
+
+
+def get_closest_sno_to_reference(all_features, rfeature, tol_seconds=ZERO_SECONDS):
+ """Check all features found and find the one matching the reference.
+
+ Returns the distance between the two SNOs in km.
+ """
+ rgeom = rfeature['geometry']
+ rlon, rlat = rgeom['coordinates']
+ rprop = rfeature['properties']
+
+ rtime1 = dt.datetime.fromisoformat(rprop['datetime1'])
+ rtime2 = dt.datetime.fromisoformat(rprop['datetime2'])
+
+ overlap_found = False
+ for tfeat in all_features['features']:
+ tgeom = tfeat['geometry']
+ tlon, tlat = tgeom['coordinates']
+ tprop = tfeat['properties']
+
+ ttime1 = dt.datetime.fromisoformat(tprop['datetime1'])
+ ttime2 = dt.datetime.fromisoformat(tprop['datetime2'])
+
+ # Check for time overlap:
+ if check_overlapping_times((rtime1, rtime2), (ttime1, ttime2), tol_seconds):
+ # print("Overlap in times")
+ if tol_seconds > ZERO_SECONDS:
+ print("Approximate overlap in times")
+ km_dist = distance.distance((tlat, tlon), (rlat, rlon)).kilometers
+ overlap_found = True
+ print(f"Distance between SNOs: {km_dist:5.1f} km")
+ break
+
+ if overlap_found:
+ return km_dist, (rtime1, rtime2), (ttime1, ttime2)
+ else:
+ return None, (rtime1, rtime2), None
+
+
+def geojson_compare_derived_snos_against_reference(this_features, ref_features):
+ """Compare the SNOs derived against a reference set.
+
+ We loop over the Geojson features of the reference dataset and see if a
+ corresponding SNO can be found among the Geojson features output of the SNO
+ finder.
+ """
+ for rfeat in ref_features['features']:
+ km_dist, twindow_ref, twindow_this = get_closest_sno_to_reference(this_features, rfeat)
+ if not km_dist:
+ rgeom = rfeat['geometry']
+ rlon, rlat = rgeom['coordinates']
+
+ print("Failed to find strict overlap in times")
+ km_dist, twindow_ref, twindow_this = get_closest_sno_to_reference(this_features, rfeat,
+ timedelta(seconds=10))
+ if not km_dist:
+ print("Failed to find overlap in times")
+
+
+def check_overlapping_times(twindow1, twindow2, tol_seconds=ZERO_SECONDS):
+ """Check if two time windows overlap.
+
+ A tolerance can be given to allow accepting time windows that are very
+ close but not actuall overlapping.
+ """
+ twindow1 = check_time_window(twindow1)
+ twindow2 = check_time_window(twindow2)
+
+ rtime1 = twindow1[0] - tol_seconds
+ rtime2 = twindow1[1] + tol_seconds
+ ttime1 = twindow2[0] - tol_seconds
+ ttime2 = twindow2[1] + tol_seconds
+
+ if (rtime1 >= ttime1 and rtime1 <= ttime2) or \
+ (rtime2 >= ttime1 and rtime2 <= ttime2):
+ return True
+ return False
+
+
+def check_time_window(twindow):
+ """Check the consistency of a time window, so that the first item is always the older."""
+ time1 = twindow[0]
+ time2 = twindow[1]
+ if time1 > time2:
+ tmp_time = time1
+ time1 = time2
+ time2 = tmp_time
+ twindow = (time1, time2)
+
+ return twindow
diff --git a/pyorbital/tests/conftest.py b/pyorbital/tests/conftest.py
new file mode 100644
index 00000000..26f11477
--- /dev/null
+++ b/pyorbital/tests/conftest.py
@@ -0,0 +1,132 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2024 Pyorbital developers
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""Fixtures and helper functions for unittests."""
+
+import pytest
+
+
+TEST_YAML_CONFIG_CONTENT = """
+station:
+ longitude: 16.15
+ latitude: 58.58
+ altitude: 0.0
+
+# Here a list of directories can be given, to determine where to look up the
+# TLE files covering the time period of interest.
+tle-dirs:
+ - /path/to/tle/files
+
+# The format of tle filenames.
+tle-file-format: 'TLE_{platform:s}.txt'
+"""
+
+
+@pytest.fixture
+def fake_yamlconfig_file(tmp_path):
+ """Write fake yaml config file."""
+ file_path = tmp_path / 'snos_mystation.yaml'
+ with open(file_path, 'w') as fpt:
+ fpt.write(TEST_YAML_CONFIG_CONTENT)
+
+ yield file_path
+
+
+TEST_TLE_FILE1_CALIPSO = """1 29108U 06016B 13365.56860824 .00000817 00000-0 19114-3 0 5182
+2 29108 98.2183 306.3131 0001262 81.5858 278.5464 14.57147103408355
+1 29108U 06016B 14001.73596105 .00000831 00000-0 19433-3 0 5198
+2 29108 98.2189 307.4643 0001241 81.3032 278.8307 14.57149241408523
+1 29108U 06016B 14002.83464415 .00000945 00000-0 21969-3 0 5203
+2 29108 98.2193 308.5492 0001225 81.5021 278.6342 14.57151825408689
+1 29108U 06016B 14004.00199336 -.00003567 00000-0 -78180-3 0 5214
+2 29108 98.2205 309.7016 0001599 74.2235 285.7542 14.57124395408850
+"""
+
+TEST_TLE_FILE1_SNPP = """1 37849U 11061A 13365.52212068 .00000110 00000-0 73158-4 0 6643
+2 37849 98.7742 301.9172 0001322 129.8543 353.2962 14.19526864112801
+1 37849U 11061A 14001.84985892 .00000068 00000-0 53438-4 0 6650
+2 37849 98.7742 303.2326 0001351 129.0203 295.4516 14.19527275112993
+1 37849U 11061A 14002.47164083 .00000106 00000-0 50370-4 0 6667
+2 37849 98.7717 303.8433 0001170 118.9961 241.1812 14.19527267113086
+1 37849U 11061A 14003.12858987 .00000112 00000-0 74213-4 0 6662
+2 37849 98.7741 304.4994 0001384 128.2427 347.2535 14.19527795113176
+1 37849U 11061A 14004.18951829 .00000140 00000-0 87286-4 0 6674
+2 37849 98.7741 305.5505 0001426 126.5558 7.5558 14.19528199113326
+"""
+
+TEST_TLE_FILE2_CALIPSO = """1 29108U 06016B 13365.56860824 .00000817 00000-0 19114-3 0 9995
+2 29108 098.2183 306.3131 0001262 081.5858 278.5464 14.57147103408355
+1 29108U 06016B 14001.73596105 .00000831 00000-0 19433-3 0 9990
+2 29108 098.2189 307.4643 0001241 081.3032 278.8307 14.57149241408523
+1 29108U 06016B 14002.83464415 .00000945 00000-0 21969-3 0 9993
+2 29108 098.2193 308.5492 0001225 081.5021 278.6342 14.57151825408689
+1 29108U 06016B 14004.00199336 -.00003567 00000-0 -78180-3 0 9993
+2 29108 098.2205 309.7016 0001599 074.2235 285.7542 14.57124395408850
+1 29108U 06016B 14004.34536874 -.00007900 00000-0 -17461-2 0 9991
+2 29108 098.2198 310.0372 0001753 016.4256 343.5560 14.57094406408902
+1 29108U 06016B 14004.75741906 -.00011581 00000-0 -25675-2 0 9996
+2 29108 098.2201 310.4470 0001148 111.5107 248.5630 14.57068459408967
+1 29108U 06016B 14005.10078897 -.00014693 00000-0 -32632-2 0 9996
+2 29108 098.2210 310.7843 0001695 087.8266 272.2525 14.57045430409010
+1 29108U 06016B 14006.26824747 -.00016004 00000-0 -35589-2 0 9999
+2 29108 098.2212 311.9389 0000985 074.8347 285.2961 14.57006491409187
+1 29108U 06016B 14006.61162114 -.00010721 00000-0 -23779-2 0 9999
+2 29108 098.2215 312.2750 0001785 054.7782 305.4877 14.57027072409230
+1 29108U 06016B 14006.95496482 -.00007161 00000-0 -15838-2 0 9995
+2 29108 098.2196 312.6152 0001053 142.1452 218.0627 14.57036401409285
+1 29108U 06016B 14007.36698875 .00001082 00000-0 25039-3 0 9990
+2 29108 098.2203 313.0220 0001243 089.7863 270.4344 14.57065316409340
+1 29108U 06016B 14008.46571865 .00001229 00000-0 28299-3 0 9995
+2 29108 098.2204 314.1066 0001330 084.9063 275.2275 14.57068253409504
+1 29108U 06016B 14009.56446154 .00001213 00000-0 27957-3 0 9992
+2 29108 098.2198 315.1909 0001273 090.7695 269.3617 14.57071150409666
+1 29108U 06016B 14010.73187492 .00001212 00000-0 27917-3 0 9995
+2 29108 098.2194 316.3431 0001281 087.5323 272.5997 14.57074056409830
+1 29108U 06016B 14011.83061533 .00001150 00000-0 26558-3 0 9995
+2 29108 098.2191 317.4273 0001267 090.5743 269.5612 14.57076467409992
+"""
+
+
+@pytest.fixture
+def fake_tle_file1_calipso(tmp_path):
+ """Write fake TLE file for Calipso."""
+ file_path = tmp_path / 'TLE_calipso.txt'
+ with open(file_path, 'w') as fpt:
+ fpt.write(TEST_TLE_FILE1_CALIPSO)
+
+ yield file_path
+
+
+@pytest.fixture
+def fake_tle_file2_calipso(tmp_path):
+ """Write fake TLE file for Calipso."""
+ file_path = tmp_path / 'TLE_calipso.txt'
+ with open(file_path, 'w') as fpt:
+ fpt.write(TEST_TLE_FILE2_CALIPSO)
+
+ yield file_path
+
+
+@pytest.fixture
+def fake_tle_file1_snpp(tmp_path):
+ """Write fake TLE file for Suomi-NPP."""
+ file_path = tmp_path / 'TLE_snpp.txt'
+ with open(file_path, 'w') as fpt:
+ fpt.write(TEST_TLE_FILE1_SNPP)
+
+ yield file_path
diff --git a/pyorbital/tests/test_helper.py b/pyorbital/tests/test_helper.py
new file mode 100644
index 00000000..7fc359a8
--- /dev/null
+++ b/pyorbital/tests/test_helper.py
@@ -0,0 +1,77 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2024 Adam.Dybbroe
+
+# Author(s):
+
+# Adam.Dybbroe
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""Helper functions for unittesting."""
+
+import pandas as pd
+import io
+from datetime import timedelta, timezone
+import datetime as dt
+# from geojson import dump
+
+
+def get_dataframe_from_ascii(sno_ascii_data):
+ """Get data as pandas dataframe from ascii test file."""
+ # Define the column names
+ column_names = [
+ 'year1', 'month1', 'day1', 'hour1', 'minute1', 'second1',
+ 'year2', 'month2', 'day2', 'hour2', 'minute2', 'second2',
+ 'latitude', 'longitude'
+ ]
+ # Define the widths of each column in the fixed-width format
+ colspecs = [
+ (0, 6), (6, 9), (9, 12), (12, 15), (15, 18), (18, 24), # First datetime components
+ (24, 33), (33, 36), (36, 39), (39, 42), (42, 45), (45, 51), # Second datetime components
+ (51, 64), (64, 73) # Latitude and longitude
+ ]
+
+ str_data = io.StringIO(sno_ascii_data)
+ # Read the data using pandas
+ df = pd.read_fwf(str_data, colspecs=colspecs, header=None, names=column_names)
+ return df
+
+
+def df2geojson(df):
+ """Convert dataframe to Geojson."""
+ # geojson skeleton
+ geojson = {"type": "FeatureCollection", "features": []}
+
+ # go through dataframe, append entries to geojson format
+ for _, row in df.iterrows():
+ dtobj1 = dt.datetime(int(row['year1']), int(row['month1']), int(row['day1']),
+ int(row['hour1']), int(row['minute1']),
+ tzinfo=timezone.utc) + timedelta(seconds=float(row['second1']))
+ dtobj2 = dt.datetime(int(row['year2']), int(row['month2']), int(row['day2']),
+ int(row['hour2']), int(row['minute2']),
+ tzinfo=timezone.utc) + timedelta(seconds=float(row['second2']))
+
+ feature = {"type": "Feature", "geometry": {"type": "Point",
+ "coordinates": [row['longitude'],
+ row['latitude']]},
+ "properties": {"datetime1": dtobj1.isoformat(),
+ "datetime2": dtobj2.isoformat()}}
+ geojson['features'].append(feature)
+
+ # with open('test1_sno.geojson', 'w') as fp:
+ # json.dump(geojson, fp)
+
+ return geojson
diff --git a/pyorbital/tests/test_snos.py b/pyorbital/tests/test_snos.py
new file mode 100644
index 00000000..91d21c37
--- /dev/null
+++ b/pyorbital/tests/test_snos.py
@@ -0,0 +1,124 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2024 Adam.Dybbroe
+
+# Author(s):
+
+# Adam.Dybbroe
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""Test the SNO finder functionality."""
+
+
+import pytest
+import numpy as np
+import datetime as dt
+from datetime import timezone
+from pyorbital.sno_utils import SNOfinder
+from pyorbital.sno_utils import check_overlapping_times
+from pyorbital.sno_utils import PlatformNotSupported
+# from pyorbital.config import get_config
+from pyorbital.tests.test_helper import get_dataframe_from_ascii
+from pyorbital.tests.test_helper import df2geojson
+
+# from pyorbital.sno_utils import geojson_compare_derived_snos_against_reference
+from pyorbital.sno_utils import get_closest_sno_to_reference
+
+# The data below are until further regarded as the "truth".
+# The SNOs here have been found by running a Fotran program provided by NOAA colleagues to SMHI around ~2008
+# Adam Dybbroe, 2024-06-14
+TEST1_SNOS_ASCII = """
+ 2014 1 3 4 41 40.9 2014 1 3 4 43 16.7 78.34 -0.56
+ 2014 1 3 5 32 15.5 2014 1 3 5 32 34.3 -78.74 169.19
+ 2014 1 3 6 22 51.0 2014 1 3 6 21 51.8 79.08 -21.21
+"""
+
+TEST2_SNOS_ASCII = """
+ 2014 1 3 4 41 40.9 2014 1 3 4 43 16.7 78.34 -0.56
+ 2014 1 3 5 32 15.5 2014 1 3 5 32 34.3 -78.74 169.19
+ 2014 1 3 6 22 51.0 2014 1 3 6 21 51.8 79.08 -21.21
+ 2014 1 5 20 58 34.8 2014 1 5 20 59 41.5 78.51 116.17
+ 2014 1 5 21 49 10.0 2014 1 5 21 48 59.9 -78.88 -74.14
+ 2014 1 5 22 39 45.8 2014 1 5 22 38 17.8 79.19 95.39
+ 2014 1 8 13 15 31.6 2014 1 8 13 16 25.4 78.55 -127.76
+ 2014 1 8 14 6 6.9 2014 1 8 14 5 43.8 -78.92 41.90
+ 2014 1 8 14 56 43.0 2014 1 8 14 55 2.2 79.23 -148.59
+ 2014 1 11 4 41 53.6 2014 1 11 4 43 42.6 -78.21 178.74
+ 2014 1 11 5 32 27.1 2014 1 11 5 32 58.4 78.64 -11.43
+ 2014 1 11 6 23 2.8 2014 1 11 6 22 17.1 -78.99 158.18
+"""
+
+
+def test_snofinder_unsupported_platform(fake_yamlconfig_file, fake_tle_file1_calipso, fake_tle_file1_snpp):
+ """Test the SNO finder when one of the platform names are not supported."""
+ platform_one = 'PARASOL'
+ platform_two = 'calipso'
+ time_window = (dt.datetime(2014, 1, 3, tzinfo=timezone.utc),
+ dt.datetime(2014, 1, 4, tzinfo=timezone.utc))
+ sno_min_thr = 2 # SNOs allowed to have 2 minute deviation between the two platforms
+
+ with pytest.raises(PlatformNotSupported) as exec_info:
+ _ = SNOfinder(platform_one, platform_two, time_window, sno_min_thr)
+
+ exception_raised = exec_info.value
+ assert str(exception_raised) == "Platform PARASOL not supported!"
+
+
+def test_get_snos_calipso_snpp(fake_yamlconfig_file, fake_tle_file1_calipso, fake_tle_file1_snpp):
+ """Test finding calipso and SNPP SNOs within time window."""
+ platform_one = 'snpp'
+ platform_two = 'calipso'
+ time_window = (dt.datetime(2014, 1, 3, tzinfo=timezone.utc),
+ dt.datetime(2014, 1, 4, tzinfo=timezone.utc))
+ sno_min_thr = 2 # SNOs allowed to have 2 minute deviation between the two platforms
+ mysnofinder = SNOfinder(platform_one, platform_two, time_window, sno_min_thr)
+ mysnofinder.set_configuration(fake_yamlconfig_file)
+ mysnofinder._conf['tle-dirs'].insert(0, str(fake_tle_file1_snpp.parent))
+ mysnofinder._conf['tle-dirs'].insert(0, str(fake_tle_file1_calipso.parent))
+ mysnofinder.initialize()
+
+ results = mysnofinder.get_snos_within_time_window()
+
+ np.testing.assert_array_equal(results['within_local_reception_area'].values,
+ np.array([True, False, True]))
+
+ mysnofinder.dataframe2geojson(results)
+
+ test_ref = get_dataframe_from_ascii(TEST1_SNOS_ASCII)
+ ref_features = df2geojson(test_ref)
+
+ for idx, expected in enumerate([6.5, 12, 18]):
+ eval_res = get_closest_sno_to_reference(mysnofinder.geojson_results, ref_features['features'][idx])
+ assert eval_res[0] < expected
+ assert check_overlapping_times(eval_res[1], eval_res[2])
+
+
+# Times: 2014-01-03 06:22:53.516417 2014-01-03 06:21:54.322415
+# ('2014-01-03 06:22:51', '2014-01-03 06:21:51.800000')
+TWIN1 = (dt.datetime(2014, 1, 3, 6, 22, 51).replace(tzinfo=timezone.utc),
+ dt.datetime(2014, 1, 3, 6, 21, 51, 800000).replace(tzinfo=timezone.utc))
+TWIN2 = (dt.datetime(2014, 1, 3, 6, 22, 53, 516417).replace(tzinfo=timezone.utc),
+ dt.datetime(2014, 1, 3, 6, 21, 51, 800000).replace(tzinfo=timezone.utc))
+
+
+@pytest.mark.parametrize("twin1, twin2, expected",
+ [(TWIN1, TWIN2, True)
+ ]
+ )
+def test_check_overlapping_times(twin1, twin2, expected):
+ """Test the check to see if two time windows overlap."""
+ result = check_overlapping_times(twin1, twin2)
+ assert result
diff --git a/pyorbital/tests/test_tle_archive.py b/pyorbital/tests/test_tle_archive.py
new file mode 100644
index 00000000..67ffc190
--- /dev/null
+++ b/pyorbital/tests/test_tle_archive.py
@@ -0,0 +1,104 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2024 Pyorbital developers
+
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""Testing the TLE archive functions."""
+
+
+import numpy as np
+import datetime as dt
+from datetime import timezone
+import pyorbital
+from pyorbital.tle_archive import TwoLineElementsFinder
+
+
+def test_populate_tle_buffer(fake_tle_file1_calipso):
+ """Test populate the TLE buffer."""
+ tle_filename = str(fake_tle_file1_calipso)
+ tleid_calipso = '29108'
+ tle_finder = TwoLineElementsFinder(tleid_calipso, tle_filename)
+ tle_finder.populate_tle_buffer()
+ tlebuff = tle_finder.tle_buffer
+
+ with open(tle_filename, 'r') as fpt:
+ tlelines = fpt.readlines()
+
+ expected_dtimes = [dt.datetime(2013, 12, 31, 13, 38, 47, 751936, tzinfo=timezone.utc),
+ dt.datetime(2014, 1, 1, 17, 39, 47, 34720, tzinfo=timezone.utc),
+ dt.datetime(2014, 1, 2, 20, 1, 53, 254560, tzinfo=timezone.utc),
+ dt.datetime(2014, 1, 4, 0, 2, 52, 226304, tzinfo=timezone.utc)]
+ for idx, key in enumerate(tlebuff.keys()):
+ assert key == expected_dtimes[idx]
+
+ for idx, key in enumerate(tlebuff.keys()):
+ tleobj = tlebuff[key]
+ assert isinstance(tleobj, pyorbital.tlefile.Tle)
+ assert tlelines[idx*2].strip() == tleobj.line1
+ assert tlelines[idx*2+1].strip() == tleobj.line2
+
+
+def test_get_best_tle_from_archive(fake_tle_file1_calipso):
+ """Test getting all the TLEs from file with many TLEs."""
+ tle_filename = str(fake_tle_file1_calipso)
+ tleid_calipso = '29108'
+ dtobj = dt.datetime(2014, 1, 2, tzinfo=timezone.utc)
+
+ tle_finder = TwoLineElementsFinder(tleid_calipso, tle_filename)
+ tleobj = tle_finder.get_best_tle_from_archive(dtobj)
+
+ ts = (tleobj.epoch - np.datetime64('1970-01-01T00:00:00Z')) / np.timedelta64(1, 's')
+ dtime_valid = dt.datetime.fromtimestamp(ts, tz=timezone.utc)
+ expected = dt.datetime(2014, 1, 1, 17, 39, 47, 34720, tzinfo=timezone.utc)
+
+ assert abs(expected - dtime_valid).total_seconds() < 0.001
+
+ dtobj = dt.datetime(2014, 1, 1, 12, tzinfo=timezone.utc)
+ tle_finder = TwoLineElementsFinder(tleid_calipso, tle_filename)
+ tleobj = tle_finder.get_best_tle_from_archive(dtobj)
+
+ ts = (tleobj.epoch - np.datetime64('1970-01-01T00:00:00Z')) / np.timedelta64(1, 's')
+ dtime_valid = dt.datetime.fromtimestamp(ts, tz=timezone.utc)
+ expected = dt.datetime(2013, 12, 31, 13, 38, 47, 751936, tzinfo=timezone.utc)
+
+ assert abs(expected - dtime_valid).total_seconds() < 0.001
+
+ dtobj = dt.datetime(2014, 1, 1, 16, tzinfo=timezone.utc)
+ tle_finder = TwoLineElementsFinder(tleid_calipso, tle_filename)
+ tleobj = tle_finder.get_best_tle_from_archive(dtobj)
+
+ ts = (tleobj.epoch - np.datetime64('1970-01-01T00:00:00Z')) / np.timedelta64(1, 's')
+ dtime_valid = dt.datetime.fromtimestamp(ts, tz=timezone.utc)
+ expected = dt.datetime(2014, 1, 1, 17, 39, 47, 34720, tzinfo=timezone.utc)
+
+ assert abs(expected - dtime_valid).total_seconds() < 0.001
+
+
+# def test_get_best_tle_from_archive(fake_tle_file2_calipso):
+# """Test getting all the TLEs from file with many TLEs."""
+# tle_filename = str(fake_tle_file2_calipso)
+# tleid_calipso = '29108'
+# dtobj = dt.datetime(2014, 1, 6, 12, tzinfo=timezone.utc)
+
+# tle_finder = TwoLineElementsFinder(tleid_calipso, tle_filename)
+# tleobj = tle_finder.get_best_tle_from_archive(dtobj)
+
+# ts = (tleobj.epoch - np.datetime64('1970-01-01T00:00:00Z')) / np.timedelta64(1, 's')
+# dtime_valid = dt.datetime.fromtimestamp(ts, tz=timezone.utc)
+# expected = dt.datetime(2014, 1, 1, 17, 39, 47, 34720, tzinfo=timezone.utc)
+
+# assert abs(expected - dtime_valid).total_seconds() < 0.001
diff --git a/pyorbital/tle_archive.py b/pyorbital/tle_archive.py
new file mode 100644
index 00000000..d22f2f1b
--- /dev/null
+++ b/pyorbital/tle_archive.py
@@ -0,0 +1,92 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# Copyright (c) 2024 Pyorbital developers
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""Functions to support handling many archived TLEs."""
+
+
+import numpy as np
+import datetime as dt
+from datetime import timezone
+from pyorbital.tlefile import Tle
+
+max_tle_days_diff = 3
+
+
+class TwoLineElementsFinder:
+ """Finding the most adequate TLEs for a given platform and time."""
+
+ def __init__(self, platform_catalogue_id, tle_filename, tle_buffer=None):
+ """Initialize the class."""
+ self.tle_id = platform_catalogue_id
+ self.filename = tle_filename
+ self._tles_as_list = self._read_tles_from_file()
+ if tle_buffer:
+ self.tle_buffer = tle_buffer
+ else:
+ self.tle_buffer = {}
+
+ def _read_tles_from_file(self):
+ """Read the TLEs from file."""
+ with open(self.filename, 'r') as fh_:
+ tle_data_as_list = fh_.readlines()
+ return tle_data_as_list
+
+ def populate_tle_buffer(self):
+ """Populate the TLE buffer."""
+ for ind in range(0, len(self._tles_as_list), 2):
+ if self.tle_id in self._tles_as_list[ind]:
+ tle = Tle(self.tle_id, line1=self._tles_as_list[ind], line2=self._tles_as_list[ind+1])
+ ts = (tle.epoch - np.datetime64('1970-01-01T00:00:00Z')) / np.timedelta64(1, 's')
+ tobj = dt.datetime.fromtimestamp(ts, tz=timezone.utc)
+ self.tle_buffer[tobj] = tle
+
+ def get_best_tle_from_archive(self, time_requested):
+ """From the archive get the Two-Line elements that best fit for the requested time.
+
+ The TLE buffer tle_buffer is being updated.
+ """
+ # Read tle data if not already in buffer
+ if len(self.tle_buffer) == 0:
+ self.populate_tle_buffer()
+
+ for tobj in self.tle_buffer:
+ if tobj > time_requested:
+ deltat = tobj - time_requested
+ else:
+ deltat = time_requested - tobj
+ if np.abs((deltat).days) < 1:
+ return self.tle_buffer[tobj]
+
+ for delta_days in range(1, max_tle_days_diff + 1, 1):
+ for tobj in self.tle_buffer:
+ if tobj > time_requested:
+ deltat = tobj - time_requested
+ else:
+ deltat = time_requested - tobj
+ if np.abs((deltat).days) <= delta_days:
+ print("Did not find TLE for {:s}, Using TLE from {:s}".format(tobj.strftime("%Y%m%d"),
+ time_requested.strftime("%Y%m%d")))
+ return self.tle_buffer[tobj]
+ print("Did not find TLE for {:s} +/- 3 days")
+
+
+def get_datetime_from_tle(tle_obj):
+ """Get the datetime from a TLE object."""
+ ts = (tle_obj.epoch - np.datetime64('1970-01-01T00:00:00Z')) / np.timedelta64(1, 's')
+ # return dt.datetime.utcfromtimestamp(ts)
+ return dt.datetime.fromtimestamp(ts, tz=timezone.utc)