Skip to content

proposal: iter: add Push function #72083

Closed as not planned
Closed as not planned
@ncruces

Description

@ncruces

Proposal Details

This proposal stems from a thread in golang-nuts.

The idea is to add a Push function to package iter (and likely a similar Push2):

package iter

// Push takes a consumer function, and returns a yield and a stop function.
// It arranges for the consumer to be called with a Seq iterator.
// The iterator will return all the values passed to the yield function.
// The iterator will stop when the stop function is called.
func Push[V any](consumer func(seq iter.Seq[V])) (yield func(V) bool, stop func())

This is similar in concept to the Pull and Pull2 functions, except that yield and stop are used to push values into seq.

It is based on a similar proposal by @ianlancetaylor in the thread, which unfortunately would not hide goroutine creation/management (a goal of mine).

Their worry with my counter proposal (this one) is, quoting:

My concern with your alternative is that it may be fragile. It should be possible for the function to pass the iterator to another goroutine, but it's not clear to me what will happen to the coroutines in that case. With for/range the language inherently constrains what can happen with the coroutines - and the code is still really complicated with all kinds of special cases around panics and passing the yield function around.

However, ISTM that it is possible with my proposal (although potentially racy) to pass the iterator to another goroutine, you just need to ensure this goroutine terminates before consumer (perhaps with a sync.WaitGroup). The potential races don't seem worse than those caused by passing next/stop from Pull to other goroutines.

Purpose

The purpose of this function is basically the same as Pull: adapt/plug existing iteration APIs to the "standard" iter.Seq interface.

It came up when trying to use a function with this signature: func processor(seq iter.Seq[float64]) float64 (a function that receives a sequence of floats, and outputs a single float, e.g. the sum/average/etc) to implement this interface (which allows you to implement custom aggregate functions in SQLite:

type AggregateFunction interface {
	// Step is invoked to add a row to the current window.
	// The function arguments, if any, corresponding to the row being added, are passed to Step.
	// Implementations must not retain arg.
	Step(ctx Context, arg ...Value)

	// Value is invoked to return the current (or final) value of the aggregate.
	Value(ctx Context)
}

In the simple case SQLite calls the AggregateFunction.Step once for each row (arg are the columns of the row, a single column is arg[0].Float()), and then Value once to get the aggregate result (you call ctx.ResultFloat(x) to return the result).

It turns out that it's impossible to use a processor function like the above to implement the interface, without:

  • collection all values (potentially millions) in a slice
  • using goroutines and channels to simulate concurrency without parallelism

Proposed semantics and implementation based on channels

func Push[V any](consumer func(seq iter.Seq[V])) (yield func(V) bool, stop func()) {
	var (
		v          V
		done       bool
		panicValue any
		seqDone    bool // to detect Goexit
		swtch      = make(chan struct{})
	)

	go func() {
		// Recover and propagate panics from consumer.
		defer func() {
			if p := recover(); p != nil {
				panicValue = p
			} else if !seqDone {
				panicValue = goexitPanicValue
			}
			done = true
			close(swtch)
		}()

		<-swtch
		consumer(func(yield func(V) bool) {
			for !done {
				if !yield(v) {
					break
				}
				swtch <- struct{}{}
				<-swtch
			}
		})
		seqDone = true
	}()

	yield = func(v1 V) bool {
		v = v1
		// Yield the next value.
		// Panics if stop has been called.
		swtch <- struct{}{}
		<-swtch

		// Propagate panics and goexits from consumer.
		if panicValue != nil {
			if panicValue == goexitPanicValue {
				// Propagate runtime.Goexit from consumer.
				runtime.Goexit()
			} else {
				panic(panicValue)
			}
		}
		return !done
	}

	stop = func() {
		done = true
		// Finish the iteration.
		// Panics if stop has been called.
		swtch <- struct{}{}
		<-swtch

		// Propagate panics and goexits from consumer.
		if panicValue != nil {
			if panicValue == goexitPanicValue {
				// Propagate runtime.Goexit from consumer.
				runtime.Goexit()
			} else {
				panic(panicValue)
			}
		}
	}

	return yield, stop
}

There's a race (and potential panic: send on closed channel) between the close(swtch) and the swtch <- struct{}{} in consumer if consumer starts a goroutine and the seq outlives consumer. Otherwise, there is no parallelism, only concurrency.

The implementation takes liberally from iter.Pull and can translate to runtime.newcoro/coroswitch by replacing the swtch channel. In that case, the panic becomes fatal error: coroswitch on exited coro.

Metadata

Metadata

Assignees

No one assigned

    Labels

    LibraryProposalIssues describing a requested change to the Go standard library or x/ libraries, but not to a toolProposal

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions