/*
 * MinIO Go Library for Amazon S3 Compatible Cloud Storage
 * Copyright 2021 MinIO, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package minio

import (
	"archive/tar"
	"bufio"
	"bytes"
	"context"
	"fmt"
	"io"
	"net/http"
	"os"
	"strings"
	"sync"
	"time"

	"github.com/klauspost/compress/s2"
)

// SnowballOptions contains options for PutObjectsSnowball calls.
type SnowballOptions struct {
	// Opts is options applied to all objects.
	Opts PutObjectOptions

	// Processing options:

	// InMemory specifies that all objects should be collected in memory
	// before they are uploaded.
	// If false a temporary file will be created.
	InMemory bool

	// Compress enabled content compression before upload.
	// Compression will typically reduce memory and network usage,
	// Compression can safely be enabled with MinIO hosts.
	Compress bool

	// SkipErrs if enabled will skip any errors while reading the
	// object content while creating the snowball archive
	SkipErrs bool
}

// SnowballObject contains information about a single object to be added to the snowball.
type SnowballObject struct {
	// Key is the destination key, including prefix.
	Key string

	// Size is the content size of this object.
	Size int64

	// Modtime to apply to the object.
	// If Modtime is the zero value current time will be used.
	ModTime time.Time

	// Content of the object.
	// Exactly 'Size' number of bytes must be provided.
	Content io.Reader

	// VersionID of the object; if empty, a new versionID will be generated
	VersionID string

	// Headers contains more options for this object upload, the same as you
	// would include in a regular PutObject operation, such as user metadata
	// and content-disposition, expires, ..
	Headers http.Header

	// Close will be called when an object has finished processing.
	// Note that if PutObjectsSnowball returns because of an error,
	// objects not consumed from the input will NOT have been closed.
	// Leave as nil for no callback.
	Close func()
}

type nopReadSeekCloser struct {
	io.ReadSeeker
}

func (n nopReadSeekCloser) Close() error {
	return nil
}

// This is available as io.ReadSeekCloser from go1.16
type readSeekCloser interface {
	io.Reader
	io.Closer
	io.Seeker
}

// PutObjectsSnowball will put multiple objects with a single put call.
// A (compressed) TAR file will be created which will contain multiple objects.
// The key for each object will be used for the destination in the specified bucket.
// Total size should be < 5TB.
// This function blocks until 'objs' is closed and the content has been uploaded.
func (c Client) PutObjectsSnowball(ctx context.Context, bucketName string, opts SnowballOptions, objs <-chan SnowballObject) (err error) {
	err = opts.Opts.validate(&c)
	if err != nil {
		return err
	}
	var tmpWriter io.Writer
	var getTmpReader func() (rc readSeekCloser, sz int64, err error)
	if opts.InMemory {
		b := bytes.NewBuffer(nil)
		tmpWriter = b
		getTmpReader = func() (readSeekCloser, int64, error) {
			return nopReadSeekCloser{bytes.NewReader(b.Bytes())}, int64(b.Len()), nil
		}
	} else {
		f, err := os.CreateTemp("", "s3-putsnowballobjects-*")
		if err != nil {
			return err
		}
		name := f.Name()
		tmpWriter = f
		var once sync.Once
		defer once.Do(func() {
			f.Close()
		})
		defer os.Remove(name)
		getTmpReader = func() (readSeekCloser, int64, error) {
			once.Do(func() {
				f.Close()
			})
			f, err := os.Open(name)
			if err != nil {
				return nil, 0, err
			}
			st, err := f.Stat()
			if err != nil {
				return nil, 0, err
			}
			return f, st.Size(), nil
		}
	}
	flush := func() error { return nil }
	if !opts.Compress {
		if !opts.InMemory {
			// Insert buffer for writes.
			buf := bufio.NewWriterSize(tmpWriter, 1<<20)
			flush = buf.Flush
			tmpWriter = buf
		}
	} else {
		s2c := s2.NewWriter(tmpWriter, s2.WriterBetterCompression())
		flush = s2c.Close
		defer s2c.Close()
		tmpWriter = s2c
	}
	t := tar.NewWriter(tmpWriter)

objectLoop:
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case obj, ok := <-objs:
			if !ok {
				break objectLoop
			}

			closeObj := func() {}
			if obj.Close != nil {
				closeObj = obj.Close
			}

			// Trim accidental slash prefix.
			obj.Key = strings.TrimPrefix(obj.Key, "/")
			header := tar.Header{
				Typeflag: tar.TypeReg,
				Name:     obj.Key,
				Size:     obj.Size,
				ModTime:  obj.ModTime,
				Format:   tar.FormatPAX,
			}
			if header.ModTime.IsZero() {
				header.ModTime = time.Now().UTC()
			}

			header.PAXRecords = make(map[string]string)
			if obj.VersionID != "" {
				header.PAXRecords["minio.versionId"] = obj.VersionID
			}
			for k, vals := range obj.Headers {
				header.PAXRecords["minio.metadata."+k] = strings.Join(vals, ",")
			}

			if err := t.WriteHeader(&header); err != nil {
				closeObj()
				return err
			}
			n, err := io.Copy(t, obj.Content)
			if err != nil {
				closeObj()
				if opts.SkipErrs {
					continue
				}
				return err
			}
			if n != obj.Size {
				closeObj()
				if opts.SkipErrs {
					continue
				}
				return io.ErrUnexpectedEOF
			}
			closeObj()
		}
	}
	// Flush tar
	err = t.Flush()
	if err != nil {
		return err
	}
	// Flush compression
	err = flush()
	if err != nil {
		return err
	}
	if opts.Opts.UserMetadata == nil {
		opts.Opts.UserMetadata = map[string]string{}
	}
	opts.Opts.UserMetadata["X-Amz-Meta-Snowball-Auto-Extract"] = "true"
	opts.Opts.DisableMultipart = true
	rc, sz, err := getTmpReader()
	if err != nil {
		return err
	}
	defer rc.Close()
	rand := c.random.Uint64()
	_, err = c.PutObject(ctx, bucketName, fmt.Sprintf("snowball-upload-%x.tar", rand), rc, sz, opts.Opts)
	return err
}