diff --git a/core/client/set.go b/core/client/set.go index 113909e90..954fe4b7f 100644 --- a/core/client/set.go +++ b/core/client/set.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "strings" + "sync" "github.com/0chain/gosdk/core/conf" @@ -18,12 +19,15 @@ var ( client Client sdkInitialized bool - Sign SignFunc - sigC = make(chan struct{}, 1) + Sign SignFunc + SignByMultiWallet SignByMultiWalletFunc + sigC = make(chan struct{}, 1) ) type SignFunc func(hash string, clients ...string) (string, error) +type SignByMultiWalletFunc func(hash string, pubkey string, clients ...string) (string, error) + // maintains client's information type Client struct { wallet *zcncrypto.Wallet @@ -34,6 +38,8 @@ type Client struct { nonce int64 txnFee uint64 sign SignFunc + wg map[string]*sync.WaitGroup + walletCount map[string]int // maintains count of wallets in the WaitGroup by Client ID } type InitSdkOptions struct { @@ -63,18 +69,80 @@ func init() { Sign = func(hash string, clients ...string) (string, error) { wallet := client.wallet - if len(clients) > 0 && clients[0] != "" && client.wallets[clients[0]] != nil { - wallet = client.wallets[clients[0]] + if len(clients) > 0 && clients[0] != "" { + if client.wallets[clients[0]] != nil { + wallet = client.wallets[clients[0]] + } else { + for _, w := range client.wallets { + if w.ClientID == clients[0] { + wallet = w + break + } + } + } } if !wallet.IsSplit { return sys.Sign(hash, client.signatureScheme, GetClientSysKeys(clients...)) } - + fmt.Printf("Sign: wallet details: %+v\n", *wallet) // get sign lock <-sigC fmt.Println("Sign: with sys.SignWithAuth:", sys.SignWithAuth, "sysKeys:", GetClientSysKeys(clients...)) - sig, err := sys.SignWithAuth(hash, client.signatureScheme, GetClientSysKeys(clients...)) + sig, err := sys.SignWithAuth(hash, client.signatureScheme, GetClientSysKeys(clients...), wallet.ClientID) + sigC <- struct{}{} + return sig, err + } + + SignByMultiWallet = func(hash string, pubkey string, clients ...string) (string, error) { + var wallet *zcncrypto.Wallet + + // First try to find wallet by public key + if pubkey != "" { + if client.wallets[pubkey] != nil { + wallet = client.wallets[pubkey] + } else { + // Fallback to searching by client ID if pubkey not found + if len(clients) > 0 && clients[0] != "" { + if client.wallets[clients[0]] != nil { + wallet = client.wallets[clients[0]] + } else { + for _, w := range client.wallets { + if w.ClientID == clients[0] { + wallet = w + break + } + } + } + } + } + } else if len(clients) > 0 && clients[0] != "" { + // If no pubkey provided, fallback to client ID lookup + if client.wallets[clients[0]] != nil { + wallet = client.wallets[clients[0]] + } else { + for _, w := range client.wallets { + if w.ClientID == clients[0] { + wallet = w + break + } + } + } + } + + // If no wallet found, use default wallet + if wallet == nil { + wallet = client.wallet + } + + if !wallet.IsSplit { + return sys.Sign(hash, client.signatureScheme, GetClientSysKeysByWallet(wallet)) + } + fmt.Printf("SignByMultiWallet: wallet details: %+v\n", *wallet) + // get sign lock + <-sigC + fmt.Println("SignByMultiWallet: with sys.SignWithAuth:", sys.SignWithAuth, "sysKeys:", GetClientSysKeysByWallet(wallet)) + sig, err := sys.SignWithAuth(hash, client.signatureScheme, GetClientSysKeysByWallet(wallet), wallet.ClientID) sigC <- struct{}{} return sig, err } @@ -82,6 +150,9 @@ func init() { sys.Verify = verifySignature sys.VerifyWith = verifySignatureWith sys.VerifyEd25519With = verifyEd25519With + + client.wg = make(map[string]*sync.WaitGroup) + client.walletCount = make(map[string]int) } var SignFn = func(hash string) (string, error) { @@ -95,16 +166,24 @@ var SignFn = func(hash string) (string, error) { return ss.Sign(hash) } -func signHashWithAuth(hash, signatureScheme string, keys []sys.KeyPair) (string, error) { +func signHashWithAuth(hash, signatureScheme string, keys []sys.KeyPair, clientIds ...string) (string, error) { sig, err := sys.Sign(hash, signatureScheme, keys) if err != nil { return "", fmt.Errorf("failed to sign with split key: %v", err) } + // Get the first clientID from variadic arguments, or use default wallet clientID + var clientID string + if len(clientIds) > 0 && clientIds[0] != "" { + clientID = clientIds[0] + } else { + clientID = client.wallet.ClientID + } + data, err := json.Marshal(AuthMessage{ Hash: hash, Signature: sig, - ClientID: client.wallet.ClientID, + ClientID: clientID, }) if err != nil { return "", err @@ -114,7 +193,7 @@ func signHashWithAuth(hash, signatureScheme string, keys []sys.KeyPair) (string, return "", errors.New("authCommon is not set") } - rsp, err := sys.AuthCommon(string(data)) + rsp, err := sys.AuthCommon(string(data), clientID) if err != nil { return "", err } @@ -183,9 +262,20 @@ func verifyEd25519With(pubKey, signature, hash string) (bool, error) { func GetClientSysKeys(clients ...string) []sys.KeyPair { var wallet *zcncrypto.Wallet - if len(clients) > 0 && clients[0] != "" && client.wallets[clients[0]] != nil { - wallet = client.wallets[clients[0]] - } else { + if len(clients) > 0 && clients[0] != "" { + if client.wallets[clients[0]] != nil { + wallet = client.wallets[clients[0]] + } else { + for _, w := range client.wallets { + if w.ClientID == clients[0] { + wallet = w + break + } + } + } + } + + if wallet == nil { wallet = client.wallet } @@ -200,13 +290,86 @@ func GetClientSysKeys(clients ...string) []sys.KeyPair { return keys } +func GetClientSysKeysByWallet(wallet *zcncrypto.Wallet) []sys.KeyPair { + if wallet == nil { + return GetClientSysKeys() + } + + var keys []sys.KeyPair + for _, kv := range wallet.Keys { + keys = append(keys, sys.KeyPair{ + PrivateKey: kv.PrivateKey, + PublicKey: kv.PublicKey, + }) + } + + return keys +} + // SetWallet should be set before any transaction or client specific APIs func SetWallet(w zcncrypto.Wallet) { client.wallet = &w if client.wallets == nil { client.wallets = make(map[string]*zcncrypto.Wallet) } - client.wallets[w.ClientID] = &w + client.wallets[w.ClientKey] = &w +} + +func GetWalletByClientKey(clientKey string) *zcncrypto.Wallet { + if client.wallets == nil { + return nil + } + return client.wallets[clientKey] +} + +// GetWalletByClientID gets a wallet by client id. +func GetWalletByClientID(clientID string) *zcncrypto.Wallet { + if client.wallets == nil { + return nil + } + if _, exists := client.wallets[clientID]; !exists { + return nil + } + + for _, wallet := range client.wallets { + if wallet.ClientID == clientID { + return wallet + } + } + + return nil +} + +// AddWallet adds a new wallet to the sdk. +func AddWallet(wallet zcncrypto.Wallet) { + clientKey := wallet.ClientKey + if client.wallets == nil { + client.wallets = make(map[string]*zcncrypto.Wallet) + } + if _, exists := client.wg[wallet.ClientID]; !exists { + client.wg[wallet.ClientID] = &sync.WaitGroup{} + } + client.wg[clientKey].Add(1) + client.walletCount[clientKey]++ + client.wallets[clientKey] = &wallet +} + +// RemoveWallet removes a wallet from the sdk. +func RemoveWallet(clientKey string) { + client.wg[clientKey].Done() + client.walletCount[clientKey]-- + if client.walletCount[clientKey] == 0 { + delete(client.wallets, clientKey) + } +} + +func RemoveWalletByClientID(clientID string) { + for clientKey, wallet := range client.wallets { + if wallet.ClientID == clientID { + RemoveWallet(clientKey) + return + } + } } // SetWalletMode sets current wallet split key mode. @@ -280,12 +443,16 @@ func IsWalletSet() bool { } func PublicKey(clients ...string) string { - if len(clients) > 0 && clients[0] != "" && client.wallets[clients[0]] != nil { - if client.wallets[clients[0]] == nil { - fmt.Println("Public key is empty") - return "" + if len(clients) > 0 && clients[0] != "" { + if client.wallets[clients[0]] != nil { + return client.wallets[clients[0]].ClientKey + } else { + for _, w := range client.wallets { + if w.ClientID == clients[0] { + return w.ClientKey + } + } } - return client.wallets[clients[0]].ClientKey } return client.wallet.ClientKey @@ -303,12 +470,16 @@ func PrivateKey() string { } func Id(clients ...string) string { - if len(clients) > 0 && clients[0] != "" && client.wallets[clients[0]] != nil { - if client.wallets[clients[0]] == nil { - fmt.Println("Id is empty : ", clients[0]) - return "" + if len(clients) > 0 && clients[0] != "" { + if client.wallets[clients[0]] != nil { + return client.wallets[clients[0]].ClientID + } else { + for _, w := range client.wallets { + if w.ClientID == clients[0] { + return w.ClientID + } + } } - return client.wallets[clients[0]].ClientID } return client.wallet.ClientID } diff --git a/core/client/zauth.go b/core/client/zauth.go index 9e31f3457..a38179653 100644 --- a/core/client/zauth.go +++ b/core/client/zauth.go @@ -535,7 +535,7 @@ func CallZvaultRetrieveSharedWallets(serverAddr, token string) (string, error) { // ZauthSignTxn returns a function that sends a txn signing request to the zauth server func ZauthSignTxn(serverAddr string) sys.AuthorizeFunc { - return func(msg string) (string, error) { + return func(msg string, clientIds ...string) (string, error) { req, err := http.NewRequest("POST", serverAddr+"/sign/txn", bytes.NewBuffer([]byte(msg))) if err != nil { return "", errors.Wrap(err, "failed to create HTTP request") @@ -543,6 +543,14 @@ func ZauthSignTxn(serverAddr string) sys.AuthorizeFunc { req.Header.Set("Content-Type", "application/json") c := GetClient() pubkey := c.Keys[0].PublicKey + if len(clientIds) > 0 { + c = GetWalletByClientID(clientIds[0]) + if c == nil { + return "", errors.Errorf("wallet not found for client ID: %s", clientIds[0]) + } + pubkey = c.Keys[0].PublicKey + } + req.Header.Set("X-Peer-Public-Key", pubkey) client := &http.Client{} @@ -572,7 +580,7 @@ func ZauthSignTxn(serverAddr string) sys.AuthorizeFunc { } func ZauthAuthCommon(serverAddr string) sys.AuthorizeFunc { - return func(msg string) (string, error) { + return func(msg string, clientIds ...string) (string, error) { req, err := http.NewRequest("POST", serverAddr+"/sign/msg", bytes.NewBuffer([]byte(msg))) if err != nil { return "", errors.Wrap(err, "failed to create HTTP request") @@ -580,6 +588,14 @@ func ZauthAuthCommon(serverAddr string) sys.AuthorizeFunc { c := GetClient() pubkey := c.Keys[0].PublicKey + if len(clientIds) > 0 { + c = GetWalletByClientID(clientIds[0]) + if c == nil { + return "", errors.Errorf("wallet not found for client ID: %s", clientIds[0]) + } + pubkey = c.Keys[0].PublicKey + } + req.Header.Set("Content-Type", "application/json") req.Header.Set("X-Peer-Public-Key", pubkey) diff --git a/core/sys/sign.go b/core/sys/sign.go index 37ac18479..186729726 100644 --- a/core/sys/sign.go +++ b/core/sys/sign.go @@ -9,8 +9,11 @@ type KeyPair struct { // SignFunc sign method for request verification type SignFunc func(hash string, signatureScheme string, keys []KeyPair) (string, error) +// SignFunc sign method for request verification +type SignWithAuthFunc func(hash string, signatureScheme string, keys []KeyPair, clientIds ...string) (string, error) + type VerifyFunc func(signature string, msg string) (bool, error) type VerifyWithFunc func(pk, signature string, msg string) (bool, error) -type AuthorizeFunc func(msg string) (string, error) +type AuthorizeFunc func(msg string, clientIds... string) (string, error) diff --git a/core/sys/vars.go b/core/sys/vars.go index f1f0fde5c..866c3980d 100644 --- a/core/sys/vars.go +++ b/core/sys/vars.go @@ -15,7 +15,7 @@ var ( // Sign sign method. it should be initialized on different platform. Sign SignFunc - SignWithAuth SignFunc + SignWithAuth SignWithAuthFunc // Verify verify method. it should be initialized on different platform. Verify VerifyFunc diff --git a/core/transaction/entity.go b/core/transaction/entity.go index 995a45954..b79b5b965 100644 --- a/core/transaction/entity.go +++ b/core/transaction/entity.go @@ -270,7 +270,7 @@ func (t *Transaction) VerifySigWith(pubkey string, verifyHandler VerifyFunc) (bo } func SendTransactionSync(txn *Transaction, miners []string) error { - const requestTimeout = 3 * time.Second // Timeout for each request + const requestTimeout = 12 * time.Second // Timeout for each request fails := make(chan error, len(miners)) var wg sync.WaitGroup @@ -282,7 +282,7 @@ func SendTransactionSync(txn *Transaction, miners []string) error { go func(url string) { defer wg.Done() - // Create a context with a 30-second timeout for each request + // Create a context with a 12-second timeout for each request ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) defer cancel() @@ -320,6 +320,13 @@ func SendTransactionSync(txn *Transaction, miners []string) error { } } + // Reset stable miners list if any miner failed + if failureCount > 0 { + if nodeClient, err := client.GetNode(); err == nil { + nodeClient.ResetStableMiners() + } + } + if failureCount == len(miners) { return fmt.Errorf(dominantErr) } diff --git a/core/util/httpnet.go b/core/util/httpnet.go index bcc5f2b77..7a9e9703f 100644 --- a/core/util/httpnet.go +++ b/core/util/httpnet.go @@ -85,18 +85,27 @@ func init() { func httpDo(req *http.Request, ctx context.Context, cncl context.CancelFunc, f func(*http.Response, error) error) error { c := make(chan error, 1) + done := make(chan struct{}) - go func() { c <- f(Client.Do(req.WithContext(ctx))) }() + go func() { + select { + case c <- f(Client.Do(req.WithContext(ctx))): + // normal completion + case <-done: + // context cancelled, do not call f + } + }() select { - case <-ctx.Done(): - // Use the cancel function only after trying to get the result. - <-c // Wait for f to return. - return ctx.Err() - case err := <-c: - // Ensure that we call cncl after we are done with the response - defer cncl() // Move this here to ensure we cancel after processing - return err + case <-ctx.Done(): + // Use the cancel function only after trying to get the result. + close(done) + cncl() + return ctx.Err() + case err := <-c: + // Ensure that we call cncl after we are done with the response + defer cncl() // Move this here to ensure we cancel after processing + return err } } diff --git a/core/version/version.go b/core/version/version.go index fa0af25cc..4f54f23ab 100644 --- a/core/version/version.go +++ b/core/version/version.go @@ -1,5 +1,6 @@ + //====== THIS IS AUTOGENERATED FILE. DO NOT MODIFY ======== package version +const VERSIONSTR = "v1.20.6-1-gafade4fb" -const VERSIONSTR = "v1.17.11-269-g7fd90660" diff --git a/mobilesdk/sdk/sdk.go b/mobilesdk/sdk/sdk.go index 8ce6558c9..d9b78bcfd 100644 --- a/mobilesdk/sdk/sdk.go +++ b/mobilesdk/sdk/sdk.go @@ -33,7 +33,7 @@ var nonce = int64(0) var allocationIDRequired = errors.Errorf("Allocation ID is required") type Autorizer interface { - Auth(msg string) (string, error) + Auth(msg string, clientIDs ...string) (string, error) } // ChainConfig - blockchain config diff --git a/wasmsdk/auth_txn.go b/wasmsdk/auth_txn.go index e917512e5..aa8354446 100644 --- a/wasmsdk/auth_txn.go +++ b/wasmsdk/auth_txn.go @@ -29,7 +29,7 @@ func registerAuthorizer(this js.Value, args []js.Value) interface{} { authCallback = parseAuthorizerCallback(args[0]) authResponseC = make(chan string, 1) - sys.Authorize = func(msg string) (string, error) { + sys.Authorize = func(msg string, clientIDs ...string) (string, error) { authCallback(msg) return <-authResponseC, nil } @@ -93,7 +93,7 @@ func registerAuthCommon(this js.Value, args []js.Value) interface{} { authMsgCallback = parseAuthorizerCallback(args[0]) authMsgResponseC = make(chan string, 1) - sys.AuthCommon = func(msg string) (string, error) { + sys.AuthCommon = func(msg string, clientIDs ...string) (string, error) { authMsgLock <- struct{}{} defer func() { <-authMsgLock diff --git a/wasmsdk/proxy.go b/wasmsdk/proxy.go index f70e7e471..ec56f3ec0 100644 --- a/wasmsdk/proxy.go +++ b/wasmsdk/proxy.go @@ -78,16 +78,24 @@ func main() { return signFunc(hash) } - sys.SignWithAuth = func(hash, signatureScheme string, keys []sys.KeyPair) (string, error) { + sys.SignWithAuth = func(hash, signatureScheme string, keys []sys.KeyPair, clientIds ...string) (string, error) { sig, err := sys.Sign(hash, signatureScheme, keys) if err != nil { return "", fmt.Errorf("failed to sign with split key: %v", err) } + // Get the first clientID from variadic arguments, or use default wallet clientID + var clientID string + if len(clientIds) > 0 && clientIds[0] != "" { + clientID = clientIds[0] + } else { + clientID = client.Wallet().ClientID + } + data, err := json.Marshal(client.AuthMessage{ Hash: hash, Signature: sig, - ClientID: client.Wallet().ClientID, + ClientID: clientID, }) if err != nil { return "", err @@ -378,17 +386,25 @@ func main() { return signFunc(hash) } - sys.SignWithAuth = func(hash, signatureScheme string, keys []sys.KeyPair) (string, error) { + sys.SignWithAuth = func(hash, signatureScheme string, keys []sys.KeyPair, clientIds ...string) (string, error) { fmt.Println("[worker] SignWithAuth pubkey:", keys[0]) sig, err := sys.Sign(hash, signatureScheme, keys) if err != nil { return "", fmt.Errorf("failed to sign with split key: %v", err) } + // Get the first clientID from variadic arguments, or use default wallet clientID + var clientID string + if len(clientIds) > 0 && clientIds[0] != "" { + clientID = clientIds[0] + } else { + clientID = client.Wallet().ClientID + } + data, err := json.Marshal(client.AuthMessage{ Hash: hash, Signature: sig, - ClientID: client.GetClient().ClientID, + ClientID: clientID, }) if err != nil { return "", err @@ -462,7 +478,7 @@ func main() { gInitProxyKeys(publicKey, privateKey) if isSplit { - sys.AuthCommon = func(msg string) (string, error) { + sys.AuthCommon = func(msg string, clientIDs ...string) (string, error) { // send message to main thread sendMessageToMainThread(msg) // wait for response from main thread diff --git a/zboxcore/sdk/allocation.go b/zboxcore/sdk/allocation.go index 2bb806bb5..83c75cab8 100644 --- a/zboxcore/sdk/allocation.go +++ b/zboxcore/sdk/allocation.go @@ -1139,13 +1139,31 @@ func (a *Allocation) DoMultiOperation(operations []OperationRequest, opts ...Mul switch op.OperationType { case constants.FileOperationRename: - operation = NewRenameOperation(op.RemotePath, op.DestName, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, mo.ctx) + var clientId string + if mo.Wallet != nil { + clientId = mo.Wallet.ClientID + } else { + clientId = mo.allocationObj.Owner + } + operation = NewRenameOperation(op.RemotePath, op.DestName, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, mo.ctx, clientId) case constants.FileOperationCopy: - operation = NewCopyOperation(mo.ctx, op.RemotePath, op.DestPath, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, op.CopyDirOnly) + var clientId string + if mo.Wallet != nil { + clientId = mo.Wallet.ClientID + } else { + clientId = mo.allocationObj.Owner + } + operation = NewCopyOperation(mo.ctx, op.RemotePath, op.DestPath, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, op.CopyDirOnly, clientId) case constants.FileOperationMove: - operation = NewMoveOperation(op.RemotePath, op.DestPath, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, mo.ctx) + var clientId string + if mo.Wallet != nil { + clientId = mo.Wallet.ClientID + } else { + clientId = mo.allocationObj.Owner + } + operation = NewMoveOperation(op.RemotePath, op.DestPath, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, mo.ctx, clientId) case constants.FileOperationInsert: cancelLock.Lock() @@ -1154,10 +1172,17 @@ func (a *Allocation) DoMultiOperation(operations []OperationRequest, opts ...Mul operation, newConnectionID, err = NewUploadOperation(mo.ctx, op.Workdir, mo.allocationObj, mo.connectionID, op.FileMeta, op.FileReader, false, op.IsWebstreaming, op.IsRepair, op.DownloadFile, op.StreamUpload, op.Opts...) case constants.FileOperationDelete: + var clientId string + if mo.Wallet != nil { + clientId = mo.Wallet.ClientID + } else { + clientId = mo.allocationObj.Owner + } + if op.Mask != nil { - operation = NewDeleteOperation(mo.ctx, op.RemotePath, *op.Mask, mo.maskMU, mo.consensusThresh, mo.fullconsensus) + operation = NewDeleteOperation(mo.ctx, op.RemotePath, *op.Mask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, clientId) } else { - operation = NewDeleteOperation(mo.ctx, op.RemotePath, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus) + operation = NewDeleteOperation(mo.ctx, op.RemotePath, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, clientId) } case constants.FileOperationUpdate: @@ -1167,7 +1192,7 @@ func (a *Allocation) DoMultiOperation(operations []OperationRequest, opts ...Mul operation, newConnectionID, err = NewUploadOperation(mo.ctx, op.Workdir, mo.allocationObj, mo.connectionID, op.FileMeta, op.FileReader, true, op.IsWebstreaming, op.IsRepair, op.DownloadFile, op.StreamUpload, op.Opts...) case constants.FileOperationCreateDir: - operation = NewDirOperation(op.RemotePath, op.FileMeta.CustomMeta, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, mo.ctx) + operation = NewDirOperation(op.RemotePath, op.FileMeta.CustomMeta, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, mo.ctx, mo.Wallet) default: return errors.New("invalid_operation", "Operation is not valid") diff --git a/zboxcore/sdk/blobber_operations.go b/zboxcore/sdk/blobber_operations.go index cdcd3b9c2..b24d645a5 100644 --- a/zboxcore/sdk/blobber_operations.go +++ b/zboxcore/sdk/blobber_operations.go @@ -196,7 +196,8 @@ func UpdateAllocation( func GetUpdateAllocTicket(allocationID, userID, operationType string, roundExpiry int64) (string, error) { payload := fmt.Sprintf("%s:%d:%s:%s", allocationID, roundExpiry, userID, operationType) - signature, err := client.Sign(hex.EncodeToString([]byte(payload))) + pubkey := client.PublicKey() + signature, err := client.SignByMultiWallet(hex.EncodeToString([]byte(payload)), pubkey) if err != nil { return "", err } @@ -366,7 +367,8 @@ func GenerateOwnerSigningKey(ownerPublicKey, ownerID string) (ed25519.PrivateKey return nil, errors.New("owner_public_key_required", "owner public key is required") } hashData := fmt.Sprintf("%s:%s", ownerPublicKey, "owner_signing_public_key") - sig, err := client.Sign(encryption.Hash(hashData), ownerID) + pubkey := client.PublicKey() + sig, err := client.SignByMultiWallet(encryption.Hash(hashData), pubkey, ownerID) if err != nil { logger.Logger.Error("error during sign", zap.Error(err)) return nil, err diff --git a/zboxcore/sdk/chunked_upload.go b/zboxcore/sdk/chunked_upload.go index 1408fa539..7eab1b015 100644 --- a/zboxcore/sdk/chunked_upload.go +++ b/zboxcore/sdk/chunked_upload.go @@ -430,10 +430,10 @@ func (su *ChunkedUpload) process() error { defer su.chunkReader.Release() defer su.chunkReader.Close() defer su.ctxCncl(nil) + for { chunks, err := su.readChunks(su.chunkNumber) - // chunk, err := su.chunkReader.Next() if err != nil { if su.statusCallback != nil { @@ -706,7 +706,6 @@ func (su *ChunkedUpload) uploadToBlobbers(uploadData UploadData) error { go func(pos uint64) { defer wg.Done() err := su.blobbers[pos].sendUploadRequest(ctx, su, uploadData.isFinal, su.encryptedKey, uploadData.uploadBody[pos].dataBuffers, uploadData.uploadBody[pos].formData, uploadData.uploadBody[pos].contentSlice, pos, &consensus) - if err != nil { if strings.Contains(err.Error(), "duplicate") { su.consensus.Done() diff --git a/zboxcore/sdk/chunked_upload_blobber.go b/zboxcore/sdk/chunked_upload_blobber.go index 2bbe2fe7a..005898a41 100644 --- a/zboxcore/sdk/chunked_upload_blobber.go +++ b/zboxcore/sdk/chunked_upload_blobber.go @@ -71,6 +71,10 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest( eg, _ := errgroup.WithContext(ctx) + clientID := su.allocationObj.Owner + if su.wallet != nil { + clientID = su.wallet.ClientID + } for dataInd := 0; dataInd < len(dataBuffers); dataInd++ { ind := dataInd eg.Go(func() error { @@ -80,7 +84,7 @@ func (sb *ChunkedUploadBlobber) sendUploadRequest( var req *fasthttp.Request for i := 0; i < 6; i++ { req, err = zboxutil.NewFastUploadRequest( - sb.blobber.Baseurl, su.allocationObj.ID, su.allocationObj.Tx, dataBuffers[ind].Bytes(), su.httpMethod, su.allocationObj.Owner) + sb.blobber.Baseurl, su.allocationObj.ID, su.allocationObj.Tx, dataBuffers[ind].Bytes(), su.httpMethod, clientID) if err != nil { return err } diff --git a/zboxcore/sdk/chunked_upload_form_builder.go b/zboxcore/sdk/chunked_upload_form_builder.go index 6781d311b..cc103cd6e 100644 --- a/zboxcore/sdk/chunked_upload_form_builder.go +++ b/zboxcore/sdk/chunked_upload_form_builder.go @@ -24,7 +24,7 @@ type ChunkedUploadFormBuilder interface { fileMeta *FileMeta, hasher Hasher, connectionID, blobberID string, chunkSize int64, chunkStartIndex, chunkEndIndex int, isFinal bool, encryptedKey, encryptedKeyPoint string, fileChunksData [][]byte, - thumbnailChunkData []byte, shardSize int64, + thumbnailChunkData []byte, shardSize int64, clients ...string, ) (blobberData, error) } @@ -59,7 +59,7 @@ func (b *chunkedUploadFormBuilder) Build( fileMeta *FileMeta, hasher Hasher, connectionID, blobberID string, chunkSize int64, chunkStartIndex, chunkEndIndex int, isFinal bool, encryptedKey, encryptedKeyPoint string, fileChunksData [][]byte, - thumbnailChunkData []byte, shardSize int64, + thumbnailChunkData []byte, shardSize int64, clients ...string, ) (blobberData, error) { metadata := ChunkedUploadFormMetadata{ @@ -187,7 +187,12 @@ func (b *chunkedUploadFormBuilder) Build( } formData.ActualFileHashSignature = hex.EncodeToString(sig) } else { - sig, err := client.Sign(fileMeta.ActualHash) + var pubkey string + if len(clients) > 0 && clients[0] != "" { + pubkey = client.PublicKey(clients...) + } + + sig, err := client.SignByMultiWallet(fileMeta.ActualHash, pubkey, clients...) if err != nil { return res, err } @@ -206,7 +211,11 @@ func (b *chunkedUploadFormBuilder) Build( } formData.ValidationRootSignature = hex.EncodeToString(sig) } else { - rootSig, err := client.Sign(hash) + var pubkey string + if len(clients) > 0 && clients[0] != "" { + pubkey = client.PublicKey(clients...) + } + rootSig, err := client.SignByMultiWallet(hash, pubkey, clients...) if err != nil { return res, err } diff --git a/zboxcore/sdk/chunked_upload_model.go b/zboxcore/sdk/chunked_upload_model.go index d5fb3d42f..db26f0407 100644 --- a/zboxcore/sdk/chunked_upload_model.go +++ b/zboxcore/sdk/chunked_upload_model.go @@ -11,6 +11,7 @@ import ( "time" "github.com/0chain/gosdk/core/common" + "github.com/0chain/gosdk/core/zcncrypto" "github.com/0chain/gosdk/zboxcore/allocationchange" "github.com/0chain/gosdk/zboxcore/encryption" "github.com/0chain/gosdk/zboxcore/fileref" @@ -97,6 +98,7 @@ type ChunkedUpload struct { processMap map[int]zboxutil.Uint128 //nolint:unused //used in wasm check chunked_upload_process_js.go processMapLock sync.Mutex //nolint:unused + wallet *zcncrypto.Wallet } // FileMeta metadata of stream input/local diff --git a/zboxcore/sdk/chunked_upload_option.go b/zboxcore/sdk/chunked_upload_option.go index 09bc26e56..89b7d5835 100644 --- a/zboxcore/sdk/chunked_upload_option.go +++ b/zboxcore/sdk/chunked_upload_option.go @@ -7,6 +7,7 @@ import ( "os" "time" + "github.com/0chain/gosdk/core/zcncrypto" "github.com/0chain/gosdk/zboxcore/zboxutil" "github.com/klauspost/reedsolomon" ) @@ -37,6 +38,12 @@ func WithThumbnail(buf []byte) ChunkedUploadOption { } } +func WithWallet(w *zcncrypto.Wallet) ChunkedUploadOption { + return func(su *ChunkedUpload) { + su.wallet = w + } +} + // WithThumbnailFile add thumbnail from file. stream mode is unnecessary for thumbnail. // - fileName: file name of the thumbnail, which will be read and uploaded func WithThumbnailFile(fileName string) ChunkedUploadOption { diff --git a/zboxcore/sdk/chunked_upload_process.go b/zboxcore/sdk/chunked_upload_process.go index 72a7560b7..5f2680e8b 100644 --- a/zboxcore/sdk/chunked_upload_process.go +++ b/zboxcore/sdk/chunked_upload_process.go @@ -105,10 +105,19 @@ func (su *ChunkedUpload) processUpload(chunkStartIndex, chunkEndIndex int, wg.Add(1) go func(b *ChunkedUploadBlobber, thumbnailChunkData []byte, pos uint64) { defer wg.Done() - uploadData, err := su.formBuilder.Build( - &su.fileMeta, blobber.progress.Hasher, su.progress.ConnectionID, blobber.blobber.ID, - su.chunkSize, chunkStartIndex, chunkEndIndex, isFinal, su.encryptedKey, su.progress.EncryptedKeyPoint, - fileShards[pos], thumbnailChunkData, su.shardSize) + var uploadData blobberData + var err error + if su.wallet != nil { + uploadData, err = su.formBuilder.Build( + &su.fileMeta, blobber.progress.Hasher, su.progress.ConnectionID, blobber.blobber.ID, + su.chunkSize, chunkStartIndex, chunkEndIndex, isFinal, su.encryptedKey, su.progress.EncryptedKeyPoint, + fileShards[pos], thumbnailChunkData, su.shardSize, su.wallet.ClientID) + } else { + uploadData, err = su.formBuilder.Build( + &su.fileMeta, blobber.progress.Hasher, su.progress.ConnectionID, blobber.blobber.ID, + su.chunkSize, chunkStartIndex, chunkEndIndex, isFinal, su.encryptedKey, su.progress.EncryptedKeyPoint, + fileShards[pos], thumbnailChunkData, su.shardSize) + } if err != nil { errC := atomic.AddInt32(&errCount, 1) if errC > int32(su.allocationObj.ParityShards-1) { // If atleast data shards + 1 number of blobbers can process the upload, it can be repaired later diff --git a/zboxcore/sdk/commitworker.go b/zboxcore/sdk/commitworker.go index aa4b864c0..df958116c 100644 --- a/zboxcore/sdk/commitworker.go +++ b/zboxcore/sdk/commitworker.go @@ -20,6 +20,7 @@ import ( thrown "github.com/0chain/errors" "github.com/0chain/gosdk/core/client" "github.com/0chain/gosdk/core/encryption" + "github.com/0chain/gosdk/core/zcncrypto" "github.com/0chain/gosdk/zboxcore/allocationchange" "github.com/0chain/gosdk/zboxcore/blockchain" "github.com/0chain/gosdk/zboxcore/fileref" @@ -90,6 +91,7 @@ type CommitRequestV2 struct { commitMask zboxutil.Uint128 changeIndex uint64 isRepair bool + wallet *zcncrypto.Wallet } var ( @@ -433,7 +435,18 @@ func (commitReq *CommitRequestV2) processCommit() { pos = uint64(i.TrailingZeros()) go func(ind uint64) { blobber := commitReq.allocationObj.Blobbers[ind] - trie, err := getReferencePathV2(blobber, commitReq.allocationObj.ID, commitReq.allocationObj.Tx, commitReq.sig, paths, &success, mu) + // trie, err := getReferencePathV2(blobber, commitReq.allocationObj.ID, commitReq.allocationObj.Tx, commitReq.sig, paths, &success, mu) + + var ( + trie *wmpt.WeightedMerkleTrie + err error + ) + if commitReq.wallet != nil { + trie, err = getReferencePathV2(blobber, commitReq.allocationObj.ID, commitReq.allocationObj.Tx, commitReq.sig, paths, &success, mu, commitReq.wallet.ClientID) + } else { + trie, err = getReferencePathV2(blobber, commitReq.allocationObj.ID, commitReq.allocationObj.Tx, commitReq.sig, paths, &success, mu) + } + resp := refPathResp{ trie: trie, err: err, @@ -577,6 +590,9 @@ func (req *CommitRequestV2) commitBlobber(rootHash []byte, rootWeight, prevWeigh wm.AllocationID = req.allocationObj.ID wm.FileMetaRoot = fileMetaRoot wm.ClientID = client.Id() + if req.wallet != nil { + wm.ClientID = req.wallet.ClientID + } err = wm.Sign() if err != nil { l.Logger.Error("Error signing writemarker", err) @@ -588,8 +604,12 @@ func (req *CommitRequestV2) commitBlobber(rootHash []byte, rootWeight, prevWeigh return err } - err = submitWriteMarker(wmData, nil, blobber, req.connectionID, req.allocationObj.ID, req.allocationObj.Tx, req.allocationObj.StorageVersion) - if err != nil { + if req.wallet != nil { + err = submitWriteMarker(wmData, nil, blobber, req.connectionID, req.allocationObj.ID, req.allocationObj.Tx, req.allocationObj.StorageVersion, req.wallet.ClientID) + } else { + err = submitWriteMarker(wmData, nil, blobber, req.connectionID, req.allocationObj.ID, req.allocationObj.Tx, req.allocationObj.StorageVersion) + } + if err != nil { l.Logger.Error("Error submitting writemarker ", err) return err } @@ -619,7 +639,7 @@ func getFormWritter(connectionID string, wmData, fileIDMetaData []byte, body *by return formWriter, nil } -func getReferencePathV2(blobber *blockchain.StorageNode, allocationID, allocationTx, sig string, paths []string, success *bool, mu *sync.Mutex) (*wmpt.WeightedMerkleTrie, error) { +func getReferencePathV2(blobber *blockchain.StorageNode, allocationID, allocationTx, sig string, paths []string, success *bool, mu *sync.Mutex, clientIds... string) (*wmpt.WeightedMerkleTrie, error) { if len(paths) == 0 || blobber.LatestWM == nil || blobber.LatestWM.ChainSize == 0 { var node wmpt.Node if blobber.LatestWM != nil && len(blobber.LatestWM.FileMetaRoot) > 0 && blobber.LatestWM.ChainSize > 0 { @@ -638,7 +658,7 @@ func getReferencePathV2(blobber *blockchain.StorageNode, allocationID, allocatio for retries := 0; retries < 3; retries++ { err, shouldContinue = func() (err error, shouldContinue bool) { var req *http.Request - req, err = zboxutil.NewReferencePathRequestV2(blobber.Baseurl, allocationID, allocationTx, sig, paths, false) + req, err = zboxutil.NewReferencePathRequestV2(blobber.Baseurl, allocationID, allocationTx, sig, paths, false, clientIds...) if err != nil { l.Logger.Error("Creating ref path req", err) return @@ -709,8 +729,18 @@ func getReferencePathV2(blobber *blockchain.StorageNode, allocationID, allocatio return nil, errAlreadySuccessful } trie := wmpt.New(nil, nil) - if lR.LatestWM != nil { - err = lR.LatestWM.VerifySignature(client.PublicKey()) + if lR.LatestWM != nil { + var useClientID string + if len(clientIds) > 0 && clientIds[0] != "" { + useClientID = clientIds[0] + } else { + useClientID = client.Id() + } + wallet := client.GetWalletByClientID(useClientID) + if wallet == nil { + return nil, errors.New("wallet not found", useClientID) + } + err = lR.LatestWM.VerifySignature(wallet.ClientKey) if err != nil { return nil, errors.New("signature_verification_failed", err.Error()) } @@ -732,7 +762,7 @@ func getReferencePathV2(blobber *blockchain.StorageNode, allocationID, allocatio return trie, nil } -func submitWriteMarker(wmData, metaData []byte, blobber *blockchain.StorageNode, connectionID, allocationID, allocationTx string, apiVersion int) (err error) { +func submitWriteMarker(wmData, metaData []byte, blobber *blockchain.StorageNode, connectionID, allocationID, allocationTx string, apiVersion int, clientIds... string) (err error) { var ( resp *http.Response shouldContinue bool @@ -745,7 +775,7 @@ func submitWriteMarker(wmData, metaData []byte, blobber *blockchain.StorageNode, l.Logger.Error("Creating form writer failed: ", err) return } - httpreq, err := zboxutil.NewCommitRequest(blobber.Baseurl, allocationID, allocationTx, body, apiVersion) + httpreq, err := zboxutil.NewCommitRequest(blobber.Baseurl, allocationID, allocationTx, body, apiVersion, clientIds...) if err != nil { l.Logger.Error("Error creating commit req: ", err) return diff --git a/zboxcore/sdk/copyworker.go b/zboxcore/sdk/copyworker.go index 9d77f2425..5e7d1bbd9 100644 --- a/zboxcore/sdk/copyworker.go +++ b/zboxcore/sdk/copyworker.go @@ -46,6 +46,7 @@ type CopyRequest struct { timestamp int64 dirOnly bool destLookupHash string + clientId string Consensus } @@ -130,7 +131,11 @@ func (req *CopyRequest) copyBlobberObject( cncl context.CancelFunc ) - httpreq, err = zboxutil.NewCopyRequest(blobber.Baseurl, req.allocationID, req.allocationTx, req.sig, body, req.allocationObj.Owner) + clientId := req.clientId + if clientId == "" { + clientId = req.allocationObj.Owner + } + httpreq, err = zboxutil.NewCopyRequest(blobber.Baseurl, req.allocationID, req.allocationTx, req.sig, body, req.allocationObj.Owner, clientId) if err != nil { l.Logger.Error(blobber.Baseurl, "Error creating rename request", err) return @@ -417,6 +422,7 @@ type CopyOperation struct { copyMask zboxutil.Uint128 maskMU *sync.Mutex objectTreeRefs []fileref.RefEntity + clientId string Consensus } @@ -438,6 +444,7 @@ func (co *CopyOperation) Process(allocObj *Allocation, connectionID string) ([]f maskMU: co.maskMU, dirOnly: co.dirOnly, Consensus: Consensus{RWMutex: &sync.RWMutex{}}, + clientId: co.clientId, } cR.consensusThresh = co.consensusThresh @@ -517,7 +524,7 @@ func (co *CopyOperation) Error(allocObj *Allocation, consensus int, err error) { } -func NewCopyOperation(ctx context.Context, remotePath string, destPath string, copyMask zboxutil.Uint128, maskMU *sync.Mutex, consensusTh, fullConsensus int, copyDirOnly bool) *CopyOperation { +func NewCopyOperation(ctx context.Context, remotePath string, destPath string, copyMask zboxutil.Uint128, maskMU *sync.Mutex, consensusTh, fullConsensus int, copyDirOnly bool, clientId string) *CopyOperation { co := &CopyOperation{} co.remotefilepath = zboxutil.RemoteClean(remotePath) co.copyMask = copyMask @@ -530,6 +537,7 @@ func NewCopyOperation(ctx context.Context, remotePath string, destPath string, c co.destPath = destPath co.ctx, co.ctxCncl = context.WithCancel(ctx) co.dirOnly = copyDirOnly + co.clientId = clientId return co } diff --git a/zboxcore/sdk/copyworker_test.go b/zboxcore/sdk/copyworker_test.go index f795ce51b..fdc7a3496 100644 --- a/zboxcore/sdk/copyworker_test.go +++ b/zboxcore/sdk/copyworker_test.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "encoding/json" - "github.com/0chain/gosdk/zboxcore/mocks" "io" "mime" "mime/multipart" @@ -14,6 +13,8 @@ import ( "sync" "testing" + "github.com/0chain/gosdk/zboxcore/mocks" + "github.com/0chain/errors" "github.com/0chain/gosdk/core/client" "github.com/0chain/gosdk/core/zcncrypto" @@ -495,7 +496,8 @@ func TestCopyRequest_ProcessCopy(t *testing.T) { maskMU: &sync.Mutex{}, connectionID: mockConnectionId, } - sig, err := client.Sign(mockAllocationTxId) + pubkey := client.PublicKey() + sig, err := client.SignByMultiWallet(mockAllocationTxId, pubkey) require.NoError(err) req.sig = sig req.ctx, req.ctxCncl = context.WithCancel(context.TODO()) diff --git a/zboxcore/sdk/deleteworker.go b/zboxcore/sdk/deleteworker.go index 243e88c5b..c3f3e3eef 100644 --- a/zboxcore/sdk/deleteworker.go +++ b/zboxcore/sdk/deleteworker.go @@ -19,6 +19,7 @@ import ( "github.com/google/uuid" "github.com/0chain/gosdk/constants" + "github.com/0chain/gosdk/core/client" "github.com/0chain/gosdk/core/common" "github.com/0chain/gosdk/zboxcore/allocationchange" "github.com/0chain/gosdk/zboxcore/blockchain" @@ -43,6 +44,7 @@ type DeleteRequest struct { connectionID string consensus Consensus timestamp int64 + clientId string } var errFileDeleted = errors.New("file_deleted", "file is already deleted") @@ -66,7 +68,7 @@ func (req *DeleteRequest) deleteBlobberFile( query.Add("connection_id", req.connectionID) query.Add("path", req.remotefilepath) - httpreq, err := zboxutil.NewDeleteRequest(blobber.Baseurl, req.allocationID, req.allocationTx, req.sig, query, req.allocationObj.Owner) + httpreq, err := zboxutil.NewDeleteRequest(blobber.Baseurl, req.allocationID, req.allocationTx, req.sig, query, req.clientId) if err != nil { l.Logger.Error(blobber.Baseurl, "Error creating delete request", err) return err @@ -356,6 +358,7 @@ type DeleteOperation struct { consensus Consensus lookupHash string refs []fileref.RefEntity + clientId string } func (dop *DeleteOperation) Process(allocObj *Allocation, connectionID string) ([]fileref.RefEntity, zboxutil.Uint128, error) { @@ -374,6 +377,7 @@ func (dop *DeleteOperation) Process(allocObj *Allocation, connectionID string) ( maskMu: dop.maskMu, wg: &sync.WaitGroup{}, consensus: Consensus{RWMutex: &sync.RWMutex{}}, + clientId: dop.clientId, } deleteReq.consensus.fullconsensus = dop.consensus.fullconsensus deleteReq.consensus.consensusThresh = dop.consensus.consensusThresh @@ -582,7 +586,7 @@ func (dop *DeleteOperation) Error(allocObj *Allocation, consensus int, err error } -func NewDeleteOperation(ctx context.Context, remotePath string, deleteMask zboxutil.Uint128, maskMu *sync.Mutex, consensusTh, fullConsensus int) *DeleteOperation { +func NewDeleteOperation(ctx context.Context, remotePath string, deleteMask zboxutil.Uint128, maskMu *sync.Mutex, consensusTh, fullConsensus int, clientIds... string) *DeleteOperation { dop := &DeleteOperation{} dop.remotefilepath = zboxutil.RemoteClean(remotePath) dop.deleteMask = deleteMask @@ -590,6 +594,7 @@ func NewDeleteOperation(ctx context.Context, remotePath string, deleteMask zboxu dop.consensus.consensusThresh = consensusTh dop.consensus.fullconsensus = fullConsensus dop.ctx, dop.ctxCncl = context.WithCancel(ctx) + dop.clientId = clientIds[0] return dop } @@ -623,7 +628,18 @@ func (req *DeleteRequest) deleteSubDirectories() error { } ops = append(ops, op) } - err = req.allocationObj.DoMultiOperation(ops) + if req.clientId != "" { + clientId := req.clientId + wallet := client.GetWalletByClientID(clientId) + if wallet == nil { + return errors.New("client_not_found", clientId) + } + err = req.allocationObj.DoMultiOperation(ops, func(mo *MultiOperation) { + mo.Wallet = wallet + }) + } else { + err = req.allocationObj.DoMultiOperation(ops) + } if err != nil { return err } @@ -657,7 +673,18 @@ func (req *DeleteRequest) deleteSubDirectories() error { } ops = append(ops, op) } - err = req.allocationObj.DoMultiOperation(ops) + if req.clientId != "" { + clientId := req.clientId + wallet := client.GetWalletByClientID(clientId) + if wallet == nil { + return errors.New("client_not_found", clientId) + } + err = req.allocationObj.DoMultiOperation(ops, func(mo *MultiOperation) { + mo.Wallet = wallet + }) + } else { + err = req.allocationObj.DoMultiOperation(ops) + } if err != nil { return err } diff --git a/zboxcore/sdk/dirworker.go b/zboxcore/sdk/dirworker.go index a1b4ef26f..bdca91bdb 100644 --- a/zboxcore/sdk/dirworker.go +++ b/zboxcore/sdk/dirworker.go @@ -16,6 +16,7 @@ import ( "github.com/0chain/errors" "github.com/0chain/gosdk/core/common" "github.com/0chain/gosdk/core/util" + "github.com/0chain/gosdk/core/zcncrypto" "github.com/0chain/gosdk/zboxcore/allocationchange" "github.com/0chain/gosdk/zboxcore/blockchain" "github.com/0chain/gosdk/zboxcore/fileref" @@ -45,6 +46,7 @@ type DirRequest struct { timestamp int64 alreadyExists map[uint64]bool customMeta string + wallet *zcncrypto.Wallet Consensus } @@ -187,7 +189,14 @@ func (req *DirRequest) createDirInBlobber(blobber *blockchain.StorageNode, pos u } formWriter.Close() - httpreq, err := zboxutil.NewCreateDirRequest(blobber.Baseurl, req.allocationID, req.allocationTx, req.sig, body, req.allocationObj.Owner) + var ( + httpreq *http.Request + ) + if req.wallet != nil { + httpreq, err = zboxutil.NewCreateDirRequest(blobber.Baseurl, req.allocationID, req.allocationTx, req.sig, body, req.wallet.ClientID) + } else { + httpreq, err = zboxutil.NewCreateDirRequest(blobber.Baseurl, req.allocationID, req.allocationTx, req.sig, body, req.allocationObj.Owner) + } if err != nil { l.Logger.Error(blobber.Baseurl, "Error creating dir request", err) return err, false @@ -282,6 +291,7 @@ type DirOperation struct { maskMU *sync.Mutex customMeta string alreadyExists map[uint64]bool + wallet *zcncrypto.Wallet Consensus } @@ -303,6 +313,7 @@ func (dirOp *DirOperation) Process(allocObj *Allocation, connectionID string) ([ wg: &sync.WaitGroup{}, alreadyExists: make(map[uint64]bool), customMeta: dirOp.customMeta, + wallet: dirOp.wallet, } dR.Consensus = Consensus{ RWMutex: &sync.RWMutex{}, @@ -364,7 +375,7 @@ func (dirOp *DirOperation) Error(allocObj *Allocation, consensus int, err error) } -func NewDirOperation(remotePath, customMeta string, dirMask zboxutil.Uint128, maskMU *sync.Mutex, consensusTh int, fullConsensus int, ctx context.Context) *DirOperation { +func NewDirOperation(remotePath, customMeta string, dirMask zboxutil.Uint128, maskMU *sync.Mutex, consensusTh int, fullConsensus int, ctx context.Context, wallet *zcncrypto.Wallet) *DirOperation { dirOp := &DirOperation{} dirOp.remotePath = zboxutil.RemoteClean(remotePath) dirOp.dirMask = dirMask @@ -372,6 +383,7 @@ func NewDirOperation(remotePath, customMeta string, dirMask zboxutil.Uint128, ma dirOp.consensusThresh = consensusTh dirOp.fullconsensus = fullConsensus dirOp.customMeta = customMeta + dirOp.wallet = wallet dirOp.ctx, dirOp.ctxCncl = context.WithCancel(ctx) dirOp.alreadyExists = make(map[uint64]bool) return dirOp diff --git a/zboxcore/sdk/downloadworker.go b/zboxcore/sdk/downloadworker.go index a28479f9e..fa8af5008 100644 --- a/zboxcore/sdk/downloadworker.go +++ b/zboxcore/sdk/downloadworker.go @@ -1235,6 +1235,7 @@ func (req *DownloadRequest) getFileMetaConsensus(fMetaResp []*fileMetaResponse) } actualHash := fmr.fileref.ActualFileHash actualFileHashSignature := fmr.fileref.ActualFileHashSignature + var ( isValid bool err error @@ -1301,6 +1302,7 @@ func (req *DownloadRequest) getFileMetaConsensus(fMetaResp []*fileMetaResponse) hashData := fmt.Sprintf("%s:%s:%s:%s", fRef.ActualFileHash, fRef.ValidationRoot, fRef.FixedMerkleRoot, req.blobbers[i].ID) hash = encrypt.Hash(hashData) } + var ( isValid bool err error diff --git a/zboxcore/sdk/filestatsworker_test.go b/zboxcore/sdk/filestatsworker_test.go index d9c7e6ba4..a1a50f97a 100644 --- a/zboxcore/sdk/filestatsworker_test.go +++ b/zboxcore/sdk/filestatsworker_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "github.com/0chain/gosdk/zboxcore/mocks" "io" @@ -126,7 +127,8 @@ func TestListRequest_getFileStatsInfoFromBlobber(t *testing.T) { require.NoError(t, err) require.EqualValues(t, expected, string(actual)) - sign, _ := client.Sign(encryption.Hash(mockAllocationTxId)) + pubkey := client.PublicKey() + sign, _ := client.SignByMultiWallet(encryption.Hash(mockAllocationTxId), pubkey) return req.URL.Path == "Test_Success"+zboxutil.FILE_STATS_ENDPOINT+mockAllocationTxId && req.Method == "POST" && req.Header.Get("X-App-Client-ID") == mockClientId && diff --git a/zboxcore/sdk/moveworker.go b/zboxcore/sdk/moveworker.go index 2bceac689..2294e09c0 100644 --- a/zboxcore/sdk/moveworker.go +++ b/zboxcore/sdk/moveworker.go @@ -45,11 +45,16 @@ type MoveRequest struct { connectionID string timestamp int64 destLookupHash string + clientId string Consensus } func (req *MoveRequest) getObjectTreeFromBlobber(blobber *blockchain.StorageNode) (fileref.RefEntity, error) { - return getObjectTreeFromBlobber(req.ctx, req.allocationID, req.allocationTx, req.sig, req.remotefilepath, blobber, req.allocationObj.Owner) + clientId := req.clientId + if clientId == "" { + clientId = req.allocationObj.Owner + } + return getObjectTreeFromBlobber(req.ctx, req.allocationID, req.allocationTx, req.sig, req.remotefilepath, blobber, req.allocationObj.Owner, clientId) } func (req *MoveRequest) getFileMetaFromBlobber(pos int) (fileRef *fileref.FileRef, err error) { @@ -122,7 +127,11 @@ func (req *MoveRequest) moveBlobberObject( cncl context.CancelFunc ) - httpreq, err = zboxutil.NewMoveRequest(blobber.Baseurl, req.allocationID, req.allocationTx, req.sig, body, req.allocationObj.Owner) + clientId := req.clientId + if clientId == "" { + clientId = req.allocationObj.Owner + } + httpreq, err = zboxutil.NewMoveRequest(blobber.Baseurl, req.allocationID, req.allocationTx, req.sig, body, req.allocationObj.Owner, clientId) if err != nil { l.Logger.Error(blobber.Baseurl, "Error creating rename request", err) return @@ -422,6 +431,7 @@ type MoveOperation struct { maskMU *sync.Mutex consensus Consensus objectTreeRefs []fileref.RefEntity + clientId string } func (mo *MoveOperation) Process(allocObj *Allocation, connectionID string) ([]fileref.RefEntity, zboxutil.Uint128, error) { @@ -439,6 +449,7 @@ func (mo *MoveOperation) Process(allocObj *Allocation, connectionID string) ([]f maskMU: mo.maskMU, destPath: mo.destPath, Consensus: Consensus{RWMutex: &sync.RWMutex{}}, + clientId: mo.clientId, } mR.Consensus.fullconsensus = mo.consensus.fullconsensus mR.Consensus.consensusThresh = mo.consensus.consensusThresh @@ -518,7 +529,7 @@ func (mo *MoveOperation) Error(allocObj *Allocation, consensus int, err error) { } -func NewMoveOperation(remotePath string, destPath string, moveMask zboxutil.Uint128, maskMU *sync.Mutex, consensusTh int, fullConsensus int, ctx context.Context) *MoveOperation { +func NewMoveOperation(remotePath string, destPath string, moveMask zboxutil.Uint128, maskMU *sync.Mutex, consensusTh int, fullConsensus int, ctx context.Context, clientId string) *MoveOperation { mo := &MoveOperation{} mo.remotefilepath = zboxutil.RemoteClean(remotePath) if destPath != "/" { @@ -530,6 +541,7 @@ func NewMoveOperation(remotePath string, destPath string, moveMask zboxutil.Uint mo.consensus.consensusThresh = consensusTh mo.consensus.fullconsensus = fullConsensus mo.ctx, mo.ctxCncl = context.WithCancel(ctx) + mo.clientId = clientId return mo } diff --git a/zboxcore/sdk/multi_operation_worker.go b/zboxcore/sdk/multi_operation_worker.go index 38a6a6f07..b70883049 100644 --- a/zboxcore/sdk/multi_operation_worker.go +++ b/zboxcore/sdk/multi_operation_worker.go @@ -16,6 +16,7 @@ import ( "github.com/0chain/gosdk/core/common" "github.com/0chain/gosdk/core/util" + "github.com/0chain/gosdk/core/zcncrypto" "github.com/0chain/gosdk/zboxcore/allocationchange" "github.com/0chain/gosdk/zboxcore/fileref" "github.com/0chain/gosdk/zboxcore/logger" @@ -63,10 +64,10 @@ type MultiOperation struct { changes [][]allocationchange.AllocationChange changesV2 []allocationchange.AllocationChangeV2 isRepair bool + Wallet *zcncrypto.Wallet } func (mo *MultiOperation) createConnectionObj(blobberIdx int) (err error) { - defer func() { if err == nil { mo.maskMU.Lock() @@ -96,10 +97,18 @@ func (mo *MultiOperation) createConnectionObj(blobberIdx int) (err error) { formWriter.Close() var httpreq *http.Request - httpreq, err = zboxutil.NewConnectionRequest(blobber.Baseurl, mo.allocationObj.ID, mo.allocationObj.Tx, mo.allocationObj.sig, body, mo.allocationObj.Owner) - if err != nil { - l.Logger.Error(blobber.Baseurl, "Error creating new connection request", err) - return + if mo.Wallet != nil { + httpreq, err = zboxutil.NewConnectionRequest(blobber.Baseurl, mo.allocationObj.ID, mo.allocationObj.Tx, mo.allocationObj.sig, body, mo.Wallet.ClientID) + if err != nil { + l.Logger.Error(blobber.Baseurl, "Error creating new connection request by wallet", err) + return err, false + } + } else { + httpreq, err = zboxutil.NewConnectionRequest(blobber.Baseurl, mo.allocationObj.ID, mo.allocationObj.Tx, mo.allocationObj.sig, body, mo.allocationObj.Owner) + if err != nil { + l.Logger.Error(blobber.Baseurl, "Error creating new connection request", err) + return + } } httpreq.Header.Add("Content-Type", formWriter.FormDataContentType()) @@ -147,7 +156,6 @@ func (mo *MultiOperation) createConnectionObj(blobberIdx int) (err error) { err = errors.New("response_error", string(respBody)) return }() - if err != nil { return } @@ -270,7 +278,11 @@ func (mo *MultiOperation) Process() error { start = time.Now() status := Commit if !mo.isRepair && !mo.allocationObj.checkStatus { - status, _, err = mo.allocationObj.CheckAllocStatus() + if mo.Wallet != nil { + status, _, err = mo.allocationObj.CheckAllocStatus(mo.Wallet.ClientID) + } else { + status, _, err = mo.allocationObj.CheckAllocStatus() + } if err != nil { logger.Logger.Error("Error checking allocation status", err) if singleClientMode { @@ -393,7 +405,6 @@ func (mo *MultiOperation) Process() error { } func (mo *MultiOperation) commitV2() error { - rootMap := make(map[string]zboxutil.Uint128) var pos uint64 for i := mo.operationMask; !i.Equals64(0); i = i.And(zboxutil.NewUint128(1).Lsh(pos).Not()) { @@ -427,6 +438,7 @@ func (mo *MultiOperation) commitV2() error { consensusThresh: threshold, changes: changes, isRepair: mo.isRepair, + wallet: mo.Wallet, } commitReqs[counter] = commitReq counter++ diff --git a/zboxcore/sdk/renameworker.go b/zboxcore/sdk/renameworker.go index 644bfcc75..f83dc75ac 100644 --- a/zboxcore/sdk/renameworker.go +++ b/zboxcore/sdk/renameworker.go @@ -45,10 +45,15 @@ type RenameRequest struct { connectionID string consensus Consensus timestamp int64 + clientId string } func (req *RenameRequest) getObjectTreeFromBlobber(blobber *blockchain.StorageNode) (fileref.RefEntity, error) { - return getObjectTreeFromBlobber(req.ctx, req.allocationID, req.allocationTx, req.sig, req.remotefilepath, blobber, req.allocationObj.Owner) + clientId := req.clientId + if clientId == "" { + clientId = req.allocationObj.Owner + } + return getObjectTreeFromBlobber(req.ctx, req.allocationID, req.allocationTx, req.sig, req.remotefilepath, blobber, req.allocationObj.Owner, clientId) } func (req *RenameRequest) getFileMetaFromBlobber(pos int) (fileRef *fileref.FileRef, err error) { @@ -117,7 +122,11 @@ func (req *RenameRequest) renameBlobberObject( formWriter.Close() var httpreq *http.Request - httpreq, err = zboxutil.NewRenameRequest(blobber.Baseurl, req.allocationID, req.allocationTx, req.sig, body, req.allocationObj.Owner) + clientId := req.clientId + if clientId == "" { + clientId = req.allocationObj.Owner + } + httpreq, err = zboxutil.NewRenameRequest(blobber.Baseurl, req.allocationID, req.allocationTx, req.sig, body, req.allocationObj.Owner, clientId) if err != nil { l.Logger.Error(blobber.Baseurl, "Error creating rename request", err) return @@ -418,6 +427,7 @@ type RenameOperation struct { newName string maskMU *sync.Mutex objectTreeRefs []fileref.RefEntity + clientId string consensus Consensus } @@ -439,6 +449,7 @@ func (ro *RenameOperation) Process(allocObj *Allocation, connectionID string) ([ maskMU: ro.maskMU, wg: &sync.WaitGroup{}, consensus: Consensus{RWMutex: &sync.RWMutex{}}, + clientId: ro.clientId, } if filepath.Base(ro.remotefilepath) == ro.newName { return nil, ro.renameMask, errors.New("invalid_operation", "Cannot rename to same name") @@ -526,7 +537,7 @@ func (ro *RenameOperation) Error(allocObj *Allocation, consensus int, err error) } -func NewRenameOperation(remotePath string, destName string, renameMask zboxutil.Uint128, maskMU *sync.Mutex, consensusTh int, fullConsensus int, ctx context.Context) *RenameOperation { +func NewRenameOperation(remotePath string, destName string, renameMask zboxutil.Uint128, maskMU *sync.Mutex, consensusTh int, fullConsensus int, ctx context.Context, clientId string) *RenameOperation { ro := &RenameOperation{} ro.remotefilepath = zboxutil.RemoteClean(remotePath) ro.newName = path.Base(destName) @@ -535,6 +546,7 @@ func NewRenameOperation(remotePath string, destName string, renameMask zboxutil. ro.consensus.consensusThresh = consensusTh ro.consensus.fullconsensus = fullConsensus ro.ctx, ro.ctxCncl = context.WithCancel(ctx) + ro.clientId = clientId return ro } diff --git a/zboxcore/sdk/rollback.go b/zboxcore/sdk/rollback.go index 6d1607c44..2044c033b 100644 --- a/zboxcore/sdk/rollback.go +++ b/zboxcore/sdk/rollback.go @@ -65,6 +65,16 @@ type BlobberStatus struct { func GetWritemarker(allocID, allocTx, sig, id, baseUrl string, clientId ...string) (*LatestPrevWriteMarker, error) { var lpm LatestPrevWriteMarker + var useClientID string + if len(clientId) > 0 && clientId[0] != "" { + useClientID = clientId[0] + } else { + useClientID = client.Id() + } + wallet := client.GetWalletByClientID(useClientID) + if wallet == nil { + return nil, errors.New("wallet not found : " + useClientID) + } req, err := zboxutil.NewWritemarkerRequest(baseUrl, allocID, allocTx, sig, clientId...) if err != nil { @@ -103,12 +113,12 @@ func GetWritemarker(allocID, allocTx, sig, id, baseUrl string, clientId ...strin return nil, err } if lpm.LatestWM != nil { - err = lpm.LatestWM.VerifySignature(client.PublicKey()) + err = lpm.LatestWM.VerifySignature(wallet.ClientKey) if err != nil { return nil, fmt.Errorf("signature verification failed for latest writemarker: %s", err.Error()) } if lpm.PrevWM != nil { - err = lpm.PrevWM.VerifySignature(client.PublicKey()) + err = lpm.PrevWM.VerifySignature(wallet.ClientKey) if err != nil { return nil, fmt.Errorf("signature verification failed for latest writemarker: %s", err.Error()) } @@ -269,7 +279,14 @@ func (rb *RollbackBlobber) processRollback(ctx context.Context, tx string) error // CheckAllocStatus checks the status of the allocation // and returns the status of the allocation and its blobbers. -func (a *Allocation) CheckAllocStatus() (AllocStatus, []BlobberStatus, error) { +func (a *Allocation) CheckAllocStatus(clientId... string) (AllocStatus, []BlobberStatus, error) { + + var useClientID string + if len(clientId) > 0 && clientId[0] != "" { + useClientID = clientId[0] + } else { + useClientID = a.Owner + } wg := &sync.WaitGroup{} markerChan := make(chan *RollbackBlobber, len(a.Blobbers)) @@ -286,7 +303,7 @@ func (a *Allocation) CheckAllocStatus() (AllocStatus, []BlobberStatus, error) { ID: blobber.ID, Status: "available", } - wr, err := GetWritemarker(a.ID, a.Tx, a.sig, blobber.ID, blobber.Baseurl, a.Owner) + wr, err := GetWritemarker(a.ID, a.Tx, a.sig, blobber.ID, blobber.Baseurl, useClientID) if err != nil { atomic.AddInt32(&errCnt, 1) markerError = err diff --git a/zboxcore/sdk/sdk.go b/zboxcore/sdk/sdk.go index 91b8ee639..0b018bca4 100644 --- a/zboxcore/sdk/sdk.go +++ b/zboxcore/sdk/sdk.go @@ -7,12 +7,22 @@ import ( "io" "math" "net/http" + "os" + "path" "strconv" + "strings" "github.com/0chain/common/core/currency" "github.com/0chain/errors" + thrown "github.com/0chain/errors" + "github.com/0chain/gosdk/constants" "github.com/0chain/gosdk/core/logger" + "github.com/0chain/gosdk/core/pathutil" "github.com/0chain/gosdk/core/screstapi" + "github.com/0chain/gosdk/core/sys" + "github.com/0chain/gosdk/core/zcncrypto" + + // "github.com/0chain/gosdk/zcncore" "gopkg.in/natefinch/lumberjack.v2" "github.com/0chain/gosdk/core/client" @@ -1473,3 +1483,109 @@ func updateMaskBit(mask uint16, index uint8, value bool) uint16 { return mask & ^uint16(1< 0 && clientIds[0] != "" { + wallet := client.GetWalletByClientID(clientIds[0]) + if wallet == nil { + panic("wallet not found : " + clientIds[0]) + } + req.Header.Set("X-App-Client-ID", wallet.ClientID) + req.Header.Set("X-App-Client-Key", wallet.ClientKey) + return + } req.Header.Set("X-App-Client-ID", client.Id()) req.Header.Set("X-App-Client-Key", client.PublicKey()) } @@ -208,7 +219,13 @@ func setClientInfoWithSign(req *http.Request, sig, allocation, baseURL string, c } else { clientID = client.Id() } - setClientInfo(req) + wallet := client.GetWalletByClientID(clientID) + if wallet == nil { + return errors.New("wallet not found", clientID) + } + fmt.Printf("setClientInfoWithSign: wallet details: %+v\n", *wallet) + req.Header.Set("X-App-Client-ID", wallet.ClientID) + req.Header.Set("X-App-Client-Key", wallet.ClientKey) req.Header.Set(CLIENT_SIGNATURE_HEADER, sig) hashData := allocation + baseURL @@ -243,7 +260,7 @@ func NewCommitRequest(baseUrl, allocationID string, allocationTx string, body io if err != nil { return nil, err } - setClientInfo(req) + setClientInfo(req, clients...) req.Header.Set(ALLOCATION_ID_HEADER, allocationID) @@ -651,11 +668,22 @@ func NewFastUploadRequest(baseURL, allocationID string, allocationTx string, bod } func setFastClientInfoWithSign(req *fasthttp.Request, allocation, baseURL string, clients ...string) error { - req.Header.Set("X-App-Client-ID", client.Id()) - req.Header.Set("X-App-Client-Key", client.PublicKey()) + var clientID string + if len(clients) > 0 && clients[0] != "" { + clientID = clients[0] + } else { + clientID = client.Id() + } + wallet := client.GetWalletByClientID(clientID) + if wallet == nil { + return errors.New("wallet not found", clientID) + } + req.Header.Set("X-App-Client-ID", wallet.ClientID) + req.Header.Set("X-App-Client-Key", wallet.ClientKey) + hashData := allocation + baseURL - clientID := client.Id() + // clientID := client.Id() sig2, ok := SignCache.Get(hashData + ":" + clientID) if !ok { var err error @@ -694,7 +722,33 @@ func NewUploadRequest(baseUrl, allocationID, allocationTx, sig string, body io.R return req, nil } +// NewConnectionRequestByWallet creates a new connection request using the given wallet. +// func NewConnectionRequestByWallet(baseUrl, allocationID, allocationTx, sig string, body io.Reader, wallet *zcncrypto.Wallet) (*http.Request, error) { +// l.Logger.Info(fmt.Sprintf("NewConnectionRequestByWallet: baseUrl: %s, allocationID: %s, allocationTx: %s, sig: %s", baseUrl, allocationID, allocationTx, sig)) +// u, err := joinUrl(baseUrl, CREATE_CONNECTION_ENDPOINT, allocationTx) +// if err != nil { +// return nil, err +// } +// req, err := http.NewRequest(http.MethodPost, u.String(), body) +// if err != nil { +// return nil, err +// } +// req.Header.Set("X-App-Client-ID", wallet.ClientID) +// req.Header.Set("X-App-Client-Key", wallet.ClientKey) +// req.Header.Set(CLIENT_SIGNATURE_HEADER, sig) +// hashData := allocationTx + baseUrl +// sig2, err := wallet.Sign(encryption.Hash(hashData), constants.BLS0CHAIN.String()) +// if err != nil { +// return nil, err +// } +// req.Header.Set(CLIENT_SIGNATURE_HEADER, sig) +// req.Header.Set(CLIENT_SIGNATURE_HEADER_V2, sig2) +// req.Header.Set(ALLOCATION_ID_HEADER, allocationID) +// return req, nil +// } + func NewConnectionRequest(baseUrl, allocationID, allocationTx, sig string, body io.Reader, clients ...string) (*http.Request, error) { + l.Logger.Info(fmt.Sprintf("NewConnectionRequest: baseUrl: %s, allocationID: %s, allocationTx: %s, sig: %s", baseUrl, allocationID, allocationTx, sig)) u, err := joinUrl(baseUrl, CREATE_CONNECTION_ENDPOINT, allocationTx) if err != nil { return nil, err @@ -831,7 +885,7 @@ func NewRedeemRequest(baseUrl, allocationID, allocationTx string, clients ...str return req, nil } -func NewDeleteRequest(baseUrl, allocationID, allocationTx, sig string, query *url.Values, clients ...string) (*http.Request, error) { +func NewDeleteRequest(baseUrl, allocationID, allocationTx, sig string, query *url.Values, clients... string) (*http.Request, error) { u, err := joinUrl(baseUrl, UPLOAD_ENDPOINT, allocationTx) if err != nil { return nil, err