Skip to content

cp: Only use session when -c is passed #2989

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/accounting-reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ func (a *accounter) Get() int64 {
return atomic.LoadInt64(&a.current)
}

func (a *accounter) SetTotal(int64) {
}

// Add add to current value atomically.
func (a *accounter) Add(n int64) int64 {
return atomic.AddInt64(&a.current, n)
Expand Down
214 changes: 126 additions & 88 deletions cmd/cp-main.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ EXAMPLES:
{{.Prompt}} {{.HelpName}} --storage-class REDUCED_REDUNDANCY myobject.txt play/mybucket

14. Copy a text file to an object storage and create or resume copy session.
{{.Prompt}} {{.HelpName}} --recursive --continue myobject.txt play/mybucket
{{.Prompt}} {{.HelpName}} --recursive --continue dir/ play/mybucket

15. Copy a text file to an object storage and preserve the file system attribute as metadata.
{{.Prompt}} {{.HelpName}} -a myobject.txt play/mybucket
`,
Expand Down Expand Up @@ -172,6 +172,7 @@ func (c copyMessage) JSON() string {
// of data written.
type Progress interface {
Get() int64
SetTotal(int64)
}

// ProgressReader can be used to update the progress of
Expand Down Expand Up @@ -219,15 +220,12 @@ func doCopyFake(cpURLs URLs, pg Progress) URLs {
}

// doPrepareCopyURLs scans the source URL and prepares a list of objects for copying.
func doPrepareCopyURLs(session *sessionV8, trapCh <-chan bool, cancelCopy context.CancelFunc) {
func doPrepareCopyURLs(session *sessionV8, trapCh <-chan bool, cancelCopy context.CancelFunc) (totalBytes, totalObjects int64) {
// Separate source and target. 'cp' can take only one target,
// but any number of sources.
sourceURLs := session.Header.CommandArgs[:len(session.Header.CommandArgs)-1]
targetURL := session.Header.CommandArgs[len(session.Header.CommandArgs)-1] // Last one is target

var totalBytes int64
var totalObjects int64

// Access recursive flag inside the session header.
isRecursive := session.Header.CommandBoolFlags["recursive"]

Expand All @@ -245,7 +243,8 @@ func doPrepareCopyURLs(session *sessionV8, trapCh <-chan bool, cancelCopy contex
if !globalQuiet && !globalJSON { // set up progress bar
scanBar = scanBarFactory()
}
URLsCh := prepareCopyURLs(sourceURLs, targetURL, isRecursive, encKeyDB)

URLsCh := prepareCopyURLs(sourceURLs, targetURL, isRecursive, encKeyDB, olderThan, newerThan)
done := false
for !done {
select {
Expand Down Expand Up @@ -273,16 +272,6 @@ func doPrepareCopyURLs(session *sessionV8, trapCh <-chan bool, cancelCopy contex
fatalIf(probe.NewError(e), "Unable to prepare URL for copying. Error in JSON marshaling.")
}

// Skip objects older than --older-than parameter if specified
if olderThan != "" && isOlder(cpURLs.SourceContent.Time, olderThan) {
continue
}

// Skip objects newer than --newer-than parameter if specified
if newerThan != "" && isNewer(cpURLs.SourceContent.Time, newerThan) {
continue
}

fmt.Fprintln(dataFP, string(jsonData))
if !globalQuiet && !globalJSON {
scanBar(cpURLs.SourceContent.URL.String())
Expand All @@ -300,34 +289,84 @@ func doPrepareCopyURLs(session *sessionV8, trapCh <-chan bool, cancelCopy contex
os.Exit(0)
}
}

session.Header.TotalBytes = totalBytes
session.Header.TotalObjects = totalObjects
session.Save()
return
}

func doCopySession(session *sessionV8, encKeyDB map[string][]prefixSSEPair) error {
func doCopySession(cli *cli.Context, session *sessionV8, encKeyDB map[string][]prefixSSEPair) error {
trapCh := signalTrap(os.Interrupt, syscall.SIGTERM, syscall.SIGKILL)

ctx, cancelCopy := context.WithCancel(context.Background())
defer cancelCopy()
if !session.HasData() {
doPrepareCopyURLs(session, trapCh, cancelCopy)
}

// Prepare URL scanner from session data file.
urlScanner := bufio.NewScanner(session.NewDataReader())
// isCopied returns true if an object has been already copied
// or not. This is useful when we resume from a session.
isCopied := isLastFactory(session.Header.LastCopied)
var isCopied func(string) bool
var totalObjects, totalBytes int64

var cpURLsCh = make(chan URLs, 10000)

// Store a progress bar or an accounter
var pg ProgressReader

// Enable progress bar reader only during default mode.
if !globalQuiet && !globalJSON { // set up progress bar
pg = newProgressBar(session.Header.TotalBytes)
pg = newProgressBar(totalBytes)
} else {
pg = newAccounter(totalBytes)
}

if session != nil {
// isCopied returns true if an object has been already copied
// or not. This is useful when we resume from a session.
isCopied = isLastFactory(session.Header.LastCopied)

if !session.HasData() {
totalBytes, totalObjects = doPrepareCopyURLs(session, trapCh, cancelCopy)
} else {
totalBytes, totalObjects = session.Header.TotalBytes, session.Header.TotalObjects
}

pg.SetTotal(totalBytes)

go func() {
// Prepare URL scanner from session data file.
urlScanner := bufio.NewScanner(session.NewDataReader())
for {
if !urlScanner.Scan() || urlScanner.Err() != nil {
close(cpURLsCh)
break
}
var cpURLs URLs
if e := json.Unmarshal([]byte(urlScanner.Text()), &cpURLs); e != nil {
errorIf(probe.NewError(e), "Unable to unmarshal %s", urlScanner.Text())
continue
}

cpURLsCh <- cpURLs
}

}()
} else {
pg = newAccounter(session.Header.TotalBytes)
sourceURLs := cli.Args()[:len(cli.Args())-1]
targetURL := cli.Args()[len(cli.Args())-1] // Last one is target

// Access recursive flag inside the session header.
isRecursive := cli.Bool("recursive")
olderThan := cli.String("older-than")
newerThan := cli.String("newer-than")

go func() {
totalBytes := int64(0)
for cpURLs := range prepareCopyURLs(sourceURLs, targetURL, isRecursive, encKeyDB, olderThan, newerThan) {
totalBytes += cpURLs.SourceContent.Size
pg.SetTotal(totalBytes)

cpURLsCh <- cpURLs
}
close(cpURLsCh)
}()
}

var quitCh = make(chan struct{})
Expand All @@ -347,32 +386,17 @@ func doCopySession(session *sessionV8, encKeyDB map[string][]prefixSSEPair) erro
case <-quitCh:
gracefulStop()
return
default:
if !urlScanner.Scan() {
// No more entries, quit immediately
gracefulStop()
return
}

if e := urlScanner.Err(); e != nil {
// Error while reading. quit immediately
case cpURLs, ok := <-cpURLsCh:
if !ok {
gracefulStop()
return
}

var cpURLs URLs
// Unmarshal copyURLs from each line. This expects each line to be
// an entire JSON object.
if e := json.Unmarshal([]byte(urlScanner.Text()), &cpURLs); e != nil {
errorIf(probe.NewError(e), "Unable to unmarshal %s", urlScanner.Text())
continue
}

// Save total count.
cpURLs.TotalCount = session.Header.TotalObjects
cpURLs.TotalCount = totalObjects

// Save totalSize.
cpURLs.TotalSize = session.Header.TotalBytes
cpURLs.TotalSize = totalBytes

// Initialize target metadata.
cpURLs.TargetContent.Metadata = make(map[string]string)
Expand All @@ -381,22 +405,22 @@ func doCopySession(session *sessionV8, encKeyDB map[string][]prefixSSEPair) erro
cpURLs.TargetContent.UserMetadata = make(map[string]string)

// Check and handle storage class if passed in command line args
if _, ok := session.Header.CommandStringFlags["storage-class"]; ok {
cpURLs.TargetContent.Metadata["X-Amz-Storage-Class"] = session.Header.CommandStringFlags["storage-class"]
if storageClass := cli.String("storage-class"); storageClass != "" {
cpURLs.TargetContent.Metadata["X-Amz-Storage-Class"] = storageClass
}

// Check and handle metadata if passed in command line args
if len(session.Header.UserMetaData) != 0 {
for metaDataKey, metaDataVal := range session.Header.UserMetaData {
if cli.String("attr") != "" {
userMetaMap, _ := getMetaDataEntry(cli.String("attr"))
for metaDataKey, metaDataVal := range userMetaMap {
cpURLs.TargetContent.UserMetadata[metaDataKey] = metaDataVal
}
}

// If one needs to store the file system information by passing -a flag
if preserve := (session.Header.CommandBoolFlags["preserve"]); preserve {
if preserve := cli.Bool("preserve"); preserve {
attrValue, pErr := getFileAttrMeta(cpURLs, encKeyDB)
if pErr != nil {
errorIf(pErr, "Unable to fetch file meta info for %s", urlScanner.Text())
errorIf(pErr, "Unable to fetch file meta info for %s", cpURLs.SourceAlias)
continue
}

Expand All @@ -405,7 +429,7 @@ func doCopySession(session *sessionV8, encKeyDB map[string][]prefixSSEPair) erro
}
}
// Verify if previously copied, notify progress bar.
if isCopied(cpURLs.SourceContent.URL.String()) {
if isCopied != nil && isCopied(cpURLs.SourceContent.URL.String()) {
queueCh <- func() URLs {
return doCopyFake(cpURLs, pg)
}
Expand All @@ -430,15 +454,19 @@ loop:
if !globalQuiet && !globalJSON {
console.Eraseline()
}
session.CloseAndDie()
if session != nil {
session.CloseAndDie()
}
case cpURLs, ok := <-statusCh:
// Status channel is closed, we should return.
if !ok {
break loop
}
if cpURLs.Error == nil {
session.Header.LastCopied = cpURLs.SourceContent.URL.String()
session.Save()
if session != nil {
session.Header.LastCopied = cpURLs.SourceContent.URL.String()
session.Save()
}
} else {

// Set exit status for any copy error
Expand All @@ -454,10 +482,13 @@ loop:
if isErrIgnored(cpURLs.Error) {
continue loop
}
// For critical errors we should exit. Session
// can be resumed after the user figures out
// the problem.
session.copyCloseAndDie(session.Header.CommandBoolFlags["session"])

if session != nil {
// For critical errors we should exit. Session
// can be resumed after the user figures out
// the problem.
session.copyCloseAndDie(session.Header.CommandBoolFlags["session"])
}
}
}
}
Expand Down Expand Up @@ -523,37 +554,44 @@ func mainCopy(ctx *cli.Context) error {
}
sse := ctx.String("encrypt")

sessionID := getHash("cp", ctx.Args())
if ctx.Bool("continue") && isSessionExists(sessionID) {
resumeSession(sessionID)
return nil
}
var session *sessionV8

session := newSessionV8(sessionID)
session.Header.CommandType = "cp"
session.Header.CommandBoolFlags["recursive"] = recursive
session.Header.CommandStringFlags["older-than"] = olderThan
session.Header.CommandStringFlags["newer-than"] = newerThan
session.Header.CommandStringFlags["storage-class"] = storageClass
session.Header.CommandStringFlags["encrypt-key"] = sseKeys
session.Header.CommandStringFlags["encrypt"] = sse
session.Header.CommandBoolFlags["session"] = ctx.Bool("continue")

if ctx.Bool("preserve") {
session.Header.CommandBoolFlags["preserve"] = ctx.Bool("preserve")
if ctx.Bool("continue") {
sessionID := getHash("cp", ctx.Args())
if isSessionExists(sessionID) {
session, err = loadSessionV8(sessionID)
fatalIf(err.Trace(sessionID), "Unable to load session.")
} else {
session = newSessionV8(sessionID)
session.Header.CommandType = "cp"
session.Header.CommandBoolFlags["recursive"] = recursive
session.Header.CommandStringFlags["older-than"] = olderThan
session.Header.CommandStringFlags["newer-than"] = newerThan
session.Header.CommandStringFlags["storage-class"] = storageClass
session.Header.CommandStringFlags["encrypt-key"] = sseKeys
session.Header.CommandStringFlags["encrypt"] = sse
session.Header.CommandBoolFlags["session"] = ctx.Bool("continue")

if ctx.Bool("preserve") {
session.Header.CommandBoolFlags["preserve"] = ctx.Bool("preserve")
}
session.Header.UserMetaData = userMetaMap

var e error
if session.Header.RootPath, e = os.Getwd(); e != nil {
session.Delete()
fatalIf(probe.NewError(e), "Unable to get current working folder.")
}

// extract URLs.
session.Header.CommandArgs = ctx.Args()
}
}
session.Header.UserMetaData = userMetaMap

var e error
if session.Header.RootPath, e = os.Getwd(); e != nil {
e := doCopySession(ctx, session, encKeyDB)
if session != nil {
session.Delete()
fatalIf(probe.NewError(e), "Unable to get current working folder.")
}

// extract URLs.
session.Header.CommandArgs = ctx.Args()
e = doCopySession(session, encKeyDB)
session.Delete()

return e
}
22 changes: 20 additions & 2 deletions cmd/cp-url.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func prepareCopyURLsTypeD(sourceURLs []string, targetURL string, isRecursive boo
}

// prepareCopyURLs - prepares target and source clientURLs for copying.
func prepareCopyURLs(sourceURLs []string, targetURL string, isRecursive bool, encKeyDB map[string][]prefixSSEPair) <-chan URLs {
func prepareCopyURLs(sourceURLs []string, targetURL string, isRecursive bool, encKeyDB map[string][]prefixSSEPair, olderThan, newerThan string) chan URLs {
copyURLsCh := make(chan URLs)
go func(sourceURLs []string, targetURL string, copyURLsCh chan URLs, encKeyDB map[string][]prefixSSEPair) {
defer close(copyURLsCh)
Expand All @@ -238,5 +238,23 @@ func prepareCopyURLs(sourceURLs []string, targetURL string, isRecursive bool, en
}
}(sourceURLs, targetURL, copyURLsCh, encKeyDB)

return copyURLsCh
finalCopyURLsCh := make(chan URLs)
go func() {
defer close(finalCopyURLsCh)
for cpURLs := range copyURLsCh {
// Skip objects older than --older-than parameter if specified
if olderThan != "" && isOlder(cpURLs.SourceContent.Time, olderThan) {
continue
}

// Skip objects newer than --newer-than parameter if specified
if newerThan != "" && isNewer(cpURLs.SourceContent.Time, newerThan) {
continue
}

finalCopyURLsCh <- cpURLs
}
}()

return finalCopyURLsCh
}
1 change: 0 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,6 @@ var appCmds = []cli.Command{
watchCmd,
policyCmd,
adminCmd,
sessionCmd,
configCmd,
updateCmd,
versionCmd,
Expand Down
Loading