Files
libtorrent/bindings/python/test.py
T
2026-02-15 09:43:14 +01:00

1463 lines
54 KiB
Python

#!/usr/bin/env python3
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
from __future__ import annotations
import ast
from typing import cast, Generator
import libtorrent as lt
import unittest
import time
import datetime
import os
import shutil
import binascii
import subprocess as sub
import sys
import pickle
import threading
import tempfile
import socket
import select
import ssl
import http.server
import functools
import dummy_data
# include terminal interface for travis parallel executions of scripts which use
# terminal features: fix multiple stdin assignment at termios.tcgetattr
if os.name != "nt":
import pty
settings: "lt.settings_pack" = {
"alert_mask": lt.alert.category_t.all_categories,
"enable_dht": False,
"enable_lsd": False,
"enable_natpmp": False,
"enable_upnp": False,
"listen_interfaces": "0.0.0.0:0",
"file_pool_size": 1
}
def has_deprecated():
return hasattr(lt, 'version')
class test_create_torrent(unittest.TestCase):
def test_from_torrent_info(self):
ti = lt.torrent_info('unordered.torrent')
print(ti.ssl_cert())
ct = lt.create_torrent(ti)
entry = ct.generate()
content = lt.bencode(entry).strip()
with open('unordered.torrent', 'rb') as f:
file_content = bytearray(f.read().strip())
print(content)
print(file_content)
print(entry)
self.assertEqual(content, file_content)
def test_from_scratch(self):
fs = lt.file_storage()
fs.add_file('test/file1', 1000)
fs.add_file('test/file2', 2000)
self.assertEqual(fs.file_name(0), 'file1')
self.assertEqual(fs.file_name(1), 'file2')
ct = lt.create_torrent(fs, 0, lt.create_torrent.canonical_files_no_tail_padding)
ct.add_url_seed('foo')
ct.add_http_seed('bar')
ct.add_tracker('bar')
ct.set_root_cert('1234567890')
ct.add_collection('1337')
for i in range(ct.num_pieces()):
ct.set_hash(i, b'abababababababababab')
entry = ct.generate()
encoded = lt.bencode(entry)
print(encoded)
# zero out the creation date:
encoded = encoded.split(b'13:creation datei', 1)
encoded[1] = b'0e' + encoded[1].split(b'e', 1)[1]
encoded = b'13:creation datei'.join(encoded)
self.assertEqual(encoded, b'd8:announce3:bar13:creation datei0e9:httpseeds3:bar4:infod11:collectionsl4:1337e5:filesld6:lengthi1000e4:pathl5:file1eed4:attr1:p6:lengthi15384e4:pathl4:.pad5:15384eed6:lengthi2000e4:pathl5:file2eee4:name4:test12:piece lengthi16384e6:pieces40:abababababababababababababababababababab8:ssl-cert10:1234567890e8:url-list3:fooe')
class test_session_stats(unittest.TestCase):
def test_add_torrent_params(self):
atp = lt.add_torrent_params()
for field_name in dir(atp):
field = getattr(atp, field_name)
print(field_name, field)
atp.renamed_files = {}
atp.merkle_tree = []
atp.unfinished_pieces = {}
atp.have_pieces = []
atp.banned_peers = []
atp.verified_pieces = []
atp.piece_priorities = []
atp.url_seeds = []
def test_unique(self):
metrics = lt.session_stats_metrics()
self.assertTrue(len(metrics) > 40)
idx = set()
for m in metrics:
self.assertTrue(m.value_index not in idx)
idx.add(m.value_index)
def test_find_idx(self):
self.assertEqual(lt.find_metric_idx("peer.error_peers"), 0)
class test_torrent_handle(unittest.TestCase):
def setup(self):
self.ses = lt.session(settings)
self.ti = lt.torrent_info('url_seed_multi.torrent')
self.h = self.ses.add_torrent({
'ti': self.ti, 'save_path': os.getcwd(),
'flags': lt.torrent_flags.default_flags})
def test_add_torrent_error(self):
self.ses = lt.session(settings)
self.ti = lt.torrent_info('url_seed_multi.torrent')
with self.assertRaises(RuntimeError):
self.ses.add_torrent({'ti': self.ti, 'save_path': os.getcwd(), 'info_hashes': b'abababababababababab'})
def test_move_storage(self):
self.setup()
self.h.move_storage(u'test-dir')
self.h.move_storage(b'test-dir2')
self.h.move_storage('test-dir3')
self.h.move_storage(u'test-dir', flags=lt.move_flags_t.dont_replace)
self.h.move_storage(u'test-dir', flags=2)
self.h.move_storage(b'test-dir2', flags=2)
self.h.move_storage('test-dir3', flags=2)
def test_torrent_handle(self):
self.setup()
self.assertEqual(self.h.get_file_priorities(), [4, 4])
self.assertEqual(self.h.get_piece_priorities(), [4])
self.h.prioritize_files([0, 1])
# workaround for asynchronous priority update
time.sleep(1)
self.assertEqual(self.h.get_file_priorities(), [0, 1])
self.h.prioritize_pieces([0])
self.assertEqual(self.h.get_piece_priorities(), [0])
# also test the overload that takes a list of piece->priority mappings
self.h.prioritize_pieces([(0, 1)])
self.assertEqual(self.h.get_piece_priorities(), [1])
self.h.connect_peer(('127.0.0.1', 6881))
self.h.connect_peer(('127.0.0.2', 6881), source=4)
self.h.connect_peer(('127.0.0.3', 6881), flags=2)
self.h.connect_peer(('127.0.0.4', 6881), flags=2, source=4)
torrent_files = self.h.torrent_file()
print(torrent_files.map_file(0, 0, 0).piece)
print(self.h.queue_position())
def test_torrent_handle_in_set(self):
self.setup()
torrents = set()
torrents.add(self.h)
# get another instance of a torrent_handle that represents the same
# torrent. Make sure that when we add it to a set, it just replaces the
# existing object
t = self.ses.get_torrents()
self.assertEqual(len(t), 1)
for h in t:
torrents.add(h)
self.assertEqual(len(torrents), 1)
def test_torrent_handle_in_dict(self):
self.setup()
torrents = {}
torrents[self.h] = 'foo'
# get another instance of a torrent_handle that represents the same
# torrent. Make sure that when we add it to a dict, it just replaces the
# existing object
t = self.ses.get_torrents()
self.assertEqual(len(t), 1)
for h in t:
torrents[h] = 'bar'
self.assertEqual(len(torrents), 1)
self.assertEqual(torrents[self.h], 'bar')
def test_replace_trackers(self):
self.setup()
trackers = []
for idx, tracker_url in enumerate(('udp://tracker1.com', 'udp://tracker2.com')):
tracker = lt.announce_entry(tracker_url)
tracker.tier = idx
tracker.fail_limit = 2
trackers.append(tracker)
self.assertEqual(tracker.url, tracker_url)
self.h.replace_trackers(trackers)
new_trackers = self.h.trackers()
self.assertEqual(new_trackers[0]['url'], 'udp://tracker1.com')
self.assertEqual(new_trackers[1]['tier'], 1)
self.assertEqual(new_trackers[1]['fail_limit'], 2)
def test_pickle_trackers(self):
"""Test lt objects converters are working and trackers can be pickled"""
self.setup()
tracker = lt.announce_entry('udp://tracker1.com')
tracker.tier = 0
tracker.fail_limit = 1
trackers = [tracker]
self.h.replace_trackers(trackers)
# wait a bit until the endpoints list gets populated
while len(self.h.trackers()[0]['endpoints']) == 0:
time.sleep(0.1)
trackers = self.h.trackers()
self.assertEqual(trackers[0]['url'], 'udp://tracker1.com')
# this is not necessarily 0, it could also be (EHOSTUNREACH) if the
# local machine doesn't support the address family
expect_value = trackers[0]['endpoints'][0]['info_hashes'][0]['last_error']['value']
pickled_trackers = pickle.dumps(trackers)
unpickled_trackers = pickle.loads(pickled_trackers)
self.assertEqual(unpickled_trackers[0]['url'], 'udp://tracker1.com')
self.assertEqual(unpickled_trackers[0]['endpoints'][0]['info_hashes'][0]['last_error']['value'], expect_value)
def test_file_status(self):
self.setup()
status = self.h.file_status()
print(status)
def test_piece_deadlines(self):
self.setup()
self.h.clear_piece_deadlines()
def test_status_last_uploaded_downloaded(self):
# we want to check at seconds precision but can't control session
# time, wait for next full second to prevent second increment
time.sleep(1 - datetime.datetime.now().microsecond / 1000000.0)
self.setup()
st = self.h.status()
for attr in dir(st):
print('%s: %s' % (attr, getattr(st, attr)))
# last upload and download times are at session start time
self.assertEqual(st.last_upload, None)
self.assertEqual(st.last_download, None)
def test_serialize_trackers(self):
"""Test to ensure the dict contains only python built-in types"""
self.setup()
self.h.add_tracker({'url': 'udp://tracker1.com'})
tr = self.h.trackers()[0]
# wait a bit until the endpoints list gets populated
while len(tr['endpoints']) == 0:
time.sleep(0.1)
tr = self.h.trackers()[0]
import json
print(json.dumps(self.h.trackers()[0]))
def test_torrent_status(self):
self.setup()
st = self.h.status()
ti = st.handle
self.assertEqual(ti.info_hashes(), self.ti.info_hashes())
# make sure we can compare torrent_status objects
st2 = self.h.status()
self.assertEqual(st2, st)
print(st2)
def test_read_resume_data(self):
resume_data = lt.bencode({
'file-format': 'libtorrent resume file',
'info-hash': 'abababababababababab',
'name': 'test',
'save_path': '.',
'peers': '\x01\x01\x01\x01\x00\x01\x02\x02\x02\x02\x00\x02',
'file_priority': [0, 1, 1]})
tp = lt.read_resume_data(resume_data)
self.assertEqual(tp.name, 'test')
self.assertEqual(tp.info_hashes.v1, lt.sha1_hash('abababababababababab'))
self.assertEqual(tp.file_priorities, [0, 1, 1])
self.assertEqual(tp.peers, [('1.1.1.1', 1), ('2.2.2.2', 2)])
ses = lt.session(settings)
h = ses.add_torrent(tp)
for attr in dir(tp):
print('%s: %s' % (attr, getattr(tp, attr)))
h.connect_peer(('3.3.3.3', 3))
for i in range(0, 10):
alerts = ses.pop_alerts()
for a in alerts:
print(a.message())
time.sleep(0.1)
def test_scrape(self):
self.setup()
# this is just to make sure this function can be called like this
# from python
self.h.scrape_tracker()
def test_unknown_torrent_parameter(self):
self.ses = lt.session(settings)
try:
self.h = self.ses.add_torrent({'unexpected-key-name': ''})
self.assertFalse('should have thrown an exception')
except KeyError as e:
print(e)
def test_torrent_parameter(self):
self.ses = lt.session(settings)
self.ti = lt.torrent_info('url_seed_multi.torrent')
self.h = self.ses.add_torrent({
'ti': self.ti,
'save_path': os.getcwd(),
'trackers': ['http://test.com/announce'],
'dht_nodes': [('1.2.3.4', 6881), ('4.3.2.1', 6881)],
'file_priorities': [1, 1],
'http_seeds': ['http://test.com/file3'],
'url_seeds': ['http://test.com/announce-url'],
'peers': [('5.6.7.8', 6881)],
'banned_peers': [('8.7.6.5', 6881)],
'renamed_files': {0: 'test.txt', 2: 'test.txt'}
})
self.st = self.h.status()
self.assertEqual(self.st.save_path, os.getcwd())
trackers = self.h.trackers()
self.assertEqual(len(trackers), 1)
self.assertEqual(trackers[0].get('url'), 'http://test.com/announce')
self.assertEqual(trackers[0].get('tier'), 0)
self.assertEqual(self.h.get_file_priorities(), [1, 1])
self.assertEqual(self.h.http_seeds(), ['http://test.com/file3'])
# url_seeds was already set, test that it did not get overwritten
self.assertEqual(self.h.url_seeds(),
['http://test.com/announce-url/', 'http://test.com/file/'])
# piece priorities weren't set explicitly, but they were updated by the
# file priorities being set
self.assertEqual(self.h.get_piece_priorities(), [1])
self.assertEqual(self.st.verified_pieces, [])
class TestAddPiece(unittest.TestCase):
def setUp(self):
self.dir = tempfile.TemporaryDirectory()
self.session = lt.session(settings)
self.ti = lt.torrent_info(dummy_data.DICT)
self.atp = lt.add_torrent_params()
self.atp.ti = self.ti
self.atp.save_path = self.dir.name
self.handle = self.session.add_torrent(self.atp)
self.wait_for(lambda: self.handle.status().state != lt.torrent_status.checking_files
and self.handle.status().state != lt.torrent_status.checking_resume_data, msg="checking")
def wait_for(self, condition, msg="condition", timeout=5):
deadline = time.time() + timeout
while not condition():
self.assertLess(time.time(), deadline, msg="%s timed out" % msg)
time.sleep(0.1)
def wait_until_torrent_finished(self):
self.wait_for(lambda: self.handle.status().progress == 1.0, msg="progress")
def file_written():
with open(os.path.join(self.dir.name, dummy_data.NAME.decode()), mode="rb") as f:
return f.read() == dummy_data.DATA
self.wait_for(file_written, msg="file write")
def test_with_str(self):
for i, data in enumerate(dummy_data.PIECES):
self.handle.add_piece(i, data.decode(), 0)
self.wait_until_torrent_finished()
def test_with_bytes(self):
for i, data in enumerate(dummy_data.PIECES):
self.handle.add_piece(i, data, 0)
self.wait_until_torrent_finished()
class test_load_torrent(unittest.TestCase):
def test_bytearray(self):
# a bytearray object is interpreted as a bencoded buffer
atp = lt.load_torrent_buffer(bytearray(lt.bencode({'info': {
'name': 'test_torrent', 'length': 1234,
'piece length': 16 * 1024,
'pieces': 'aaaaaaaaaaaaaaaaaaaa'}})))
self.assertEqual(atp.ti.num_files(), 1)
def test_bytes(self):
# a bytes object is interpreted as a bencoded buffer
atp = lt.load_torrent_buffer(bytes(lt.bencode({'info': {
'name': 'test_torrent', 'length': 1234,
'piece length': 16 * 1024,
'pieces': 'aaaaaaaaaaaaaaaaaaaa'}})))
self.assertEqual(atp.ti.num_files(), 1)
def test_info_section(self):
atp = lt.load_torrent_file('base.torrent')
self.assertTrue(len(atp.ti.info_section()) != 0)
self.assertTrue(len(atp.ti.hash_for_piece(0)) != 0)
class test_torrent_info(unittest.TestCase):
def test_non_ascii_file(self):
try:
shutil.copy('base.torrent', 'base-\u745E\u5177.torrent')
except shutil.SameFileError:
pass
ti = lt.torrent_info('base-\u745E\u5177.torrent')
self.assertTrue(len(ti.info_section()) != 0)
self.assertTrue(len(ti.hash_for_piece(0)) != 0)
def test_bencoded_constructor(self):
# things that can be converted to a bencoded entry, will be interpreted
# as such and encoded
info = lt.torrent_info({'info': {
'name': 'test_torrent', 'length': 1234,
'piece length': 16 * 1024,
'pieces': 'aaaaaaaaaaaaaaaaaaaa'}})
self.assertEqual(info.num_files(), 1)
f = info.files()
self.assertEqual(f.file_path(0), 'test_torrent')
self.assertEqual(f.file_name(0), 'test_torrent')
self.assertEqual(f.file_size(0), 1234)
self.assertEqual(info.total_size(), 1234)
self.assertEqual(info.creation_date(), 0)
def test_bytearray(self):
# a bytearray object is interpreted as a bencoded buffer
info = lt.torrent_info(bytearray(lt.bencode({'info': {
'name': 'test_torrent', 'length': 1234,
'piece length': 16 * 1024,
'pieces': 'aaaaaaaaaaaaaaaaaaaa'}})))
self.assertEqual(info.num_files(), 1)
def test_bytes(self):
# a bytes object is interpreted as a bencoded buffer
info = lt.torrent_info(bytes(lt.bencode({'info': {
'name': 'test_torrent', 'length': 1234,
'piece length': 16 * 1024,
'pieces': 'aaaaaaaaaaaaaaaaaaaa'}})))
self.assertEqual(info.num_files(), 1)
def test_load_decode_depth_limit(self):
self.assertRaises(RuntimeError, lambda: lt.torrent_info(
{'test': {'test': {'test': {'test': {'test': {}}}}}, 'info': {
'name': 'test_torrent', 'length': 1234,
'piece length': 16 * 1024,
'pieces': 'aaaaaaaaaaaaaaaaaaaa'}}, {'max_decode_depth': 1}))
def test_load_max_pieces_limit(self):
self.assertRaises(RuntimeError, lambda: lt.torrent_info(
{'info': {
'name': 'test_torrent', 'length': 1234000,
'piece length': 16 * 1024,
'pieces': 'aaaaaaaaaaaaaaaaaaaa'}}, {'max_pieces': 1}))
def test_load_max_buffer_size_limit(self):
self.assertRaises(RuntimeError, lambda: lt.torrent_info(
{'info': {
'name': 'test_torrent', 'length': 1234000,
'piece length': 16 * 1024,
'pieces': 'aaaaaaaaaaaaaaaaaaaa'}}, {'max_buffer_size': 1}))
def test_info_section(self):
ti = lt.torrent_info('base.torrent')
self.assertTrue(len(ti.info_section()) != 0)
self.assertTrue(len(ti.hash_for_piece(0)) != 0)
def test_torrent_info_bytes_overload(self):
# bytes will never be interpreted as a file name. It's interpreted as a
# bencoded buffer
with self.assertRaises(RuntimeError):
ti = lt.torrent_info(b'base.torrent')
def test_web_seeds(self):
ti = lt.torrent_info('base.torrent')
ws = [{'url': 'http://foo/test', 'auth': '', 'type': 0},
{'url': 'http://bar/test', 'auth': '', 'type': 1}]
ti.set_web_seeds(ws)
web_seeds = ti.web_seeds()
self.assertEqual(len(ws), len(web_seeds))
for i in range(len(web_seeds)):
self.assertEqual(web_seeds[i]["url"], ws[i]["url"])
self.assertEqual(web_seeds[i]["auth"], ws[i]["auth"])
self.assertEqual(web_seeds[i]["type"], ws[i]["type"])
def test_announce_entry(self):
ae = lt.announce_entry('test')
self.assertEqual(ae.url, 'test')
self.assertEqual(ae.tier, 0)
self.assertEqual(ae.verified, False)
self.assertEqual(ae.source, 0)
def test_torrent_info_sha1_overload(self):
ti = lt.torrent_info(lt.info_hash_t(lt.sha1_hash(b'a' * 20)))
self.assertEqual(ti.info_hash(), lt.sha1_hash(b'a' * 20))
self.assertEqual(ti.info_hashes().v1, lt.sha1_hash(b'a' * 20))
ti_copy = lt.torrent_info(ti)
self.assertEqual(ti_copy.info_hash(), lt.sha1_hash(b'a' * 20))
self.assertEqual(ti_copy.info_hashes().v1, lt.sha1_hash(b'a' * 20))
def test_torrent_info_sha256_overload(self):
ti = lt.torrent_info(lt.info_hash_t(lt.sha256_hash(b'a' * 32)))
self.assertEqual(ti.info_hashes().v2, lt.sha256_hash(b'a' * 32))
ti_copy = lt.torrent_info(ti)
self.assertEqual(ti_copy.info_hashes().v2, lt.sha256_hash(b'a' * 32))
def test_url_seed(self):
ti = lt.torrent_info('base.torrent')
ti.add_tracker('foobar1')
ti.add_url_seed('foobar2')
ti.add_url_seed('foobar3', 'username:password')
ti.add_url_seed('foobar4', 'username:password', [])
seeds = ti.web_seeds()
self.assertEqual(seeds, [
{'url': 'foobar2', 'type': 0, 'auth': ''},
{'url': 'foobar3', 'type': 0, 'auth': 'username:password'},
{'url': 'foobar4', 'type': 0, 'auth': 'username:password'},
])
def test_http_seed(self):
ti = lt.torrent_info('base.torrent')
ti.add_http_seed('foobar2')
ti.add_http_seed('foobar3', 'username:password')
ti.add_http_seed('foobar4', 'username:password', [])
seeds = ti.web_seeds()
self.assertEqual(seeds, [
{'url': 'foobar2', 'type': 1, 'auth': ''},
{'url': 'foobar3', 'type': 1, 'auth': 'username:password'},
{'url': 'foobar4', 'type': 1, 'auth': 'username:password'},
])
class test_alerts(unittest.TestCase):
def test_alert(self):
ses = lt.session(settings)
ti = lt.torrent_info('base.torrent')
h = ses.add_torrent({'ti': ti, 'save_path': os.getcwd()})
st = h.status()
time.sleep(1)
ses.remove_torrent(h)
ses.wait_for_alert(1000) # milliseconds
alerts = ses.pop_alerts()
for a in alerts:
if a.what() == 'add_torrent_alert':
self.assertEqual(cast(lt.add_torrent_alert, a).torrent_name, 'temp')
print(a.message())
for field_name in dir(a):
if field_name.startswith('__'):
continue
field = getattr(a, field_name)
if callable(field):
print(' ', field_name, ' = ', field())
else:
print(' ', field_name, ' = ', field)
print(st.next_announce)
self.assertEqual(st.name, 'temp')
print(st.errc.message())
print(st.pieces)
print(st.last_seen_complete)
print(st.completed_time)
print(st.progress)
print(st.num_pieces)
print(st.distributed_copies)
print(st.info_hashes)
print(st.seeding_duration)
print(st.last_upload)
print(st.last_download)
self.assertEqual(st.save_path, os.getcwd())
def test_alert_fs(self):
ses = lt.session(settings)
s1, s2 = socket.socketpair()
ses.set_alert_fd(s2.fileno())
ses.pop_alerts()
# make sure there's an alert to wake us up
ses.post_session_stats()
read_sockets, write_sockets, error_sockets = select.select([s1], [], [])
self.assertEqual(len(read_sockets), 1)
for s in read_sockets:
s.recv(10)
def test_pop_alerts(self):
ses = lt.session(settings)
ses.async_add_torrent(
{"ti": lt.torrent_info("base.torrent"), "save_path": "."})
# this will cause an error (because of duplicate torrents) and the
# torrent_info object created here will be deleted once the alert goes out
# of scope. When that happens, it will decrement the python object, to allow
# it to release the object.
# we're trying to catch the error described in this post, with regards to
# torrent_info.
# https://mail.python.org/pipermail/cplusplus-sig/2007-June/012130.html
ses.async_add_torrent(
{"ti": lt.torrent_info("base.torrent"), "save_path": "."})
time.sleep(1)
for i in range(0, 10):
alerts = ses.pop_alerts()
for a in alerts:
print(a.message())
time.sleep(0.1)
def test_alert_notify(self):
ses = lt.session(settings)
event = threading.Event()
def callback():
event.set()
ses.set_alert_notify(callback)
ses.async_add_torrent(
{"ti": lt.torrent_info("base.torrent"), "save_path": "."})
event.wait()
class test_bencoder(unittest.TestCase):
def test_bencode(self):
encoded = lt.bencode({'a': 1, 'b': [1, 2, 3], 'c': 'foo'})
self.assertEqual(encoded, b'd1:ai1e1:bli1ei2ei3ee1:c3:fooe')
def test_bdecode(self):
encoded = b'd1:ai1e1:bli1ei2ei3ee1:c3:fooe'
decoded = lt.bdecode(encoded)
self.assertEqual(decoded, {b'a': 1, b'b': [1, 2, 3], b'c': b'foo'})
def test_string(self):
encoded = lt.bencode('foo\u00e5\u00e4\u00f6')
self.assertEqual(encoded, b'9:foo\xc3\xa5\xc3\xa4\xc3\xb6')
def test_bytes(self):
encoded = lt.bencode(b'foo')
self.assertEqual(encoded, b'3:foo')
def test_float(self):
# TODO: this should throw a TypeError in the future
with self.assertWarns(DeprecationWarning):
encoded = lt.bencode(1.337)
self.assertEqual(encoded, b'0:')
def test_object(self):
class FooBar:
dummy = 1
# TODO: this should throw a TypeError in the future
with self.assertWarns(DeprecationWarning):
encoded = lt.bencode(FooBar())
self.assertEqual(encoded, b'0:')
def test_preformatted(self):
encoded = lt.bencode((1, 2, 3, 4, 5))
self.assertEqual(encoded, b'\x01\x02\x03\x04\x05')
class test_sha1hash(unittest.TestCase):
def test_sha1hash(self):
h = 'a0' * 20
s = lt.sha1_hash(binascii.unhexlify(h))
self.assertEqual(h, str(s))
def test_hash(self):
self.assertNotEqual(hash(lt.sha1_hash(b'b' * 20)), hash(lt.sha1_hash(b'a' * 20)))
self.assertEqual(hash(lt.sha1_hash(b'b' * 20)), hash(lt.sha1_hash(b'b' * 20)))
class test_sha256hash(unittest.TestCase):
def test_sha1hash(self):
h = 'a0' * 32
s = lt.sha256_hash(binascii.unhexlify(h))
self.assertEqual(h, str(s))
def test_hash(self):
self.assertNotEqual(hash(lt.sha256_hash(b'b' * 32)), hash(lt.sha256_hash(b'a' * 32)))
self.assertEqual(hash(lt.sha256_hash(b'b' * 32)), hash(lt.sha256_hash(b'b' * 32)))
class test_info_hash(unittest.TestCase):
def test_info_hash(self):
s1 = lt.sha1_hash(b'a' * 20)
s2 = lt.sha256_hash(b'b' * 32)
ih1 = lt.info_hash_t(s1)
self.assertTrue(ih1.has_v1())
self.assertFalse(ih1.has_v2())
self.assertEqual(ih1.v1, s1)
ih2 = lt.info_hash_t(s2)
self.assertFalse(ih2.has_v1())
self.assertTrue(ih2.has_v2())
self.assertEqual(ih2.v2, s2)
ih12 = lt.info_hash_t(s1, s2)
self.assertTrue(ih12.has_v1())
self.assertTrue(ih12.has_v2())
self.assertEqual(ih12.v1, s1)
self.assertEqual(ih12.v2, s2)
self.assertNotEqual(hash(ih1), hash(ih2))
self.assertNotEqual(hash(ih1), hash(ih12))
self.assertEqual(hash(ih1), hash(lt.info_hash_t(s1)))
self.assertEqual(hash(ih2), hash(lt.info_hash_t(s2)))
self.assertEqual(hash(ih12), hash(lt.info_hash_t(s1, s2)))
class test_magnet_link(unittest.TestCase):
def test_parse_magnet_uri(self):
ses = lt.session({})
magnet = 'magnet:?xt=urn:btih:C6EIF4CCYDBTIJVG3APAGM7M4NDONCTI'
p = lt.parse_magnet_uri(magnet)
self.assertEqual(str(p.info_hashes.v1), '178882f042c0c33426a6d81e0333ece346e68a68')
p.save_path = '.'
h = ses.add_torrent(p)
self.assertEqual(str(h.info_hash()), '178882f042c0c33426a6d81e0333ece346e68a68')
self.assertEqual(str(h.info_hashes().v1), '178882f042c0c33426a6d81e0333ece346e68a68')
def test_parse_magnet_uri_dict(self):
ses = lt.session({})
magnet = 'magnet:?xt=urn:btih:C6EIF4CCYDBTIJVG3APAGM7M4NDONCTI'
p = lt.parse_magnet_uri_dict(magnet)
self.assertEqual(binascii.hexlify(p['info_hashes']), b'178882f042c0c33426a6d81e0333ece346e68a68')
p['save_path'] = '.'
h = ses.add_torrent(p)
self.assertEqual(str(h.info_hash()), '178882f042c0c33426a6d81e0333ece346e68a68')
self.assertEqual(str(h.info_hashes().v1), '178882f042c0c33426a6d81e0333ece346e68a68')
def test_add_deprecated_magnet_link(self):
ses = lt.session()
atp = lt.add_torrent_params()
atp.info_hashes = lt.info_hash_t(lt.sha1_hash(b"a" * 20))
atp.save_path = "."
h = ses.add_torrent(atp)
self.assertTrue(h.status().info_hashes == lt.info_hash_t(lt.sha1_hash(b"a" * 20)))
def test_add_magnet_link(self):
ses = lt.session()
atp = lt.add_torrent_params()
atp.save_path = "."
atp.info_hash = lt.sha1_hash(b"a" * 20)
h = ses.add_torrent(atp)
self.assertTrue(h.status().info_hashes == lt.info_hash_t(lt.sha1_hash(b"a" * 20)))
class test_peer_class(unittest.TestCase):
def test_peer_class_ids(self):
s = lt.session(settings)
print('global_peer_class_id:', lt.session.global_peer_class_id)
print('tcp_peer_class_id:', lt.session.tcp_peer_class_id)
print('local_peer_class_id:', lt.session.local_peer_class_id)
print('global: ', s.get_peer_class(s.global_peer_class_id))
print('tcp: ', s.get_peer_class(s.local_peer_class_id))
print('local: ', s.get_peer_class(s.local_peer_class_id))
def test_peer_class(self):
s = lt.session(settings)
c = s.create_peer_class('test class')
print('new class: ', s.get_peer_class(c))
nfo = s.get_peer_class(c)
self.assertEqual(nfo['download_limit'], 0)
self.assertEqual(nfo['upload_limit'], 0)
self.assertEqual(nfo['ignore_unchoke_slots'], False)
self.assertEqual(nfo['connection_limit_factor'], 100)
self.assertEqual(nfo['download_priority'], 1)
self.assertEqual(nfo['upload_priority'], 1)
self.assertEqual(nfo['label'], 'test class')
nfo['download_limit'] = 1337
nfo['upload_limit'] = 1338
nfo['ignore_unchoke_slots'] = True
nfo['connection_limit_factor'] = 42
nfo['download_priority'] = 2
nfo['upload_priority'] = 3
s.set_peer_class(c, nfo)
nfo2 = s.get_peer_class(c)
self.assertEqual(nfo, nfo2)
def test_peer_class_filter(self):
filt = lt.peer_class_type_filter()
filt.add(lt.peer_class_type_filter.tcp_socket, lt.session.global_peer_class_id)
filt.remove(lt.peer_class_type_filter.utp_socket, lt.session.local_peer_class_id)
filt.disallow(lt.peer_class_type_filter.tcp_socket, lt.session.global_peer_class_id)
filt.allow(lt.peer_class_type_filter.utp_socket, lt.session.local_peer_class_id)
def test_peer_class_ip_filter(self):
s = lt.session(settings)
s.set_peer_class_type_filter(lt.peer_class_type_filter())
s.set_peer_class_filter(lt.ip_filter())
class test_ip_filter(unittest.TestCase):
def test_export(self):
f = lt.ip_filter()
self.assertEqual(f.access('1.1.1.1'), 0)
f.add_rule('1.1.1.1', '1.1.1.2', 1)
self.assertEqual(f.access('1.1.1.0'), 0)
self.assertEqual(f.access('1.1.1.1'), 1)
self.assertEqual(f.access('1.1.1.2'), 1)
self.assertEqual(f.access('1.1.1.3'), 0)
exp = f.export_filter()
self.assertEqual(exp, ([('0.0.0.0', '1.1.1.0'), ('1.1.1.1', '1.1.1.2'), ('1.1.1.3', '255.255.255.255')], [('::', 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff')]))
class test_session(unittest.TestCase):
def test_settings(self):
sett: "lt.settings_pack" = {'alert_mask': lt.alert.category_t.all_categories }
s = lt.session(sett)
sett = s.get_settings()
self.assertEqual(sett['alert_mask'] & 0x7fffffff, 0x7fffffff)
def test_session_params(self):
sp = lt.session_params()
sp.settings = {'alert_mask': lt.alert.category_t.all_categories }
s = lt.session(sp)
sett = s.get_settings()
self.assertEqual(sett['alert_mask'] & 0x7fffffff, 0x7fffffff)
def test_session_params_constructor(self):
sp = lt.session_params({'alert_mask': lt.alert.category_t.all_categories })
s = lt.session(sp)
sett = s.get_settings()
self.assertEqual(sett['alert_mask'] & 0x7fffffff, 0x7fffffff)
def test_session_params_ip_filter(self):
sp = lt.session_params()
sp.ip_filter.add_rule("1.1.1.1", "1.1.1.2", 1337)
self.assertEqual(sp.ip_filter.access("1.1.1.1"), 1337)
self.assertEqual(sp.ip_filter.access("1.1.1.2"), 1337)
self.assertEqual(sp.ip_filter.access("1.1.1.3"), 0)
def test_session_params_roundtrip_buf(self):
sp = lt.session_params()
sp.settings = { 'alert_mask': lt.alert.category_t.all_categories }
buf = lt.write_session_params_buf(sp)
sp2 = lt.read_session_params(buf)
self.assertEqual(sp2.settings['alert_mask'] & 0x7fffffff, 0x7fffffff)
def test_session_params_roundtrip_entry(self):
sp = lt.session_params()
sp.settings = {'alert_mask': lt.alert.category_t.all_categories }
ent = lt.write_session_params(sp)
print(ent)
sp2 = lt.read_session_params(ent)
self.assertEqual(sp2.settings['alert_mask'] & 0x7fffffff, 0x7fffffff)
def test_add_torrent(self):
s = lt.session(settings)
h = s.add_torrent({'ti': lt.torrent_info('base.torrent'),
'save_path': '.',
'dht_nodes': [('1.2.3.4', 6881), ('4.3.2.1', 6881)],
'http_seeds': ['http://test.com/seed'],
'peers': [('5.6.7.8', 6881)],
'banned_peers': [('8.7.6.5', 6881)],
'file_priorities': [1, 1, 1, 2, 0]})
def test_find_torrent(self):
s = lt.session(settings)
h = s.add_torrent({'info_hash': b"a" * 20,
'save_path': '.'})
self.assertTrue(h.is_valid())
h2 = s.find_torrent(lt.sha1_hash(b"a" * 20))
self.assertTrue(h2.is_valid())
h3 = s.find_torrent(lt.sha1_hash(b"b" * 20))
self.assertFalse(h3.is_valid())
self.assertEqual(h, h2)
self.assertNotEqual(h, h3)
def test_add_torrent_info_hash(self):
s = lt.session(settings)
h = s.add_torrent({
'info_hash': b'a' * 20,
'info_hashes': b'a' * 32,
'save_path': '.'})
time.sleep(1)
alerts = s.pop_alerts()
while len(alerts) > 0:
a = alerts.pop(0)
print(a)
self.assertTrue(h.is_valid())
self.assertEqual(h.status().info_hashes, lt.info_hash_t(lt.sha256_hash(b'a' * 32)))
def test_session_status(self):
if not has_deprecated():
return
s = lt.session()
st = s.status()
print(st)
print(st.active_requests)
print(st.dht_nodes)
print(st.dht_node_cache)
print(st.dht_torrents)
print(st.dht_global_nodes)
print(st.dht_total_allocations)
def test_apply_settings(self):
s = lt.session(settings)
s.apply_settings({'num_want': 66, 'user_agent': 'test123'})
self.assertEqual(s.get_settings()['num_want'], 66)
self.assertEqual(s.get_settings()['user_agent'], 'test123')
def test_post_session_stats(self):
s = lt.session({'alert_mask': 0, 'enable_dht': False})
s.post_session_stats()
alerts = []
# first the stats headers log line. but not if logging is disabled
while len(alerts) == 0:
s.wait_for_alert(1000)
alerts = s.pop_alerts()
while len(alerts) > 0:
a = alerts.pop(0)
print(a)
if isinstance(a, lt.session_stats_header_alert):
break
self.assertTrue(isinstance(a, lt.session_stats_header_alert))
# then the actual stats values
while len(alerts) == 0:
s.wait_for_alert(1000)
alerts = s.pop_alerts()
a = alerts.pop(0)
print(a)
values = cast(lt.session_stats_alert, a).values
self.assertTrue(isinstance(a, lt.session_stats_alert))
self.assertTrue(isinstance(values, dict))
self.assertTrue(len(values) > 0)
def test_post_dht_stats(self):
s = lt.session({'alert_mask': 0, 'enable_dht': False})
s.post_dht_stats()
alerts = []
cnt = 0
while len(alerts) == 0:
s.wait_for_alert(1000)
alerts = s.pop_alerts()
cnt += 1
if cnt > 60:
print('no dht_stats_alert in 1 minute!')
sys.exit(1)
a = alerts.pop(0)
self.assertTrue(isinstance(a, lt.dht_stats_alert))
self.assertTrue(isinstance(cast(lt.dht_stats_alert, a).active_requests, list))
self.assertTrue(isinstance(cast(lt.dht_stats_alert, a).routing_table, list))
def test_unknown_settings(self):
try:
lt.session({'unexpected-key-name': 42})
self.assertFalse('should have thrown an exception')
except KeyError as e:
print(e)
def test_fingerprint(self):
self.assertEqual(lt.generate_fingerprint('LT', 0, 1, 2, 3), '-LT0123-')
self.assertEqual(lt.generate_fingerprint('..', 10, 1, 2, 3), '-..A123-')
def test_min_memory_preset(self):
min_mem = lt.min_memory_usage()
print(min_mem)
self.assertTrue('allow_idna' not in min_mem)
self.assertTrue('connection_speed' in min_mem)
self.assertTrue('file_pool_size' in min_mem)
def test_seed_mode_preset(self):
seed_mode = lt.high_performance_seed()
print(seed_mode)
self.assertTrue('alert_queue_size' in seed_mode)
self.assertTrue('connection_speed' in seed_mode)
self.assertTrue('file_pool_size' in seed_mode)
def test_default_settings(self):
default = lt.default_settings()
print(default)
class test_example_client(unittest.TestCase):
def test_execute_client(self):
if os.name == 'nt':
# TODO: fix windows includes of client.py
return
my_stdin = sys.stdin
if os.name != 'nt':
master_fd, slave_fd = pty.openpty()
# slave_fd fix multiple stdin assignment at termios.tcgetattr
my_stdin = slave_fd
process = sub.Popen(
[sys.executable, "client.py", "url_seed_multi.torrent"],
stdin=my_stdin, stdout=sub.PIPE, stderr=sub.PIPE)
# python2 has no Popen.wait() timeout
time.sleep(5)
returncode = process.poll()
if returncode is None:
# this is an expected use-case
process.kill()
err = process.stderr.read().decode("utf-8")
self.assertEqual('', err, 'process throw errors: \n' + err)
# check error code if process did unexpected end
if returncode is not None:
# in case of error return: output stdout if nothing was on stderr
if returncode != 0:
print("stdout:\n" + process.stdout.read().decode("utf-8"))
self.assertEqual(returncode, 0, "returncode: " + str(returncode) + "\n"
+ "stderr: empty\n"
+ "some configuration does not output errors like missing module members,"
+ "try to call it manually to get the error message\n")
def test_execute_simple_client(self):
process = sub.Popen(
[sys.executable, "simple_client.py", "url_seed_multi.torrent"],
stdout=sub.PIPE, stderr=sub.PIPE)
# python2 has no Popen.wait() timeout
time.sleep(5)
returncode = process.poll()
if returncode is None:
# this is an expected use-case
process.kill()
err = process.stderr.read().decode("utf-8")
self.assertEqual('', err, 'process throw errors: \n' + err)
# check error code if process did unexpected end
if returncode is not None:
# in case of error return: output stdout if nothing was on stderr
if returncode != 0:
print("stdout:\n" + process.stdout.read().decode("utf-8"))
self.assertEqual(returncode, 0, "returncode: " + str(returncode) + "\n"
+ "stderr: empty\n"
+ "some configuration does not output errors like missing module members,"
+ "try to call it manually to get the error message\n")
def test_execute_make_torrent(self):
process = sub.Popen(
[sys.executable, "make_torrent.py", "url_seed_multi.torrent",
"http://test.com/test"], stdout=sub.PIPE, stderr=sub.PIPE)
returncode = process.wait()
# python2 has no Popen.wait() timeout
err = process.stderr.read().decode("utf-8")
self.assertEqual('', err, 'process throw errors: \n' + err)
# in case of error return: output stdout if nothing was on stderr
if returncode != 0:
print("stdout:\n" + process.stdout.read().decode("utf-8"))
self.assertEqual(returncode, 0, "returncode: " + str(returncode) + "\n"
+ "stderr: empty\n"
+ "some configuration does not output errors like missing module members,"
+ "try to call it manually to get the error message\n")
def test_default_settings(self):
default = lt.default_settings()
self.assertNotIn('', default)
print(default)
class test_operation_t(unittest.TestCase):
def test_enum(self):
self.assertEqual(lt.operation_name(lt.operation_t.sock_accept), "sock_accept")
self.assertEqual(lt.operation_name(lt.operation_t.unknown), "unknown")
self.assertEqual(lt.operation_name(lt.operation_t.mkdir), "mkdir")
self.assertEqual(lt.operation_name(lt.operation_t.partfile_write), "partfile_write")
self.assertEqual(lt.operation_name(lt.operation_t.hostname_lookup), "hostname_lookup")
class test_error_code(unittest.TestCase):
def test_error_code(self):
a = lt.error_code()
a = lt.error_code(10, lt.libtorrent_category())
self.assertEqual(a.category().name(), 'libtorrent')
self.assertEqual(lt.libtorrent_category().name(), 'libtorrent')
self.assertEqual(lt.upnp_category().name(), 'upnp')
self.assertEqual(lt.http_category().name(), 'http')
self.assertEqual(lt.socks_category().name(), 'socks')
self.assertEqual(lt.bdecode_category().name(), 'bdecode')
self.assertEqual(lt.generic_category().name(), 'generic')
self.assertEqual(lt.system_category().name(), 'system')
class test_peer_info(unittest.TestCase):
def test_peer_info_members(self):
p = lt.peer_info()
print(p.client)
print(p.pieces)
print(p.pieces)
print(p.last_request)
print(p.last_active)
print(p.flags)
print(p.source)
print(p.pid)
print(p.downloading_piece_index)
print(p.ip)
print(p.local_endpoint)
print(p.read_state)
print(p.write_state)
class test_dht_settings(unittest.TestCase):
def test_dht_get_peers(self):
session = lt.session()
info_hash = lt.sha1_hash(b"a" * 20)
session.dht_get_peers(info_hash)
def test_construct(self):
ds = lt.dht_settings()
print(ds.max_peers_reply)
print(ds.search_branching)
print(ds.max_fail_count)
print(ds.max_fail_count)
print(ds.max_torrents)
print(ds.max_dht_items)
print(ds.restrict_routing_ips)
print(ds.restrict_search_ips)
print(ds.max_torrent_search_reply)
print(ds.extended_routing_table)
print(ds.aggressive_lookups)
print(ds.privacy_lookups)
print(ds.enforce_node_id)
print(ds.ignore_dark_internet)
print(ds.block_timeout)
print(ds.block_ratelimit)
print(ds.read_only)
print(ds.item_lifetime)
def get_isolated_settings() -> "lt.settings_pack":
return {
"enable_dht": False,
"enable_lsd": False,
"enable_natpmp": False,
"enable_upnp": False,
"listen_interfaces": "127.0.0.1:0",
"dht_bootstrap_nodes": "",
}
def loop_until_timeout(timeout, msg="condition"):
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
yield
raise AssertionError(f"{msg} timed out")
def unlink_all_files(path):
for dirpath, _, filenames in os.walk(path):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
os.unlink(filepath)
# In test cases where libtorrent writes torrent data in a temporary directory,
# cleaning up the tempdir on Windows CI sometimes fails with a PermissionError
# having WinError 5 (Access Denied). I can't repro this WinError in any way;
# holding an open file handle results in a different WinError. Seems to be a
# race condition which only happens with very short-lived tests which write
# data. Work around by cleaning up the tempdir in a loop.
# TODO: why is this necessary?
def cleanup_with_windows_fix(tempdir, *, timeout):
# Clean up just the files, so we don't have to bother with depth-first
# traversal
for _ in loop_until_timeout(timeout, msg="PermissionError clear"):
try:
unlink_all_files(tempdir.name)
except PermissionError:
if sys.platform == "win32":
# current release of mypy doesn't know about winerror
# if exc.winerror == 5:
continue
raise
break
# This removes directories in depth-first traversal.
# It also marks the tempdir as explicitly cleaned so it doesn't trigger a
# ResourceWarning.
tempdir.cleanup()
def wait_for(session, alert_type, *, timeout, prefix=None):
# Return the first alert of type, but log all alerts.
result = None
for _ in loop_until_timeout(timeout, msg=alert_type.__name__):
for alert in session.pop_alerts():
print(f"{alert.what()}: {alert.message()}")
if result is None and isinstance(alert, alert_type):
result = alert
if result is not None:
return result
raise AssertionError("unreachable")
class LambdaRequestHandler(http.server.BaseHTTPRequestHandler):
default_request_version = "HTTP/1.1"
def __init__(self, get_data, *args, **kwargs):
self.get_data = get_data
super().__init__(*args, **kwargs)
def do_GET(self):
print(f"mock tracker request: {self.requestline}")
data = self.get_data()
self.send_response(200)
self.send_header("Content-Type", "application/octet-stream")
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
class SSLTrackerAlertTest(unittest.TestCase):
def setUp(self):
self.cert_path = os.path.realpath(os.path.join(
os.path.dirname(__file__), "..", "..", "test", "ssl", "server.pem"
))
print(f"cert_path = {self.cert_path}")
self.tracker_response = {
b"external ip": b"\x01\x02\x03\x04",
}
self.tracker = http.server.HTTPServer(
("127.0.0.1", 0),
functools.partial(
LambdaRequestHandler, lambda: lt.bencode(self.tracker_response)
),
)
self.ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
self.ctx.load_cert_chain(self.cert_path)
self.tracker.socket = self.ctx.wrap_socket(
self.tracker.socket, server_side=True
)
self.tracker_thread = threading.Thread(target=self.tracker.serve_forever)
self.tracker_thread.start()
# HTTPServer.server_name seems to resolve to things like
# "localhost.localdomain"
port = self.tracker.server_port
self.tracker_url = f"https://127.0.0.1:{port}/announce"
print(f"mock tracker url = {self.tracker_url}")
self.settings = get_isolated_settings()
self.settings["alert_mask"] = lt.alert_category.status
# I couldn't get validation to work on all platforms. Setting
# SSL_CERT_FILE to our self-signed cert works on linux and mac, but
# not on Windows.
self.settings["validate_https_trackers"] = False
self.session = lt.session(self.settings)
self.dir = tempfile.TemporaryDirectory()
self.atp = lt.add_torrent_params()
self.atp.info_hash = dummy_data.get_sha1_hash()
self.atp.flags &= ~lt.torrent_flags.auto_managed
self.atp.flags &= ~lt.torrent_flags.paused
self.atp.save_path = self.dir.name
def tearDown(self):
# we do this because sessions writing data can collide with
# cleaning up temporary directories. session.abort() isn't bound
handles = self.session.get_torrents()
for handle in handles:
self.session.remove_torrent(handle)
for _ in loop_until_timeout(5, msg="clear all handles"):
if not any(handle.is_valid() for handle in handles):
break
cleanup_with_windows_fix(self.dir, timeout=5)
self.tracker.shutdown()
# Explicitly clean up server sockets, to avoid ResourceWarning
self.tracker.server_close()
def test_external_ip_alert_via_ssl_tracker(self):
handle = self.session.add_torrent(self.atp)
handle.add_tracker({"url": self.tracker_url})
alert = wait_for(self.session, lt.external_ip_alert, timeout=60)
self.assertEqual(alert.category(), lt.alert_category.status)
self.assertEqual(alert.what(), "external_ip")
self.assertIsInstance(alert.message(), str)
self.assertNotEqual(alert.message(), "")
self.assertEqual(str(alert), alert.message())
self.assertEqual(alert.external_address, "1.2.3.4")
class TestTypeStubs(unittest.TestCase):
"""
One of the shortcomings of the Python type stubs is the lack of function signature matching.
So, next best thing: we match the Boost signature that the Python stub was made for.
Caveat: the "C++ signature" bit of the docstring differs between builds, depending on where and how it was built.
"""
def _get_name(self, node: ast.FunctionDef | ast.AsyncFunctionDef) -> str:
name = node.name
while hasattr(node, "parent"):
parent = node.parent
if not isinstance(parent, ast.Module):
name = f"{parent.name}.{name}"
node = parent
return name
def _attempt_import(self, qualname: str) -> object | None:
base = lt
for part in qualname.split("."):
if not hasattr(base, part):
return None
if hasattr(base, "__dict__") and isinstance(vars(base).get(part), property):
return None
base = getattr(base, part)
if isinstance(base, int):
return None
return base
def _special_src_doc(self, qualname: str, src_doc: str) -> str:
if qualname == "session.__init__":
# Special case: inserts object memory address into docstring.
while (start_bad := src_doc.find("<libtorrent.fingerprint object at")) != -1:
end_bad = src_doc.find(">", start_bad + 1)
src_doc = src_doc[:start_bad] + "<libtorrent.fingerprint object" + src_doc[end_bad:]
elif qualname in ["write_session_params_buf", "read_session_params", "write_session_params"]:
# Special case: the typical flags=0xffffffff is not used for "all".
# Instead flags=save_state_flags_t::all() takes on 4294967295 or 2147483647, depending on the platform.
src_doc = src_doc.replace("2147483647", "4294967295")
elif sys.platform != "linux":
# The linux binding has looser typing, adapt the other platforms to it.
if qualname == "file_storage.__iter__":
# Linux does not have as narrow a type as other platforms.
src_doc = src_doc.replace("(file_storage)arg1", "(object)arg1")
elif qualname == "torrent_info.trackers":
# Ditto.
src_doc = src_doc.replace("(torrent_info)arg1", "(object)arg1")
elif qualname in ["session.dht_proxy", "session.i2p_proxy", "session.peer_proxy", "session.proxy",
"session.set_dht_proxy", "session.set_i2p_proxy", "session.set_peer_proxy",
"session.set_proxy", "session.set_tracker_proxy", "session.set_web_seed_proxy",
"session.tracker_proxy", "session.web_seed_proxy"]:
# Special case: other platforms refer to proxy_settings as proxy_type_t.proxy_settings.
src_doc = src_doc.replace("proxy_type_t.proxy_settings", "proxy_settings")
return src_doc
@classmethod
def setUpClass(cls) -> None:
"""
Parse the libtorrent AST once for this class.
"""
filename = "libtorrent/__init__.pyi"
with open(filename) as file_handle:
root = ast.parse(file_handle.read(), "__init__.pyi", type_comments=True)
for node in ast.walk(root):
for child in ast.iter_child_nodes(node):
child.parent = node
cls.libtorrent_ast_root = root
def _yield_next_signature(self) -> Generator[ast.FunctionDef | ast.AsyncFunctionDef]:
for node in ast.walk(self.libtorrent_ast_root):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
yield node
def test_signatures(self):
"""
Test if the function signature in the stubs matches that of the binding.
If this test fails, either:
a. You accidentally changed the cpp binding. Change it back!
b. The binding generation changed. Double-check whatever changed and update the binding docstring in the stubs.
c. You knowingly changed the binding. Update the binding docstring in the stubs.
"""
for node in self._yield_next_signature():
stub_doc = ast.get_docstring(node, True)
qualname = self._get_name(node)
runtime_obj = self._attempt_import(qualname)
if runtime_obj is None:
continue # Skip typing-only constructions
src_doc = runtime_obj.__doc__
if src_doc is None:
continue # Skip bindings without a signature
with self.subTest(qualname.replace(".", " | ")):
self.assertIsNotNone(stub_doc, f"{qualname} @ line {node.lineno} missing docstring!")
self.assertIn(stub_doc.strip(), self._special_src_doc(qualname, src_doc),
f"\n> Stub function docstring of {qualname} differs from implementation!")
if __name__ == '__main__':
print(lt.__version__)
try:
shutil.copy(os.path.join('..', '..', 'test', 'test_torrents',
'url_seed_multi.torrent'), '.')
except shutil.SameFileError:
pass
try:
shutil.copy(os.path.join('..', '..', 'test', 'test_torrents',
'base.torrent'), '.')
except shutil.SameFileError:
pass
try:
shutil.copy(os.path.join('..', '..', 'test', 'test_torrents',
'unordered.torrent'), '.')
except shutil.SameFileError:
pass
unittest.main()