io.Pipe
Understanding the Pipe
If you worked on Linux, you're probably familiar with the pipe operator. The pipe operator, |
, connects two programs
where the output of one becomes the input of the other.
Running the bellow command on my home directory outputs Music
. The output of ls
which outputs all my directories
was grepped with Mus, and it shows only the Music directory.
With |
you can connect as many programs as you want making the pipe a very useful operator.
$ ls | grep Mus
Music
To come back to Go, you can see the ls
as expecting a Writer - a place where to write its data and grep
expecting a
Reader -
a source where it can read data from.
io.Pipe
We can achieve this piping behaviour in Go bo using io.Pipe function:
func Pipe() (*PipeReader, *PipeWriter)
Pipe creates a synchronous in-memory pipe. It can be used to connect code expecting an io.Reader with code expecting an io.Writer.
Synchronizing the Read&Write
Let's try to create our first pipe workflow. I will have 2 functions: one expecting a Writer and another expecting a Reader.
What we try to do is something like this in Linux terms:
$ wantToWrite | wantToRead
We will connect wantToWrite
and wantToRead
via io.Pipe() function and try to run the program:
package main
import (
"fmt"
"io"
)
func main() {
r, w := io.Pipe()
wantToWrite(w)
wantToRead(r)
}
func wantToWrite(w *io.PipeWriter) {
io.WriteString(w, "Hello Writer!")
}
func wantToRead(r *io.PipeReader) {
data, _ := io.ReadAll(r)
fmt.Println("I just read", string(data))
}
Running the program fails with:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [select]:
io.(*pipe).write(0xc000102120, {0xc00010c000, 0xd, 0x10})
/usr/local/go-faketime/src/io/pipe.go:86 +0x212
io.(*PipeWriter).Write(0x7f95795e9a68?, {0xc00010c000?, 0xd?, 0xc000100020?})
/usr/local/go-faketime/src/io/pipe.go:165 +0x25
io.WriteString({0x4bbc68, 0xc000100020}, {0x49d589, 0xd})
/usr/local/go-faketime/src/io/io.go:315 +0x91
main.wantToWrite(0xc000100020)
/tmp/sandbox861914013/prog.go:16 +0x34
main.main()
/tmp/sandbox861914013/prog.go:11 +0x116
Going back to docs remember that io.Pipe() creates "synchronous in-memory pipe". There is no internal buffer where the data is being copied before being moved into the Writer like we had with io.Copy.
The copying is happening at the same time. When we start writing there must be a reader that starts reading.
To fix this, let's try to start wantToWrite
on a separate Goroutine. This means that wantToWrite
will run
concurrently
with wantToRead
so a Reader will be ready for a Writer when it starts writing:
package main
import (
"fmt"
"io"
)
func main() {
r, w := io.Pipe()
// start separate goroutine to synchronize `wantToWrite` with `wantToRead`
go wantToWrite(w)
wantToRead(r)
}
func wantToWrite(w *io.PipeWriter) {
io.WriteString(w, "Hello Writer!")
}
func wantToRead(r *io.PipeReader) {
data, _ := io.ReadAll(r)
fmt.Println("I just read", string(data))
}
Running this program give us the same error:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [select]:
io.(*pipe).read(0xc0000ae120, {0xc0000b200d, 0x1f3, 0x203001?})
/usr/local/go-faketime/src/io/pipe.go:57 +0xb1
io.(*PipeReader).Read(0x400000?, {0xc0000b200d?, 0x41e765?, 0x40bc5f?})
/usr/local/go-faketime/src/io/pipe.go:136 +0x25
io.ReadAll({0x4bbcd8, 0xc0000ac018})
/usr/local/go-faketime/src/io/io.go:661 +0xfe
main.wantToRead(0xc00009af70?)
/tmp/sandbox2894506000/prog.go:21 +0x27
main.main()
/tmp/sandbox2894506000/prog.go:13 +0x165
Closing the Write part
We can see that the types we are working here are: io.PipeReader and io.PipeWriter.
Let's read on the Reader part from the docs:
func (r *PipeReader) Read(data []byte) (n int, err error)
Read implements the standard Read interface: it reads data from the pipe, blocking until a writer arrives or the write end is closed. If the write end is closed with an error, that error is returned as err; otherwise err is EOF.
So we already got a hint why our program reached a deadlock: the reader is waiting for something to read from or EOF. We
no longer have something to read from after the "Hello Writer!"
is written, so we need an EOF. We can do that by closing the writer:
func (w *PipeWriter) Close() error
Close
closes the writer; subsequent reads from the read half of the pipe will return no bytes and EOF
.
Now we should have a running program:
package main
import (
"fmt"
"io"
)
func main() {
r, w := io.Pipe()
// start separate goroutine to synchronize `wantToWrite` with `wantToRead`
go wantToWrite(w)
wantToRead(r)
}
func wantToWrite(w *io.PipeWriter) {
defer w.Close() // close the writer when it's done writing
io.WriteString(w, "Hello Writer!")
}
func wantToRead(r *io.PipeReader) {
data, _ := io.ReadAll(r)
fmt.Println("I just read", string(data))
}
That's it! You just learn how to connect two pieces of code using an io.Pipe. You learned about the synchronous nature of the pipe and how to avoid reaching a deadlock by running both parts concurrently and making sure you are closing the writing part.