// Copyright 2009 The Go Authors. All rights reserved.// Use of this source code is governed by a BSD-style// license that can be found in the LICENSE file.package rpcimport ()// ServerError represents an error that has been returned from// the remote side of the RPC connection.typeServerErrorstringfunc ( ServerError) () string {returnstring()}varErrShutdown = errors.New("connection is shut down")// Call represents an active RPC.typeCallstruct { ServiceMethod string// The name of the service and method to call. Args any// The argument to the function (*struct). Reply any// The reply from the function (*struct). Error error// After completion, the error status. Done chan *Call// Receives *Call when Go is complete.}// Client represents an RPC Client.// There may be multiple outstanding Calls associated// with a single Client, and a Client may be used by// multiple goroutines simultaneously.typeClientstruct { codec ClientCodec reqMutex sync.Mutex// protects following request Request mutex sync.Mutex// protects following seq uint64 pending map[uint64]*Call closing bool// user has called Close shutdown bool// server has told us to stop}// A ClientCodec implements writing of RPC requests and// reading of RPC responses for the client side of an RPC session.// The client calls [ClientCodec.WriteRequest] to write a request to the connection// and calls [ClientCodec.ReadResponseHeader] and [ClientCodec.ReadResponseBody] in pairs// to read responses. The client calls [ClientCodec.Close] when finished with the// connection. ReadResponseBody may be called with a nil// argument to force the body of the response to be read and then// discarded.// See [NewClient]'s comment for information about concurrent access.typeClientCodecinterface {WriteRequest(*Request, any) errorReadResponseHeader(*Response) errorReadResponseBody(any) errorClose() error}func ( *Client) ( *Call) { .reqMutex.Lock()defer .reqMutex.Unlock()// Register this call. .mutex.Lock()if .shutdown || .closing { .mutex.Unlock() .Error = ErrShutdown .done()return } := .seq .seq++ .pending[] = .mutex.Unlock()// Encode and send the request. .request.Seq = .request.ServiceMethod = .ServiceMethod := .codec.WriteRequest(&.request, .Args)if != nil { .mutex.Lock() = .pending[]delete(.pending, ) .mutex.Unlock()if != nil { .Error = .done() } }}func ( *Client) () {varerrorvarResponsefor == nil { = Response{} = .codec.ReadResponseHeader(&)if != nil {break } := .Seq .mutex.Lock() := .pending[]delete(.pending, ) .mutex.Unlock()switch {case == nil:// We've got no pending call. That usually means that // WriteRequest partially failed, and call was already // removed; response is a server telling us about an // error reading request body. We should still attempt // to read error body, but there's no one to give it to. = .codec.ReadResponseBody(nil)if != nil { = errors.New("reading error body: " + .Error()) }case .Error != "":// We've got an error response. Give this to the request; // any subsequent requests will get the ReadResponseBody // error if there is one. .Error = ServerError(.Error) = .codec.ReadResponseBody(nil)if != nil { = errors.New("reading error body: " + .Error()) } .done()default: = .codec.ReadResponseBody(.Reply)if != nil { .Error = errors.New("reading body " + .Error()) } .done() } }// Terminate pending calls. .reqMutex.Lock() .mutex.Lock() .shutdown = true := .closingif == io.EOF {if { = ErrShutdown } else { = io.ErrUnexpectedEOF } }for , := range .pending { .Error = .done() } .mutex.Unlock() .reqMutex.Unlock()ifdebugLog && != io.EOF && ! {log.Println("rpc: client protocol error:", ) }}func ( *Call) () {select {case .Done<- :// okdefault:// We don't want to block here. It is the caller's responsibility to make // sure the channel has enough buffer space. See comment in Go().ifdebugLog {log.Println("rpc: discarding Call reply due to insufficient Done chan capacity") } }}// NewClient returns a new [Client] to handle requests to the// set of services at the other end of the connection.// It adds a buffer to the write side of the connection so// the header and payload are sent as a unit.//// The read and write halves of the connection are serialized independently,// so no interlocking is required. However each half may be accessed// concurrently so the implementation of conn should protect against// concurrent reads or concurrent writes.func ( io.ReadWriteCloser) *Client { := bufio.NewWriter() := &gobClientCodec{, gob.NewDecoder(), gob.NewEncoder(), }returnNewClientWithCodec()}// NewClientWithCodec is like [NewClient] but uses the specified// codec to encode requests and decode responses.func ( ClientCodec) *Client { := &Client{codec: ,pending: make(map[uint64]*Call), }go .input()return}type gobClientCodec struct { rwc io.ReadWriteCloser dec *gob.Decoder enc *gob.Encoder encBuf *bufio.Writer}func ( *gobClientCodec) ( *Request, any) ( error) {if = .enc.Encode(); != nil {return }if = .enc.Encode(); != nil {return }return .encBuf.Flush()}func ( *gobClientCodec) ( *Response) error {return .dec.Decode()}func ( *gobClientCodec) ( any) error {return .dec.Decode()}func ( *gobClientCodec) () error {return .rwc.Close()}// DialHTTP connects to an HTTP RPC server at the specified network address// listening on the default HTTP RPC path.func (, string) (*Client, error) {returnDialHTTPPath(, , DefaultRPCPath)}// DialHTTPPath connects to an HTTP RPC server// at the specified network address and path.func (, , string) (*Client, error) { , := net.Dial(, )if != nil {returnnil, }io.WriteString(, "CONNECT "++" HTTP/1.0\n\n")// Require successful HTTP response // before switching to RPC protocol. , := http.ReadResponse(bufio.NewReader(), &http.Request{Method: "CONNECT"})if == nil && .Status == connected {returnNewClient(), nil }if == nil { = errors.New("unexpected HTTP response: " + .Status) } .Close()returnnil, &net.OpError{Op: "dial-http",Net: + " " + ,Addr: nil,Err: , }}// Dial connects to an RPC server at the specified network address.func (, string) (*Client, error) { , := net.Dial(, )if != nil {returnnil, }returnNewClient(), nil}// Close calls the underlying codec's Close method. If the connection is already// shutting down, [ErrShutdown] is returned.func ( *Client) () error { .mutex.Lock()if .closing { .mutex.Unlock()returnErrShutdown } .closing = true .mutex.Unlock()return .codec.Close()}// Go invokes the function asynchronously. It returns the [Call] structure representing// the invocation. The done channel will signal when the call is complete by returning// the same Call object. If done is nil, Go will allocate a new channel.// If non-nil, done must be buffered or Go will deliberately crash.func ( *Client) ( string, any, any, chan *Call) *Call { := new(Call) .ServiceMethod = .Args = .Reply = if == nil { = make(chan *Call, 10) // buffered. } else {// If caller passes done != nil, it must arrange that // done has enough buffer for the number of simultaneous // RPCs that will be using that channel. If the channel // is totally unbuffered, it's best not to run at all.ifcap() == 0 {log.Panic("rpc: done channel is unbuffered") } } .Done = .send()return}// Call invokes the named function, waits for it to complete, and returns its error status.func ( *Client) ( string, any, any) error { := <-.Go(, , , make(chan *Call, 1)).Donereturn .Error}
The pages are generated with Goldsv0.7.3. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.