Source File
pipe.go
Belonging Package
io
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Pipe adapter to connect code expecting an io.Reader
// with code expecting an io.Writer.
package io
import (
)
// onceError is an object that will only store an error once.
type onceError struct {
sync.Mutex // guards following
err error
}
func ( *onceError) ( error) {
.Lock()
defer .Unlock()
if .err != nil {
return
}
.err =
}
func ( *onceError) () error {
.Lock()
defer .Unlock()
return .err
}
// ErrClosedPipe is the error used for read or write operations on a closed pipe.
var ErrClosedPipe = errors.New("io: read/write on closed pipe")
// A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
type pipe struct {
wrMu sync.Mutex // Serializes Write operations
wrCh chan []byte
rdCh chan int
once sync.Once // Protects closing done
done chan struct{}
rerr onceError
werr onceError
}
func ( *pipe) ( []byte) ( int, error) {
select {
case <-.done:
return 0, .readCloseError()
default:
}
select {
case := <-.wrCh:
:= copy(, )
.rdCh <-
return , nil
case <-.done:
return 0, .readCloseError()
}
}
func ( *pipe) ( error) error {
if == nil {
= ErrClosedPipe
}
.rerr.Store()
.once.Do(func() { close(.done) })
return nil
}
func ( *pipe) ( []byte) ( int, error) {
select {
case <-.done:
return 0, .writeCloseError()
default:
.wrMu.Lock()
defer .wrMu.Unlock()
}
for := true; || len() > 0; = false {
select {
case .wrCh <- :
:= <-.rdCh
= [:]
+=
case <-.done:
return , .writeCloseError()
}
}
return , nil
}
func ( *pipe) ( error) error {
if == nil {
= EOF
}
.werr.Store()
.once.Do(func() { close(.done) })
return nil
}
// readCloseError is considered internal to the pipe type.
func ( *pipe) () error {
:= .rerr.Load()
if := .werr.Load(); == nil && != nil {
return
}
return ErrClosedPipe
}
// writeCloseError is considered internal to the pipe type.
func ( *pipe) () error {
:= .werr.Load()
if := .rerr.Load(); == nil && != nil {
return
}
return ErrClosedPipe
}
// A PipeReader is the read half of a pipe.
type PipeReader struct{ pipe }
// Read implements the standard Read interface:
// it reads data from the pipe, blocking until a writer
// arrives or the write end is closed.
// If the write end is closed with an error, that error is
// returned as err; otherwise err is EOF.
func ( *PipeReader) ( []byte) ( int, error) {
return .pipe.read()
}
// Close closes the reader; subsequent writes to the
// write half of the pipe will return the error [ErrClosedPipe].
func ( *PipeReader) () error {
return .CloseWithError(nil)
}
// CloseWithError closes the reader; subsequent writes
// to the write half of the pipe will return the error err.
//
// CloseWithError never overwrites the previous error if it exists
// and always returns nil.
func ( *PipeReader) ( error) error {
return .pipe.closeRead()
}
// A PipeWriter is the write half of a pipe.
type PipeWriter struct{ r PipeReader }
// Write implements the standard Write interface:
// it writes data to the pipe, blocking until one or more readers
// have consumed all the data or the read end is closed.
// If the read end is closed with an error, that err is
// returned as err; otherwise err is [ErrClosedPipe].
func ( *PipeWriter) ( []byte) ( int, error) {
return .r.pipe.write()
}
// Close closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and EOF.
func ( *PipeWriter) () error {
return .CloseWithError(nil)
}
// CloseWithError closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and the error err,
// or EOF if err is nil.
//
// CloseWithError never overwrites the previous error if it exists
// and always returns nil.
func ( *PipeWriter) ( error) error {
return .r.pipe.closeWrite()
}
// Pipe creates a synchronous in-memory pipe.
// It can be used to connect code expecting an [io.Reader]
// with code expecting an [io.Writer].
//
// Reads and Writes on the pipe are matched one to one
// except when multiple Reads are needed to consume a single Write.
// That is, each Write to the [PipeWriter] blocks until it has satisfied
// one or more Reads from the [PipeReader] that fully consume
// the written data.
// The data is copied directly from the Write to the corresponding
// Read (or Reads); there is no internal buffering.
//
// It is safe to call Read and Write in parallel with each other or with Close.
// Parallel calls to Read and parallel calls to Write are also safe:
// the individual calls will be gated sequentially.
func () (*PipeReader, *PipeWriter) {
:= &PipeWriter{r: PipeReader{pipe: pipe{
wrCh: make(chan []byte),
rdCh: make(chan int),
done: make(chan struct{}),
}}}
return &.r,
}
The pages are generated with Golds v0.7.3. (GOOS=linux GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds. |