Code Monkey home page Code Monkey logo

Comments (7)

maorfr avatar maorfr commented on August 25, 2024

what is the output of git rev-parse HEAD?
should be 2a81cc4316ffbe6ef3d24b5dcc197091cdb86bec

from skbn.

alexbarta avatar alexbarta commented on August 25, 2024

yes same hash

[root@master-node0 skbn]# pwd
/home/cloud-user/gopath/src/github.com/alexbarta/skbn
[root@master-node0 skbn]# git rev-parse HEAD
2a81cc4316ffbe6ef3d24b5dcc197091cdb86bec

from skbn.

maorfr avatar maorfr commented on August 25, 2024

can you show me cmd.test, cmd/skbn.go and pkg/skbn/skbn.go?

from skbn.

alexbarta avatar alexbarta commented on August 25, 2024

cmd.test not found

cmd/skbn.go

[root@master-node0 skbn]# cat cmd/skbn.go
package main

import (
        "fmt"
        "io"
        "log"
        "os"

        "github.com/nuvo/skbn/pkg/skbn"

        "github.com/spf13/cobra"
)

func main() {
        cmd := NewRootCmd(os.Args[1:])
        if err := cmd.Execute(); err != nil {
                log.Fatal("Failed to execute command")
        }
}

// NewRootCmd represents the base command when called without any subcommands
func NewRootCmd(args []string) *cobra.Command {
        cmd := &cobra.Command{
                Use:   "skbn",
                Short: "",
                Long:  ``,
        }

        out := cmd.OutOrStdout()

        cmd.AddCommand(NewCpCmd(out))
        cmd.AddCommand(NewVersionCmd(out))

        return cmd
}

type cpCmd struct {
        src        string
        dst        string
        parallel   int
        bufferSize float64

        out io.Writer
}

// NewCpCmd represents the copy command
func NewCpCmd(out io.Writer) *cobra.Command {
        c := &cpCmd{out: out}

        cmd := &cobra.Command{
                Use:   "cp",
                Short: "Copy files or directories Kubernetes and Cloud storage",
                Long:  ``,
                Run: func(cmd *cobra.Command, args []string) {
                        if err := skbn.Copy(c.src, c.dst, c.parallel, c.bufferSize); err != nil {
                                log.Fatal(err)
                        }
                },
        }
        f := cmd.Flags()

        f.StringVar(&c.src, "src", "", "path to copy from. Example: k8s://<namespace>/<podName>/<containerName>/path/to/copyfrom")
        f.StringVar(&c.dst, "dst", "", "path to copy to. Example: s3://<bucketName>/path/to/copyto")
        f.IntVarP(&c.parallel, "parallel", "p", 1, "number of files to copy in parallel. set this flag to 0 for full parallelism")
        f.Float64VarP(&c.bufferSize, "buffer-size", "b", 6.75, "in memory buffer size (MB) to use for files copy (buffer per file)")

        cmd.MarkFlagRequired("src")
        cmd.MarkFlagRequired("dst")

        return cmd
}

var (
        // GitTag stands for a git tag
        GitTag string
        // GitCommit stands for a git commit hash
        GitCommit string
)

// NewVersionCmd prints version information
func NewVersionCmd(out io.Writer) *cobra.Command {
        cmd := &cobra.Command{
                Use:   "version",
                Short: "Print version information",
                Long:  ``,
                Run: func(cmd *cobra.Command, args []string) {
                        fmt.Printf("Version %s (git-%s)\n", GitTag, GitCommit)
                },
        }

        return cmd
}

pkg/skbn/skbn.go

[root@master-node0 skbn]# cat pkg/skbn/skbn.go
package skbn

import (
        "context"
        "fmt"
        "io"
        "log"
        "math"
        "path/filepath"

        "github.com/nuvo/skbn/pkg/utils"

        "github.com/djherbis/buffer"
        "gopkg.in/djherbis/nio.v2"
)

// FromToPair is a pair of FromPath and ToPath
type FromToPair struct {
        FromPath string
        ToPath   string
}

// Copy copies files from src to dst
func Copy(src, dst string, parallel int, bufferSize float64) error {
        srcPrefix, srcPath := utils.SplitInTwo(src, "://")
        dstPrefix, dstPath := utils.SplitInTwo(dst, "://")

        err := TestImplementationsExist(srcPrefix, dstPrefix)
        if err != nil {
                return err
        }
        srcClient, dstClient, err := GetClients(srcPrefix, dstPrefix, srcPath, dstPath)
        if err != nil {
                return err
        }
        fromToPaths, err := GetFromToPaths(srcClient, srcPrefix, srcPath, dstPath)
        if err != nil {
                return err
        }
        err = PerformCopy(srcClient, dstClient, srcPrefix, dstPrefix, fromToPaths, parallel, bufferSize)
        if err != nil {
                return err
        }

        return nil
}

// TestImplementationsExist checks that implementations exist for the desired action
func TestImplementationsExist(srcPrefix, dstPrefix string) error {
        switch srcPrefix {
        case "k8s":
        case "s3":
        case "abs":
        default:
                return fmt.Errorf(srcPrefix + " not implemented")
        }

        switch dstPrefix {
        case "k8s":
        case "s3":
        case "abs":
        default:
                return fmt.Errorf(dstPrefix + " not implemented")
        }

        return nil
}

// GetClients gets the clients for the source and destination
func GetClients(srcPrefix, dstPrefix, srcPath, dstPath string) (interface{}, interface{}, error) {
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()

        srcClient, tested, err := initClient(ctx, nil, srcPrefix, srcPath, "")
        if err != nil {
                return nil, nil, err
        }
        dstClient, _, err := initClient(ctx, srcClient, dstPrefix, dstPath, tested)
        if err != nil {
                return nil, nil, err
        }

        return srcClient, dstClient, nil
}

// GetFromToPaths gets from and to paths to perform the copy on
func GetFromToPaths(srcClient interface{}, srcPrefix, srcPath, dstPath string) ([]FromToPair, error) {
        relativePaths, err := GetListOfFiles(srcClient, srcPrefix, srcPath)
        if err != nil {
                return nil, err
        }

        var fromToPaths []FromToPair
        for _, relativePath := range relativePaths {
                fromPath := filepath.Join(srcPath, relativePath)
                toPath := filepath.Join(dstPath, relativePath)
                fromToPaths = append(fromToPaths, FromToPair{FromPath: fromPath, ToPath: toPath})
        }

        return fromToPaths, nil
}

// PerformCopy performs the actual copy action
func PerformCopy(srcClient, dstClient interface{}, srcPrefix, dstPrefix string, fromToPaths []FromToPair, parallel int, bufferSize float64) error {

        // Execute in parallel
        totalFiles := len(fromToPaths)
        if parallel == 0 {
                parallel = totalFiles
        }
        bwgSize := int(math.Min(float64(parallel), float64(totalFiles))) // Very stingy :)
        bwg := utils.NewBoundedWaitGroup(bwgSize)
        errc := make(chan error, 1)
        currentLine := 0
        for _, ftp := range fromToPaths {

                if len(errc) != 0 {
                        break
                }

                bwg.Add(1)
                currentLine++

                totalDigits := utils.CountDigits(totalFiles)
                currentLinePadded := utils.LeftPad2Len(currentLine, 0, totalDigits)

                go func(srcClient, dstClient interface{}, srcPrefix, fromPath, dstPrefix, toPath, currentLinePadded string, totalFiles int) {

                        if len(errc) != 0 {
                                return
                        }

                        newBufferSize := (int64)(bufferSize * 1024 * 1024) // may not be super accurate
                        buf := buffer.New(newBufferSize)
                        pr, pw := nio.Pipe(buf)

                        log.Printf("[%s/%d] copy: %s://%s -> %s://%s", currentLinePadded, totalFiles, srcPrefix, fromPath, dstPrefix, toPath)

                        go func() {
                                defer pw.Close()
                                if len(errc) != 0 {
                                        return
                                }
                                err := Download(srcClient, srcPrefix, fromPath, pw)
                                if err != nil {
                                        log.Println(err, fmt.Sprintf(" src: file: %s", fromPath))
                                        errc <- err
                                }
                        }()

                        go func() {
                                defer pr.Close()
                                defer bwg.Done()
                                if len(errc) != 0 {
                                        return
                                }
                                defer log.Printf("[%s/%d] done: %s -> %s", currentLinePadded, totalFiles, fromPath, toPath)
                                err := Upload(dstClient, dstPrefix, toPath, fromPath, pr)
                                if err != nil {
                                        log.Println(err, fmt.Sprintf(" dst: file: %s", toPath))
                                        errc <- err
                                }
                        }()
                }(srcClient, dstClient, srcPrefix, ftp.FromPath, dstPrefix, ftp.ToPath, currentLinePadded, totalFiles)
        }
        bwg.Wait()
        if len(errc) != 0 {
                // This is not exactly the correct behavior
                // There may be more than 1 error in the channel
                // But first let's make it work
                err := <-errc
                close(errc)
                if err != nil {
                        return err
                }
        }
        return nil
}

// GetListOfFiles gets relative paths from the provided path
func GetListOfFiles(client interface{}, prefix, path string) ([]string, error) {
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()

        var relativePaths []string

        switch prefix {
        case "k8s":
                paths, err := GetListOfFilesFromK8s(client, path, "f", "*")
                if err != nil {
                        return nil, err
                }
                relativePaths = paths
        case "s3":
                paths, err := GetListOfFilesFromS3(client, path)
                if err != nil {
                        return nil, err
                }
                relativePaths = paths
        case "abs":
                paths, err := GetListOfFilesFromAbs(ctx, client, path)
                if err != nil {
                        return nil, err
                }
                relativePaths = paths
        default:
                return nil, fmt.Errorf(prefix + " not implemented")
        }

        return relativePaths, nil
}

// Download downloads a single file from path into an io.Writer
func Download(srcClient interface{}, srcPrefix, srcPath string, writer io.Writer) error {
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()

        switch srcPrefix {
        case "k8s":
                err := DownloadFromK8s(srcClient, srcPath, writer)
                if err != nil {
                        return err
                }
        case "s3":
                err := DownloadFromS3(srcClient, srcPath, writer)
                if err != nil {
                        return err
                }
        case "abs":
                err := DownloadFromAbs(ctx, srcClient, srcPath, writer)
                if err != nil {
                        return err
                }
        default:
                return fmt.Errorf(srcPrefix + " not implemented")
        }

        return nil
}

// Upload uploads a single file provided as an io.Reader array to path
func Upload(dstClient interface{}, dstPrefix, dstPath, srcPath string, reader io.Reader) error {
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()

        switch dstPrefix {
        case "k8s":
                err := UploadToK8s(dstClient, dstPath, srcPath, reader)
                if err != nil {
                        return err
                }
        case "s3":
                err := UploadToS3(dstClient, dstPath, srcPath, reader)
                if err != nil {
                        return err
                }
        case "abs":
                err := UploadToAbs(ctx, dstClient, dstPath, srcPath, reader)
                if err != nil {
                        return err
                }
        default:
                return fmt.Errorf(dstPrefix + " not implemented")
        }
        return nil
}

func initClient(ctx context.Context, existingClient interface{}, prefix, path, tested string) (interface{}, string, error) {
        var newClient interface{}
        switch prefix {
        case "k8s":
                if isTestedAndClientExists(prefix, tested, existingClient) {
                        newClient = existingClient
                        break
                }
                client, err := GetClientToK8s()
                if err != nil {
                        return nil, "", err
                }
                newClient = client

        case "s3":
                if isTestedAndClientExists(prefix, tested, existingClient) {
                        newClient = existingClient
                        break
                }
                client, err := GetClientToS3(path)
                if err != nil {
                        return nil, "", err
                }
                newClient = client

        case "abs":
                if isTestedAndClientExists(prefix, tested, existingClient) {
                        newClient = existingClient
                        break
                }
                client, err := GetClientToAbs(ctx, path)
                if err != nil {
                        return nil, "", err
                }
                newClient = client

        default:
                return nil, "", fmt.Errorf(prefix + " not implemented")
        }

        return newClient, prefix, nil
}

func isTestedAndClientExists(prefix, tested string, client interface{}) bool {
        return prefix == tested && client != nil
}

from skbn.

maorfr avatar maorfr commented on August 25, 2024

You probably have an older version of nuvo/skbn on your machine.

I suggest that you do the following:

mkdir -p $GOPATH/src/github.com/nuvo && cd $_
git clone https://github.com/alexbarta/skbn.git && cd skbn
make

That would probably fix the issue you are seeing.

from skbn.

alexbarta avatar alexbarta commented on August 25, 2024

In fact that worked ! Thank you! However is really strange I have forked the project like 2 hours ago.. it seems that $GOPATH/src/github.com/nuvo path is playing some role in the compilation ?

from skbn.

maorfr avatar maorfr commented on August 25, 2024

it does. go is sensitive about paths ;)
i'll close this issue.

from skbn.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.