itTorrent 是一种用于在 Internet 上下载和分发文件的协议。 与传统的客户端/服务器关系相比,下载者连接到中央服务器(例如:在 Netflix 上观看电影,或加载您正在阅读的网页),BitTorrent 网络中的参与者,称为 peers ,下载文件片段 ——这 就是它成为 点对点 协议的原因。 我们将研究它是如何工作的,并构建我们自己的客户端,它可以找到对等点并在它们之间交换数据。
该协议在过去 20 年中有机地发展,各种个人和组织为加密、私人种子和寻找对等点的新方法等功能添加了扩展。 我们将从 2001 年开始实施 原始规范 ,以保持这是一个周末大小的项目。
我将使用 Debian ISO 文件作为我的试验品,因为它很大,但不是很大,只有 350MB。 作为一个流行的 Linux 发行版,我们将连接到许多快速和合作的对等点。 我们将避免与下载盗版内容相关的法律和道德问题。
考虑向当地 社区保释基金捐款 。
你的钱将用于支付法律援助和保释,因为抗议者因反对警察的暴行、制度性种族主义以及谋杀乔治·弗洛伊德、布伦娜·泰勒、艾哈迈德·阿伯里和尼娜·波普等黑人男女而被捕。
在科技界,我们经常谈论包容性和多样性。 现在是采取具体行动的时候了。
https://www.communityjusticeexchange.org/nbfn-directory
# 寻找同伴
这里有一个问题:我们想用 BitTorrent 下载一个文件,但它是一个点对点协议,我们不知道从哪里可以找到对等点来下载它。 这很像搬到一个新城市并试图结交朋友——也许我们会去当地的酒吧或聚会小组! 像这样的中心化位置是跟踪器背后的重要理念, 跟踪器 是相互介绍对等点的中央服务器。 它们只是通过 HTTP 运行的 Web 服务器 * ,您可以在 http://bttracker.debian.org:6969/找到 Debian
当然,如果这些中央服务器为同行交换非法内容提供便利,那么它们很容易被联邦调查局突袭。 您可能还记得阅读过有关 TorrentSpy、Popcorn Time 和 KickassTorrents 等跟踪器被查封并关闭的信息。 新方法通过使 对等发现 成为分布式过程来切断中间人。 我们不会实施它们,但如果您有兴趣,可以研究的一些术语是 DHT 、 PEX 和 磁力链接 。
## 解析 .torrent 文件
.torrent 文件描述了可下载文件的内容以及连接到跟踪器的信息。 这就是我们启动种子下载过程所需要的一切。 Debian 的 .torrent 文件如下所示:
d8:announce41:http://bttracker.debian.org:6969/announce7:comment35:"Debian CD from cdimage.debian.org"13:creation datei1573903810e9:httpseedsl145:https://cdimage.debian.org/cdimage/release/10.2.0//srv/cdbuilder.debian.org/dst/deb-cd/weekly-builds/amd64/iso-cd/debian-10.2.0-amd64-netinst.iso145:https://cdimage.debian.org/cdimage/archive/10.2.0//srv/cdbuilder.debian.org/dst/deb-cd/weekly-builds/amd64/iso-cd/debian-10.2.0-amd64-netinst.isoe4:infod6:lengthi351272960e4:name31:debian-10.2.0-amd64-netinst.iso12:piece lengthi262144e6:pieces26800:�����PS�^�� (binary blob of the hashes of each piece)ee
)的格式 这种混乱以一种称为Bencode (发音为 bee-encode 编码,我们需要对其进行解码。
Bencode 可以编码与 JSON 大致相同类型的结构——字符串、整数、列表和字典。 Bencoded 数据不像 JSON 那样可读/可写,但它可以有效地处理二进制数据,并且从流中解析非常简单。 字符串带有长度前缀,看起来像 4:spam
. 整数在 开始 和 结束 标记之间,所以 7
将编码为 i7e
. 列表和字典的工作方式类似: l4:spami7ee
代表 ['spam', 7]
, 尽管 d4:spami7ee
方法 {spam: 7}
.
在更漂亮的格式中,我们的 .torrent 文件如下所示:
d 8:announce 41:http://bttracker.debian.org:6969/announce 7:comment 35:"Debian CD from cdimage.debian.org" 13:creation date i1573903810e 4:info d 6:length i351272960e 4:name 31:debian-10.2.0-amd64-netinst.iso 12:piece length i262144e 6:pieces 26800:�����PS�^�� (binary blob of the hashes of each piece) e e
在这个文件中,我们可以发现跟踪器的 URL、创建日期(作为 Unix 时间戳)、文件的名称和大小,以及包含每个 片段 的 SHA-1 哈希值的大二进制 blob ,它们同样是 -我们要下载的文件的大小部分。 一块的确切大小因种子而异,但通常在 256KB 到 1MB 之间。 这意味着一个大文件可能由 数千个 文件组成。 我们将从我们的同行那里下载这些片段,将它们与我们的 torrent 文件中的哈希值进行对比,将它们组装在一起,然后砰,我们得到了一个文件!
这种机制使我们能够在进行过程中验证每个部分的完整性。 它使 BitTorrent 能够抵抗意外损坏或故意 torrent 中毒 。 除非攻击者能够通过原像攻击破坏 SHA-1,否则我们将获得我们所要求的内容。
写一个bencode解析器会很有趣,但是解析不是我们今天的重点。 但我发现 Fredrik Lundh 的 50 行解析器 特别有启发性。 对于这个项目,我使用 了 github.com/jackpal/bencode-go :
import ( "github.com/jackpal/bencode-go" ) type bencodeInfo struct { Pieces string `bencode:"pieces"` PieceLength int `bencode:"piece length"` Length int `bencode:"length"` Name string `bencode:"name"` } type bencodeTorrent struct { Announce string `bencode:"announce"` Info bencodeInfo `bencode:"info"` } // Open parses a torrent file func Open(r io.Reader) (*bencodeTorrent, error) { bto := bencodeTorrent{} err := bencode.Unmarshal(r, &bto) if err != nil { return nil, err } return &bto, nil }
因为我喜欢让我的结构相对平坦,并且我喜欢让我的应用程序结构与我的序列化结构分开,所以我导出了一个不同的、更扁平的结构,名为 TorrentFile
并编写了一些辅助函数来在两者之间进行转换。
值得注意的是,我分裂了 pieces
(以前是一个字符串)到一个散列片(每个 [20]byte
) 以便我以后可以轻松访问单个哈希。 我还计算了整个 bencoded 的 SHA-1 哈希 info
dict(包含名称、大小和片段哈希的那个)。 我们将其称为 infohash ,它在我们与跟踪器和对等方交谈时唯一标识文件。 稍后再谈。
type TorrentFile struct { Announce string InfoHash [20]byte PieceHashes [][20]byte PieceLength int Length int Name string }
func (bto
bencodeTorrent) toTorrentFile() (TorrentFile, error) { // … }
## 从跟踪器中检索对等点
现在我们有了关于文件及其跟踪器的信息,让我们与跟踪器交谈以 宣布 我们作为对等点的存在并检索其他对等点的列表。 我们只需要向 announce
.torrent 文件中提供的 URL,带有一些查询参数:
func (t *TorrentFile) buildTrackerURL(peerID [20]byte, port uint16) (string, error) { base, err := url.Parse(t.Announce) if err != nil { return "", err } params := url.Values{ "info_hash": []string{string(t.InfoHash[:])}, "peer_id": []string{string(peerID[:])}, "port": []string{strconv.Itoa(int(Port))}, "uploaded": []string{"0"}, "downloaded": []string{"0"}, "compact": []string{"1"}, "left": []string{strconv.Itoa(t.Length)}, } base.RawQuery = params.Encode() return base.String(), nil }
重要的:
- info_hash :标识 的文件 我们正在尝试下载 。 这是我们之前从 bencoded 计算的 infohash
info
字典。 跟踪器将使用它来确定向我们展示哪些对等点。 - peer_id :一个 20 字节的名称,用于 方标识我们自己 向跟踪器和对等 。 我们将为此生成 20 个随机字节。 真正的 BitTorrent 客户端的 ID 如下
-TR2940-k8hj0wgej6ch
标识客户端软件和版本——在这种情况下,TR2940 代表传输客户端 2.94。
## 解析跟踪器响应
我们得到一个经过编码的响应:
d 8:interval i900e 5:peers 252:(another long binary blob) e
Interval
告诉我们应该多久再次连接到跟踪器以刷新我们的对等点列表。 值 900 意味着我们应该每 15 分钟(900 秒)重新连接一次。
Peers
是另一个包含每个对等点的 IP 地址的长二进制 blob。 它由 六个字节的组组成 。 每组中的前四个字节代表对等方的 IP 地址——每个字节代表 IP 中的一个数字。 最后两个字节代表端口,作为一个大端 uint16
. Big-endian 或 network order 意味着我们可以将一组字节解释为一个整数,只需将它们从左到右挤压在一起。 例如,字节 0x1A
, 0xE1
制作 0x1AE1
,或十进制的 6881。 *
// Peer encodes connection information for a peer type Peer struct { IP net.IP Port uint16 } // Unmarshal parses peer IP addresses and ports from a buffer func Unmarshal(peersBin []byte) ([]Peer, error) { const peerSize = 6 // 4 for IP, 2 for port numPeers := len(peersBin) / peerSize if len(peersBin)%peerSize != 0 { err := fmt.Errorf("Received malformed peers") return nil, err } peers := make([]Peer, numPeers) for i := 0; i < numPeers; i++ { offset := i * peerSize peers[i].IP = net.IP(peersBin[offset : offset+4]) peers[i].Port = binary.BigEndian.Uint16(peersBin[offset+4 : offset+6]) } return peers, nil }
# 从同行下载
现在我们有了一个对等点列表,是时候与他们联系并开始下载片段了! 我们可以将这个过程分解为几个步骤。 对于每个对等点,我们希望:
- 启动与对等方的 TCP 连接。 这就像开始一个电话一样。
- 完成双向 BitTorrent 握手 。 “你好?” “你好。”
- 交换 消息 以下载 片断 。 “我想要第 231 件作品。”
## 开始一个 TCP 连接
conn, err := net.DialTimeout("tcp", peer.String(), 3*time.Second) if err != nil { return nil, err }
我设置了一个超时,这样我就不会在那些不会让我连接的同伴身上浪费太多时间。 在大多数情况下,这是一个非常标准的 TCP 连接。
## 完成握手
我们刚刚建立了与对等点的连接,但我们想要握手以验证我们对对等点的假设
- 可以使用 BitTorrent 协议进行通信
- 能够理解并回复我们的信息
- 有我们想要的文件,或者至少知道我们在说什么
我父亲告诉我,良好握手的秘诀是握紧和眼神交流。 良好的 BitTorrent 握手的秘诀在于它由五个部分组成:
- 协议标识符的长度,始终为 19(十六进制的 0x13)
- 协议标识符,称为 pstr ,始终为
BitTorrent protocol
- 八个 保留字节 ,全部设置为 0。我们将其中一些翻转为 1,以表明我们支持某些 扩展 。 但我们没有,所以我们将它们保持在 0。
- 我们之前计算的 infohash 以确定我们想要哪个文件
- 的 Peer ID 我们为识别自己而制作
放在一起,握手字符串可能如下所示:
\x13BitTorrent protocol\x00\x00\x00\x00\x00\x00\x00\x00\x86\xd4\xc8\x00\x24\xa4\x69\xbe\x4c\x50\xbc\x5a\x10\x2c\xf7\x17\x80\x31\x00\x74-TR2940-k8hj0wgej6ch
在我们向我们的对等方发送握手之后,我们应该会收到以相同格式返回的握手。 我们返回的信息哈希应该与我们发送的信息相匹配,这样我们就知道我们在谈论同一个文件。 如果一切按计划进行,我们就可以出发了。 如果没有,我们可以切断连接,因为有问题。 “你好?” “这是谁?你想要什么?”
在我们的代码中,让我们创建一个结构来表示握手,并编写一些方法来序列化和读取它们:
// A Handshake is a special message that a peer uses to identify itself type Handshake struct { Pstr string InfoHash [20]byte PeerID [20]byte } // Serialize serializes the handshake to a buffer func (h *Handshake) Serialize() []byte { buf := make([]byte, len(h.Pstr)+49) buf[0] = byte(len(h.Pstr)) curr := 1 curr += copy(buf[curr:], h.Pstr) curr += copy(buf[curr:], make([]byte, 8)) // 8 reserved bytes curr += copy(buf[curr:], h.InfoHash[:]) curr += copy(buf[curr:], h.PeerID[:]) return buf } // Read parses a handshake from a stream func Read(r io.Reader) (*Handshake, error) { // Do Serialize(), but backwards // ... }
## 发送和接收消息
一旦我们完成了初始握手,我们就可以发送和接收 消息 了。 好吧,不完全是——如果另一个对等点还没有准备好接受消息,我们不能发送任何消息,直到他们告诉我们他们准备好了。 在这种状态下,我们被认为是 窒息 被其他同伴 。 他们会向我们发送一条 不受干扰 的消息,让我们知道我们可以开始向他们询问数据。 默认情况下,我们假设我们被窒息,直到证明不是这样。
一旦我们没有被阻塞,我们就可以开始发送 片段请求 ,他们可以向我们发送包含片段的消息。
### 解释消息
消息具有长度、 ID 和 有效负载 。 在电线上,它看起来像:
消息以长度指示符开头,它告诉我们消息将有多少字节长。 它是一个 32 位整数,这意味着它由四个字节按大端顺序排列而成。 下一个字节 ID 告诉我们正在接收哪种类型的消息,例如,
2
字节的意思是“感兴趣”。 最后,可选的 有效负载 填充消息的剩余长度。
type messageID uint8 const ( MsgChoke messageID = 0 MsgUnchoke messageID = 1 MsgInterested messageID = 2 MsgNotInterested messageID = 3 MsgHave messageID = 4 MsgBitfield messageID = 5 MsgRequest messageID = 6 MsgPiece messageID = 7 MsgCancel messageID = 8 ) // Message stores ID and payload of a message type Message struct { ID messageID Payload []byte } // Serialize serializes a message into a buffer of the form // <length prefix><message ID><payload> // Interprets `nil` as a keep-alive message func (m *Message) Serialize() []byte { if m == nil { return make([]byte, 4) } length := uint32(len(m.Payload) + 1) // +1 for id buf := make([]byte, 4+length) binary.BigEndian.PutUint32(buf[0:4], length) buf[4] = byte(m.ID) copy(buf[5:], m.Payload) return buf }
要从流中读取消息,我们只需遵循消息的格式。 我们读取四个字节并将它们解释为 uint32
获取消息的 长度 。 然后,我们读取该字节数以获取 ID (第一个字节)和 有效负载 (剩余字节)。
// Read parses a message from a stream. Returns `nil` on keep-alive message func Read(r io.Reader) (*Message, error) { lengthBuf := make([]byte, 4) _, err := io.ReadFull(r, lengthBuf) if err != nil { return nil, err } length := binary.BigEndian.Uint32(lengthBuf) // keep-alive message if length == 0 { return nil, nil } messageBuf := make([]byte, length) _, err = io.ReadFull(r, messageBuf) if err != nil { return nil, err } m := Message{ ID: messageID(messageBuf[0]), Payload: messageBuf[1:], } return &m, nil }
### 位域
最有趣的消息类型之一是 bitfield ,它是一种数据结构,对等方使用它来有效地编码他们能够发送给我们的片段。 位域看起来像一个字节数组,要检查它们有哪些部分,我们只需要查看设置为 1 的 位 的位置。您可以将其视为咖啡店会员卡的数字等价物。 我们从一张白卡开始 0
, 并将位翻转为 1
将他们的位置标记为“已盖章”。
通过使用 位 而不是 字节 ,这种数据结构非常紧凑。 我们可以在单个字节的空间中填充关于八块的信息——一个字节的大小
bool
. 权衡是访问值变得有点棘手。 计算机可以寻址的最小内存单位是字节,所以为了得到我们的位,我们必须做一些按位操作:
// A Bitfield represents the pieces that a peer has type Bitfield []byte // HasPiece tells if a bitfield has a particular index set func (bf Bitfield) HasPiece(index int) bool { byteIndex := index / 8 offset := index % 8 return bf[byteIndex]>>(7-offset)&1 != 0 } // SetPiece sets a bit in the bitfield func (bf Bitfield) SetPiece(index int) { byteIndex := index / 8 offset := index % 8 bf[byteIndex] |= 1 << (7 - offset) }
## 把它们放在一起
我们现在拥有下载种子所需的所有工具:我们有一个从跟踪器获得的对等点列表,我们可以通过拨打 TCP 连接、发起握手以及发送和接收消息来与它们通信。 我们的最后一个大问题是同时处理与 的并发性 多个对等点交谈所涉及 管理对等点的状态 ,以及在我们与对等点交互时 。 这些都是经典的难题。
### 管理并发:通道作为队列
在 Go 中,我们 通过通信共享内存 ,我们可以将 Go 通道视为廉价的线程安全队列。
我们将设置两个通道来同步我们的并发工作人员:一个用于在对等点之间分发工作(要下载的片段),另一个用于收集下载的片段。 当下载的片段通过结果通道进入时,我们可以将它们复制到缓冲区中以开始组装我们的完整文件。
// Init queues for workers to retrieve work and send results workQueue := make(chan *pieceWork, len(t.PieceHashes)) results := make(chan *pieceResult) for index, hash := range t.PieceHashes { length := t.calculatePieceSize(index) workQueue <- &pieceWork{index, hash, length} } // Start workers for _, peer := range t.Peers { go t.startDownloadWorker(peer, workQueue, results) } // Collect results into a buffer until full buf := make([]byte, t.Length) donePieces := 0 for donePieces < len(t.PieceHashes) { res := <-results begin, end := t.calculateBoundsForPiece(res.index) copy(buf[begin:end], res.buf) donePieces++ } close(workQueue)
我们将为我们从跟踪器收到的每个对等点生成一个工作 goroutine。 它将与对等方连接并握手,然后开始从对等方检索工作 workQueue
,尝试下载它,并通过 results
渠道。
func (t *Torrent) startDownloadWorker(peer peers.Peer, workQueue chan *pieceWork, results chan *pieceResult) { c, err := client.New(peer, t.PeerID, t.InfoHash) if err != nil { log.Printf("Could not handshake with %s. Disconnecting\n", peer.IP) return } defer c.Conn.Close() log.Printf("Completed handshake with %s\n", peer.IP) c.SendUnchoke() c.SendInterested() for pw := range workQueue { if !c.Bitfield.HasPiece(pw.index) { workQueue <- pw // Put piece back on the queue continue } // Download the piece buf, err := attemptDownloadPiece(c, pw) if err != nil { log.Println("Exiting", err) workQueue <- pw // Put piece back on the queue return } err = checkIntegrity(pw, buf) if err != nil { log.Printf("Piece #%d failed integrity check\n", pw.index) workQueue <- pw // Put piece back on the queue continue } c.SendHave(pw.index) results <- &pieceResult{pw.index, buf} } }
### 管理状态
我们将跟踪结构中的每个对等点,并在读取消息时修改该结构。 它将包括数据,例如我们从对等方下载了多少,我们向他们请求了多少,以及我们是否被卡住了。 如果我们想进一步扩展它,我们可以将它形式化为一个有限状态机。 但是现在一个结构和一个开关就足够了。
type pieceProgress struct { index int client *client.Client buf []byte downloaded int requested int backlog int } func (state *pieceProgress) readMessage() error { msg, err := state.client.Read() // this call blocks switch msg.ID { case message.MsgUnchoke: state.client.Choked = false case message.MsgChoke: state.client.Choked = true case message.MsgHave: index, err := message.ParseHave(msg) state.client.Bitfield.SetPiece(index) case message.MsgPiece: n, err := message.ParsePiece(state.index, state.buf, msg) state.downloaded += n state.backlog-- } return nil }
### 是时候提出要求了!
文件、片段和片段散列并不是全部——我们可以通过将片段分解成 块 来更进一步。 块是块的一部分,我们可以通过块的 索引 、块内的字节 偏移量 和 长度 来完全定义块。 当我们从对等点请求数据时,我们实际上是在请求 块 。 一个块通常有 16KB 大,这意味着一个 256KB 的块实际上可能需要 16 个请求。
如果对等点收到大于 16KB 的块的请求,则应该切断连接。 但是,根据我的经验,他们通常非常乐意满足高达 128KB 的请求。 我只在更大的块大小上获得了适度的整体速度提升,所以坚持规范可能会更好。
### 流水线
网络往返很昂贵,并且逐个请求每个块绝对会降低我们的下载性能。 因此,对我们的请求进行 管道 传输非常重要,这样我们就可以持续承受一定数量的未完成请求的压力。 这可以将我们连接的吞吐量提高一个数量级。
传统上,BitTorrent 客户端保留五个流水线请求的队列,这就是我将使用的值。 我发现增加它可以使下载速度提高一倍。 较新的客户端使用 自适应 队列大小来更好地适应现代网络速度和条件。 这绝对是一个值得调整的参数,对于未来的性能优化来说,它是非常容易实现的。
// MaxBlockSize is the largest number of bytes a request can ask for const MaxBlockSize = 16384 // MaxBacklog is the number of unfulfilled requests a client can have in its pipeline const MaxBacklog = 5 func attemptDownloadPiece(c *client.Client, pw *pieceWork) ([]byte, error) { state := pieceProgress{ index: pw.index, client: c, buf: make([]byte, pw.length), } // Setting a deadline helps get unresponsive peers unstuck. // 30 seconds is more than enough time to download a 262 KB piece c.Conn.SetDeadline(time.Now().Add(30 * time.Second)) defer c.Conn.SetDeadline(time.Time{}) // Disable the deadline for state.downloaded < pw.length { // If unchoked, send requests until we have enough unfulfilled requests if !state.client.Choked { for state.backlog < MaxBacklog && state.requested < pw.length { blockSize := MaxBlockSize // Last block might be shorter than the typical block if pw.length-state.requested < blockSize { blockSize = pw.length - state.requested } err := c.SendRequest(pw.index, state.requested, blockSize) if err != nil { return nil, err } state.backlog++ state.requested += blockSize } } err := state.readMessage() if err != nil { return nil, err } } return state.buf, nil }
### main.go
这是一个简短的。 我们快到了。
package main import ( "log" "os" "github.com/veggiedefender/torrent-client/torrentfile" ) func main() { inPath := os.Args[1] outPath := os.Args[2] tf, err := torrentfile.Open(inPath) if err != nil { log.Fatal(err) } err = tf.DownloadToFile(outPath) if err != nil { log.Fatal(err) } }
# 这不是完整的故事
为简洁起见,我只包含了一些重要的代码片段。 值得注意的是,我省略了所有的胶水代码、解析、单元测试和构建角色的无聊部分。 如果您有兴趣,请查看我的 完整实现。