Skip to content
Open
53 changes: 36 additions & 17 deletions magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ where
if let Some(in_bank) =
this.accounts_bank.get_account(&pubkey)
{
if in_bank.delegated() && !in_bank.undelegating() {
this.unsubscribe_from_delegated_account(pubkey)
.await;
return;
}

if in_bank.undelegating() {
// We expect the account to still be delegated, but with the delegation
// program owner
Expand Down Expand Up @@ -258,17 +264,8 @@ where
// The subscription will be turned back on once the committor service schedules
// a commit for it that includes undelegation
if account.delegated() {
if let Err(err) = this
.remote_account_provider
.unsubscribe(&pubkey)
.await
{
error!(
pubkey = %pubkey,
error = %err,
"Failed to unsubscribe from delegated account"
);
}
this.unsubscribe_from_delegated_account(pubkey)
.await;
}

if account.executable() {
Expand Down Expand Up @@ -320,6 +317,18 @@ where
.await;
}

async fn unsubscribe_from_delegated_account(&self, pubkey: Pubkey) {
if let Err(err) =
self.remote_account_provider.unsubscribe(&pubkey).await
{
warn!(
pubkey = %pubkey,
error = %err,
"Failed to unsubscribe from delegated account"
);
}
}

async fn resolve_account_to_clone_from_forwarded_sub_with_unsubscribe(
&self,
update: ForwardedSubscriptionUpdate,
Expand Down Expand Up @@ -527,6 +536,9 @@ where
)?;

if let Some(in_bank_ata) = self.accounts_bank.get_account(&ata_pubkey) {
if in_bank_ata.delegated() && !in_bank_ata.undelegating() {
return None;
}
if in_bank_ata.remote_slot() >= projected_ata.remote_slot() {
return None;
}
Expand Down Expand Up @@ -1329,17 +1341,24 @@ where
if lamports == 0 {
return Ok(());
}
if let Some(acc) = self.accounts_bank.get_account(&pubkey) {
if acc.lamports() > 0 {
return Ok(());
}
}
let remote_slot =
if let Some(acc) = self.accounts_bank.get_account(&pubkey) {
if acc.lamports() > 0 {
return Ok(());
}
acc.remote_slot()
.max(self.remote_account_provider.chain_slot())
} else {
self.remote_account_provider.chain_slot()
};
// Build a plain system account with the requested balance
let account =
let mut account =
AccountSharedData::new(lamports, 0, &system_program::id());
account.set_remote_slot(remote_slot);
debug!(
pubkey = %pubkey,
lamports,
remote_slot,
"Auto-airdropping account"
);
let _sig = self
Expand Down
271 changes: 270 additions & 1 deletion magicblock-chainlink/src/chainlink/fetch_cloner/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use crate::{
cloner_stub::ClonerStub,
deleg::{add_delegation_record_for, add_invalid_delegation_record_for},
eatas::{
create_eata_account, derive_ata, derive_eata, EATA_PROGRAM_ID,
create_ata_account, create_eata_account, derive_ata, derive_eata,
EATA_PROGRAM_ID,
},
init_logger,
rpc_client_mock::{ChainRpcClientMock, ChainRpcClientMockBuilder},
Expand Down Expand Up @@ -1097,6 +1098,117 @@ async fn test_undelegation_requested_subscription_behavior() {
assert_subscribed!(remote_account_provider, &[&account_pubkey]);
}

#[tokio::test]
async fn test_delegated_authoritative_skip_unsubscribes_subscription() {
init_logger();
let validator_pubkey = random_pubkey();
let account_owner = random_pubkey();
const CURRENT_SLOT: u64 = 100;

let account_pubkey = random_pubkey();
let delegated_account = Account {
lamports: 1_000_000,
data: vec![1, 2, 3, 4],
owner: dlp::id(),
executable: false,
rent_epoch: 0,
};

let FetcherTestCtx {
remote_account_provider,
accounts_bank,
rpc_client,
fetch_cloner,
subscription_tx,
..
} = setup(
[(account_pubkey, delegated_account.clone())],
CURRENT_SLOT,
validator_pubkey,
)
.await;

add_delegation_record_for(
&rpc_client,
account_pubkey,
validator_pubkey,
account_owner,
);

// Clone delegated account into bank (authoritative local delegated state).
fetch_cloner
.fetch_and_clone_accounts(
&[account_pubkey],
None,
None,
AccountFetchOrigin::GetAccount,
None,
)
.await
.expect("delegated account fetch should succeed");
assert_cloned_delegated_account!(
accounts_bank,
account_pubkey,
delegated_account.clone(),
CURRENT_SLOT,
account_owner
);

// Simulate undelegation-tracking subscription being active.
fetch_cloner
.subscribe_to_account(&account_pubkey)
.await
.expect("failed to subscribe delegated account");
assert_subscribed!(remote_account_provider, &[&account_pubkey]);

// Send a newer plain update; delegated authoritative-skip path should still unsubscribe.
use crate::remote_account_provider::{
RemoteAccount, RemoteAccountUpdateSource,
};
let chain_update = Account {
lamports: 900_000,
data: vec![9, 9, 9, 9],
owner: account_owner,
executable: false,
rent_epoch: 0,
};
subscription_tx
.send(ForwardedSubscriptionUpdate {
pubkey: account_pubkey,
account: RemoteAccount::from_fresh_account(
chain_update,
CURRENT_SLOT + 1,
RemoteAccountUpdateSource::Subscription,
),
})
.await
.unwrap();

const POLL_INTERVAL: std::time::Duration = Duration::from_millis(10);
const TIMEOUT: std::time::Duration = Duration::from_millis(500);
tokio::time::timeout(TIMEOUT, async {
loop {
if !remote_account_provider.is_watching(&account_pubkey) {
break;
}
tokio::time::sleep(POLL_INTERVAL).await;
}
})
.await
.expect("timed out waiting for delegated account unsubscribe");

assert_not_subscribed!(remote_account_provider, &[&account_pubkey]);

// Ensure we did not overwrite the local delegated account state.
assert_cloned_delegated_account!(
accounts_bank,
account_pubkey,
delegated_account,
CURRENT_SLOT,
account_owner
);
}

#[tokio::test]
async fn test_parallel_fetch_prevention_multiple_accounts() {
init_logger();
Expand Down Expand Up @@ -1448,6 +1560,76 @@ async fn test_fetch_and_clone_undelegating_account_that_is_closed_on_chain() {
);
}

#[tokio::test]
async fn test_auto_airdrop_uses_non_stale_remote_slot_from_bank_account() {
init_logger();
let validator_pubkey = random_pubkey();
let payer_pubkey = random_pubkey();
const CURRENT_SLOT: u64 = 100;
const LOCAL_SLOT: u64 = 250;
const AIRDROP_LAMPORTS: u64 = 1_000_000_000;

let FetcherTestCtx {
accounts_bank,
fetch_cloner,
..
} = setup(
std::iter::empty::<(Pubkey, Account)>(),
CURRENT_SLOT,
validator_pubkey,
)
.await;

let mut empty_local_account =
AccountSharedData::new(0, 0, &system_program::id());
empty_local_account.set_remote_slot(LOCAL_SLOT);
accounts_bank.insert(payer_pubkey, empty_local_account);

fetch_cloner
.airdrop_account_if_empty(payer_pubkey, AIRDROP_LAMPORTS)
.await
.expect("airdrop should succeed");

let payer_after = accounts_bank
.get_account(&payer_pubkey)
.expect("payer should exist in bank");
assert_eq!(payer_after.lamports(), AIRDROP_LAMPORTS);
assert_eq!(payer_after.remote_slot(), LOCAL_SLOT);
assert_eq!(*payer_after.owner(), system_program::id());
}

#[tokio::test]
async fn test_auto_airdrop_uses_chain_slot_when_account_not_in_bank() {
init_logger();
let validator_pubkey = random_pubkey();
let payer_pubkey = random_pubkey();
const CURRENT_SLOT: u64 = 100;
const AIRDROP_LAMPORTS: u64 = 1_000_000_000;

let FetcherTestCtx {
accounts_bank,
fetch_cloner,
..
} = setup(
std::iter::empty::<(Pubkey, Account)>(),
CURRENT_SLOT,
validator_pubkey,
)
.await;

fetch_cloner
.airdrop_account_if_empty(payer_pubkey, AIRDROP_LAMPORTS)
.await
.expect("airdrop should succeed");

let payer_after = accounts_bank
.get_account(&payer_pubkey)
.expect("payer should exist in bank");
assert_eq!(payer_after.lamports(), AIRDROP_LAMPORTS);
assert_eq!(payer_after.remote_slot(), CURRENT_SLOT);
assert_eq!(*payer_after.owner(), system_program::id());
}

// -----------------
// Allowed Programs Tests
// -----------------
Expand Down Expand Up @@ -2080,3 +2262,90 @@ async fn test_delegated_eata_subscription_update_clones_raw_eata_and_projects_at
assert_eq!(projected_owner, wallet_owner);
assert_eq!(projected_amount, AMOUNT);
}

#[tokio::test]
async fn test_delegated_eata_update_does_not_override_delegated_ata_in_bank() {
init_logger();
let validator_pubkey = random_pubkey();
let wallet_owner = random_pubkey();
let mint = random_pubkey();
const CURRENT_SLOT: u64 = 100;
const CHAIN_EATA_AMOUNT: u64 = 777;
const LOCAL_ATA_AMOUNT: u64 = 999;

let eata_pubkey = derive_eata(&wallet_owner, &mint);
let ata_pubkey = derive_ata(&wallet_owner, &mint);
let eata_account =
create_eata_account(&wallet_owner, &mint, CHAIN_EATA_AMOUNT, true);

let FetcherTestCtx {
accounts_bank,
rpc_client,
subscription_tx,
..
} = setup(
[(eata_pubkey, eata_account.clone())],
CURRENT_SLOT,
validator_pubkey,
)
.await;

add_delegation_record_for(
&rpc_client,
eata_pubkey,
validator_pubkey,
EATA_PROGRAM_ID,
);

// Simulate local delegated ATA state that was already mutated in the validator.
let mut local_ata = create_ata_account(&wallet_owner, &mint);
local_ata.data[64..72].copy_from_slice(&LOCAL_ATA_AMOUNT.to_le_bytes());
let mut local_ata_shared = AccountSharedData::from(local_ata);
local_ata_shared.set_remote_slot(CURRENT_SLOT - 1);
local_ata_shared.set_delegated(true);
accounts_bank.insert(ata_pubkey, local_ata_shared);

use crate::remote_account_provider::{
RemoteAccount, RemoteAccountUpdateSource,
};

// A newer chain update for delegated eATA must not override delegated ATA in bank.
subscription_tx
.send(ForwardedSubscriptionUpdate {
pubkey: eata_pubkey,
account: RemoteAccount::from_fresh_account(
eata_account,
CURRENT_SLOT,
RemoteAccountUpdateSource::Subscription,
),
})
.await
.unwrap();

const POLL_INTERVAL: std::time::Duration = Duration::from_millis(10);
const TIMEOUT: std::time::Duration = Duration::from_millis(500);
tokio::time::timeout(TIMEOUT, async {
while accounts_bank.get_account(&eata_pubkey).is_none() {
tokio::time::sleep(POLL_INTERVAL).await;
}
})
.await
.expect("timed out waiting for delegated eATA subscription update");

let ata_after = accounts_bank
.get_account(&ata_pubkey)
.expect("ATA should still exist in bank");
assert!(ata_after.delegated(), "ATA must remain delegated");
assert_eq!(
ata_after.remote_slot(),
CURRENT_SLOT - 1,
"Delegated ATA should not be overwritten by chain update",
);

Comment on lines +2327 to +2344
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Minor: poll condition waits for eATA only — ATA-projection decision may not be settled yet.

The test polls until eata_pubkey appears in the bank and then immediately reads the ATA. If the subscription handler processes the eATA clone and the ATA-projection-skip in separate async steps (with a yield between them), there is a brief window where the eATA is visible in the bank but the handler hasn't yet decided whether to overwrite the ATA. This means the test could produce a false-positive if a bug causes the ATA overwrite to happen after this check.

Existing test test_delegated_eata_subscription_update_clones_raw_eata_and_projects_ata (line 2224) polls for has_eata && has_ata precisely to bridge this gap. Consider adding a small stable-state guard, or simply adding a tokio::task::yield_now().await after the timeout block to let any pending projection task run before asserting the ATA state.

💡 Proposed guard (minimal change)
 })
 .await
 .expect("timed out waiting for delegated eATA subscription update");
+// Yield to let any pending ATA-projection step complete before asserting.
+tokio::task::yield_now().await;

 let ata_after = accounts_bank
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-chainlink/src/chainlink/fetch_cloner/tests.rs` around lines 2327 -
2344, The test currently polls until eata_pubkey appears and then immediately
inspects ata_pubkey, which can race with the projection decision; after the
tokio::time::timeout(...).await.expect(...) block add a short stable-state guard
(e.g., await tokio::task::yield_now() or an additional small poll that confirms
ATA projection work has run) before reading ata_pubkey to ensure any pending
async projection/handler tasks complete; update the test function around the
timeout block referencing eata_pubkey and ata_pubkey (the timeout/poll loop) to
include this yield/poll to avoid false positives.

let ata_data = ata_after.data();
let ata_amount = u64::from_le_bytes(ata_data[64..72].try_into().unwrap());
assert_eq!(
ata_amount, LOCAL_ATA_AMOUNT,
"Delegated ATA amount should keep local state",
);
}
Loading