Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing test warnings #744

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 103 additions & 104 deletions pydub/audio_segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,132 +664,131 @@ def is_format(f):

return False

if is_format("wav"):
try:
try:
if is_format("wav"):
try:
if start_second is None and duration is None:
return cls._from_safe_wav(file)
elif start_second is not None and duration is None:
return cls._from_safe_wav(file)[start_second*1000:]
elif start_second is None and duration is not None:
return cls._from_safe_wav(file)[:duration*1000]
else:
return cls._from_safe_wav(file)[start_second*1000:(start_second+duration)*1000]
except:
file.seek(0)
elif is_format("raw") or is_format("pcm"):
sample_width = kwargs['sample_width']
frame_rate = kwargs['frame_rate']
channels = kwargs['channels']
metadata = {
'sample_width': sample_width,
'frame_rate': frame_rate,
'channels': channels,
'frame_width': channels * sample_width
}
if start_second is None and duration is None:
return cls._from_safe_wav(file)
return cls(data=file.read(), metadata=metadata)
elif start_second is not None and duration is None:
return cls._from_safe_wav(file)[start_second*1000:]
return cls(data=file.read(), metadata=metadata)[start_second*1000:]
elif start_second is None and duration is not None:
return cls._from_safe_wav(file)[:duration*1000]
return cls(data=file.read(), metadata=metadata)[:duration*1000]
else:
return cls._from_safe_wav(file)[start_second*1000:(start_second+duration)*1000]
except:
file.seek(0)
elif is_format("raw") or is_format("pcm"):
sample_width = kwargs['sample_width']
frame_rate = kwargs['frame_rate']
channels = kwargs['channels']
metadata = {
'sample_width': sample_width,
'frame_rate': frame_rate,
'channels': channels,
'frame_width': channels * sample_width
}
if start_second is None and duration is None:
return cls(data=file.read(), metadata=metadata)
elif start_second is not None and duration is None:
return cls(data=file.read(), metadata=metadata)[start_second*1000:]
elif start_second is None and duration is not None:
return cls(data=file.read(), metadata=metadata)[:duration*1000]
else:
return cls(data=file.read(), metadata=metadata)[start_second*1000:(start_second+duration)*1000]
return cls(data=file.read(), metadata=metadata)[start_second*1000:(start_second+duration)*1000]

conversion_command = [cls.converter,
'-y', # always overwrite existing files
]
conversion_command = [cls.converter,
'-y', # always overwrite existing files
]

# If format is not defined
# ffmpeg/avconv will detect it automatically
if format:
conversion_command += ["-f", format]
# If format is not defined
# ffmpeg/avconv will detect it automatically
if format:
conversion_command += ["-f", format]

if codec:
# force audio decoder
conversion_command += ["-acodec", codec]
if codec:
# force audio decoder
conversion_command += ["-acodec", codec]

read_ahead_limit = kwargs.get('read_ahead_limit', -1)
if filename:
conversion_command += ["-i", filename]
stdin_parameter = None
stdin_data = None
else:
if cls.converter == 'ffmpeg':
conversion_command += ["-read_ahead_limit", str(read_ahead_limit),
"-i", "cache:pipe:0"]
read_ahead_limit = kwargs.get('read_ahead_limit', -1)
if filename:
conversion_command += ["-i", filename]
stdin_parameter = None
stdin_data = None
else:
conversion_command += ["-i", "-"]
stdin_parameter = subprocess.PIPE
stdin_data = file.read()
if cls.converter == 'ffmpeg':
conversion_command += ["-read_ahead_limit", str(read_ahead_limit),
"-i", "cache:pipe:0"]
else:
conversion_command += ["-i", "-"]
stdin_parameter = subprocess.PIPE
stdin_data = file.read()

if codec:
info = None
else:
info = mediainfo_json(orig_file, read_ahead_limit=read_ahead_limit)
if info:
audio_streams = [x for x in info['streams']
if x['codec_type'] == 'audio']
# This is a workaround for some ffprobe versions that always say
# that mp3/mp4/aac/webm/ogg files contain fltp samples
audio_codec = audio_streams[0].get('codec_name')
if (audio_streams[0].get('sample_fmt') == 'fltp' and
audio_codec in ['mp3', 'mp4', 'aac', 'webm', 'ogg']):
bits_per_sample = 16
else:
bits_per_sample = audio_streams[0]['bits_per_sample']
if bits_per_sample == 8:
acodec = 'pcm_u8'
if codec:
info = None
else:
acodec = 'pcm_s%dle' % bits_per_sample

conversion_command += ["-acodec", acodec]
info = mediainfo_json(orig_file, read_ahead_limit=read_ahead_limit)
if info:
audio_streams = [x for x in info['streams']
if x['codec_type'] == 'audio']
# This is a workaround for some ffprobe versions that always say
# that mp3/mp4/aac/webm/ogg files contain fltp samples
audio_codec = audio_streams[0].get('codec_name')
if (audio_streams[0].get('sample_fmt') == 'fltp' and
audio_codec in ['mp3', 'mp4', 'aac', 'webm', 'ogg']):
bits_per_sample = 16
else:
bits_per_sample = audio_streams[0]['bits_per_sample']
if bits_per_sample == 8:
acodec = 'pcm_u8'
else:
acodec = 'pcm_s%dle' % bits_per_sample

conversion_command += [
"-vn", # Drop any video streams if there are any
"-f", "wav" # output options (filename last)
]
conversion_command += ["-acodec", acodec]

if start_second is not None:
conversion_command += ["-ss", str(start_second)]
conversion_command += [
"-vn", # Drop any video streams if there are any
"-f", "wav" # output options (filename last)
]

if duration is not None:
conversion_command += ["-t", str(duration)]
if start_second is not None:
conversion_command += ["-ss", str(start_second)]

conversion_command += ["-"]
if duration is not None:
conversion_command += ["-t", str(duration)]

if parameters is not None:
# extend arguments with arbitrary set
conversion_command.extend(parameters)
conversion_command += ["-"]

log_conversion(conversion_command)
if parameters is not None:
# extend arguments with arbitrary set
conversion_command.extend(parameters)

p = subprocess.Popen(conversion_command, stdin=stdin_parameter,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p_out, p_err = p.communicate(input=stdin_data)
log_conversion(conversion_command)

if p.returncode != 0 or len(p_out) == 0:
if close_file:
file.close()
raise CouldntDecodeError(
"Decoding failed. ffmpeg returned error code: {0}\n\nOutput from ffmpeg/avlib:\n\n{1}".format(
p.returncode, p_err.decode(errors='ignore') ))
p = subprocess.Popen(conversion_command, stdin=stdin_parameter,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p_out, p_err = p.communicate(input=stdin_data)

p_out = bytearray(p_out)
fix_wav_headers(p_out)
p_out = bytes(p_out)
obj = cls(p_out)
if p.returncode != 0 or len(p_out) == 0:
raise CouldntDecodeError(
"Decoding failed. ffmpeg returned error code: {0}\n\nOutput from ffmpeg/avlib:\n\n{1}".format(
p.returncode, p_err.decode(errors='ignore') ))

if close_file:
file.close()
p_out = bytearray(p_out)
fix_wav_headers(p_out)
p_out = bytes(p_out)
obj = cls(p_out)

if start_second is None and duration is None:
return obj
elif start_second is not None and duration is None:
return obj[0:]
elif start_second is None and duration is not None:
return obj[:duration * 1000]
else:
return obj[0:duration * 1000]
if start_second is None and duration is None:
return obj
elif start_second is not None and duration is None:
return obj[0:]
elif start_second is None and duration is not None:
return obj[:duration * 1000]
else:
return obj[0:duration * 1000]
finally:
if close_file:
file.close()

@classmethod
def from_mp3(cls, file, parameters=None):
Expand Down
29 changes: 21 additions & 8 deletions test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,18 @@ def test_export_pathlib_path(self):
seg1 = AudioSegment.from_file(self.mp3_path_str)
from pathlib import Path
path = Path(tempfile.gettempdir()) / 'pydub-test-export-8ajds.mp3'
out_f = None
try:
seg1.export(path, format='mp3')
out_f = seg1.export(path, format='mp3')
seg2 = AudioSegment.from_file(path, format='mp3')

self.assertTrue(len(seg1) > 0)
self.assertWithinTolerance(len(seg1),
len(seg2),
percentage=0.01)
finally:
if out_f:
out_f.close()
os.unlink(path)


Expand Down Expand Up @@ -280,7 +283,8 @@ def test_192khz_audio(self):
path = os.path.join(data_dir, base_file)
base = AudioSegment.from_file(path)

headers = extract_wav_headers(open(path, 'rb').read())
with open(path, 'rb') as f:
headers = extract_wav_headers(f.read())
data16_size = headers[-1].size
self.assertEqual(len(base.raw_data), data16_size)
self.assertEqual(base.frame_rate, 192000)
Expand Down Expand Up @@ -489,7 +493,8 @@ def test_set_channels(self):
if sys.platform == 'win32':
tmp_file.close()

mono.export(tmp_file.name, 'mp3')
out_f = mono.export(tmp_file.name, 'mp3')
out_f.close()
monomp3 = AudioSegment.from_mp3(tmp_file.name)

self.assertWithinTolerance(
Expand Down Expand Up @@ -640,6 +645,8 @@ def test_export_as_raw_with_parameters(self):
with self.assertRaises(AttributeError):
seg.export(format='raw', parameters=['-ar', '16000', '-ac', '1'])

@unittest.skipUnless('libvorbis' in get_supported_encoders(),
"Unsupported codecs")
def test_export_as_ogg(self):
seg = self.seg1
exported_ogg = seg.export(format='ogg')
Expand All @@ -649,6 +656,8 @@ def test_export_as_ogg(self):
len(seg),
percentage=0.01)

@unittest.skipUnless('libvorbis' in get_supported_encoders(),
"Unsupported codecs")
def test_export_forced_codec(self):
seg = self.seg1 + self.seg2

Expand Down Expand Up @@ -779,6 +788,8 @@ def test_export_ogg_as_mp3(self):
AudioSegment.from_file(self.ogg_file_path).export(tmp_mp3_file,
format="mp3")

@unittest.skipUnless('libvorbis' in get_supported_encoders(),
"Unsupported codecs")
def test_export_mp3_as_ogg(self):
with NamedTemporaryFile('w+b', suffix='.ogg') as tmp_ogg_file:
AudioSegment.from_file(self.mp3_file_path).export(tmp_ogg_file,
Expand All @@ -796,7 +807,7 @@ def test_export_mp3_as_webm(self):
AudioSegment.from_file(self.mp3_file_path).export(tmp_webm_file,
format="webm")

@unittest.skipUnless('aac' in get_supported_decoders(),
@unittest.skipUnless('aac' in get_supported_decoders() and 'libvorbis' in get_supported_encoders(),
"Unsupported codecs")
def test_export_mp4_as_ogg(self):
with NamedTemporaryFile('w+b', suffix='.ogg') as tmp_ogg_file:
Expand Down Expand Up @@ -973,7 +984,7 @@ def test_compress(self):
# average volume should be reduced
self.assertTrue(compressed.rms < self.seg1.rms)

@unittest.skipUnless('aac' in get_supported_decoders(),
@unittest.skipUnless('aac' in get_supported_decoders() and 'libvorbis' in get_supported_encoders(),
"Unsupported codecs")
def test_exporting_to_ogg_uses_default_codec_when_codec_param_is_none(self):
delete = sys.platform != 'win32'
Expand Down Expand Up @@ -1086,7 +1097,7 @@ def test_from_file_clean_fail(self):
tmp_wav_file.flush()
self.assertRaises(CouldntDecodeError, AudioSegment.from_file, tmp_wav_file.name)
files = os.listdir(tempfile.tempdir)
self.assertEquals(files, [os.path.basename(tmp_wav_file.name)])
self.assertEqual(files, [os.path.basename(tmp_wav_file.name)])

if sys.platform == 'win32':
os.remove(tmp_wav_file.name)
Expand All @@ -1109,7 +1120,7 @@ def setUp(self):

def test_split_on_silence_complete_silence(self):
seg = AudioSegment.silent(5000)
self.assertEquals( split_on_silence(seg), [] )
self.assertEqual( split_on_silence(seg), [] )

def test_split_on_silence_test1(self):
self.assertEqual(
Expand Down Expand Up @@ -1276,7 +1287,9 @@ def test_init_AudioSegment_data_buffer_with_bad_values_fails(self):

def test_exporting(self):
seg = AudioSegment.from_wav(self.wave_file)
exported = AudioSegment.from_wav(seg.export(format="wav"))
out_f = seg.export(format="wav")
exported = AudioSegment.from_wav(out_f)
out_f.close()

self.assertEqual(len(exported), len(seg))

Expand Down