package runners

import (
	"context"
	"fmt"
	"os"
	"runtime"
	"sync"

	"codeberg.org/gruf/go-errors/v2"
)

// WorkerFunc represents a function processable by a worker in WorkerPool. Note
// that implementations absolutely MUST check whether passed context is <-ctx.Done()
// otherwise stopping the pool may block indefinitely.
type WorkerFunc func(context.Context)

// WorkerPool provides a means of enqueuing asynchronous work.
type WorkerPool struct {
	fns chan WorkerFunc
	svc Service
}

// Start will start the main WorkerPool management loop in a new goroutine, along
// with requested number of child worker goroutines. Returns false if already running.
func (pool *WorkerPool) Start(workers int, queue int) bool {
	// Attempt to start the svc
	ctx, ok := pool.svc.doStart()
	if !ok {
		return false
	}

	if workers <= 0 {
		// Use $GOMAXPROCS as default.
		workers = runtime.GOMAXPROCS(0)
	}

	if queue < 0 {
		// Use reasonable queue default.
		queue = workers * 10
	}

	// Allocate pool queue of given size.
	//
	// This MUST be set BEFORE we return and NOT in
	// the launched goroutine, or there is a risk that
	// the pool may appear as closed for a short time
	// until the main goroutine has been entered.
	fns := make(chan WorkerFunc, queue)
	pool.fns = fns

	go func() {
		defer func() {
			// unlock single wait
			pool.svc.wait.Unlock()

			// ensure stopped
			pool.svc.Stop()
		}()

		var wait sync.WaitGroup

		// Start goroutine worker functions
		for i := 0; i < workers; i++ {
			wait.Add(1)

			go func() {
				defer wait.Done()

				// Run worker function (retry on panic)
				for !worker_run(CancelCtx(ctx), fns) {
				}
			}()
		}

		// Wait on ctx
		<-ctx

		// Drain function queue.
		//
		// All functions in the queue MUST be
		// run, so we pass them a closed context.
		//
		// This mainly allows us to block until
		// the function queue is empty, as worker
		// functions will also continue draining in
		// the background with the (now) closed ctx.
		for !drain_queue(fns) {
			// retry on panic
		}

		// Now the queue is empty, we can
		// safely close the channel signalling
		// all of the workers to return.
		close(fns)
		wait.Wait()
	}()

	return true
}

// Stop will stop the WorkerPool management loop, blocking until stopped.
func (pool *WorkerPool) Stop() bool {
	return pool.svc.Stop()
}

// Running returns if WorkerPool management loop is running (i.e. NOT stopped / stopping).
func (pool *WorkerPool) Running() bool {
	return pool.svc.Running()
}

// Done returns a channel that's closed when WorkerPool.Stop() is called. It is the same channel provided to the currently running worker functions.
func (pool *WorkerPool) Done() <-chan struct{} {
	return pool.svc.Done()
}

// Enqueue will add provided WorkerFunc to the queue to be performed when there is a free worker.
// This will block until function is queued or pool is stopped. In all cases, the WorkerFunc will be
// executed, with the state of the pool being indicated by <-ctx.Done() of the passed ctx.
// WorkerFuncs MUST respect the passed context.
func (pool *WorkerPool) Enqueue(fn WorkerFunc) {
	// Check valid fn
	if fn == nil {
		return
	}

	select {
	// Pool ctx cancelled
	case <-pool.svc.Done():
		fn(closedctx)

	// Placed fn in queue
	case pool.fns <- fn:
	}
}

// EnqueueCtx is functionally identical to WorkerPool.Enqueue() but returns early in the
// case that caller provided <-ctx.Done() is closed, WITHOUT running the WorkerFunc.
func (pool *WorkerPool) EnqueueCtx(ctx context.Context, fn WorkerFunc) bool {
	// Check valid fn
	if fn == nil {
		return false
	}

	select {
	// Caller ctx cancelled
	case <-ctx.Done():
		return false

	// Pool ctx cancelled
	case <-pool.svc.Done():
		return false

	// Placed fn in queue
	case pool.fns <- fn:
		return true
	}
}

// MustEnqueueCtx functionally performs similarly to WorkerPool.EnqueueCtx(), but in the case
// that the provided <-ctx.Done() is closed, it is passed asynchronously to WorkerPool.Enqueue().
// Return boolean indicates whether function was executed in time before <-ctx.Done() is closed.
func (pool *WorkerPool) MustEnqueueCtx(ctx context.Context, fn WorkerFunc) (ok bool) {
	// Check valid fn
	if fn == nil {
		return false
	}

	select {
	case <-ctx.Done():
		// We failed to add this entry to the worker queue before the
		// incoming context was cancelled. So to ensure processing
		// we simply queue it asynchronously and return early to caller.
		go pool.Enqueue(fn)
		return false

	case <-pool.svc.Done():
		// Pool ctx cancelled
		fn(closedctx)
		return false

	case pool.fns <- fn:
		// Placed fn in queue
		return true
	}
}

// EnqueueNow attempts Enqueue but returns false if not executed.
func (pool *WorkerPool) EnqueueNow(fn WorkerFunc) bool {
	// Check valid fn
	if fn == nil {
		return false
	}

	select {
	// Pool ctx cancelled
	case <-pool.svc.Done():
		return false

	// Placed fn in queue
	case pool.fns <- fn:
		return true

	// Queue is full
	default:
		return false
	}
}

// Queue returns the number of currently queued WorkerFuncs.
func (pool *WorkerPool) Queue() int {
	var l int
	pool.svc.While(func() {
		l = len(pool.fns)
	})
	return l
}

// worker_run is the main worker routine, accepting functions from 'fns' until it is closed.
func worker_run(ctx context.Context, fns <-chan WorkerFunc) bool {
	defer func() {
		// Recover and drop any panic
		if r := recover(); r != nil {

			// Gather calling func frames.
			pcs := make([]uintptr, 10)
			n := runtime.Callers(3, pcs)
			i := runtime.CallersFrames(pcs[:n])
			c := gatherFrames(i, n)

			const msg = "worker_run: recovered panic: %v\n\n%s\n"
			fmt.Fprintf(os.Stderr, msg, r, c.String())
		}
	}()

	for {
		// Wait on next func
		fn, ok := <-fns
		if !ok {
			return true
		}

		// Run with ctx
		fn(ctx)
	}
}

// drain_queue will drain and run all functions in worker queue, passing in a closed context.
func drain_queue(fns <-chan WorkerFunc) bool {
	defer func() {
		// Recover and drop any panic
		if r := recover(); r != nil {

			// Gather calling func frames.
			pcs := make([]uintptr, 10)
			n := runtime.Callers(3, pcs)
			i := runtime.CallersFrames(pcs[:n])
			c := gatherFrames(i, n)

			const msg = "worker_run: recovered panic: %v\n\n%s\n"
			fmt.Fprintf(os.Stderr, msg, r, c.String())
		}
	}()

	for {
		select {
		// Run with closed ctx
		case fn := <-fns:
			fn(closedctx)

		// Queue is empty
		default:
			return true
		}
	}
}

// gatherFrames collates runtime frames from a frame iterator.
func gatherFrames(iter *runtime.Frames, n int) errors.Callers {
	if iter == nil {
		return nil
	}
	frames := make([]runtime.Frame, 0, n)
	for {
		f, ok := iter.Next()
		if !ok {
			break
		}
		frames = append(frames, f)
	}
	return frames
}