diff --git a/doc/source/conf.py b/doc/source/conf.py
index ab8067a..f3bba78 100644
--- a/doc/source/conf.py
+++ b/doc/source/conf.py
@@ -12,16 +12,17 @@
# serve to show the default.
"""Configurations for sphinx based documentation."""
-import sys
+import datetime as dt
import os
+import sys
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
-sys.path.insert(0, os.path.abspath('../../'))
-sys.path.insert(0, os.path.abspath('../../pyorbital'))
-from pyorbital import __version__ # noqa
+sys.path.insert(0, os.path.abspath("../../"))
+sys.path.insert(0, os.path.abspath("../../pyorbital"))
+from pyorbital.version import __version__ # noqa
# -- General configuration -----------------------------------------------------
@@ -30,30 +31,32 @@
# Add any Sphinx extension module names here, as strings. They can be extensions
# coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
-extensions = ['sphinx.ext.autodoc', 'sphinx.ext.doctest', 'sphinx.ext.coverage', 'sphinx.ext.napoleon']
+extensions = ["sphinx.ext.autodoc", "sphinx.ext.doctest", "sphinx.ext.coverage", "sphinx.ext.napoleon"]
# Add any paths that contain templates here, relative to this directory.
-templates_path = ['.templates']
+templates_path = [".templates"]
# The suffix of source filenames.
-source_suffix = '.rst'
+source_suffix = ".rst"
# The encoding of source files.
# #source_encoding = 'utf-8-sig'
# The master toctree document.
-master_doc = 'index'
+master_doc = "index"
# General information about the project.
-project = u'pyorbital'
-copyright = u'2012-2023, The Pytroll crew'
+project = u"pyorbital"
+copyright = u"2012, 2024-{}, The PyTroll Team".format(dt.datetime.utcnow().strftime("%Y")) # noqa: A001
+
+
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
-version = __version__.split('+')[0]
+version = __version__.split("+")[0]
# The full version, including alpha/beta/rc tags.
release = __version__
@@ -69,7 +72,7 @@
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
-exclude_patterns = []
+# exclude_patterns = []
# The reST default role (used for this markup: `text`) to use for all documents.
# #default_role = None
@@ -86,7 +89,7 @@
# show_authors = False
# The name of the Pygments (syntax highlighting) style to use.
-pygments_style = 'sphinx'
+pygments_style = "sphinx"
# A list of ignored prefixes for module index sorting.
# #modindex_common_prefix = []
@@ -180,7 +183,7 @@
# #html_file_suffix = None
# Output file base name for HTML help builder.
-htmlhelp_basename = 'pyorbitaldoc'
+htmlhelp_basename = "pyorbitaldoc"
# -- Options for LaTeX output --------------------------------------------------
@@ -194,8 +197,8 @@
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title, author, documentclass [howto/manual]).
latex_documents = [
- ('index', 'pyorbital.tex', u'pyorbital Documentation',
- u'The Pytroll crew', 'manual'),
+ ("index", "pyorbital.tex", u"pyorbital Documentation",
+ u"The Pytroll crew", "manual"),
]
# The name of an image file (relative to this directory) to place at the top of
@@ -227,6 +230,6 @@
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
- ('index', 'pyorbital', u'pyorbital Documentation',
- [u'The Pytroll crew'], 1)
+ ("index", "pyorbital", u"pyorbital Documentation",
+ [u"The Pytroll crew"], 1)
]
diff --git a/pyorbital/astronomy.py b/pyorbital/astronomy.py
index 881e97d..2793553 100644
--- a/pyorbital/astronomy.py
+++ b/pyorbital/astronomy.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
-# Copyright (c) 2011, 2013
+# Copyright (c) 2011, 2013, 2024
#
# Author(s):
#
@@ -36,7 +36,6 @@
produce scalar outputs.
"""
-import datetime
import numpy as np
@@ -48,23 +47,20 @@
def jdays2000(utc_time):
- """Get the days since year 2000.
- """
- return _days(dt2np(utc_time) - np.datetime64('2000-01-01T12:00'))
+ """Get the days since year 2000."""
+ return _days(dt2np(utc_time) - np.datetime64("2000-01-01T12:00"))
def jdays(utc_time):
- """Get the julian day of *utc_time*.
- """
+ """Get the julian day of *utc_time*."""
return jdays2000(utc_time) + 2451545.0
def _days(dt):
- """Get the days (floating point) from *d_t*.
- """
+ """Get the days (floating point) from *d_t*."""
if hasattr(dt, "shape"):
dt = np.asanyarray(dt, dtype=np.timedelta64)
- return dt / np.timedelta64(1, 'D')
+ return dt / np.timedelta64(1, "D")
def gmst(utc_time):
@@ -81,14 +77,16 @@ def gmst(utc_time):
def _lmst(utc_time, longitude):
"""Local mean sidereal time, computed from *utc_time* and *longitude*.
- In radians.
+
+ utc_time: The UTC time as a datetime.datetime object.
+ Logitude: The longitude in radians.
+ Returns: local mean sideral time in radians.
"""
return gmst(utc_time) + longitude
def sun_ecliptic_longitude(utc_time):
- """Ecliptic longitude of the sun at *utc_time*.
- """
+ """Ecliptic longitude of the sun at *utc_time*."""
jdate = jdays2000(utc_time) / 36525.0
# mean anomaly, rad
m_a = np.deg2rad(357.52910 +
@@ -105,8 +103,7 @@ def sun_ecliptic_longitude(utc_time):
def sun_ra_dec(utc_time):
- """Right ascension and declination of the sun at *utc_time*.
- """
+ """Right ascension and declination of the sun at *utc_time*."""
jdate = jdays2000(utc_time) / 36525.0
eps = np.deg2rad(23.0 + 26.0 / 60.0 + 21.448 / 3600.0 -
(46.8150 * jdate + 0.00059 * jdate * jdate -
@@ -124,9 +121,12 @@ def sun_ra_dec(utc_time):
def _local_hour_angle(utc_time, longitude, right_ascension):
- """Hour angle at *utc_time* for the given *longitude* and
- *right_ascension*
- longitude in radians
+ """Derive the hour angle at *utc_time* for the given *longitude* and *right_ascension*.
+
+ utc_time: datetime.datetime instance of the UTC time
+ longitude: Longitude in radians.
+ right_ascension: The right ascension in radians.
+ Returns: Hour angle in radians.
"""
return _lmst(utc_time, longitude) - right_ascension
@@ -152,9 +152,12 @@ def get_alt_az(utc_time, lon, lat):
def cos_zen(utc_time, lon, lat):
- """Cosine of the sun-zenith angle for *lon*, *lat* at *utc_time*.
+ """Derive the cosine of the sun-zenith angle for *lon*, *lat* at *utc_time*.
+
utc_time: datetime.datetime instance of the UTC time
- lon and lat in degrees.
+ lon: Longitude in degrees
+ lat: Latitude in degrees.
+ Returns: Cosine of the sun zenith angle.
"""
lon = np.deg2rad(lon)
lat = np.deg2rad(lat)
@@ -169,8 +172,9 @@ def cos_zen(utc_time, lon, lat):
def sun_zenith_angle(utc_time, lon, lat):
"""Sun-zenith angle for *lon*, *lat* at *utc_time*.
+
lon,lat in degrees.
- The angle returned is given in degrees
+ The sun zenith angle returned is in degrees.
"""
sza = np.rad2deg(np.arccos(cos_zen(utc_time, lon, lat)))
if not isinstance(lon, float):
@@ -179,8 +183,7 @@ def sun_zenith_angle(utc_time, lon, lat):
def sun_earth_distance_correction(utc_time):
- """Calculate the sun earth distance correction, relative to 1 AU.
- """
+ """Calculate the sun earth distance correction, relative to 1 AU."""
# Computation according to
# https://web.archive.org/web/20150117190838/http://curious.astro.cornell.edu/question.php?number=582
# with
@@ -208,7 +211,6 @@ def observer_position(utc_time, lon, lat, alt):
http://celestrak.com/columns/v02n03/
"""
-
lon = np.deg2rad(lon)
lat = np.deg2rad(lat)
diff --git a/pyorbital/geoloc.py b/pyorbital/geoloc.py
index 00f10a5..60a324c 100644
--- a/pyorbital/geoloc.py
+++ b/pyorbital/geoloc.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-# Copyright (c) 2011 - 2019 Pytroll Community
+# Copyright (c) 2011 - 2019, 2024 Pytroll Community
# Author(s):
@@ -21,8 +21,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-"""Module to compute geolocalization of a satellite scene.
-"""
+"""Module to compute geolocalization of a satellite scene."""
# TODO:
# - Attitude correction
@@ -31,13 +30,13 @@
# - test !!!
from __future__ import print_function
+
import numpy as np
# DIRTY STUFF. Needed the get_lonlatalt function to work on pos directly if
# we want to print out lonlats in the end.
from pyorbital import astronomy
-from pyorbital.orbital import XKMPER, F
-from pyorbital.orbital import Orbital
+from pyorbital.orbital import XKMPER, F, Orbital
A = 6378.137 # WGS84 Equatorial radius (km)
B = 6356.75231414 # km, GRS80
@@ -45,6 +44,7 @@
def geodetic_lat(point, a=A, b=B):
+ """Get the Geodetic latitude of a point."""
x, y, z = point
r = np.sqrt(x * x + y * y)
geoc_lat = np.arctan2(z, r)
@@ -84,8 +84,9 @@ class ScanGeometry(object):
"""
def __init__(self, fovs, times, attitude=(0, 0, 0)):
+ """Initialize the class."""
self.fovs = np.array(fovs)
- self._times = np.array(times) * np.timedelta64(1000000000, 'ns')
+ self._times = np.array(times) * np.timedelta64(1000000000, "ns")
self.attitude = attitude
def vectors(self, pos, vel, roll=0.0, pitch=0.0, yaw=0.0):
@@ -120,6 +121,7 @@ def vectors(self, pos, vel, roll=0.0, pitch=0.0, yaw=0.0):
return qrotate(xy_rotated, nadir, yaw)
def times(self, start_of_scan):
+ """Return an array with the times of each scan line."""
# tds = [timedelta(seconds=i) for i in self._times]
# tds = self._times.astype('timedelta64[us]')
try:
@@ -129,12 +131,15 @@ def times(self, start_of_scan):
class Quaternion(object):
+ """Some class, that I don't know what is doing..."""
def __init__(self, scalar, vector):
+ """Initialize the class."""
self.__x, self.__y, self.__z = vector.reshape((3, -1))
self.__w = scalar.ravel()
def rotation_matrix(self):
+ """Get the rotation matrix."""
x, y, z, w = self.__x, self.__y, self.__z, self.__w
zero = np.zeros_like(x)
return np.array(
@@ -240,27 +245,28 @@ def compute_pixels(orb, sgeom, times, rpy=(0.0, 0.0, 0.0)):
def norm(v):
+ """Return the norm of the vector *v*."""
return np.sqrt(np.dot(v, v.conj()))
def mnorm(m, axis=None):
- """norm of a matrix of vectors stacked along the *axis* dimension."""
+ """Norm of a matrix of vectors stacked along the *axis* dimension."""
if axis is None:
axis = np.ndim(m) - 1
return np.sqrt((m**2).sum(axis))
def vnorm(m):
- """norms of a matrix of column vectors."""
+ """Norms of a matrix of column vectors."""
return np.sqrt((m**2).sum(0))
def hnorm(m):
- """norms of a matrix of row vectors."""
+ """Norms of a matrix of row vectors."""
return np.sqrt((m**2).sum(1))
-if __name__ == '__main__':
+if __name__ == "__main__":
# NOAA 18 (from the 2011-10-12, 16:55 utc)
# 1 28654U 05018A 11284.35271227 .00000478 00000-0 28778-3 0 9246
# 2 28654 99.0096 235.8581 0014859 135.4286 224.8087 14.11526826329313
diff --git a/pyorbital/geoloc_example.py b/pyorbital/geoloc_example.py
index c3c1296..1b8b567 100644
--- a/pyorbital/geoloc_example.py
+++ b/pyorbital/geoloc_example.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-# Copyright (c) 2013 Martin Raspaud
+# Copyright (c) 2013, 2024 Martin Raspaud
# Author(s):
@@ -20,14 +20,15 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-"""Simple usage for geoloc.
-"""
+"""Simple usage for geoloc."""
-import numpy as np
from datetime import datetime
-from pyorbital.geoloc import ScanGeometry, compute_pixels, get_lonlatalt
-from mpl_toolkits.basemap import Basemap
+
import matplotlib.pyplot as plt
+import numpy as np
+from mpl_toolkits.basemap import Basemap
+
+from pyorbital.geoloc import ScanGeometry, compute_pixels, get_lonlatalt
# Couple of example Two Line Elements
tle1 = "1 33591U 09005A 12345.45213434 .00000391 00000-0 24004-3 0 6113"
@@ -72,14 +73,14 @@
print(pos_time)
# Plot the result
-m = Basemap(projection='stere', llcrnrlat=24, urcrnrlat=70, llcrnrlon=-25, urcrnrlon=120,
- lat_ts=58, lat_0=58, lon_0=14, resolution='l')
+m = Basemap(projection="stere", llcrnrlat=24, urcrnrlat=70, llcrnrlon=-25, urcrnrlon=120,
+ lat_ts=58, lat_0=58, lon_0=14, resolution="l")
# convert and plot the predicted pixels in red
x, y = m(pos_time[0], pos_time[1])
-p1 = m.plot(x, y, marker='+', color='red', markerfacecolor='red', markeredgecolor='red', markersize=1, markevery=1,
+p1 = m.plot(x, y, marker="+", color="red", markerfacecolor="red", markeredgecolor="red", markersize=1, markevery=1,
zorder=4, linewidth=0.0)
-m.fillcontinents(color='0.85', lake_color=None, zorder=3)
+m.fillcontinents(color="0.85", lake_color=None, zorder=3)
m.drawparallels(np.arange(-90., 90., 5.), labels=[1, 0, 1, 0], fontsize=10, dashes=[1, 0],
color=[0.8, 0.8, 0.8], zorder=1)
m.drawmeridians(np.arange(-180., 180., 5.), labels=[0, 1, 0, 1], fontsize=10, dashes=[1, 0],
diff --git a/pyorbital/geoloc_instrument_definitions.py b/pyorbital/geoloc_instrument_definitions.py
index 465ebc6..28cfb2a 100644
--- a/pyorbital/geoloc_instrument_definitions.py
+++ b/pyorbital/geoloc_instrument_definitions.py
@@ -41,7 +41,6 @@
from pyorbital.geoloc import ScanGeometry
-
################################################################
#
# AVHRR
@@ -78,7 +77,7 @@ def avhrr(scans_nb, scan_points,
def avhrr_gac(scan_times, scan_points,
scan_angle=55.37, frequency=0.5):
- """Definition of the avhrr instrument, gac version
+ """Definition of the avhrr instrument, gac version.
Source: NOAA KLM User's Guide, Appendix J
http://www.ncdc.noaa.gov/oa/pod-guide/ncdc/docs/klm/html/j/app-j.htm
@@ -154,9 +153,8 @@ def viirs(scans_nb, scan_indices=slice(0, None),
99 emtpy (excluded) scans
"""
-
entire_width = np.arange(chn_pixels)
- scan_points = entire_width[scan_indices].astype('int')
+ scan_points = entire_width[scan_indices].astype("int")
scan_pixels = len(scan_points)
# Initial angle 55.84 deg replaced with 56.28 deg found in
@@ -210,7 +208,7 @@ def viirs_edge_geom(scans_nb):
################################################################
def amsua(scans_nb, scan_points=None):
- """ Describe AMSU-A instrument geometry
+ """Describe AMSU-A instrument geometry.
Parameters:
scans_nb | int - number of scan lines
@@ -222,7 +220,6 @@ def amsua(scans_nb, scan_points=None):
pyorbital.geoloc.ScanGeometry object
"""
-
scan_len = 30 # 30 samples per scan
scan_rate = 8 # single scan, seconds
scan_angle = -48.3 # swath, degrees
@@ -255,7 +252,7 @@ def amsua(scans_nb, scan_points=None):
################################################################
def mhs(scans_nb, scan_points=None):
- """ Describe MHS instrument geometry
+ """Describe MHS instrument geometry.
See:
@@ -274,7 +271,6 @@ def mhs(scans_nb, scan_points=None):
pyorbital.geoloc.ScanGeometry object
"""
-
scan_len = 90 # 90 samples per scan
scan_rate = 8 / 3. # single scan, seconds
scan_angle = -49.444 # swath, degrees
@@ -332,7 +328,6 @@ def hirs4(scans_nb, scan_points=None):
pyorbital.geoloc.ScanGeometry object
"""
-
scan_len = 56 # 56 samples per scan
scan_rate = 6.4 # single scan, seconds
scan_angle = -49.5 # swath, degrees
@@ -363,7 +358,7 @@ def hirs4(scans_nb, scan_points=None):
################################################################
def atms(scans_nb, scan_points=None):
- """ Describe ATMS instrument geometry
+ """Describe ATMS instrument geometry
See:
- https://dtcenter.org/com-GSI/users/docs/presentations/2013_workshop/
@@ -382,7 +377,6 @@ def atms(scans_nb, scan_points=None):
pyorbital.geoloc.ScanGeometry object
"""
-
scan_len = 96 # 96 samples per scan
scan_rate = 8 / 3. # single scan, seconds
scan_angle = -52.7 # swath, degrees
@@ -413,7 +407,7 @@ def atms(scans_nb, scan_points=None):
################################################################
def mwhs2(scans_nb, scan_points=None):
- """Describe MWHS-2 instrument geometry
+ """Describe MWHS-2 instrument geometry.
The scanning period is 2.667 s. Main beams of the antenna scan over the ob-
serving swath (±53.35◦ from nadir) in the cross-track direction at a
@@ -434,7 +428,6 @@ def mwhs2(scans_nb, scan_points=None):
pyorbital.geoloc.ScanGeometry object
"""
-
scan_len = 98 # 98 samples per scan
scan_rate = 8 / 3. # single scan, seconds
scan_angle = -53.35 # swath, degrees
@@ -485,7 +478,6 @@ def olci(scans_nb, scan_points=None):
Source: Sentinel-3 OLCI Coverage
https://sentinel.esa.int/web/sentinel/user-guides/sentinel-3-olci/coverage
"""
-
if scan_points is None:
scan_len = 4000 # samples per scan
scan_points = np.arange(4000)
@@ -519,7 +511,6 @@ def ascat(scan_nb, scan_points=None):
sub-satellite track.
"""
-
if scan_points is None:
scan_len = 42 # samples per scan
scan_points = np.arange(42)
diff --git a/pyorbital/logger.py b/pyorbital/logger.py
index b450664..c3070a7 100644
--- a/pyorbital/logger.py
+++ b/pyorbital/logger.py
@@ -38,12 +38,12 @@ def logging_on(level=logging.WARNING):
console = logging.StreamHandler()
console.setFormatter(logging.Formatter("[%(levelname)s: %(asctime)s :"
" %(name)s] %(message)s",
- '%Y-%m-%d %H:%M:%S'))
+ "%Y-%m-%d %H:%M:%S"))
console.setLevel(level)
- logging.getLogger('').addHandler(console)
+ logging.getLogger("").addHandler(console)
_is_logging_on = True
- log = logging.getLogger('')
+ log = logging.getLogger("")
log.setLevel(level)
for h in log.handlers:
h.setLevel(level)
@@ -58,7 +58,7 @@ def emit(self, record):
def logging_off():
"""Turn logging off."""
- logging.getLogger('').handlers = [NullHandler()]
+ logging.getLogger("").handlers = [NullHandler()]
def get_logger(name):
diff --git a/pyorbital/orbital.py b/pyorbital/orbital.py
index 07f2e40..bba8bd9 100644
--- a/pyorbital/orbital.py
+++ b/pyorbital/orbital.py
@@ -27,9 +27,9 @@
import logging
import warnings
from datetime import datetime, timedelta
-import pytz
import numpy as np
+import pytz
from scipy import optimize
from pyorbital import astronomy, dt2np, tlefile
@@ -169,7 +169,7 @@ def __str__(self):
def get_last_an_time(self, utc_time):
"""Calculate time of last ascending node relative to the specified time."""
# Propagate backwards to ascending node
- dt = np.timedelta64(10, 'm')
+ dt = np.timedelta64(10, "m")
t_old = np.datetime64(_get_tz_unaware_utctime(utc_time))
t_new = t_old - dt
pos0, vel0 = self.get_position(t_old, normalize=False)
@@ -298,6 +298,7 @@ def get_orbit_number(self, utc_time, tbus_style=False, as_float=False):
"""Calculate orbit number at specified time.
Args:
+ utc_time: UTC time as a datetime.datetime object.
tbus_style: If True, use TBUS-style orbit numbering (TLE orbit number + 1)
as_float: Return a continuous orbit number as float.
"""
@@ -318,7 +319,7 @@ def get_orbit_number(self, utc_time, tbus_style=False, as_float=False):
self.orbit_elements.an_period = self.orbit_elements.an_time - \
self.get_last_an_time(self.orbit_elements.an_time
- - np.timedelta64(10, 'm'))
+ - np.timedelta64(10, "m"))
dt = astronomy._days(utc_time - self.orbit_elements.an_time)
orbit_period = astronomy._days(self.orbit_elements.an_period)
@@ -385,7 +386,7 @@ def get_max_parab(fun, start, end, tol=0.01):
f_c = fun(c)
x = b
- with np.errstate(invalid='raise'):
+ with np.errstate(invalid="raise"):
while True:
try:
x = x - 0.5 * (((b - a) ** 2 * (f_b - f_c)
@@ -398,7 +399,7 @@ def get_max_parab(fun, start, end, tol=0.01):
f_x = fun(x)
# sometimes the estimation diverges... return best guess
if f_x > f_b:
- logger.info('Parabolic interpolation did not converge, returning best guess so far.')
+ logger.info("Parabolic interpolation did not converge, returning best guess so far.")
return b
a, b, c = (a + x) / 2.0, x, (x + c) / 2.0
@@ -449,7 +450,7 @@ def _get_time_at_horizon(self, utc_time, obslon, obslat, **kwargs):
warnings.warn("_get_time_at_horizon is replaced with get_next_passes",
DeprecationWarning, stacklevel=2)
if "precision" in kwargs:
- precision = kwargs['precision']
+ precision = kwargs["precision"]
else:
precision = timedelta(seconds=0.001)
if "max_iterations" in kwargs:
@@ -497,7 +498,7 @@ def utc2local(self, utc_time):
lon, _, _ = self.get_lonlatalt(utc_time)
return utc_time + timedelta(hours=lon * 24 / 360.0)
- def get_equatorial_crossing_time(self, tstart, tend, node='ascending', local_time=False,
+ def get_equatorial_crossing_time(self, tstart, tend, node="ascending", local_time=False,
rtol=1E-9):
"""Estimate the equatorial crossing time of an orbit.
@@ -524,19 +525,19 @@ def get_equatorial_crossing_time(self, tstart, tend, node='ascending', local_tim
# Orbit doesn't cross the equator in the given time interval
return None
elif n_end - n_start > 1:
- warnings.warn('Multiple revolutions between start and end time. Computing crossing '
- 'time for the last revolution in that interval.', stacklevel=2)
+ warnings.warn("Multiple revolutions between start and end time. Computing crossing "
+ "time for the last revolution in that interval.", stacklevel=2)
# Let n'(t) = n(t) - offset. Determine offset so that n'(tstart) < 0 and n'(tend) > 0 and
# n'(tcross) = 0.
offset = int(n_end)
- if node == 'descending':
+ if node == "descending":
offset = offset + 0.5
# Use bisection algorithm to find the root of n'(t), which is the crossing time. The
# algorithm requires continuous time coordinates, so convert timestamps to microseconds
# since 1970.
- time_unit = 'us' # same precision as datetime
+ time_unit = "us" # same precision as datetime
def _nprime(time_f):
"""Continuous orbit number as a function of time."""
@@ -618,6 +619,7 @@ class _SGDP4(object):
"""Class for the SGDP4 computations."""
def __init__(self, orbit_elements):
+ """Initialize class."""
self.mode = None
# perigee = orbit_elements.perigee
@@ -636,11 +638,11 @@ def __init__(self, orbit_elements):
# A30 = -XJ3 * AE**3
if not (0 < self.eo < ECC_LIMIT_HIGH):
- raise OrbitalError('Eccentricity out of range: %e' % self.eo)
+ raise OrbitalError("Eccentricity out of range: %e" % self.eo)
elif not ((0.0035 * 2 * np.pi / XMNPDA) < self.xn_0 < (18 * 2 * np.pi / XMNPDA)):
- raise OrbitalError('Mean motion out of range: %e' % self.xn_0)
+ raise OrbitalError("Mean motion out of range: %e" % self.xn_0)
elif not (0 < self.xincl < np.pi):
- raise OrbitalError('Inclination out of range: %e' % self.xincl)
+ raise OrbitalError("Inclination out of range: %e" % self.xincl)
if self.eo < 0:
self.mode = self.SGDP4_ZERO_ECC
@@ -776,7 +778,7 @@ def __init__(self, orbit_elements):
15.0 * c1sq * (2.0 * self.d2 + c1sq)))
elif self.mode == SGDP4_DEEP_NORM:
- raise NotImplementedError('Deep space calculations not supported')
+ raise NotImplementedError("Deep space calculations not supported")
def propagate(self, utc_time):
kep = {}
@@ -786,7 +788,7 @@ def propagate(self, utc_time):
# print utc_time.shape
# print self.t_0
utc_time = dt2np(utc_time)
- ts = (utc_time - self.t_0) / np.timedelta64(1, 'm')
+ ts = (utc_time - self.t_0) / np.timedelta64(1, "m")
em = self.eo
xinc = self.xincl
@@ -796,7 +798,7 @@ def propagate(self, utc_time):
omega = self.omegao + self.omgdot * ts
if self.mode == SGDP4_ZERO_ECC:
- raise NotImplementedError('Mode SGDP4_ZERO_ECC not implemented')
+ raise NotImplementedError("Mode SGDP4_ZERO_ECC not implemented")
elif self.mode == SGDP4_NEAR_SIMP:
raise NotImplementedError('Mode "Near-space, simplified equations"'
' not implemented')
@@ -819,12 +821,12 @@ def propagate(self, utc_time):
xl = xmp + omega + xnode + self.xnodp * templ
else:
- raise NotImplementedError('Deep space calculations not supported')
+ raise NotImplementedError("Deep space calculations not supported")
if np.any(a < 1):
- raise Exception('Satellite crashed at time %s', utc_time)
+ raise Exception("Satellite crashed at time %s", utc_time)
elif np.any(e < ECC_LIMIT_LOW):
- raise ValueError('Satellite modified eccentricity too low: %s < %e'
+ raise ValueError("Satellite modified eccentricity too low: %s < %e"
% (str(e[e < ECC_LIMIT_LOW]), ECC_LIMIT_LOW))
e = np.where(e < ECC_EPS, ECC_EPS, e)
@@ -844,14 +846,14 @@ def propagate(self, utc_time):
elsq = axn**2 + ayn**2
if np.any(elsq >= 1):
- raise Exception('e**2 >= 1 at %s', utc_time)
+ raise Exception("e**2 >= 1 at %s", utc_time)
- kep['ecc'] = np.sqrt(elsq)
+ kep["ecc"] = np.sqrt(elsq)
epw = np.fmod(xlt - xnode, 2 * np.pi)
# needs a copy in case of an array
capu = np.array(epw)
- maxnr = kep['ecc']
+ maxnr = kep["ecc"]
for i in range(10):
sinEPW = np.sin(epw)
cosEPW = np.cos(epw)
@@ -899,7 +901,7 @@ def propagate(self, utc_time):
xinck = xinc + 1.5 * temp2 * self.cosIO * self.sinIO * cos2u
if np.any(rk < 1):
- raise Exception('Satellite crashed at time %s', utc_time)
+ raise Exception("Satellite crashed at time %s", utc_time)
temp0 = np.sqrt(a)
temp2 = XKE / (a * temp0)
@@ -909,14 +911,14 @@ def propagate(self, utc_time):
(self.x1mth2 * cos2u + 1.5 * self.x3thm1)) *
(XKMPER / AE * XMNPDA / 86400.0))
- kep['radius'] = rk * XKMPER / AE
- kep['theta'] = uk
- kep['eqinc'] = xinck
- kep['ascn'] = xnodek
- kep['argp'] = omega
- kep['smjaxs'] = a * XKMPER / AE
- kep['rdotk'] = rdotk
- kep['rfdotk'] = rfdotk
+ kep["radius"] = rk * XKMPER / AE
+ kep["theta"] = uk
+ kep["eqinc"] = xinck
+ kep["ascn"] = xnodek
+ kep["argp"] = omega
+ kep["smjaxs"] = a * XKMPER / AE
+ kep["rdotk"] = rdotk
+ kep["rfdotk"] = rfdotk
return kep
@@ -940,12 +942,12 @@ def kep2xyz(kep):
(Not sure what 'kep' actually refers to, just guessing! FIXME!)
"""
- sinT = np.sin(kep['theta'])
- cosT = np.cos(kep['theta'])
- sinI = np.sin(kep['eqinc'])
- cosI = np.cos(kep['eqinc'])
- sinS = np.sin(kep['ascn'])
- cosS = np.cos(kep['ascn'])
+ sinT = np.sin(kep["theta"])
+ cosT = np.cos(kep["theta"])
+ sinI = np.sin(kep["eqinc"])
+ cosI = np.cos(kep["eqinc"])
+ sinS = np.sin(kep["ascn"])
+ cosS = np.cos(kep["ascn"])
xmx = -sinS * cosI
xmy = cosS * cosI
@@ -954,17 +956,17 @@ def kep2xyz(kep):
uy = xmy * sinT + sinS * cosT
uz = sinI * sinT
- x = kep['radius'] * ux
- y = kep['radius'] * uy
- z = kep['radius'] * uz
+ x = kep["radius"] * ux
+ y = kep["radius"] * uy
+ z = kep["radius"] * uz
vx = xmx * cosT - cosS * sinT
vy = xmy * cosT - sinS * sinT
vz = sinI * cosT
- v_x = kep['rdotk'] * ux + kep['rfdotk'] * vx
- v_y = kep['rdotk'] * uy + kep['rfdotk'] * vy
- v_z = kep['rdotk'] * uz + kep['rfdotk'] * vz
+ v_x = kep["rdotk"] * ux + kep["rfdotk"] * vx
+ v_y = kep["rdotk"] * uy + kep["rfdotk"] * vy
+ v_z = kep["rdotk"] * uz + kep["rfdotk"] * vz
return np.array((x, y, z)), np.array((v_x, v_y, v_z))
diff --git a/pyorbital/tests/test_aiaa.py b/pyorbital/tests/test_aiaa.py
index 6362ab4..290c718 100644
--- a/pyorbital/tests/test_aiaa.py
+++ b/pyorbital/tests/test_aiaa.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-# Copyright (c) 2011 - 2023 Pytroll Community
+# Copyright (c) 2011 - 2024 Pytroll Community
# Author(s):
@@ -20,8 +20,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-"""Test cases from the AIAA article.
-"""
+"""Test cases from the AIAA article."""
+
# TODO: right formal unit tests.
from __future__ import print_function, with_statement
@@ -38,10 +38,10 @@
class LineOrbital(Orbital):
- """Read TLE lines instead of file.
- """
+ """Read TLE lines instead of file."""
def __init__(self, satellite, line1, line2):
+ """Initialize the class."""
satellite = satellite.upper()
self.satellite_name = satellite
self.tle = tlefile.read(satellite, line1=line1, line2=line2)
@@ -50,8 +50,7 @@ def __init__(self, satellite, line1, line2):
def get_results(satnumber, delay):
- """Get expected results from result file.
- """
+ """Get expected results from result file."""
path = os.path.dirname(os.path.abspath(__file__))
with open(os.path.join(path, "aiaa_results")) as f_2:
line = f_2.readline()
@@ -87,10 +86,9 @@ class AIAAIntegrationTest(unittest.TestCase):
@unittest.skipIf(
not os.path.exists(os.path.join(_DATAPATH, "SGP4-VER.TLE")),
- 'SGP4-VER.TLE not available')
+ "SGP4-VER.TLE not available")
def test_aiaa(self):
- """Do the tests against AIAA test cases.
- """
+ """Do the tests against AIAA test cases."""
path = os.path.dirname(os.path.abspath(__file__))
with open(os.path.join(path, "SGP4-VER.TLE")) as f__:
test_line = f__.readline()
@@ -116,12 +114,11 @@ def test_aiaa(self):
test_line = f__.readline()
continue
except ChecksumError:
- self.assertTrue(test_line.split()[1] in [
- "33333", "33334", "33335"])
+ assert test_line.split()[1] in ["33333", "33334", "33335"]
for delay in times:
try:
test_time = delay.astype(
- 'timedelta64[m]') + o.tle.epoch
+ "timedelta64[m]") + o.tle.epoch
pos, vel = o.get_position(test_time, False)
res = get_results(
int(o.tle.satnumber), float(delay))
@@ -136,14 +133,14 @@ def test_aiaa(self):
delta_pos = 5e-6 # km = 5 mm
delta_vel = 5e-9 # km/s = 5 um/s
delta_time = 1e-3 # 1 millisecond
- self.assertTrue(abs(res[0] - pos[0]) < delta_pos)
- self.assertTrue(abs(res[1] - pos[1]) < delta_pos)
- self.assertTrue(abs(res[2] - pos[2]) < delta_pos)
- self.assertTrue(abs(res[3] - vel[0]) < delta_vel)
- self.assertTrue(abs(res[4] - vel[1]) < delta_vel)
- self.assertTrue(abs(res[5] - vel[2]) < delta_vel)
+ assert abs(res[0] - pos[0]) < delta_pos
+ assert abs(res[1] - pos[1]) < delta_pos
+ assert abs(res[2] - pos[2]) < delta_pos
+ assert abs(res[3] - vel[0]) < delta_vel
+ assert abs(res[4] - vel[1]) < delta_vel
+ assert abs(res[5] - vel[2]) < delta_vel
if res[6] is not None:
dt = astronomy._days(res[6] - test_time) * 24 * 60
- self.assertTrue(abs(dt) < delta_time)
+ assert abs(dt) < delta_time
test_line = f__.readline()
diff --git a/pyorbital/tests/test_geoloc.py b/pyorbital/tests/test_geoloc.py
index e579778..eceb9d3 100644
--- a/pyorbital/tests/test_geoloc.py
+++ b/pyorbital/tests/test_geoloc.py
@@ -23,10 +23,11 @@
"""Test the geoloc module."""
from datetime import datetime
+
import numpy as np
from pyorbital.geoloc import ScanGeometry, geodetic_lat, qrotate, subpoint
-from pyorbital.geoloc_instrument_definitions import avhrr, viirs, amsua, mhs, hirs4, atms, ascat
+from pyorbital.geoloc_instrument_definitions import amsua, ascat, atms, avhrr, hirs4, mhs, viirs
class TestQuaternion:
@@ -110,8 +111,8 @@ def test_scan_geometry(self):
times = instrument.times(start_of_scan)
assert times[0, 1] == start_of_scan
- assert times[0, 0] == start_of_scan - np.timedelta64(100, 'ms')
- assert times[0, 2] == start_of_scan + np.timedelta64(100, 'ms')
+ assert times[0, 0] == start_of_scan - np.timedelta64(100, "ms")
+ assert times[0, 2] == start_of_scan + np.timedelta64(100, "ms")
def test_geodetic_lat(self):
"""Test the determination of the geodetic latitude."""
diff --git a/pyorbital/tests/test_orbital.py b/pyorbital/tests/test_orbital.py
index 74e9dac..1cc46b8 100644
--- a/pyorbital/tests/test_orbital.py
+++ b/pyorbital/tests/test_orbital.py
@@ -22,12 +22,14 @@
"""Test the geoloc orbital."""
-import pytest
import unittest
-from unittest import mock
from datetime import datetime, timedelta
-import pytz
+from unittest import mock
+
import numpy as np
+import pytest
+import pytz
+
from pyorbital import orbital
eps_deg = 10e-3
@@ -45,7 +47,7 @@ def test_get_orbit_number(self):
"92.4533 267.6830 14.19582686 11574")
dobj = datetime(2012, 1, 18, 8, 4, 19)
orbnum = sat.get_orbit_number(dobj)
- self.assertEqual(orbnum, 1163)
+ assert orbnum == 1163
def test_sublonlat(self):
"""Test getting the sub-satellite position."""
@@ -59,12 +61,9 @@ def test_sublonlat(self):
expected_lon = -68.199894472013213
expected_lat = 23.159747677881075
expected_alt = 392.01953430856935
- self.assertTrue(np.abs(lon - expected_lon) < eps_deg,
- 'Calculation of sublon failed')
- self.assertTrue(np.abs(lat - expected_lat) < eps_deg,
- 'Calculation of sublat failed')
- self.assertTrue(np.abs(alt - expected_alt) < eps_deg,
- 'Calculation of altitude failed')
+ assert np.abs(lon - expected_lon) < eps_deg, "Calculation of sublon failed"
+ assert np.abs(lat - expected_lat) < eps_deg, "Calculation of sublat failed"
+ assert np.abs(alt - expected_alt) < eps_deg, "Calculation of altitude failed"
def test_observer_look(self):
"""Test getting the observer look angles."""
@@ -77,10 +76,8 @@ def test_observer_look(self):
az, el = sat.get_observer_look(d, -84.39733, 33.775867, 0)
expected_az = 122.45169655331965
expected_el = 1.9800219611255456
- self.assertTrue(np.abs(az - expected_az) < eps_deg,
- 'Calculation of azimut failed')
- self.assertTrue(np.abs(el - expected_el) < eps_deg,
- 'Calculation of elevation failed')
+ assert np.abs(az - expected_az) < eps_deg, "Calculation of azimut failed"
+ assert np.abs(el - expected_el) < eps_deg, "Calculation of elevation failed"
def test_orbit_num_an(self):
"""Test getting orbit number - ascending node."""
@@ -90,7 +87,7 @@ def test_orbit_num_an(self):
line2="2 29499 98.6804 312.6735 0001758 "
"111.9178 248.2152 14.21501774254058")
d = datetime(2011, 9, 14, 5, 30)
- self.assertEqual(sat.get_orbit_number(d), 25437)
+ assert sat.get_orbit_number(d) == 25437
def test_orbit_num_non_an(self):
"""Test getting orbit number - not ascending node."""
@@ -99,8 +96,8 @@ def test_orbit_num_non_an(self):
".00000017 00000-0 27793-4 0 9819",
line2="2 29499 98.6639 121.6164 0001449 "
"71.9056 43.3132 14.21510544330271")
- dt = np.timedelta64(98, 'm')
- self.assertEqual(sat.get_orbit_number(sat.tle.epoch + dt), 33028)
+ dt = np.timedelta64(98, "m")
+ assert sat.get_orbit_number(sat.tle.epoch + dt) == 33028
def test_orbit_num_equator(self):
"""Test getting orbit numbers when being around equator."""
@@ -113,13 +110,13 @@ def test_orbit_num_equator(self):
t2 = datetime(2013, 3, 2, 22, 2, 26)
on1 = sat.get_orbit_number(t1)
on2 = sat.get_orbit_number(t2)
- self.assertEqual(on1, 6973)
- self.assertEqual(on2, 6974)
+ assert on1 == 6973
+ assert on2 == 6974
pos1, vel1 = sat.get_position(t1, normalize=False)
pos2, vel2 = sat.get_position(t2, normalize=False)
del vel1, vel2
- self.assertTrue(pos1[2] < 0)
- self.assertTrue(pos2[2] > 0)
+ assert pos1[2] < 0
+ assert pos2[2] > 0
def test_get_next_passes_apogee(self):
"""Regression test #22."""
@@ -128,12 +125,10 @@ def test_get_next_passes_apogee(self):
line2 = "2 24793 86.3994 209.3241 0002020 " \
"89.8714 270.2713 14.34246429 90794"
- orb = orbital.Orbital('IRIDIUM 7 [+]', line1=line1, line2=line2)
+ orb = orbital.Orbital("IRIDIUM 7 [+]", line1=line1, line2=line2)
d = datetime(2018, 3, 7, 3, 30, 15)
res = orb.get_next_passes(d, 1, 170.556, -43.368, 0.5, horizon=40)
- self.assertTrue(abs(
- res[0][2] - datetime(2018, 3, 7, 3, 48, 13, 178439)) <
- timedelta(seconds=0.01))
+ assert abs(res[0][2] - datetime(2018, 3, 7, 3, 48, 13, 178439)) < timedelta(seconds=0.01)
def test_get_next_passes_tricky(self):
"""Check issue #34 for reference."""
@@ -143,33 +138,28 @@ def test_get_next_passes_tricky(self):
line2 = "2 43125 097.5269 314.3317 0010735 "\
"157.6344 202.5362 15.23132245036381"
- orb = orbital.Orbital('LEMUR-2-BROWNCOW', line1=line1, line2=line2)
+ orb = orbital.Orbital("LEMUR-2-BROWNCOW", line1=line1, line2=line2)
d = datetime(2018, 9, 8)
res = orb.get_next_passes(d, 72, -8.174163, 51.953319, 0.05, horizon=5)
- self.assertTrue(abs(
- res[0][2] - datetime(2018, 9, 8, 9, 5, 46, 375248)) <
- timedelta(seconds=0.01))
- self.assertTrue(abs(
- res[-1][2] - datetime(2018, 9, 10, 22, 15, 3, 143469)) <
- timedelta(seconds=0.01))
+ assert abs(res[0][2] - datetime(2018, 9, 8, 9, 5, 46, 375248)) < timedelta(seconds=0.01)
+ assert abs(res[-1][2] - datetime(2018, 9, 10, 22, 15, 3, 143469)) < timedelta(seconds=0.01)
- self.assertTrue(len(res) == 15)
+ assert len(res) == 15
def test_get_next_passes_issue_22(self):
"""Check that max."""
- line1 = '1 28654U 05018A 21083.16603416 .00000102 00000-0 79268-4 0 9999'
- line2 = '2 28654 99.0035 147.6583 0014816 159.4931 200.6838 14.12591533816498'
+ line1 = "1 28654U 05018A 21083.16603416 .00000102 00000-0 79268-4 0 9999"
+ line2 = "2 28654 99.0035 147.6583 0014816 159.4931 200.6838 14.12591533816498"
orb = orbital.Orbital("NOAA 18", line1=line1, line2=line2)
t = datetime(2021, 3, 9, 22)
next_passes = orb.get_next_passes(t, 1, -15.6335, 27.762, 0.)
rise, fall, max_elevation = next_passes[0]
assert rise < max_elevation < fall
- print(next_passes)
- @mock.patch('pyorbital.orbital.Orbital.get_lonlatalt')
+ @mock.patch("pyorbital.orbital.Orbital.get_lonlatalt")
def test_utc2local(self, get_lonlatalt):
"""Test converting UTC to local time."""
get_lonlatalt.return_value = -45, None, None
@@ -178,20 +168,19 @@ def test_utc2local(self, get_lonlatalt):
".00000017 00000-0 27793-4 0 9819",
line2="2 29499 98.6639 121.6164 0001449 "
"71.9056 43.3132 14.21510544330271")
- self.assertEqual(sat.utc2local(datetime(2009, 7, 1, 12)),
- datetime(2009, 7, 1, 9))
+ assert sat.utc2local(datetime(2009, 7, 1, 12)) == datetime(2009, 7, 1, 9)
- @mock.patch('pyorbital.orbital.Orbital.utc2local')
- @mock.patch('pyorbital.orbital.Orbital.get_orbit_number')
+ @mock.patch("pyorbital.orbital.Orbital.utc2local")
+ @mock.patch("pyorbital.orbital.Orbital.get_orbit_number")
def test_get_equatorial_crossing_time(self, get_orbit_number, utc2local):
"""Test get the equatorial crossing time."""
def get_orbit_number_patched(utc_time, **kwargs):
utc_time = np.datetime64(utc_time)
- diff = (utc_time - np.datetime64('2009-07-01 12:38:12')) / np.timedelta64(7200, 's')
+ diff = (utc_time - np.datetime64("2009-07-01 12:38:12")) / np.timedelta64(7200, "s")
return 1234 + diff
get_orbit_number.side_effect = get_orbit_number_patched
- utc2local.return_value = 'local_time'
+ utc2local.return_value = "local_time"
sat = orbital.Orbital("METOP-A",
line1="1 29499U 06044A 13060.48822809 "
".00000017 00000-0 27793-4 0 9819",
@@ -202,20 +191,20 @@ def get_orbit_number_patched(utc_time, **kwargs):
res = sat.get_equatorial_crossing_time(tstart=datetime(2009, 7, 1, 12),
tend=datetime(2009, 7, 1, 13))
exp = datetime(2009, 7, 1, 12, 38, 12)
- self.assertTrue((res - exp) < timedelta(seconds=0.01))
+ assert res - exp < timedelta(seconds=0.01)
# Descending node
res = sat.get_equatorial_crossing_time(tstart=datetime(2009, 7, 1, 12),
tend=datetime(2009, 7, 1, 14, 0),
- node='descending')
+ node="descending")
exp = datetime(2009, 7, 1, 13, 38, 12)
- self.assertTrue((res - exp) < timedelta(seconds=0.01))
+ assert res - exp < timedelta(seconds=0.01)
# Conversion to local time
res = sat.get_equatorial_crossing_time(tstart=datetime(2009, 7, 1, 12),
tend=datetime(2009, 7, 1, 14),
local_time=True)
- self.assertEqual(res, 'local_time')
+ assert res == "local_time"
class TestGetObserverLook(unittest.TestCase):
@@ -250,8 +239,9 @@ def test_basic_numpy(self):
def test_basic_dask(self):
"""Test with dask array inputs."""
- from pyorbital import orbital
import dask.array as da
+
+ from pyorbital import orbital
sat_lon = da.from_array(self.sat_lon, chunks=2)
sat_lat = da.from_array(self.sat_lat, chunks=2)
sat_alt = da.from_array(self.sat_alt, chunks=2)
@@ -266,9 +256,10 @@ def test_basic_dask(self):
def test_xarray_with_numpy(self):
"""Test with xarray DataArray with numpy array as inputs."""
- from pyorbital import orbital
import xarray as xr
+ from pyorbital import orbital
+
def _xarr_conv(input):
return xr.DataArray(input)
sat_lon = _xarr_conv(self.sat_lon)
@@ -285,10 +276,11 @@ def _xarr_conv(input):
def test_xarray_with_dask(self):
"""Test with xarray DataArray with dask array as inputs."""
- from pyorbital import orbital
import dask.array as da
import xarray as xr
+ from pyorbital import orbital
+
def _xarr_conv(input):
return xr.DataArray(da.from_array(input, chunks=2))
sat_lon = _xarr_conv(self.sat_lon)
@@ -339,14 +331,15 @@ def test_basic_numpy(self):
azi, elev = orbital.get_observer_look(self.sat_lon, self.sat_lat,
self.sat_alt, self.t,
self.lon, self.lat, self.alt)
- self.assertEqual(np.sum(np.isnan(azi)), 0)
- self.assertFalse(np.isnan(azi).any())
+ assert np.sum(np.isnan(azi)) == 0
+ assert not np.isnan(azi).any()
np.testing.assert_allclose(elev, self.exp_elev)
def test_basic_dask(self):
"""Test with dask array inputs."""
- from pyorbital import orbital
import dask.array as da
+
+ from pyorbital import orbital
sat_lon = da.from_array(self.sat_lon, chunks=2)
sat_lat = da.from_array(self.sat_lat, chunks=2)
sat_alt = da.from_array(self.sat_alt, chunks=2)
@@ -356,15 +349,16 @@ def test_basic_dask(self):
azi, elev = orbital.get_observer_look(sat_lon, sat_lat,
sat_alt, self.t,
lon, lat, alt)
- self.assertEqual(np.sum(np.isnan(azi)), 0)
- self.assertFalse(np.isnan(azi).any())
+ assert np.sum(np.isnan(azi)) == 0
+ assert not np.isnan(azi).any()
np.testing.assert_allclose(elev.compute(), self.exp_elev)
def test_xarray_with_numpy(self):
"""Test with xarray DataArray with numpy array as inputs."""
- from pyorbital import orbital
import xarray as xr
+ from pyorbital import orbital
+
def _xarr_conv(input):
return xr.DataArray(input)
sat_lon = _xarr_conv(self.sat_lon)
@@ -376,16 +370,17 @@ def _xarr_conv(input):
azi, elev = orbital.get_observer_look(sat_lon, sat_lat,
sat_alt, self.t,
lon, lat, alt)
- self.assertEqual(np.sum(np.isnan(azi)), 0)
- self.assertFalse(np.isnan(azi).any())
+ assert np.sum(np.isnan(azi)) == 0
+ assert not np.isnan(azi).any()
np.testing.assert_allclose(elev.data, self.exp_elev)
def test_xarray_with_dask(self):
"""Test with xarray DataArray with dask array as inputs."""
- from pyorbital import orbital
import dask.array as da
import xarray as xr
+ from pyorbital import orbital
+
def _xarr_conv(input):
return xr.DataArray(da.from_array(input, chunks=2))
sat_lon = _xarr_conv(self.sat_lon)
@@ -397,8 +392,8 @@ def _xarr_conv(input):
azi, elev = orbital.get_observer_look(sat_lon, sat_lat,
sat_alt, self.t,
lon, lat, alt)
- self.assertEqual(np.sum(np.isnan(azi)), 0)
- self.assertFalse(np.isnan(azi).any())
+ assert np.sum(np.isnan(azi)) == 0
+ assert not np.isnan(azi).any()
np.testing.assert_allclose(elev.data.compute(), self.exp_elev)
@@ -408,44 +403,46 @@ class TestRegressions(unittest.TestCase):
def test_63(self):
"""Check that no runtimewarning is raised, #63."""
import warnings
- from pyorbital.orbital import Orbital
+
from dateutil import parser
- warnings.filterwarnings('error')
+
+ from pyorbital.orbital import Orbital
+ warnings.filterwarnings("error")
orb = Orbital("Suomi-NPP",
line1="1 37849U 11061A 19292.84582509 .00000011 00000-0 25668-4 0 9997",
line2="2 37849 98.7092 229.3263 0000715 98.5313 290.6262 14.19554485413345")
orb.get_next_passes(parser.parse("2019-10-21 16:00:00"), 12, 123.29736, -13.93763, 0)
- warnings.filterwarnings('default')
+ warnings.filterwarnings("default")
-@pytest.mark.parametrize('dtime',
+@pytest.mark.parametrize("dtime",
[datetime(2024, 6, 25, 11, 0, 18),
datetime(2024, 6, 25, 11, 5, 0, 0, pytz.UTC),
- np.datetime64('2024-06-25T11:10:00.000000')
+ np.datetime64("2024-06-25T11:10:00.000000")
]
)
def test_get_last_an_time_scalar_input(dtime):
"""Test getting the time of the last ascending node - input time is a scalar."""
from pyorbital.orbital import Orbital
orb = Orbital("NOAA-20",
- line1='1 43013U 17073A 24176.73674251 .00000000 00000+0 11066-3 0 00014',
- line2='2 43013 98.7060 114.5340 0001454 139.3958 190.7541 14.19599847341971')
+ line1="1 43013U 17073A 24176.73674251 .00000000 00000+0 11066-3 0 00014",
+ line2="2 43013 98.7060 114.5340 0001454 139.3958 190.7541 14.19599847341971")
- expected = np.datetime64('2024-06-25T10:44:18.234375')
+ expected = np.datetime64("2024-06-25T10:44:18.234375")
result = orb.get_last_an_time(dtime)
- assert abs(expected - result) < np.timedelta64(1, 's')
+ assert abs(expected - result) < np.timedelta64(1, "s")
-@pytest.mark.parametrize('dtime',
- [datetime(2024, 6, 25, 11, 5, 0, 0, pytz.timezone('Europe/Stockholm')),
+@pytest.mark.parametrize("dtime",
+ [datetime(2024, 6, 25, 11, 5, 0, 0, pytz.timezone("Europe/Stockholm")),
]
)
def test_get_last_an_time_wrong_input(dtime):
"""Test getting the time of the last ascending node - wrong input."""
from pyorbital.orbital import Orbital
orb = Orbital("NOAA-20",
- line1='1 43013U 17073A 24176.73674251 .00000000 00000+0 11066-3 0 00014',
- line2='2 43013 98.7060 114.5340 0001454 139.3958 190.7541 14.19599847341971')
+ line1="1 43013U 17073A 24176.73674251 .00000000 00000+0 11066-3 0 00014",
+ line2="2 43013 98.7060 114.5340 0001454 139.3958 190.7541 14.19599847341971")
with pytest.raises(ValueError) as exec_info:
_ = orb.get_last_an_time(dtime)
diff --git a/pyorbital/tests/test_tlefile.py b/pyorbital/tests/test_tlefile.py
index b7eccc3..a921b8b 100644
--- a/pyorbital/tests/test_tlefile.py
+++ b/pyorbital/tests/test_tlefile.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
-# Copyright (c) 2014-2023 Pytroll Community
+# Copyright (c) 2014-2024 Pytroll Community
#
# Author(s):
#
@@ -24,23 +24,26 @@
"""Test TLE file reading, TLE downloading and stroging TLEs to database."""
-from pyorbital.tlefile import Tle
-from pyorbital.tlefile import (_get_config_path,
- read_platform_numbers,
- _get_local_tle_path_from_env,
- _get_uris_and_open_func,
- check_is_platform_supported,
- PKG_CONFIG_DIR)
-
-import logging
import datetime
+import logging
+import os
+import time
import unittest
-from unittest.mock import patch
+from contextlib import suppress
from unittest import mock
+from unittest.mock import patch
+
import pytest
-import os
-from contextlib import suppress
-import time
+
+from pyorbital.tlefile import (
+ PKG_CONFIG_DIR,
+ Tle,
+ _get_config_path,
+ _get_local_tle_path_from_env,
+ _get_uris_and_open_func,
+ check_is_platform_supported,
+ read_platform_numbers,
+)
line0 = "ISS (ZARYA)"
line1 = "1 25544U 98067A 08264.51782528 -.00002182 00000-0 -11606-4 0 2927"
@@ -56,85 +59,85 @@
NOAA19_3LINES = "NOAA 19\n" + NOAA19_2LINES
-tle_xml = '\n'.join(
+tle_xml = "\n".join(
('',
- '',
- '',
- '',
- '',
- '' + line1 + '',
- '' + line2 + '',
- '',
- '',
- '',
- '',
- '',
- '',
- '' + line1_2 + '',
- '' + line2_2 + '',
- '',
- '',
- '',
- ''))
+ "",
+ "",
+ "",
+ "",
+ "" + line1 + "",
+ "" + line2 + "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "" + line1_2 + "",
+ "" + line2_2 + "",
+ "",
+ "",
+ "",
+ ""))
@pytest.fixture
def fake_platforms_file(tmp_path):
"""Return file path to a fake platforms.txt file."""
- file_path = tmp_path / 'platforms.txt'
- lines = ['# Some header lines - line 1\n',
- '# Some header lines - line 2\n',
- 'NOAA-21 54234\n',
- 'NOAA-20 43013\n',
- 'UNKNOWN SATELLITE 99999\n'
+ file_path = tmp_path / "platforms.txt"
+ lines = ["# Some header lines - line 1\n",
+ "# Some header lines - line 2\n",
+ "NOAA-21 54234\n",
+ "NOAA-20 43013\n",
+ "UNKNOWN SATELLITE 99999\n"
]
- with open(file_path, 'w') as fpt:
+ with open(file_path, "w") as fpt:
fpt.writelines(lines)
- yield file_path
+ return file_path
@pytest.fixture(scope="session")
def fake_local_tles_dir(tmp_path_factory):
"""Make a list of fake tle files in a directory."""
- tle_dir = tmp_path_factory.mktemp('tle_files')
- file_path = tle_dir / 'tle-202211180230.txt'
+ tle_dir = tmp_path_factory.mktemp("tle_files")
+ file_path = tle_dir / "tle-202211180230.txt"
file_path.touch()
time.sleep(1)
- file_path = tle_dir / 'tle-202211180430.txt'
+ file_path = tle_dir / "tle-202211180430.txt"
file_path.touch()
time.sleep(1)
- file_path = tle_dir / 'tle-202211180630.txt'
+ file_path = tle_dir / "tle-202211180630.txt"
file_path.touch()
time.sleep(1)
- file_path = tle_dir / 'tle-202211180830.txt'
+ file_path = tle_dir / "tle-202211180830.txt"
file_path.touch()
- yield tle_dir
+ return tle_dir
@pytest.fixture
def mock_env_ppp_config_dir(monkeypatch):
"""Mock environment variable PPP_CONFIG_DIR."""
- monkeypatch.setenv('PPP_CONFIG_DIR', '/path/to/old/mpop/config/dir')
+ monkeypatch.setenv("PPP_CONFIG_DIR", "/path/to/old/mpop/config/dir")
@pytest.fixture
def mock_env_ppp_config_dir_missing(monkeypatch):
"""Mock that the environment variable PPP_CONFIG_DIR is missing."""
- monkeypatch.delenv('PPP_CONFIG_DIR', raising=False)
+ monkeypatch.delenv("PPP_CONFIG_DIR", raising=False)
@pytest.fixture
def mock_env_tles_missing(monkeypatch):
"""Mock that the environment variable TLES is missing."""
- monkeypatch.delenv('TLES', raising=False)
+ monkeypatch.delenv("TLES", raising=False)
@pytest.fixture
def mock_env_tles(monkeypatch, fake_local_tles_dir):
"""Mock environment variable TLES."""
- monkeypatch.setenv('TLES', os.path.join(fake_local_tles_dir, '*'))
+ monkeypatch.setenv("TLES", os.path.join(fake_local_tles_dir, "*"))
def test_get_config_path_no_env_defined(caplog, mock_env_ppp_config_dir_missing):
@@ -143,15 +146,15 @@ def test_get_config_path_no_env_defined(caplog, mock_env_ppp_config_dir_missing)
res = _get_config_path()
assert res == PKG_CONFIG_DIR
- assert caplog.text == ''
+ assert caplog.text == ""
def test_check_is_platform_supported_existing(caplog, mock_env_ppp_config_dir_missing):
"""Test the function to check if an existing platform is supported on default."""
with caplog.at_level(logging.INFO):
- check_is_platform_supported('NOAA-21')
+ check_is_platform_supported("NOAA-21")
- logoutput_lines = caplog.text.split('\n')
+ logoutput_lines = caplog.text.split("\n")
expected1 = "Satellite NOAA-21 is supported. NORAD number: 54234"
expected2 = "Satellite names and NORAD numbers are defined in {path}".format(path=PKG_CONFIG_DIR)
@@ -162,11 +165,11 @@ def test_check_is_platform_supported_existing(caplog, mock_env_ppp_config_dir_mi
def test_check_is_platform_supported_unknown(caplog, mock_env_ppp_config_dir_missing):
"""Test the function to check if an unknown platform is supported on default."""
- sat = 'UNKNOWN'
+ sat = "UNKNOWN"
with caplog.at_level(logging.INFO):
check_is_platform_supported(sat)
- logoutput_lines = caplog.text.split('\n')
+ logoutput_lines = caplog.text.split("\n")
expected1 = "Satellite {satellite} is NOT supported.".format(satellite=sat)
expected2 = ("Please add it to a local copy of the platforms.txt file and put in " +
@@ -178,19 +181,12 @@ def test_check_is_platform_supported_unknown(caplog, mock_env_ppp_config_dir_mis
assert expected3 in logoutput_lines[2]
-@patch(
- 'pyorbital.version.get_versions',
- return_value=dict([('version', '1.9.1+1.some-futur.dirty'),
- ('full-revisionid', 'some-future-git-version-hash'),
- ('dirty', True),
- ('error', None),
- ('date', '2023-01-20T09:37:30+0100')
- ])
-)
+@patch("pyorbital.get_version",
+ return_value="1.9.1+1.some-future.dirty")
def test_get_config_path_ppp_config_set_but_not_pyorbital_future(mock, caplog, monkeypatch):
"""Test getting the config path."""
- monkeypatch.setenv('SATPY_CONFIG_PATH', '/path/to/satpy/etc')
- monkeypatch.setenv('PPP_CONFIG_DIR', '/path/to/old/mpop/config/dir')
+ monkeypatch.setenv("SATPY_CONFIG_PATH", "/path/to/satpy/etc")
+ monkeypatch.setenv("PPP_CONFIG_DIR", "/path/to/old/mpop/config/dir")
with caplog.at_level(logging.WARNING):
res = _get_config_path()
@@ -208,31 +204,31 @@ def test_get_config_path_ppp_config_set_but_not_pyorbital_is_deprecated(caplog,
set but the deprecated (old) Satpy/MPOP one is set.
"""
- monkeypatch.setenv('SATPY_CONFIG_PATH', '/path/to/satpy/etc')
- monkeypatch.setenv('PPP_CONFIG_DIR', '/path/to/old/mpop/config/dir')
+ monkeypatch.setenv("SATPY_CONFIG_PATH", "/path/to/satpy/etc")
+ monkeypatch.setenv("PPP_CONFIG_DIR", "/path/to/old/mpop/config/dir")
with caplog.at_level(logging.WARNING):
res = _get_config_path()
- assert res == '/path/to/old/mpop/config/dir'
+ assert res == "/path/to/old/mpop/config/dir"
- log_output = ('The use of PPP_CONFIG_DIR is deprecated and will be removed in version 1.9!' +
- ' Please use PYORBITAL_CONFIG_PATH if you need a custom config path for pyorbital!')
+ log_output = ("The use of PPP_CONFIG_DIR is deprecated and will be removed in version 1.9!" +
+ " Please use PYORBITAL_CONFIG_PATH if you need a custom config path for pyorbital!")
assert log_output in caplog.text
def test_get_config_path_ppp_config_set_and_pyorbital(caplog, monkeypatch):
"""Test getting the config path."""
- pyorbital_config_dir = '/path/to/pyorbital/config/dir'
- monkeypatch.setenv('PYORBITAL_CONFIG_PATH', pyorbital_config_dir)
- monkeypatch.setenv('PPP_CONFIG_DIR', '/path/to/old/mpop/config/dir')
+ pyorbital_config_dir = "/path/to/pyorbital/config/dir"
+ monkeypatch.setenv("PYORBITAL_CONFIG_PATH", pyorbital_config_dir)
+ monkeypatch.setenv("PPP_CONFIG_DIR", "/path/to/old/mpop/config/dir")
with caplog.at_level(logging.WARNING):
res = _get_config_path()
assert res == pyorbital_config_dir
- assert caplog.text == ''
+ assert caplog.text == ""
def test_get_config_path_pyorbital_ppp_missing(caplog, monkeypatch, mock_env_ppp_config_dir_missing):
@@ -240,8 +236,8 @@ def test_get_config_path_pyorbital_ppp_missing(caplog, monkeypatch, mock_env_ppp
The old mpop PPP_CONFIG_PATH is not set but the PYORBITAL one is.
"""
- pyorbital_config_dir = '/path/to/pyorbital/config/dir'
- monkeypatch.setenv('PYORBITAL_CONFIG_PATH', pyorbital_config_dir)
+ pyorbital_config_dir = "/path/to/pyorbital/config/dir"
+ monkeypatch.setenv("PYORBITAL_CONFIG_PATH", pyorbital_config_dir)
with caplog.at_level(logging.DEBUG):
res = _get_config_path()
@@ -255,7 +251,7 @@ def test_get_config_path_pyorbital_ppp_missing(caplog, monkeypatch, mock_env_ppp
def test_read_platform_numbers(fake_platforms_file):
"""Test reading the platform names and associated catalougue numbers."""
res = read_platform_numbers(str(fake_platforms_file))
- assert res == {'NOAA-21': '54234', 'NOAA-20': '43013', 'UNKNOWN SATELLITE': '99999'}
+ assert res == {"NOAA-21": "54234", "NOAA-20": "43013", "UNKNOWN SATELLITE": "99999"}
def test_get_local_tle_path_tle_env_missing(mock_env_tles_missing):
@@ -277,12 +273,12 @@ def test_get_uris_and_open_func_using_tles_env(caplog, fake_local_tles_dir, monk
"""
from collections.abc import Sequence
- monkeypatch.setenv('TLES', str(os.path.join(fake_local_tles_dir, "*")))
+ monkeypatch.setenv("TLES", str(os.path.join(fake_local_tles_dir, "*")))
with caplog.at_level(logging.DEBUG):
uris, _ = _get_uris_and_open_func()
assert isinstance(uris, Sequence)
- assert uris[0] == str(fake_local_tles_dir / 'tle-202211180830.txt')
+ assert uris[0] == str(fake_local_tles_dir / "tle-202211180830.txt")
log_message = "Reading TLE from {msg}".format(msg=str(fake_local_tles_dir))
assert log_message in caplog.text
@@ -301,30 +297,30 @@ class TLETest(unittest.TestCase):
def check_example(self, tle):
"""Check the *tle* instance against predetermined values."""
# line 1
- self.assertEqual(tle.satnumber, "25544")
- self.assertEqual(tle.classification, "U")
- self.assertEqual(tle.id_launch_year, "98")
- self.assertEqual(tle.id_launch_number, "067")
- self.assertEqual(tle.id_launch_piece.strip(), "A")
- self.assertEqual(tle.epoch_year, "08")
- self.assertEqual(tle.epoch_day, 264.51782528)
+ assert tle.satnumber == "25544"
+ assert tle.classification == "U"
+ assert tle.id_launch_year == "98"
+ assert tle.id_launch_number == "067"
+ assert tle.id_launch_piece.strip() == "A"
+ assert tle.epoch_year == "08"
+ assert tle.epoch_day == 264.51782528
epoch = (datetime.datetime(2008, 1, 1)
+ datetime.timedelta(days=264.51782528 - 1))
- self.assertEqual(tle.epoch, epoch)
- self.assertEqual(tle.mean_motion_derivative, -.00002182)
- self.assertEqual(tle.mean_motion_sec_derivative, 0.0)
- self.assertEqual(tle.bstar, -.11606e-4)
- self.assertEqual(tle.ephemeris_type, 0)
- self.assertEqual(tle.element_number, 292)
+ assert tle.epoch == epoch
+ assert tle.mean_motion_derivative == -2.182e-05
+ assert tle.mean_motion_sec_derivative == 0.0
+ assert tle.bstar == -1.1606e-05
+ assert tle.ephemeris_type == 0
+ assert tle.element_number == 292
# line 2
- self.assertEqual(tle.inclination, 51.6416)
- self.assertEqual(tle.right_ascension, 247.4627)
- self.assertEqual(tle.excentricity, .0006703)
- self.assertEqual(tle.arg_perigee, 130.5360)
- self.assertEqual(tle.mean_anomaly, 325.0288)
- self.assertEqual(tle.mean_motion, 15.72125391)
- self.assertEqual(tle.orbit, 56353)
+ assert tle.inclination == 51.6416
+ assert tle.right_ascension == 247.4627
+ assert tle.excentricity == 0.0006703
+ assert tle.arg_perigee == 130.536
+ assert tle.mean_anomaly == 325.0288
+ assert tle.mean_motion == 15.72125391
+ assert tle.orbit == 56353
def test_from_line(self):
"""Test parsing from line elements."""
@@ -333,11 +329,11 @@ def test_from_line(self):
def test_from_file(self):
"""Test reading and parsing from a file."""
+ from os import close, remove, write
from tempfile import mkstemp
- from os import write, close, remove
filehandle, filename = mkstemp()
try:
- write(filehandle, "\n".join([line0, line1, line2]).encode('utf-8'))
+ write(filehandle, "\n".join([line0, line1, line2]).encode("utf-8"))
close(filehandle)
tle = Tle("ISS (ZARYA)", filename)
self.check_example(tle)
@@ -346,11 +342,11 @@ def test_from_file(self):
def test_from_file_with_hyphenated_platform_name(self):
"""Test reading and parsing from a file with a slightly different name."""
+ from os import close, remove, write
from tempfile import mkstemp
- from os import write, close, remove
filehandle, filename = mkstemp()
try:
- write(filehandle, NOAA19_3LINES.encode('utf-8'))
+ write(filehandle, NOAA19_3LINES.encode("utf-8"))
close(filehandle)
tle = Tle("NOAA-19", filename)
assert tle.satnumber == "33591"
@@ -359,11 +355,11 @@ def test_from_file_with_hyphenated_platform_name(self):
def test_from_file_with_no_platform_name(self):
"""Test reading and parsing from a file with a slightly different name."""
+ from os import close, remove, write
from tempfile import mkstemp
- from os import write, close, remove
filehandle, filename = mkstemp()
try:
- write(filehandle, NOAA19_2LINES.encode('utf-8'))
+ write(filehandle, NOAA19_2LINES.encode("utf-8"))
close(filehandle)
tle = Tle("NOAA-19", filename)
assert tle.satnumber == "33591"
@@ -376,8 +372,8 @@ def test_from_mmam_xml(self):
save_dir = TemporaryDirectory()
with save_dir:
- fname = os.path.join(save_dir.name, '20210420_Metop-B_ADMIN_MESSAGE_NO_127.xml')
- with open(fname, 'w') as fid:
+ fname = os.path.join(save_dir.name, "20210420_Metop-B_ADMIN_MESSAGE_NO_127.xml")
+ with open(fname, "w") as fid:
fid.write(tle_xml)
tle = Tle("", tle_file=fname)
self.check_example(tle)
@@ -410,7 +406,7 @@ def test_init(self):
"""Test the initialization."""
assert self.dl.config is self.config
- @mock.patch('pyorbital.tlefile.requests')
+ @mock.patch("pyorbital.tlefile.requests")
def test_fetch_plain_tle_not_configured(self, requests):
"""Test downloading and a TLE file from internet."""
requests.get = mock.MagicMock()
@@ -419,10 +415,10 @@ def test_fetch_plain_tle_not_configured(self, requests):
# Not configured
self.dl.config["downloaders"] = {}
res = self.dl.fetch_plain_tle()
- self.assertTrue(res == {})
+ assert res == {}
requests.get.assert_not_called()
- @mock.patch('pyorbital.tlefile.requests')
+ @mock.patch("pyorbital.tlefile.requests")
def test_fetch_plain_tle_two_sources(self, requests):
"""Test downloading and a TLE file from internet."""
requests.get = mock.MagicMock()
@@ -432,16 +428,16 @@ def test_fetch_plain_tle_two_sources(self, requests):
self.dl.config["downloaders"] = FETCH_PLAIN_TLE_CONFIG
res = self.dl.fetch_plain_tle()
- self.assertTrue("source_1" in res)
- self.assertEqual(len(res["source_1"]), 3)
- self.assertEqual(res["source_1"][0].line1, line1)
- self.assertEqual(res["source_1"][0].line2, line2)
- self.assertTrue("source_2" in res)
- self.assertEqual(len(res["source_2"]), 1)
- self.assertTrue(mock.call("mocked_url_1") in requests.get.mock_calls)
- self.assertEqual(len(requests.get.mock_calls), 4)
-
- @mock.patch('pyorbital.tlefile.requests')
+ assert "source_1" in res
+ assert len(res["source_1"]) == 3
+ assert res["source_1"][0].line1 == line1
+ assert res["source_1"][0].line2 == line2
+ assert "source_2" in res
+ assert len(res["source_2"]) == 1
+ assert mock.call("mocked_url_1", timeout=15) in requests.get.mock_calls
+ assert len(requests.get.mock_calls) == 4
+
+ @mock.patch("pyorbital.tlefile.requests")
def test_fetch_plain_tle_server_is_a_teapot(self, requests):
"""Test downloading a TLE file from internet."""
requests.get = mock.MagicMock()
@@ -453,14 +449,15 @@ def test_fetch_plain_tle_server_is_a_teapot(self, requests):
res = self.dl.fetch_plain_tle()
# The sources are in the dict ...
- self.assertEqual(len(res), 2)
+ assert len(res) == 2
# ... but there are no TLEs
- self.assertEqual(len(res["source_1"]), 0)
- self.assertEqual(len(res["source_2"]), 0)
- self.assertTrue(mock.call("mocked_url_1") in requests.get.mock_calls)
- self.assertEqual(len(requests.get.mock_calls), 4)
+ assert len(res["source_1"]) == 0
+ assert len(res["source_2"]) == 0
- @mock.patch('pyorbital.tlefile.requests')
+ assert mock.call("mocked_url_1", timeout=15) in requests.get.mock_calls
+ assert len(requests.get.mock_calls) == 4
+
+ @mock.patch("pyorbital.tlefile.requests")
def test_fetch_spacetrack_login_fails(self, requests):
"""Test downloading TLEs from space-track.org."""
mock_post = mock.MagicMock()
@@ -469,7 +466,7 @@ def test_fetch_spacetrack_login_fails(self, requests):
requests.Session.return_value.__enter__.return_value = mock_session
self.dl.config["platforms"] = {
- 25544: 'ISS'
+ 25544: "ISS"
}
self.dl.config["downloaders"] = FETCH_SPACETRACK_CONFIG
@@ -477,13 +474,13 @@ def test_fetch_spacetrack_login_fails(self, requests):
mock_post.return_value.status_code = 418
res = self.dl.fetch_spacetrack()
# Empty list of TLEs is returned
- self.assertTrue(res == [])
+ assert res == []
# The login was anyway attempted
mock_post.assert_called_with(
- 'https://www.space-track.org/ajaxauth/login',
- data={'identity': 'username', 'password': 'passw0rd'})
+ "https://www.space-track.org/ajaxauth/login",
+ data={"identity": "username", "password": "passw0rd"})
- @mock.patch('pyorbital.tlefile.requests')
+ @mock.patch("pyorbital.tlefile.requests")
def test_fetch_spacetrack_get_fails(self, requests):
"""Test downloading TLEs from space-track.org."""
mock_post = mock.MagicMock()
@@ -494,7 +491,7 @@ def test_fetch_spacetrack_get_fails(self, requests):
requests.Session.return_value.__enter__.return_value = mock_session
self.dl.config["platforms"] = {
- 25544: 'ISS'
+ 25544: "ISS"
}
self.dl.config["downloaders"] = FETCH_SPACETRACK_CONFIG
@@ -502,12 +499,12 @@ def test_fetch_spacetrack_get_fails(self, requests):
mock_post.return_value.status_code = 200
mock_get.return_value.status_code = 418
res = self.dl.fetch_spacetrack()
- self.assertTrue(res == [])
+ assert res == []
mock_get.assert_called_with("https://www.space-track.org/"
"basicspacedata/query/class/tle_latest/"
"ORDINAL/1/NORAD_CAT_ID/25544/format/tle")
- @mock.patch('pyorbital.tlefile.requests')
+ @mock.patch("pyorbital.tlefile.requests")
def test_fetch_spacetrack_success(self, requests):
"""Test downloading TLEs from space-track.org."""
mock_post = mock.MagicMock()
@@ -517,9 +514,9 @@ def test_fetch_spacetrack_success(self, requests):
mock_session.get = mock_get
requests.Session.return_value.__enter__.return_value = mock_session
- tle_text = '\n'.join((line0, line1, line2))
+ tle_text = "\n".join((line0, line1, line2))
self.dl.config["platforms"] = {
- 25544: 'ISS'
+ 25544: "ISS"
}
self.dl.config["downloaders"] = FETCH_SPACETRACK_CONFIG
@@ -528,34 +525,34 @@ def test_fetch_spacetrack_success(self, requests):
mock_get.return_value.status_code = 200
mock_get.return_value.text = tle_text
res = self.dl.fetch_spacetrack()
- self.assertEqual(len(res), 1)
- self.assertEqual(res[0].line1, line1)
- self.assertEqual(res[0].line2, line2)
+ assert len(res) == 1
+ assert res[0].line1 == line1
+ assert res[0].line2 == line2
def test_read_tle_files(self):
"""Test reading TLE files from a file system."""
from tempfile import TemporaryDirectory
- tle_text = '\n'.join((line0, line1, line2))
+ tle_text = "\n".join((line0, line1, line2))
save_dir = TemporaryDirectory()
with save_dir:
- fname = os.path.join(save_dir.name, 'tle_20200129_1600.txt')
- with open(fname, 'w') as fid:
+ fname = os.path.join(save_dir.name, "tle_20200129_1600.txt")
+ with open(fname, "w") as fid:
fid.write(tle_text)
# Add a non-existent file, it shouldn't cause a crash
- nonexistent = os.path.join(save_dir.name, 'not_here.txt')
+ nonexistent = os.path.join(save_dir.name, "not_here.txt")
# Use a wildcard to collect files (passed to glob)
- starred_fname = os.path.join(save_dir.name, 'tle*txt')
+ starred_fname = os.path.join(save_dir.name, "tle*txt")
self.dl.config["downloaders"] = {
"read_tle_files": {
"paths": [fname, nonexistent, starred_fname]
}
}
res = self.dl.read_tle_files()
- self.assertEqual(len(res), 2)
- self.assertEqual(res[0].line1, line1)
- self.assertEqual(res[0].line2, line2)
+ assert len(res) == 2
+ assert res[0].line1 == line1
+ assert res[0].line2 == line2
def test_read_xml_admin_messages(self):
"""Test reading TLE files from a file system."""
@@ -563,13 +560,13 @@ def test_read_xml_admin_messages(self):
save_dir = TemporaryDirectory()
with save_dir:
- fname = os.path.join(save_dir.name, '20210420_Metop-B_ADMIN_MESSAGE_NO_127.xml')
- with open(fname, 'w') as fid:
+ fname = os.path.join(save_dir.name, "20210420_Metop-B_ADMIN_MESSAGE_NO_127.xml")
+ with open(fname, "w") as fid:
fid.write(tle_xml)
# Add a non-existent file, it shouldn't cause a crash
- nonexistent = os.path.join(save_dir.name, 'not_here.txt')
+ nonexistent = os.path.join(save_dir.name, "not_here.txt")
# Use a wildcard to collect files (passed to glob)
- starred_fname = os.path.join(save_dir.name, '*.xml')
+ starred_fname = os.path.join(save_dir.name, "*.xml")
self.dl.config["downloaders"] = {
"read_xml_admin_messages": {
"paths": [fname, nonexistent, starred_fname]
@@ -579,17 +576,17 @@ def test_read_xml_admin_messages(self):
# There are two sets of TLEs in the file. And as the same file is
# parsed twice, 4 TLE objects are returned
- self.assertEqual(len(res), 4)
- self.assertEqual(res[0].line1, line1)
- self.assertEqual(res[0].line2, line2)
- self.assertEqual(res[1].line1, line1_2)
- self.assertEqual(res[1].line2, line2_2)
+ assert len(res) == 4
+ assert res[0].line1 == line1
+ assert res[0].line2 == line2
+ assert res[1].line1 == line1_2
+ assert res[1].line2 == line2_2
def _get_req_response(code):
req = mock.MagicMock()
req.status_code = code
- req.text = '\n'.join((line0, line1, line2))
+ req.text = "\n".join((line0, line1, line2))
return req
@@ -598,21 +595,21 @@ class TestSQLiteTLE(unittest.TestCase):
def setUp(self):
"""Create a database instance."""
- from pyorbital.tlefile import SQLiteTLE
- from pyorbital.tlefile import Tle
from tempfile import TemporaryDirectory
+ from pyorbital.tlefile import SQLiteTLE, Tle
+
self.temp_dir = TemporaryDirectory()
- self.db_fname = os.path.join(self.temp_dir.name, 'tle.db')
+ self.db_fname = os.path.join(self.temp_dir.name, "tle.db")
self.platforms = {25544: "ISS"}
self.writer_config = {
- "output_dir": os.path.join(self.temp_dir.name, 'tle_dir'),
+ "output_dir": os.path.join(self.temp_dir.name, "tle_dir"),
"filename_pattern": "tle_%Y%m%d_%H%M%S.%f.txt",
"write_name": True,
"write_always": False
}
self.db = SQLiteTLE(self.db_fname, self.platforms, self.writer_config)
- self.tle = Tle('ISS', line1=line1, line2=line2)
+ self.tle = Tle("ISS", line1=line1, line2=line2)
def tearDown(self):
"""Clean temporary files."""
@@ -621,73 +618,72 @@ def tearDown(self):
def test_init(self):
"""Test that the init did what it should have."""
- from pyorbital.tlefile import table_exists, PLATFORM_NAMES_TABLE
+ from pyorbital.tlefile import PLATFORM_NAMES_TABLE, table_exists
columns = [col.strip() for col in
- PLATFORM_NAMES_TABLE.strip('()').split(',')]
+ PLATFORM_NAMES_TABLE.strip("()").split(",")]
num_columns = len(columns)
- self.assertTrue(os.path.exists(self.db_fname))
- self.assertTrue(table_exists(self.db.db, "platform_names"))
- res = self.db.db.execute('select * from platform_names')
+ assert os.path.exists(self.db_fname)
+ assert table_exists(self.db.db, "platform_names")
+ res = self.db.db.execute("select * from platform_names")
names = [description[0] for description in res.description]
- self.assertEqual(len(names), num_columns)
+ assert len(names) == num_columns
for col in columns:
- self.assertTrue(col.split(' ')[0] in names)
+ assert col.split(" ")[0] in names
def test_update_db(self):
"""Test updating database with new data."""
- from pyorbital.tlefile import (table_exists, SATID_TABLE,
- ISO_TIME_FORMAT)
+ from pyorbital.tlefile import ISO_TIME_FORMAT, SATID_TABLE, table_exists
# Get the column names
columns = [col.strip() for col in
- SATID_TABLE.replace("'{}' (", "").strip(')').split(',')]
+ SATID_TABLE.replace("'{}' (", "").strip(")").split(",")]
# Platform number
satid = str(list(self.platforms.keys())[0])
# Data from a platform that isn't configured
self.db.platforms = {}
- self.db.update_db(self.tle, 'foo')
- self.assertFalse(table_exists(self.db.db, satid))
- self.assertFalse(self.db.updated)
+ self.db.update_db(self.tle, "foo")
+ assert not table_exists(self.db.db, satid)
+ assert not self.db.updated
# Configured platform
self.db.platforms = self.platforms
- self.db.update_db(self.tle, 'foo')
- self.assertTrue(table_exists(self.db.db, satid))
- self.assertTrue(self.db.updated)
+ self.db.update_db(self.tle, "foo")
+ assert table_exists(self.db.db, satid)
+ assert self.db.updated
# Check that all the columns were added
- res = self.db.db.execute("select * from '%s'" % satid)
+ res = self.db.db.execute(f"select * from '{satid:d}'") # noseq
names = [description[0] for description in res.description]
for col in columns:
- self.assertTrue(col.split(' ')[0] in names)
+ assert col.split(" ")[0] in names
# Check the data
data = res.fetchall()
- self.assertEqual(len(data), 1)
+ assert len(data) == 1
# epoch
- self.assertEqual(data[0][0], '2008-09-20T12:25:40.104192')
+ assert data[0][0] == "2008-09-20T12:25:40.104192"
# TLE
- self.assertEqual(data[0][1], '\n'.join((line1, line2)))
+ assert data[0][1] == "\n".join((line1, line2))
# Date when the data were added should be close to current time
date_added = datetime.datetime.strptime(data[0][2], ISO_TIME_FORMAT)
now = datetime.datetime.utcnow()
- self.assertTrue((now - date_added).total_seconds() < 1.0)
+ assert (now - date_added).total_seconds() < 1.0
# Source of the data
- self.assertTrue(data[0][3] == 'foo')
+ assert data[0][3] == "foo"
# Try to add the same data again. Nothing should change even
# if the source is different if the epoch is the same
- self.db.update_db(self.tle, 'bar')
- res = self.db.db.execute("select * from '%s'" % satid)
+ self.db.update_db(self.tle, "bar")
+ res = self.db.db.execute(f"select * from '{satid:d}'") # noseq
data = res.fetchall()
- self.assertEqual(len(data), 1)
+ assert len(data) == 1
date_added2 = datetime.datetime.strptime(data[0][2], ISO_TIME_FORMAT)
- self.assertEqual(date_added, date_added2)
+ assert date_added == date_added2
# Source of the data
- self.assertTrue(data[0][3] == 'foo')
+ assert data[0][3] == "foo"
def test_write_tle_txt(self):
"""Test reading data from the database and writing it to a file."""
@@ -695,7 +691,7 @@ def test_write_tle_txt(self):
tle_dir = self.writer_config["output_dir"]
# Put some data in the database
- self.db.update_db(self.tle, 'foo')
+ self.db.update_db(self.tle, "foo")
# Fake that the database hasn't been updated
self.db.updated = False
@@ -704,34 +700,34 @@ def test_write_tle_txt(self):
self.db.write_tle_txt()
# The output dir hasn't been created
- self.assertFalse(os.path.exists(tle_dir))
+ assert not os.path.exists(tle_dir)
self.db.updated = True
self.db.write_tle_txt()
# The dir should be there
- self.assertTrue(os.path.exists(tle_dir))
+ assert os.path.exists(tle_dir)
# There should be one file in the directory
- files = glob.glob(os.path.join(tle_dir, 'tle_*txt'))
- self.assertEqual(len(files), 1)
+ files = glob.glob(os.path.join(tle_dir, "tle_*txt"))
+ assert len(files) == 1
# The file should have been named with the date ('%' characters
# not there anymore)
- self.assertTrue('%' not in files[0])
+ assert "%" not in files[0]
# The satellite name should be in the file
- with open(files[0], 'r') as fid:
- data = fid.read().split('\n')
- self.assertEqual(len(data), 3)
- self.assertTrue('ISS' in data[0])
- self.assertEqual(data[1], line1)
- self.assertEqual(data[2], line2)
+ with open(files[0], "r") as fid:
+ data = fid.read().split("\n")
+ assert len(data) == 3
+ assert "ISS" in data[0]
+ assert data[1] == line1
+ assert data[2] == line2
# Call the writing again, nothing should be written. In
# real-life this assumes a re-run has been done without new
# TLE data
self.db.updated = False
self.db.write_tle_txt()
- files = glob.glob(os.path.join(tle_dir, 'tle_*txt'))
- self.assertEqual(len(files), 1)
+ files = glob.glob(os.path.join(tle_dir, "tle_*txt"))
+ assert len(files) == 1
# Force writing with every call
# Do not write the satellite name
@@ -740,10 +736,10 @@ def test_write_tle_txt(self):
# Wait a bit to ensure different filename
time.sleep(2)
self.db.write_tle_txt()
- files = sorted(glob.glob(os.path.join(tle_dir, 'tle_*txt')))
- self.assertEqual(len(files), 2)
- with open(files[1], 'r') as fid:
- data = fid.read().split('\n')
- self.assertEqual(len(data), 2)
- self.assertEqual(data[0], line1)
- self.assertEqual(data[1], line2)
+ files = sorted(glob.glob(os.path.join(tle_dir, "tle_*txt")))
+ assert len(files) == 2
+ with open(files[1], "r") as fid:
+ data = fid.read().split("\n")
+ assert len(data) == 2
+ assert data[0] == line1
+ assert data[1] == line2
diff --git a/pyorbital/tlefile.py b/pyorbital/tlefile.py
index d2f0410..6a827e4 100644
--- a/pyorbital/tlefile.py
+++ b/pyorbital/tlefile.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
-# Copyright (c) 2011-2023 Pytroll Community
+# Copyright (c) 2011-2024 Pytroll Community
#
# Author(s):
#
@@ -25,58 +25,63 @@
"""Classes and functions for handling TLE files."""
+import datetime as dt
+import glob
import io
import logging
-import datetime as dt
-from urllib.request import urlopen
import os
-import glob
-import numpy as np
-import requests
import sqlite3
-from xml.etree import ElementTree as ET
from itertools import zip_longest
+from urllib.request import urlopen
+
+#from xml.etree import ElementTree as ET
+import defusedxml.ElementTree as ET
+import numpy as np
+import requests
-TLE_GROUPS = ('active',
- 'weather',
- 'resource',
- 'cubesat',
- 'stations',
- 'sarsat',
- 'noaa',
- 'amateur',
- 'engineering')
-
-TLE_URLS = [f'https://celestrak.org/NORAD/elements/gp.php?GROUP={group}&FORMAT=tle'
+TLE_GROUPS = ("active",
+ "weather",
+ "resource",
+ "cubesat",
+ "stations",
+ "sarsat",
+ "noaa",
+ "amateur",
+ "engineering")
+
+TLE_URLS = [f"https://celestrak.org/NORAD/elements/gp.php?GROUP={group}&FORMAT=tle"
for group in TLE_GROUPS]
LOGGER = logging.getLogger(__name__)
-PKG_CONFIG_DIR = os.path.join(os.path.realpath(os.path.dirname(__file__)), 'etc')
+PKG_CONFIG_DIR = os.path.join(os.path.realpath(os.path.dirname(__file__)), "etc")
+
+class TleDownloadTimeoutError(Exception):
+ """TLE download timeout exception."""
def _check_support_limit_ppp_config_dir():
"""Check the version where PPP_CONFIG_DIR will no longer be supported."""
- from pyorbital import version
- return version.get_versions()['version'] >= '1.9'
+ from pyorbital import get_version
+ return get_version() >= "1.9"
def _get_config_path():
"""Get the config path for Pyorbital."""
- if 'PPP_CONFIG_DIR' in os.environ and 'PYORBITAL_CONFIG_PATH' not in os.environ:
+ if "PPP_CONFIG_DIR" in os.environ and "PYORBITAL_CONFIG_PATH" not in os.environ:
if _check_support_limit_ppp_config_dir():
LOGGER.warning(
- 'The use of PPP_CONFIG_DIR is no longer supported!' +
- ' Please use PYORBITAL_CONFIG_PATH if you need a custom config path for pyorbital!')
- LOGGER.debug('Using the package default for configuration: %s', PKG_CONFIG_DIR)
+ "The use of PPP_CONFIG_DIR is no longer supported!" +
+ " Please use PYORBITAL_CONFIG_PATH if you need a custom config path for pyorbital!")
+ LOGGER.debug("Using the package default for configuration: %s", PKG_CONFIG_DIR)
return PKG_CONFIG_DIR
else:
LOGGER.warning(
- 'The use of PPP_CONFIG_DIR is deprecated and will be removed in version 1.9!' +
- ' Please use PYORBITAL_CONFIG_PATH if you need a custom config path for pyorbital!')
- pyorbital_config_path = os.getenv('PPP_CONFIG_DIR', PKG_CONFIG_DIR)
+ "The use of PPP_CONFIG_DIR is deprecated and will be removed in version 1.9!" +
+ " Please use PYORBITAL_CONFIG_PATH if you need a custom config path for pyorbital!")
+ pyorbital_config_path = os.getenv("PPP_CONFIG_DIR", PKG_CONFIG_DIR)
else:
- pyorbital_config_path = os.getenv('PYORBITAL_CONFIG_PATH', PKG_CONFIG_DIR)
+ pyorbital_config_path = os.getenv("PYORBITAL_CONFIG_PATH", PKG_CONFIG_DIR)
LOGGER.debug("Path to the Pyorbital configuration (where e.g. platforms.txt is found): %s",
str(pyorbital_config_path))
@@ -89,9 +94,9 @@ def get_platforms_filepath():
Check that the file exists or raise an error.
"""
config_path = _get_config_path()
- platform_file = os.path.join(config_path, 'platforms.txt')
+ platform_file = os.path.join(config_path, "platforms.txt")
if not os.path.isfile(platform_file):
- platform_file = os.path.join(PKG_CONFIG_DIR, 'platforms.txt')
+ platform_file = os.path.join(PKG_CONFIG_DIR, "platforms.txt")
if not os.path.isfile(platform_file):
raise OSError("Platform file {filepath} does not exist!".format(filepath=platform_file))
@@ -102,15 +107,15 @@ def read_platform_numbers(filename, in_upper=False, num_as_int=False):
"""Read platform numbers from $PYORBITAL_CONFIG_PATH/platforms.txt."""
out_dict = {}
- with open(filename, 'r') as fid:
+ with open(filename, "r") as fid:
for row in fid:
# skip comment lines
- if not row.startswith('#'):
+ if not row.startswith("#"):
parts = row.split()
if len(parts) < 2:
continue
# The satellite name might have whitespace
- platform = ' '.join(parts[:-1])
+ platform = " ".join(parts[:-1])
num = parts[-1]
if in_upper:
platform = platform.upper()
@@ -161,12 +166,17 @@ def read(platform, tle_file=None, line1=None, line2=None):
"""
return Tle(platform, tle_file=tle_file, line1=line1, line2=line2)
+# req = urllib.request.Request('http://www.example.com')
+# with urllib.request.urlopen(req) as response:
+# the_page = response.read()
def fetch(destination):
"""Fetch TLE from internet and save it to `destination`."""
with io.open(destination, mode="w", encoding="utf-8") as dest:
for url in TLE_URLS:
- response = urlopen(url)
+ if not url.lower().startswith("http"):
+ raise ValueError(f"{str(url)} is not accepted!")
+ response = urlopen(url) # nosec
dest.write(response.read().decode("utf-8"))
@@ -248,7 +258,7 @@ def _read_tle(self):
if not tle:
raise KeyError("Found no TLE entry for '%s'" % self._platform)
- self._line1, self._line2 = tle.split('\n')
+ self._line1, self._line2 = tle.split("\n")
def _parse_tle(self):
"""Parse values from TLE data."""
@@ -272,7 +282,7 @@ def _read_tle_decimal(rep):
self.epoch_day = float(self._line1[20:32])
self.epoch = \
np.datetime64(dt.datetime.strptime(self.epoch_year, "%y") +
- dt.timedelta(days=self.epoch_day - 1), 'us')
+ dt.timedelta(days=self.epoch_day - 1), "us")
self.mean_motion_derivative = float(self._line1[33:43])
self.mean_motion_sec_derivative = _read_tle_decimal(self._line1[44:52])
self.bstar = _read_tle_decimal(self._line1[53:61])
@@ -295,20 +305,20 @@ def __str__(self):
import pprint
s_var = io.StringIO()
d_var = dict(([(k, v) for k, v in
- list(self.__dict__.items()) if k[0] != '_']))
+ list(self.__dict__.items()) if k[0] != "_"]))
pprint.pprint(d_var, s_var)
return s_var.getvalue()[:-1]
def _get_local_tle_path_from_env():
"""Get the path to possible local TLE files using the environment variable."""
- return os.environ.get('TLES')
+ return os.environ.get("TLES")
def _get_uris_and_open_func(tle_file=None):
"""Get the uri's and the adequate file open call for the TLE files."""
def _open(filename):
- return io.open(filename, 'rb')
+ return io.open(filename, "rb")
local_tle_path = _get_local_tle_path_from_env()
@@ -337,13 +347,13 @@ def _open(filename):
return uris, open_func
-def _get_first_tle(uris, open_func, platform=''):
+def _get_first_tle(uris, open_func, platform=""):
return _get_tles_from_uris(uris, open_func, platform=platform, only_first=True)
-def _get_tles_from_uris(uris, open_func, platform='', only_first=True):
+def _get_tles_from_uris(uris, open_func, platform="", only_first=True):
tles = []
- designator = "1 " + SATELLITES.get(platform, '')
+ designator = "1 " + SATELLITES.get(platform, "")
for url in uris:
fid = open_func(url)
for l_0 in fid:
@@ -375,7 +385,7 @@ def _get_tles_from_uris(uris, open_func, platform='', only_first=True):
def _decode(itm):
if isinstance(itm, str):
return itm
- return itm.decode('utf-8')
+ return itm.decode("utf-8")
PLATFORM_NAMES_TABLE = "(satid text primary key, platform_name text)"
@@ -402,7 +412,10 @@ def fetch_plain_tle(self):
tles[source] = []
failures = []
for uri in sources[source]:
- req = requests.get(uri)
+ try:
+ req = requests.get(uri, timeout=15) # 15 seconds
+ except requests.exceptions.Timeout:
+ raise TleDownloadTimeoutError(f"Failed to make request to {str(uri)} within 15 seconds!")
if req.status_code == 200:
tles[source] += _parse_tles_for_downloader((req.text,), io.StringIO)
else:
@@ -410,7 +423,7 @@ def fetch_plain_tle(self):
if len(failures) > 0:
logging.error(
"Could not fetch TLEs from %s, %d failure(s): [%s]",
- source, len(failures), ', '.join(failures))
+ source, len(failures), ", ".join(failures))
logging.info("Downloaded %d TLEs from %s",
len(tles[source]), source)
return tles
@@ -422,8 +435,8 @@ def fetch_spacetrack(self):
download_url = ("https://www.space-track.org/basicspacedata/query/"
"class/tle_latest/ORDINAL/1/NORAD_CAT_ID/%s/format/"
"tle")
- download_url = download_url % ','.join(
- [str(key) for key in self.config['platforms']])
+ download_url = download_url % ",".join(
+ [str(key) for key in self.config["platforms"]])
user = self.config["downloaders"]["fetch_spacetrack"]["user"]
password = self.config["downloaders"]["fetch_spacetrack"]["password"]
@@ -470,15 +483,15 @@ def read_xml_admin_messages(self):
def _parse_tles_for_downloader(item, open_func):
- return [Tle('', tle_file=io.StringIO(tle)) for tle in
- _get_tles_from_uris(item, open_func, platform='', only_first=False)]
+ return [Tle("", tle_file=io.StringIO(tle)) for tle in
+ _get_tles_from_uris(item, open_func, platform="", only_first=False)]
def collect_filenames(paths):
"""Collect all filenames from *paths*."""
fnames = []
for path in paths:
- if '*' in path:
+ if "*" in path:
fnames += glob.glob(path)
else:
if not os.path.exists(path):
@@ -494,10 +507,10 @@ def read_tles_from_mmam_xml_files(paths):
fnames = collect_filenames(paths)
tles = []
for fname in fnames:
- data = read_tle_from_mmam_xml_file(fname).split('\n')
+ data = read_tle_from_mmam_xml_file(fname).split("\n")
for two_lines in _group_iterable_to_chunks(2, data):
- tl_stream = io.StringIO('\n'.join(two_lines))
- tles.append(Tle('', tle_file=tl_stream))
+ tl_stream = io.StringIO("\n".join(two_lines))
+ tles.append(Tle("", tle_file=tl_stream))
return tles
@@ -559,7 +572,7 @@ def update_db(self, tle, source):
self.platforms[num], num)
cmd = SATID_VALUES.format(num)
epoch = tle.epoch.item().isoformat()
- tle = '\n'.join([tle.line1, tle.line2])
+ tle = "\n".join([tle.line1, tle.line2])
now = dt.datetime.utcnow().isoformat()
try:
with self.db:
@@ -572,7 +585,7 @@ def update_db(self, tle, source):
def write_tle_txt(self):
"""Write TLE data to a text file."""
- if not self.updated and not self.writer_config.get('write_always',
+ if not self.updated and not self.writer_config.get("write_always",
False):
return
pattern = os.path.join(self.writer_config["output_dir"],
@@ -588,9 +601,8 @@ def write_tle_txt(self):
for satid, platform_name in self.platforms.items():
if self.writer_config.get("write_name", False):
data.append(platform_name)
- query = ("SELECT epoch, tle FROM '%s' ORDER BY "
- "epoch DESC LIMIT 1" % satid)
- epoch, tle = self.db.execute(query).fetchone()
+ query = f"SELECT epoch, tle FROM '{satid:d}' ORDER BY epoch DESC LIMIT 1" # noseq
+ epoch, tle = self.db.execute(query).fetchone() # nosec
date_epoch = dt.datetime.strptime(epoch, ISO_TIME_FORMAT)
tle_age = (
dt.datetime.utcnow() - date_epoch).total_seconds() / 3600.
@@ -598,8 +610,8 @@ def write_tle_txt(self):
satid, platform_name, int(tle_age))
data.append(tle)
- with open(fname, 'w') as fid:
- fid.write('\n'.join(data))
+ with open(fname, "w") as fid:
+ fid.write("\n".join(data))
logging.info("Wrote %d TLEs to %s", len(data), fname)
@@ -612,14 +624,14 @@ def table_exists(db, name):
"""Check if the table 'name' exists in the database."""
name = str(name)
query = "SELECT 1 FROM sqlite_master WHERE type='table' and name=?"
- return db.execute(query, (name,)).fetchone() is not None
+ return db.execute(query, (name,)).fetchone() is not None # nosec
def main():
"""Run a test TLE reading."""
- tle_data = read('Noaa-19')
+ tle_data = read("Noaa-19")
print(tle_data)
-if __name__ == '__main__':
+if __name__ == "__main__":
main()