Skip to main content

io.Pipe

Understanding the Pipe

If you worked on Linux, you're probably familiar with the pipe operator. The pipe operator, |, connects two programs where the output of one becomes the input of the other.

Running the bellow command on my home directory outputs Music. The output of ls which outputs all my directories was grepped with Mus, and it shows only the Music directory.

With | you can connect as many programs as you want making the pipe a very useful operator.

$ ls | grep Mus 
Music

To come back to Go, you can see the ls as expecting a Writer - a place where to write its data and grep expecting a Reader - a source where it can read data from.

io.Pipe

We can achieve this piping behaviour in Go bo using io.Pipe function:

func Pipe() (*PipeReader, *PipeWriter)

Pipe creates a synchronous in-memory pipe. It can be used to connect code expecting an io.Reader with code expecting an io.Writer.

Synchronizing the Read&Write

Let's try to create our first pipe workflow. I will have 2 functions: one expecting a Writer and another expecting a Reader.

What we try to do is something like this in Linux terms:

$ wantToWrite | wantToRead

We will connect wantToWrite and wantToRead via io.Pipe() function and try to run the program:

package main

import (
"fmt"
"io"
)

func main() {
r, w := io.Pipe()

wantToWrite(w)
wantToRead(r)
}

func wantToWrite(w *io.PipeWriter) {
io.WriteString(w, "Hello Writer!")
}

func wantToRead(r *io.PipeReader) {
data, _ := io.ReadAll(r)
fmt.Println("I just read", string(data))
}

Running the program fails with:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [select]:
io.(*pipe).write(0xc000102120, {0xc00010c000, 0xd, 0x10})
/usr/local/go-faketime/src/io/pipe.go:86 +0x212
io.(*PipeWriter).Write(0x7f95795e9a68?, {0xc00010c000?, 0xd?, 0xc000100020?})
/usr/local/go-faketime/src/io/pipe.go:165 +0x25
io.WriteString({0x4bbc68, 0xc000100020}, {0x49d589, 0xd})
/usr/local/go-faketime/src/io/io.go:315 +0x91
main.wantToWrite(0xc000100020)
/tmp/sandbox861914013/prog.go:16 +0x34
main.main()
/tmp/sandbox861914013/prog.go:11 +0x116

Going back to docs remember that io.Pipe() creates "synchronous in-memory pipe". There is no internal buffer where the data is being copied before being moved into the Writer like we had with io.Copy.

The copying is happening at the same time. When we start writing there must be a reader that starts reading.

To fix this, let's try to start wantToWrite on a separate Goroutine. This means that wantToWrite will run concurrently with wantToRead so a Reader will be ready for a Writer when it starts writing:

package main

import (
"fmt"
"io"
)

func main() {
r, w := io.Pipe()

// start separate goroutine to synchronize `wantToWrite` with `wantToRead`
go wantToWrite(w)
wantToRead(r)
}

func wantToWrite(w *io.PipeWriter) {
io.WriteString(w, "Hello Writer!")
}

func wantToRead(r *io.PipeReader) {
data, _ := io.ReadAll(r)
fmt.Println("I just read", string(data))
}

run in Playground

Running this program give us the same error:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [select]:
io.(*pipe).read(0xc0000ae120, {0xc0000b200d, 0x1f3, 0x203001?})
/usr/local/go-faketime/src/io/pipe.go:57 +0xb1
io.(*PipeReader).Read(0x400000?, {0xc0000b200d?, 0x41e765?, 0x40bc5f?})
/usr/local/go-faketime/src/io/pipe.go:136 +0x25
io.ReadAll({0x4bbcd8, 0xc0000ac018})
/usr/local/go-faketime/src/io/io.go:661 +0xfe
main.wantToRead(0xc00009af70?)
/tmp/sandbox2894506000/prog.go:21 +0x27
main.main()
/tmp/sandbox2894506000/prog.go:13 +0x165

Closing the Write part

We can see that the types we are working here are: io.PipeReader and io.PipeWriter.

Let's read on the Reader part from the docs:

func (r *PipeReader) Read(data []byte) (n int, err error)

Read implements the standard Read interface: it reads data from the pipe, blocking until a writer arrives or the write end is closed. If the write end is closed with an error, that error is returned as err; otherwise err is EOF.

So we already got a hint why our program reached a deadlock: the reader is waiting for something to read from or EOF. We no longer have something to read from after the "Hello Writer!" is written, so we need an EOF. We can do that by closing the writer:

func (w *PipeWriter) Close() error

Close closes the writer; subsequent reads from the read half of the pipe will return no bytes and EOF.

Now we should have a running program:

package main

import (
"fmt"
"io"
)

func main() {
r, w := io.Pipe()

// start separate goroutine to synchronize `wantToWrite` with `wantToRead`
go wantToWrite(w)
wantToRead(r)
}

func wantToWrite(w *io.PipeWriter) {
defer w.Close() // close the writer when it's done writing
io.WriteString(w, "Hello Writer!")
}

func wantToRead(r *io.PipeReader) {
data, _ := io.ReadAll(r)
fmt.Println("I just read", string(data))
}

run in Playground

That's it! You just learn how to connect two pieces of code using an io.Pipe. You learned about the synchronous nature of the pipe and how to avoid reaching a deadlock by running both parts concurrently and making sure you are closing the writing part.