Skip to content

Commit a526198

Browse files
committed
Add real-time Geyser external deposit processing
1 parent 6d4ebce commit a526198

File tree

15 files changed

+227
-156
lines changed

15 files changed

+227
-156
lines changed

pkg/code/async/geyser/external_deposit.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ func getSyncedVmDepositCacheKey(signature string, vmDepositAta *common.Account)
398398
return fmt.Sprintf("%s:%s", signature, vmDepositAta.PublicKey().ToBase58())
399399
}
400400

401-
// todo: below code needs to be reimagined for the VM
401+
// todo: below swap code needs to be reimagined for the VM
402402

403403
/*
404404
func markRequiringSwapRetries(ctx context.Context, data code_data.Provider, accountInfoRecord *account.Record) error {

pkg/code/async/geyser/handler.go

Lines changed: 65 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
package async_geyser
22

33
import (
4+
"bytes"
45
"context"
56

67
"github.com/mr-tron/base58"
78
"github.com/pkg/errors"
89

910
geyserpb "github.com/code-payments/code-server/pkg/code/async/geyser/api/gen"
11+
indexerpb "github.com/code-payments/code-vm-indexer/generated/indexer/v1"
1012

13+
"github.com/code-payments/code-server/pkg/code/common"
1114
code_data "github.com/code-payments/code-server/pkg/code/data"
1215
"github.com/code-payments/code-server/pkg/solana/token"
1316
)
@@ -25,110 +28,95 @@ type ProgramAccountUpdateHandler interface {
2528
}
2629

2730
type TokenProgramAccountHandler struct {
28-
conf *conf
29-
data code_data.Provider
31+
conf *conf
32+
data code_data.Provider
33+
vmIndexerClient indexerpb.IndexerClient
3034
}
3135

32-
func NewTokenProgramAccountHandler(conf *conf, data code_data.Provider) ProgramAccountUpdateHandler {
36+
func NewTokenProgramAccountHandler(conf *conf, data code_data.Provider, vmIndexerClient indexerpb.IndexerClient) ProgramAccountUpdateHandler {
3337
return &TokenProgramAccountHandler{
34-
conf: conf,
35-
data: data,
38+
conf: conf,
39+
data: data,
40+
vmIndexerClient: vmIndexerClient,
3641
}
3742
}
3843

39-
// todo: implement real-time external deposits for the VM
44+
// todo: This needs to handle swaps
4045
func (h *TokenProgramAccountHandler) Handle(ctx context.Context, update *geyserpb.AccountUpdate) error {
41-
return nil
46+
if !bytes.Equal(update.Owner, token.ProgramKey) {
47+
return ErrUnexpectedProgramOwner
48+
}
4249

43-
/*
44-
if !bytes.Equal(update.Owner, token.ProgramKey) {
45-
return ErrUnexpectedProgramOwner
46-
}
50+
// We need to know the amount being deposited, and that's impossible without
51+
// a transaction signature.
52+
if update.TxSignature == nil {
53+
return nil
54+
}
4755

48-
// We need to know the amount being deposited, and that's impossible without
49-
// a transaction signature.
50-
if update.TxSignature == nil {
51-
return nil
52-
}
56+
// We need to know more about the account before accessing our data stores,
57+
// so skip anything that doesn't have data. I'm assuming this means the account
58+
// is closed anyways.
59+
if len(update.Data) == 0 {
60+
return nil
61+
}
5362

54-
// We need to know more about the account before accessing our data stores,
55-
// so skip anything that doesn't have data. I'm assuming this means the account
56-
// is closed anyways.
57-
if len(update.Data) == 0 {
58-
return nil
59-
}
63+
var unmarshalled token.Account
64+
if !unmarshalled.Unmarshal(update.Data) {
65+
// Probably not a token account (eg. mint)
66+
return nil
67+
}
6068

61-
var unmarshalled token.Account
62-
if !unmarshalled.Unmarshal(update.Data) {
63-
// Probably not a token account (eg. mint)
64-
return nil
65-
}
69+
tokenAccount, err := common.NewAccountFromPublicKeyBytes(update.Pubkey)
70+
if err != nil {
71+
return errors.Wrap(err, "invalid token account")
72+
}
6673

67-
tokenAccount, err := common.NewAccountFromPublicKeyBytes(update.Pubkey)
68-
if err != nil {
69-
return errors.Wrap(err, "invalid token account")
70-
}
74+
ownerAccount, err := common.NewAccountFromPublicKeyBytes(unmarshalled.Owner)
75+
if err != nil {
76+
return errors.Wrap(err, "invalid owner account")
77+
}
7178

72-
ownerAccount, err := common.NewAccountFromPublicKeyBytes(unmarshalled.Owner)
73-
if err != nil {
74-
return errors.Wrap(err, "invalid owner account")
75-
}
79+
mintAccount, err := common.NewAccountFromPublicKeyBytes(unmarshalled.Mint)
80+
if err != nil {
81+
return errors.Wrap(err, "invalid mint account")
82+
}
7683

77-
mintAccount, err := common.NewAccountFromPublicKeyBytes(unmarshalled.Mint)
78-
if err != nil {
79-
return errors.Wrap(err, "invalid mint account")
80-
}
84+
switch mintAccount.PublicKey().ToBase58() {
8185

82-
// Account is empty, and all we care about are external deposits at this point,
83-
// so filter it out
84-
if unmarshalled.Amount == 0 {
86+
case common.CoreMintAccount.PublicKey().ToBase58():
87+
// Not an ATA, so filter it out. It cannot be a VM deposit ATA
88+
if bytes.Equal(tokenAccount.PublicKey().ToBytes(), ownerAccount.PublicKey().ToBytes()) {
8589
return nil
8690
}
8791

88-
switch mintAccount.PublicKey().ToBase58() {
89-
90-
case common.CoreMintAccount.PublicKey().ToBase58():
91-
// Not a program vault account, so filter it out. It cannot be a Timelock
92-
// account.
93-
if !bytes.Equal(tokenAccount.PublicKey().ToBytes(), ownerAccount.PublicKey().ToBytes()) {
94-
return nil
95-
}
96-
97-
// todo: Need to implement VM deposit flow
92+
exists, userAuthorityAccount, err := testForKnownUserAuthorityFromDepositPda(ctx, h.data, tokenAccount)
93+
if err != nil {
94+
return errors.Wrap(err, "error testing for user authority from deposit pda")
95+
} else if !exists {
9896
return nil
97+
}
9998

100-
case common.UsdcMintAccount.PublicKey().ToBase58():
101-
ata, err := ownerAccount.ToAssociatedTokenAccount(common.UsdcMintAccount)
102-
if err != nil {
103-
return errors.Wrap(err, "error deriving usdc ata")
104-
}
105-
106-
// Not an ATA, so filter it out
107-
if !bytes.Equal(tokenAccount.PublicKey().ToBytes(), ata.PublicKey().ToBytes()) {
108-
return nil
109-
}
99+
err = processPotentialExternalDepositIntoVm(ctx, h.data, *update.TxSignature, userAuthorityAccount)
100+
if err != nil {
101+
return errors.Wrap(err, "error processing signature for external deposit into vm")
102+
}
110103

111-
isCodeSwapAccount, err := testForKnownCodeSwapAccount(ctx, h.data, tokenAccount)
104+
if unmarshalled.Amount > 0 {
105+
err = maybeInitiateExternalDepositIntoVm(ctx, h.data, h.vmIndexerClient, userAuthorityAccount)
112106
if err != nil {
113-
return errors.Wrap(err, "error testing for known account")
114-
} else if !isCodeSwapAccount {
115-
// Not an account we track, so skip the update
116-
return nil
107+
return errors.Wrap(err, "error depositing into the vm")
117108
}
118-
119-
default:
120-
// Not a Core Mint or USDC account, so filter it out
121-
return nil
122109
}
123110

124-
// We've determined this token account is one that we care about. Process
125-
// the update as an external deposit.
126-
return processPotentialExternalDeposit(ctx, h.conf, h.data, *update.TxSignature, tokenAccount)
127-
*/
111+
return nil
112+
default:
113+
// Not a Core Mint account, so filter it out
114+
return nil
115+
}
128116
}
129117

130-
func initializeProgramAccountUpdateHandlers(conf *conf, data code_data.Provider) map[string]ProgramAccountUpdateHandler {
118+
func initializeProgramAccountUpdateHandlers(conf *conf, data code_data.Provider, vmIndexerClient indexerpb.IndexerClient) map[string]ProgramAccountUpdateHandler {
131119
return map[string]ProgramAccountUpdateHandler{
132-
base58.Encode(token.ProgramKey): NewTokenProgramAccountHandler(conf, data),
120+
base58.Encode(token.ProgramKey): NewTokenProgramAccountHandler(conf, data, vmIndexerClient),
133121
}
134122
}

pkg/code/async/geyser/handler_test.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,9 @@ package async_geyser
22

33
import (
44
"testing"
5-
6-
code_data "github.com/code-payments/code-server/pkg/code/data"
75
)
86

97
// todo: implement me
108
func TestTokenProgramAccountHandler(t *testing.T) {
119

1210
}
13-
14-
// todo: implement me
15-
func TestTimelockV1ProgramAccountHandler(t *testing.T) {
16-
17-
}
18-
19-
type testEnv struct {
20-
data code_data.Provider
21-
handlers map[string]ProgramAccountUpdateHandler
22-
}
23-
24-
func setup(t *testing.T) *testEnv {
25-
data := code_data.NewTestDataProvider()
26-
return &testEnv{
27-
data: data,
28-
handlers: initializeProgramAccountUpdateHandlers(&conf{}, data),
29-
}
30-
}

pkg/code/async/geyser/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func New(data code_data.Provider, vmIndexerClient indexerpb.IndexerClient, confi
5151
vmIndexerClient: vmIndexerClient,
5252
conf: configProvider(),
5353
programUpdatesChan: make(chan *geyserpb.AccountUpdate, conf.programUpdateQueueSize.Get(context.Background())),
54-
programUpdateHandlers: initializeProgramAccountUpdateHandlers(conf, data),
54+
programUpdateHandlers: initializeProgramAccountUpdateHandlers(conf, data, vmIndexerClient),
5555
programUpdateWorkerMetrics: make(map[int]*eventWorkerMetrics),
5656
}
5757
}

pkg/code/async/geyser/util.go

Lines changed: 19 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,57 +5,41 @@ import (
55

66
"github.com/pkg/errors"
77

8-
commonpb "github.com/code-payments/code-protobuf-api/generated/go/common/v1"
9-
108
"github.com/code-payments/code-server/pkg/cache"
119
"github.com/code-payments/code-server/pkg/code/common"
1210
code_data "github.com/code-payments/code-server/pkg/code/data"
13-
"github.com/code-payments/code-server/pkg/code/data/account"
1411
"github.com/code-payments/code-server/pkg/code/data/timelock"
1512
)
1613

1714
var (
18-
codeTimelockAccountStatusCache = cache.NewCache(1_000_000)
19-
codeSwapAccontStatusCache = cache.NewCache(1_000_000)
15+
depositPdaToUserAuthorityCache = cache.NewCache(1_000_000)
2016
)
2117

2218
// todo: use a bloom filter, but a caching strategy might be ok for now
23-
func testForKnownCodeTimelockAccount(ctx context.Context, data code_data.Provider, tokenAccount *common.Account) (bool, error) {
24-
status, ok := codeTimelockAccountStatusCache.Retrieve(tokenAccount.PublicKey().ToBase58())
19+
func testForKnownUserAuthorityFromDepositPda(ctx context.Context, data code_data.Provider, depositPdaAccount *common.Account) (bool, *common.Account, error) {
20+
cached, ok := depositPdaToUserAuthorityCache.Retrieve(depositPdaAccount.PublicKey().ToBase58())
2521
if ok {
26-
return status.(bool), nil
22+
userAuthorityAccountPublicKeyString := cached.(string)
23+
if len(userAuthorityAccountPublicKeyString) > 0 {
24+
userAuthorityAccount, _ := common.NewAccountFromPublicKeyString(userAuthorityAccountPublicKeyString)
25+
return true, userAuthorityAccount, nil
26+
}
27+
return false, nil, nil
2728
}
2829

29-
_, err := data.GetTimelockByVault(ctx, tokenAccount.PublicKey().ToBase58())
30+
timelockRecord, err := data.GetTimelockByDepositPda(ctx, depositPdaAccount.PublicKey().ToBase58())
3031
switch err {
3132
case timelock.ErrTimelockNotFound:
32-
codeTimelockAccountStatusCache.Insert(tokenAccount.PublicKey().ToBase58(), false, 1)
33-
return false, nil
34-
case nil:
35-
codeTimelockAccountStatusCache.Insert(tokenAccount.PublicKey().ToBase58(), true, 1)
36-
return true, nil
37-
default:
38-
return false, errors.Wrap(err, "error getting timelock record")
39-
}
40-
}
41-
42-
// todo: use a bloom filter, but a caching strategy might be ok for now
43-
func testForKnownCodeSwapAccount(ctx context.Context, data code_data.Provider, tokenAccount *common.Account) (bool, error) {
44-
status, ok := codeSwapAccontStatusCache.Retrieve(tokenAccount.PublicKey().ToBase58())
45-
if ok {
46-
return status.(bool), nil
47-
}
48-
49-
accountInfoRecord, err := data.GetAccountInfoByTokenAddress(ctx, tokenAccount.PublicKey().ToBase58())
50-
switch err {
51-
case account.ErrAccountInfoNotFound:
52-
codeSwapAccontStatusCache.Insert(tokenAccount.PublicKey().ToBase58(), false, 1)
53-
return false, nil
33+
depositPdaToUserAuthorityCache.Insert(depositPdaAccount.PublicKey().ToBase58(), "", 1)
34+
return false, nil, nil
5435
case nil:
55-
isSwapAccount := accountInfoRecord.AccountType == commonpb.AccountType_SWAP
56-
codeSwapAccontStatusCache.Insert(tokenAccount.PublicKey().ToBase58(), isSwapAccount, 1)
57-
return isSwapAccount, nil
36+
userAuthorityAccount, err := common.NewAccountFromPublicKeyString(timelockRecord.VaultOwner)
37+
if err != nil {
38+
return false, nil, errors.New("invalid vault owner account")
39+
}
40+
depositPdaToUserAuthorityCache.Insert(depositPdaAccount.PublicKey().ToBase58(), userAuthorityAccount.PublicKey().ToBase58(), 1)
41+
return true, userAuthorityAccount, nil
5842
default:
59-
return false, errors.Wrap(err, "error getting account info record")
43+
return false, nil, errors.Wrap(err, "error getting timelock record")
6044
}
6145
}

pkg/code/common/account.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ type Account struct {
2525
}
2626

2727
type TimelockAccounts struct {
28-
Vm *Account
29-
3028
VaultOwner *Account
3129

3230
State *Account
@@ -38,19 +36,21 @@ type TimelockAccounts struct {
3836
Unlock *Account
3937
UnlockBump uint8
4038

39+
VmDepositAccounts *VmDepositAccounts
40+
41+
Vm *Account
4142
Mint *Account
4243
}
4344

4445
type VmDepositAccounts struct {
45-
Vm *Account
46-
4746
VaultOwner *Account
4847

4948
Pda *Account
5049
PdaBump uint8
5150

5251
Ata *Account
5352

53+
Vm *Account
5454
Mint *Account
5555
}
5656

@@ -253,9 +253,12 @@ func (a *Account) GetTimelockAccounts(vm, mint *Account) (*TimelockAccounts, err
253253
return nil, errors.Wrap(err, "invalid unlock address")
254254
}
255255

256-
return &TimelockAccounts{
257-
Vm: vm,
256+
vmDepositAccounts, err := a.GetVmDepositAccounts(vm, mint)
257+
if err != nil {
258+
return nil, errors.Wrap(err, "error getting vm deposit accounts")
259+
}
258260

261+
return &TimelockAccounts{
259262
VaultOwner: a,
260263

261264
State: stateAccount,
@@ -267,6 +270,9 @@ func (a *Account) GetTimelockAccounts(vm, mint *Account) (*TimelockAccounts, err
267270
Unlock: unlockAccount,
268271
UnlockBump: unlockBump,
269272

273+
VmDepositAccounts: vmDepositAccounts,
274+
275+
Vm: vm,
270276
Mint: mint,
271277
}, nil
272278
}
@@ -296,15 +302,14 @@ func (a *Account) GetVmDepositAccounts(vm, mint *Account) (*VmDepositAccounts, e
296302
}
297303

298304
return &VmDepositAccounts{
299-
Vm: vm,
300-
301305
Pda: depositPdaAccount,
302306
PdaBump: depositPdaBump,
303307

304308
Ata: depositAtaAccount,
305309

306310
VaultOwner: a,
307311

312+
Vm: vm,
308313
Mint: mint,
309314
}, nil
310315
}
@@ -389,6 +394,9 @@ func (a *TimelockAccounts) ToDBRecord() *timelock.Record {
389394
VaultOwner: a.VaultOwner.publicKey.ToBase58(),
390395
VaultState: timelock_token_v1.StateUnknown,
391396

397+
DepositPdaAddress: a.VmDepositAccounts.Pda.publicKey.ToBase58(),
398+
DepositPdaBump: a.VmDepositAccounts.PdaBump,
399+
392400
UnlockAt: nil,
393401

394402
Block: 0,

0 commit comments

Comments
 (0)