Source File
splice_linux.go
Belonging Package
internal/poll
// Copyright 2018 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package poll
import (
)
const (
// spliceNonblock doesn't make the splice itself necessarily nonblocking
// (because the actual file descriptors that are spliced from/to may block
// unless they have the O_NONBLOCK flag set), but it makes the splice pipe
// operations nonblocking.
spliceNonblock = 0x2
// maxSpliceSize is the maximum amount of data Splice asks
// the kernel to move in a single call to splice(2).
// We use 1MB as Splice writes data through a pipe, and 1MB is the default maximum pipe buffer size,
// which is determined by /proc/sys/fs/pipe-max-size.
maxSpliceSize = 1 << 20
)
// Splice transfers at most remain bytes of data from src to dst, using the
// splice system call to minimize copies of data from and to userspace.
//
// Splice gets a pipe buffer from the pool or creates a new one if needed, to serve as a buffer for the data transfer.
// src and dst must both be stream-oriented sockets.
func (, *FD, int64) ( int64, bool, error) {
, := getPipe()
if != nil {
return 0, false,
}
defer putPipe()
var , int
for == nil && > 0 {
:= maxSpliceSize
if int64() > {
= int()
}
, = spliceDrain(.wfd, , )
// The operation is considered handled if splice returns no
// error, or an error other than EINVAL. An EINVAL means the
// kernel does not support splice for the socket type of src.
// The failed syscall does not consume any data so it is safe
// to fall back to a generic copy.
//
// spliceDrain should never return EAGAIN, so if err != nil,
// Splice cannot continue.
//
// If inPipe == 0 && err == nil, src is at EOF, and the
// transfer is complete.
= || ( != syscall.EINVAL)
if != nil || == 0 {
break
}
.data +=
, = splicePump(, .rfd, )
if > 0 {
+= int64()
-= int64()
.data -=
}
}
if != nil {
return , ,
}
return , true, nil
}
// spliceDrain moves data from a socket to a pipe.
//
// Invariant: when entering spliceDrain, the pipe is empty. It is either in its
// initial state, or splicePump has emptied it previously.
//
// Given this, spliceDrain can reasonably assume that the pipe is ready for
// writing, so if splice returns EAGAIN, it must be because the socket is not
// ready for reading.
//
// If spliceDrain returns (0, nil), src is at EOF.
func spliceDrain( int, *FD, int) (int, error) {
if := .readLock(); != nil {
return 0,
}
defer .readUnlock()
if := .pd.prepareRead(.isFile); != nil {
return 0,
}
for {
// In theory calling splice(2) with SPLICE_F_NONBLOCK could end up an infinite loop here,
// because it could return EAGAIN ceaselessly when the write end of the pipe is full,
// but this shouldn't be a concern here, since the pipe buffer must be sufficient for
// this data transmission on the basis of the workflow in Splice.
, := splice(, .Sysfd, , spliceNonblock)
if == syscall.EINTR {
continue
}
if != syscall.EAGAIN {
return ,
}
if .pd.pollable() {
if := .pd.waitRead(.isFile); != nil {
return ,
}
}
}
}
// splicePump moves all the buffered data from a pipe to a socket.
//
// Invariant: when entering splicePump, there are exactly inPipe
// bytes of data in the pipe, from a previous call to spliceDrain.
//
// By analogy to the condition from spliceDrain, splicePump
// only needs to poll the socket for readiness, if splice returns
// EAGAIN.
//
// If splicePump cannot move all the data in a single call to
// splice(2), it loops over the buffered data until it has written
// all of it to the socket. This behavior is similar to the Write
// step of an io.Copy in userspace.
func splicePump( *FD, int, int) (int, error) {
if := .writeLock(); != nil {
return 0,
}
defer .writeUnlock()
if := .pd.prepareWrite(.isFile); != nil {
return 0,
}
:= 0
for > 0 {
// In theory calling splice(2) with SPLICE_F_NONBLOCK could end up an infinite loop here,
// because it could return EAGAIN ceaselessly when the read end of the pipe is empty,
// but this shouldn't be a concern here, since the pipe buffer must contain inPipe size of
// data on the basis of the workflow in Splice.
, := splice(.Sysfd, , , spliceNonblock)
if == syscall.EINTR {
continue
}
// Here, the condition n == 0 && err == nil should never be
// observed, since Splice controls the write side of the pipe.
if > 0 {
-=
+=
continue
}
if != syscall.EAGAIN {
return ,
}
if .pd.pollable() {
if := .pd.waitWrite(.isFile); != nil {
return ,
}
}
}
return , nil
}
// splice wraps the splice system call. Since the current implementation
// only uses splice on sockets and pipes, the offset arguments are unused.
// splice returns int instead of int64, because callers never ask it to
// move more data in a single call than can fit in an int32.
func splice( int, int, int, int) (int, error) {
, := syscall.Splice(, nil, , nil, , )
return int(),
}
type splicePipeFields struct {
rfd int
wfd int
data int
}
type splicePipe struct {
splicePipeFields
// We want to use a finalizer, so ensure that the size is
// large enough to not use the tiny allocator.
_ [24 - unsafe.Sizeof(splicePipeFields{})%24]byte
}
// splicePipePool caches pipes to avoid high-frequency construction and destruction of pipe buffers.
// The garbage collector will free all pipes in the sync.Pool periodically, thus we need to set up
// a finalizer for each pipe to close its file descriptors before the actual GC.
var splicePipePool = sync.Pool{New: newPoolPipe}
func newPoolPipe() any {
// Discard the error which occurred during the creation of pipe buffer,
// redirecting the data transmission to the conventional way utilizing read() + write() as a fallback.
:= newPipe()
if == nil {
return nil
}
runtime.SetFinalizer(, destroyPipe)
return
}
// getPipe tries to acquire a pipe buffer from the pool or create a new one with newPipe() if it gets nil from the cache.
func getPipe() (*splicePipe, error) {
:= splicePipePool.Get()
if == nil {
return nil, syscall.EINVAL
}
return .(*splicePipe), nil
}
func putPipe( *splicePipe) {
// If there is still data left in the pipe,
// then close and discard it instead of putting it back into the pool.
if .data != 0 {
runtime.SetFinalizer(, nil)
destroyPipe()
return
}
splicePipePool.Put()
}
// newPipe sets up a pipe for a splice operation.
func newPipe() *splicePipe {
var [2]int
if := syscall.Pipe2([:], syscall.O_CLOEXEC|syscall.O_NONBLOCK); != nil {
return nil
}
// Splice will loop writing maxSpliceSize bytes from the source to the pipe,
// and then write those bytes from the pipe to the destination.
// Set the pipe buffer size to maxSpliceSize to optimize that.
// Ignore errors here, as a smaller buffer size will work,
// although it will require more system calls.
unix.Fcntl([0], syscall.F_SETPIPE_SZ, maxSpliceSize)
return &splicePipe{splicePipeFields: splicePipeFields{rfd: [0], wfd: [1]}}
}
// destroyPipe destroys a pipe.
func destroyPipe( *splicePipe) {
CloseFunc(.rfd)
CloseFunc(.wfd)
}
The pages are generated with Golds v0.7.3-preview. (GOOS=linux GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds. |