This commit is contained in:
Sergio Kef 2020-10-23 04:45:40 +07:00 committed by GitHub
commit 8ce8db0f71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 70 additions and 6 deletions

53
test/test_archive.py Normal file
View File

@ -0,0 +1,53 @@
#!/usr/bin/env python
from __future__ import unicode_literals
# Allow direct execution
import os
import sys
import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# TODO refactor test to actually test YoutubeDL archive functionality
# from youtube_dl.archive import Archive
#
#
# class TestArchive(unittest.TestCase):
# def setUp(self):
# cur_dir = os.path.dirname(os.path.abspath(__file__))
# test_archive = os.path.join(cur_dir, "test_archive.txt")
# self.archive = Archive(test_archive)
#
# def tearDown(self):
# if os.path.exists(self.archive.filepath):
# os.remove(self.archive.filepath)
#
# def test_archive_disabled(self):
# self.assertTrue(Archive(None)._disabled)
# self.assertTrue(Archive("")._disabled)
# self.assertFalse(Archive("/anything")._disabled)
#
# def test_archive_read_empty_file(self):
# self.archive._read_file()
#
# def test_archive_exists(self):
# self.archive.record_download("dl_id")
# self.assertTrue("dl_id" in self.archive._data)
# self.assertTrue("dl_id" in self.archive)
#
# def test_archive_not_exists(self):
# self.assertFalse("dl_id" in self.archive)
#
# def test_archive_multiple_writes(self):
# self.archive.record_download("dl_id 1")
# self.archive.record_download("dl_id 2")
# self.archive.record_download("dl_id 3")
# expected = "dl_id 1" + "\n" + "dl_id 2" + "\n" + "dl_id 3" + "\n"
# with open(self.archive.filepath, "r", encoding="utf-8") as f_in:
# self.assertEqual(f_in.read(), expected)
if __name__ == "__main__":
unittest.main()

View File

@ -416,6 +416,9 @@ class YoutubeDL(object):
'Parameter outtmpl is bytes, but should be a unicode string. '
'Put from __future__ import unicode_literals at the top of your code file or consider switching to Python 3.x.')
self._archive_filepath = self.params.get('download_archive')
self._archive_set = set()
self._setup_opener()
if auto_init:
@ -2094,17 +2097,23 @@ class YoutubeDL(object):
return extractor.lower() + ' ' + video_id
def in_download_archive(self, info_dict):
fn = self.params.get('download_archive')
if fn is None:
""" Checks if video from info_dict exists in archive"""
if self._archive_filepath is None:
return False
vid_id = self._make_archive_id(info_dict)
if not vid_id:
return False # Incomplete video information
if vid_id in self._archive_set:
return True
# Read the archive file
try:
with locked_file(fn, 'r', encoding='utf-8') as archive_file:
with locked_file(self._archive_filepath, 'r', encoding='utf-8') as archive_file:
for line in archive_file:
self._archive_set.add(line.strip())
if line.strip() == vid_id:
return True
except IOError as ioe:
@ -2113,13 +2122,15 @@ class YoutubeDL(object):
return False
def record_download_archive(self, info_dict):
fn = self.params.get('download_archive')
if fn is None:
""" Records a new entry in the archive for the info_dict """
if self._archive_filepath is None:
return
vid_id = self._make_archive_id(info_dict)
assert vid_id
with locked_file(fn, 'a', encoding='utf-8') as archive_file:
with locked_file(self._archive_filepath, 'a', encoding='utf-8') as archive_file:
archive_file.write(vid_id + '\n')
self._archive_set.add(vid_id)
@staticmethod
def format_resolution(format, default='unknown'):