302 lines
9.7 KiB
Go
302 lines
9.7 KiB
Go
// Copyright 2023 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package actions
|
|
|
|
import (
|
|
"crypto/md5"
|
|
"crypto/sha256"
|
|
"encoding/base64"
|
|
"encoding/hex"
|
|
"errors"
|
|
"fmt"
|
|
"hash"
|
|
"io"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"code.gitea.io/gitea/models/actions"
|
|
"code.gitea.io/gitea/models/db"
|
|
"code.gitea.io/gitea/modules/log"
|
|
"code.gitea.io/gitea/modules/storage"
|
|
)
|
|
|
|
func saveUploadChunkBase(st storage.ObjectStorage, ctx *ArtifactContext,
|
|
artifact *actions.ActionArtifact,
|
|
contentSize, runID, start, end, length int64, checkMd5 bool,
|
|
) (int64, error) {
|
|
// build chunk store path
|
|
storagePath := fmt.Sprintf("tmp%d/%d-%d-%d-%d.chunk", runID, runID, artifact.ID, start, end)
|
|
var r io.Reader = ctx.Req.Body
|
|
var hasher hash.Hash
|
|
if checkMd5 {
|
|
// use io.TeeReader to avoid reading all body to md5 sum.
|
|
// it writes data to hasher after reading end
|
|
// if hash is not matched, delete the read-end result
|
|
hasher = md5.New()
|
|
r = io.TeeReader(r, hasher)
|
|
}
|
|
// save chunk to storage
|
|
writtenSize, err := st.Save(storagePath, r, contentSize)
|
|
if err != nil {
|
|
return -1, fmt.Errorf("save chunk to storage error: %v", err)
|
|
}
|
|
var checkErr error
|
|
if checkMd5 {
|
|
// check md5
|
|
reqMd5String := ctx.Req.Header.Get(artifactXActionsResultsMD5Header)
|
|
chunkMd5String := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
|
|
log.Info("[artifact] check chunk md5, sum: %s, header: %s", chunkMd5String, reqMd5String)
|
|
// if md5 not match, delete the chunk
|
|
if reqMd5String != chunkMd5String {
|
|
checkErr = fmt.Errorf("md5 not match")
|
|
}
|
|
}
|
|
if writtenSize != contentSize {
|
|
checkErr = errors.Join(checkErr, fmt.Errorf("contentSize not match body size"))
|
|
}
|
|
if checkErr != nil {
|
|
if err := st.Delete(storagePath); err != nil {
|
|
log.Error("Error deleting chunk: %s, %v", storagePath, err)
|
|
}
|
|
return -1, checkErr
|
|
}
|
|
log.Info("[artifact] save chunk %s, size: %d, artifact id: %d, start: %d, end: %d",
|
|
storagePath, contentSize, artifact.ID, start, end)
|
|
// return chunk total size
|
|
return length, nil
|
|
}
|
|
|
|
func saveUploadChunk(st storage.ObjectStorage, ctx *ArtifactContext,
|
|
artifact *actions.ActionArtifact,
|
|
contentSize, runID int64,
|
|
) (int64, error) {
|
|
// parse content-range header, format: bytes 0-1023/146515
|
|
contentRange := ctx.Req.Header.Get("Content-Range")
|
|
start, end, length := int64(0), int64(0), int64(0)
|
|
if _, err := fmt.Sscanf(contentRange, "bytes %d-%d/%d", &start, &end, &length); err != nil {
|
|
log.Warn("parse content range error: %v, content-range: %s", err, contentRange)
|
|
return -1, fmt.Errorf("parse content range error: %v", err)
|
|
}
|
|
return saveUploadChunkBase(st, ctx, artifact, contentSize, runID, start, end, length, true)
|
|
}
|
|
|
|
func appendUploadChunk(st storage.ObjectStorage, ctx *ArtifactContext,
|
|
artifact *actions.ActionArtifact,
|
|
start, contentSize, runID int64,
|
|
) (int64, error) {
|
|
end := start + contentSize - 1
|
|
return saveUploadChunkBase(st, ctx, artifact, contentSize, runID, start, end, contentSize, false)
|
|
}
|
|
|
|
type chunkFileItem struct {
|
|
RunID int64
|
|
ArtifactID int64
|
|
Start int64
|
|
End int64
|
|
Path string
|
|
}
|
|
|
|
func listChunksByRunID(st storage.ObjectStorage, runID int64) (map[int64][]*chunkFileItem, error) {
|
|
storageDir := fmt.Sprintf("tmp%d", runID)
|
|
var chunks []*chunkFileItem
|
|
if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error {
|
|
baseName := filepath.Base(fpath)
|
|
// when read chunks from storage, it only contains storage dir and basename,
|
|
// no matter the subdirectory setting in storage config
|
|
item := chunkFileItem{Path: storageDir + "/" + baseName}
|
|
if _, err := fmt.Sscanf(baseName, "%d-%d-%d-%d.chunk", &item.RunID, &item.ArtifactID, &item.Start, &item.End); err != nil {
|
|
return fmt.Errorf("parse content range error: %v", err)
|
|
}
|
|
chunks = append(chunks, &item)
|
|
return nil
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
// chunks group by artifact id
|
|
chunksMap := make(map[int64][]*chunkFileItem)
|
|
for _, c := range chunks {
|
|
chunksMap[c.ArtifactID] = append(chunksMap[c.ArtifactID], c)
|
|
}
|
|
return chunksMap, nil
|
|
}
|
|
|
|
func listChunksByRunIDV4(st storage.ObjectStorage, runID, artifactID int64, blist *BlockList) ([]*chunkFileItem, error) {
|
|
storageDir := fmt.Sprintf("tmpv4%d", runID)
|
|
var chunks []*chunkFileItem
|
|
chunkMap := map[string]*chunkFileItem{}
|
|
dummy := &chunkFileItem{}
|
|
for _, name := range blist.Latest {
|
|
chunkMap[name] = dummy
|
|
}
|
|
if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error {
|
|
baseName := filepath.Base(fpath)
|
|
if !strings.HasPrefix(baseName, "block-") {
|
|
return nil
|
|
}
|
|
// when read chunks from storage, it only contains storage dir and basename,
|
|
// no matter the subdirectory setting in storage config
|
|
item := chunkFileItem{Path: storageDir + "/" + baseName, ArtifactID: artifactID}
|
|
var size int64
|
|
var b64chunkName string
|
|
if _, err := fmt.Sscanf(baseName, "block-%d-%d-%s", &item.RunID, &size, &b64chunkName); err != nil {
|
|
return fmt.Errorf("parse content range error: %v", err)
|
|
}
|
|
rchunkName, err := base64.URLEncoding.DecodeString(b64chunkName)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to parse chunkName: %v", err)
|
|
}
|
|
chunkName := string(rchunkName)
|
|
item.End = item.Start + size - 1
|
|
if _, ok := chunkMap[chunkName]; ok {
|
|
chunkMap[chunkName] = &item
|
|
}
|
|
return nil
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
for i, name := range blist.Latest {
|
|
chunk, ok := chunkMap[name]
|
|
if !ok || chunk.Path == "" {
|
|
return nil, fmt.Errorf("missing Chunk (%d/%d): %s", i, len(blist.Latest), name)
|
|
}
|
|
chunks = append(chunks, chunk)
|
|
if i > 0 {
|
|
chunk.Start = chunkMap[blist.Latest[i-1]].End + 1
|
|
chunk.End += chunk.Start
|
|
}
|
|
}
|
|
return chunks, nil
|
|
}
|
|
|
|
func mergeChunksForRun(ctx *ArtifactContext, st storage.ObjectStorage, runID int64, artifactName string) error {
|
|
// read all db artifacts by name
|
|
artifacts, err := db.Find[actions.ActionArtifact](ctx, actions.FindArtifactsOptions{
|
|
RunID: runID,
|
|
ArtifactName: artifactName,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// read all uploading chunks from storage
|
|
chunksMap, err := listChunksByRunID(st, runID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// range db artifacts to merge chunks
|
|
for _, art := range artifacts {
|
|
chunks, ok := chunksMap[art.ID]
|
|
if !ok {
|
|
log.Debug("artifact %d chunks not found", art.ID)
|
|
continue
|
|
}
|
|
if err := mergeChunksForArtifact(ctx, chunks, st, art, ""); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func mergeChunksForArtifact(ctx *ArtifactContext, chunks []*chunkFileItem, st storage.ObjectStorage, artifact *actions.ActionArtifact, checksum string) error {
|
|
sort.Slice(chunks, func(i, j int) bool {
|
|
return chunks[i].Start < chunks[j].Start
|
|
})
|
|
allChunks := make([]*chunkFileItem, 0)
|
|
startAt := int64(-1)
|
|
// check if all chunks are uploaded and in order and clean repeated chunks
|
|
for _, c := range chunks {
|
|
// startAt is -1 means this is the first chunk
|
|
// previous c.ChunkEnd + 1 == c.ChunkStart means this chunk is in order
|
|
// StartAt is not -1 and c.ChunkStart is not startAt + 1 means there is a chunk missing
|
|
if c.Start == (startAt + 1) {
|
|
allChunks = append(allChunks, c)
|
|
startAt = c.End
|
|
}
|
|
}
|
|
// if the last chunk.End + 1 is not equal to chunk.ChunkLength, means chunks are not uploaded completely
|
|
if startAt+1 != artifact.FileCompressedSize {
|
|
log.Debug("[artifact] chunks are not uploaded completely, artifact_id: %d", artifact.ID)
|
|
return nil
|
|
}
|
|
// use multiReader
|
|
readers := make([]io.Reader, 0, len(allChunks))
|
|
closeReaders := func() {
|
|
for _, r := range readers {
|
|
_ = r.(io.Closer).Close() // it guarantees to be io.Closer by the following loop's Open function
|
|
}
|
|
readers = nil
|
|
}
|
|
defer closeReaders()
|
|
for _, c := range allChunks {
|
|
var readCloser io.ReadCloser
|
|
var err error
|
|
if readCloser, err = st.Open(c.Path); err != nil {
|
|
return fmt.Errorf("open chunk error: %v, %s", err, c.Path)
|
|
}
|
|
readers = append(readers, readCloser)
|
|
}
|
|
mergedReader := io.MultiReader(readers...)
|
|
shaPrefix := "sha256:"
|
|
var hash hash.Hash
|
|
if strings.HasPrefix(checksum, shaPrefix) {
|
|
hash = sha256.New()
|
|
}
|
|
if hash != nil {
|
|
mergedReader = io.TeeReader(mergedReader, hash)
|
|
}
|
|
|
|
// if chunk is gzip, use gz as extension
|
|
// download-artifact action will use content-encoding header to decide if it should decompress the file
|
|
extension := "chunk"
|
|
if artifact.ContentEncoding == "gzip" {
|
|
extension = "chunk.gz"
|
|
}
|
|
|
|
// save merged file
|
|
storagePath := fmt.Sprintf("%d/%d/%d.%s", artifact.RunID%255, artifact.ID%255, time.Now().UnixNano(), extension)
|
|
written, err := st.Save(storagePath, mergedReader, artifact.FileCompressedSize)
|
|
if err != nil {
|
|
return fmt.Errorf("save merged file error: %v", err)
|
|
}
|
|
if written != artifact.FileCompressedSize {
|
|
return fmt.Errorf("merged file size is not equal to chunk length")
|
|
}
|
|
|
|
defer func() {
|
|
closeReaders() // close before delete
|
|
// drop chunks
|
|
for _, c := range chunks {
|
|
if err := st.Delete(c.Path); err != nil {
|
|
log.Warn("Error deleting chunk: %s, %v", c.Path, err)
|
|
}
|
|
}
|
|
}()
|
|
|
|
if hash != nil {
|
|
rawChecksum := hash.Sum(nil)
|
|
actualChecksum := hex.EncodeToString(rawChecksum)
|
|
if !strings.HasSuffix(checksum, actualChecksum) {
|
|
return fmt.Errorf("update artifact error checksum is invalid %v vs %v", checksum, actualChecksum)
|
|
}
|
|
}
|
|
|
|
// save storage path to artifact
|
|
log.Debug("[artifact] merge chunks to artifact: %d, %s, old:%s", artifact.ID, storagePath, artifact.StoragePath)
|
|
// if artifact is already uploaded, delete the old file
|
|
if artifact.StoragePath != "" {
|
|
if err := st.Delete(artifact.StoragePath); err != nil {
|
|
log.Warn("Error deleting old artifact: %s, %v", artifact.StoragePath, err)
|
|
}
|
|
}
|
|
|
|
artifact.StoragePath = storagePath
|
|
artifact.Status = int64(actions.ArtifactStatusUploadConfirmed)
|
|
if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil {
|
|
return fmt.Errorf("update artifact error: %v", err)
|
|
}
|
|
|
|
return nil
|
|
}
|