mirror of
https://codeberg.org/polarisfm/youtube-dl
synced 2024-11-22 08:34:32 +01:00
Refactor
This commit is contained in:
parent
66a3e4551d
commit
b2d61f9698
@ -9,42 +9,44 @@ import unittest
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from youtube_dl.archive import Archive
|
||||
# TODO refactor test to actually test YoutubeDL archive functionality
|
||||
|
||||
|
||||
class TestArchive(unittest.TestCase):
|
||||
def setUp(self):
|
||||
cur_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
test_archive = os.path.join(cur_dir, "test_archive.txt")
|
||||
self.archive = Archive(test_archive)
|
||||
|
||||
def tearDown(self):
|
||||
if os.path.exists(self.archive.filepath):
|
||||
os.remove(self.archive.filepath)
|
||||
|
||||
def test_archive_disabled(self):
|
||||
self.assertTrue(Archive(None)._disabled)
|
||||
self.assertTrue(Archive("")._disabled)
|
||||
self.assertFalse(Archive("/anything")._disabled)
|
||||
|
||||
def test_archive_read_empty_file(self):
|
||||
self.archive._read_file()
|
||||
|
||||
def test_archive_exists(self):
|
||||
self.archive.record_download("dl_id")
|
||||
self.assertTrue("dl_id" in self.archive._data)
|
||||
self.assertTrue("dl_id" in self.archive)
|
||||
|
||||
def test_archive_not_exists(self):
|
||||
self.assertFalse("dl_id" in self.archive)
|
||||
|
||||
def test_archive_multiple_writes(self):
|
||||
self.archive.record_download("dl_id 1")
|
||||
self.archive.record_download("dl_id 2")
|
||||
self.archive.record_download("dl_id 3")
|
||||
expected = "dl_id 1" + "\n" + "dl_id 2" + "\n" + "dl_id 3" + "\n"
|
||||
with open(self.archive.filepath, "r", encoding="utf-8") as f_in:
|
||||
self.assertEqual(f_in.read(), expected)
|
||||
# from youtube_dl.archive import Archive
|
||||
#
|
||||
#
|
||||
# class TestArchive(unittest.TestCase):
|
||||
# def setUp(self):
|
||||
# cur_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
# test_archive = os.path.join(cur_dir, "test_archive.txt")
|
||||
# self.archive = Archive(test_archive)
|
||||
#
|
||||
# def tearDown(self):
|
||||
# if os.path.exists(self.archive.filepath):
|
||||
# os.remove(self.archive.filepath)
|
||||
#
|
||||
# def test_archive_disabled(self):
|
||||
# self.assertTrue(Archive(None)._disabled)
|
||||
# self.assertTrue(Archive("")._disabled)
|
||||
# self.assertFalse(Archive("/anything")._disabled)
|
||||
#
|
||||
# def test_archive_read_empty_file(self):
|
||||
# self.archive._read_file()
|
||||
#
|
||||
# def test_archive_exists(self):
|
||||
# self.archive.record_download("dl_id")
|
||||
# self.assertTrue("dl_id" in self.archive._data)
|
||||
# self.assertTrue("dl_id" in self.archive)
|
||||
#
|
||||
# def test_archive_not_exists(self):
|
||||
# self.assertFalse("dl_id" in self.archive)
|
||||
#
|
||||
# def test_archive_multiple_writes(self):
|
||||
# self.archive.record_download("dl_id 1")
|
||||
# self.archive.record_download("dl_id 2")
|
||||
# self.archive.record_download("dl_id 3")
|
||||
# expected = "dl_id 1" + "\n" + "dl_id 2" + "\n" + "dl_id 3" + "\n"
|
||||
# with open(self.archive.filepath, "r", encoding="utf-8") as f_in:
|
||||
# self.assertEqual(f_in.read(), expected)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -28,7 +28,6 @@ import random
|
||||
|
||||
from string import ascii_letters
|
||||
|
||||
from .archive import Archive
|
||||
from .compat import (
|
||||
compat_basestring,
|
||||
compat_cookiejar,
|
||||
@ -417,7 +416,8 @@ class YoutubeDL(object):
|
||||
'Parameter outtmpl is bytes, but should be a unicode string. '
|
||||
'Put from __future__ import unicode_literals at the top of your code file or consider switching to Python 3.x.')
|
||||
|
||||
self.archive = Archive(self.params.get('download_archive'))
|
||||
self._archive_filepath = self.params.get('download_archive')
|
||||
self._archive_set = set()
|
||||
|
||||
self._setup_opener()
|
||||
|
||||
@ -2097,15 +2097,40 @@ class YoutubeDL(object):
|
||||
return extractor.lower() + ' ' + video_id
|
||||
|
||||
def in_download_archive(self, info_dict):
|
||||
""" Checks if video from info_dict exists in archive"""
|
||||
|
||||
if self._archive_filepath is None:
|
||||
return False
|
||||
|
||||
vid_id = self._make_archive_id(info_dict)
|
||||
if not vid_id:
|
||||
return False # Incomplete video information
|
||||
return vid_id in self.archive
|
||||
|
||||
if vid_id in self._archive_set:
|
||||
return True
|
||||
|
||||
# Read the archive file
|
||||
try:
|
||||
with locked_file(self._archive_filepath, 'r', encoding='utf-8') as archive_file:
|
||||
for line in archive_file:
|
||||
self._archive_set.add(line.strip())
|
||||
if line.strip() == vid_id:
|
||||
return True
|
||||
except IOError as ioe:
|
||||
if ioe.errno != errno.ENOENT:
|
||||
raise
|
||||
return False
|
||||
|
||||
def record_download_archive(self, info_dict):
|
||||
""" Records a new entry in the archive for the info_dict """
|
||||
|
||||
if self._archive_filepath is None:
|
||||
return
|
||||
vid_id = self._make_archive_id(info_dict)
|
||||
assert vid_id
|
||||
self.archive.record_download(vid_id)
|
||||
with locked_file(self._archive_filepath, 'a', encoding='utf-8') as archive_file:
|
||||
archive_file.write(vid_id + '\n')
|
||||
self._archive_set.add(vid_id)
|
||||
|
||||
@staticmethod
|
||||
def format_resolution(format, default='unknown'):
|
||||
|
@ -1,64 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# coding: utf-8
|
||||
import errno
|
||||
|
||||
from youtube_dl.utils import locked_file
|
||||
|
||||
|
||||
class Archive(object):
|
||||
""" Class that manages the download Archive. Provides optimizations to avoid
|
||||
excessive file parsing.
|
||||
|
||||
Initializing options:
|
||||
filepath: str The filepath of the archive.
|
||||
|
||||
Properties:
|
||||
_data: set Container for holding a cache of the archive
|
||||
_disabled: bool When true, all functions are effectively void
|
||||
filepath: str The filepath of the archive
|
||||
|
||||
Methods:
|
||||
__contains__ Checks a video id (archive id) exists in archive
|
||||
_read_archive Reads archive from disk
|
||||
record_download Adds a new download to archive
|
||||
"""
|
||||
|
||||
def __init__(self, filepath):
|
||||
""" Instantiate Archive
|
||||
filepath: str or None. The filepath of the archive. If None or empty
|
||||
string is provided instance can still be used
|
||||
but it's effectively doing nothing."""
|
||||
|
||||
self.filepath = None if filepath == "" else filepath
|
||||
self._disabled = self.filepath is None
|
||||
self._data = set()
|
||||
|
||||
def record_download(self, archive_id):
|
||||
""" Records a new archive_id in the archive """
|
||||
if self._disabled:
|
||||
return
|
||||
|
||||
with locked_file(self.filepath, "a", encoding="utf-8") as file_out:
|
||||
file_out.write(archive_id + "\n")
|
||||
self._data.add(archive_id)
|
||||
|
||||
def _read_file(self):
|
||||
""" Reads the data from archive file """
|
||||
if self._disabled:
|
||||
return
|
||||
|
||||
try:
|
||||
with locked_file(self.filepath, "r", encoding="utf-8") as file_in:
|
||||
self._data.update(line.strip() for line in file_in)
|
||||
except IOError as err:
|
||||
if err.errno != errno.ENOENT:
|
||||
raise
|
||||
|
||||
def __contains__(self, item):
|
||||
if not isinstance(item, str):
|
||||
raise ValueError(
|
||||
"An archive contains only strings. Provided {}".format(type(item)))
|
||||
|
||||
if item not in self._data:
|
||||
self._read_file()
|
||||
return item in self._data
|
Loading…
Reference in New Issue
Block a user