Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 6 additions & 18 deletions src/app/firedancer/topology.c
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,6 @@ fd_topo_initialize( config_t * config ) {
fd_topob_wksp( topo, "banks" );
fd_topob_wksp( topo, "banks_locks" );
fd_topob_wksp( topo, "store" )->core_dump_level = FD_TOPO_CORE_DUMP_LEVEL_FULL;
fd_topob_wksp( topo, "executed_txn" );

fd_topob_wksp( topo, "gossip_sign" );
fd_topob_wksp( topo, "sign_gossip" );
Expand All @@ -461,8 +460,6 @@ fd_topo_initialize( config_t * config ) {
fd_topob_wksp( topo, "txsend_sign" );
fd_topob_wksp( topo, "sign_txsend" );

fd_topob_wksp( topo, "execrp_sig" );

fd_topob_wksp( topo, "execrp_replay" );

if( FD_LIKELY( snapshots_enabled ) ) {
Expand Down Expand Up @@ -586,7 +583,8 @@ fd_topo_initialize( config_t * config ) {
/**/ fd_topob_link( topo, "dedup_resolv", "dedup_resolv", 65536UL, FD_TPU_PARSED_MTU, 1UL );
FOR(resolv_tile_cnt) fd_topob_link( topo, "resolv_pack", "resolv_pack", 65536UL, FD_TPU_RESOLVED_MTU, 1UL );
/**/ fd_topob_link( topo, "replay_epoch", "replay_epoch", 128UL, FD_EPOCH_OUT_MTU, 1UL ); /* TODO: This should be 2 but requires fixing STEM_BURST */
/**/ fd_topob_link( topo, "replay_out", "replay_out", 8192UL, sizeof(fd_replay_message_t), 1UL );
/**/ fd_topob_link( topo, "replay_out", "replay_out", 65536UL, sizeof(fd_replay_message_t), 1UL );
fd_topob_link( topo, "replay_execrp", "replay_execrp", 16384UL, sizeof(fd_execrp_task_msg_t), 1UL );
/**/ fd_topob_link( topo, "pack_poh", "pack_poh", 4096UL, sizeof(fd_done_packing_t), 1UL );
/* pack_execle is shared across all execle, so if one executor stalls
due to complex transactions, the buffer needs to be large so that
Expand All @@ -599,7 +597,6 @@ fd_topo_initialize( config_t * config ) {
/**/ fd_topob_link( topo, "poh_shred", "poh_shred", 16384UL, USHORT_MAX, 1UL );
/**/ fd_topob_link( topo, "poh_replay", "poh_replay", 4096UL, sizeof(fd_poh_leader_slot_ended_t), 1UL );
FOR(resolv_tile_cnt) fd_topob_link( topo, "resolv_replay", "resolv_replay", 4096UL, sizeof(fd_resolv_slot_exchanged_t), 1UL );
/**/ fd_topob_link( topo, "executed_txn", "executed_txn", 16384UL, 64UL, 1UL ); /* TODO: Rename this ... */

FOR(shred_tile_cnt) fd_topob_link( topo, "shred_sign", "shred_sign", 128UL, 32UL, 1UL );
FOR(shred_tile_cnt) fd_topob_link( topo, "sign_shred", "sign_shred", 128UL, sizeof(fd_ed25519_sig_t), 1UL );
Expand All @@ -618,9 +615,6 @@ fd_topo_initialize( config_t * config ) {
/**/ fd_topob_link( topo, "tower_out", "tower_out", 16384UL, sizeof(fd_tower_msg_t), 2UL ); /* conf + slot_done. see explanation in fd_tower_tile.h for link_depth */
/**/ fd_topob_link( topo, "txsend_out", "txsend_out", 128UL, FD_TPU_RAW_MTU, 1UL );

fd_topob_link( topo, "replay_execrp", "replay_execrp", 16384UL, sizeof(fd_execrp_task_msg_t), 1UL );

FOR(execrp_tile_cnt) fd_topob_link( topo, "execrp_sig", "execrp_sig", 16384UL, 64UL, 1UL );
FOR(execrp_tile_cnt) fd_topob_link( topo, "execrp_replay", "execrp_replay", 16384UL, sizeof(fd_execrp_task_done_msg_t), 1UL );

ushort parsed_tile_to_cpu[ FD_TILE_MAX ];
Expand Down Expand Up @@ -891,17 +885,18 @@ fd_topo_initialize( config_t * config ) {
/**/ fd_topob_tile_in ( topo, "replay", 0UL, "metric_in", "genesi_out", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
/**/ fd_topob_tile_out( topo, "replay", 0UL, "replay_out", 0UL );
/**/ fd_topob_tile_out( topo, "replay", 0UL, "replay_epoch", 0UL );
/**/ fd_topob_tile_out( topo, "replay", 0UL, "executed_txn", 0UL );
/**/ fd_topob_tile_out( topo, "replay", 0UL, "replay_execrp", 0UL );
FOR(execrp_tile_cnt) fd_topob_tile_in ( topo, "replay", 0UL, "metric_in", "execrp_replay", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
/**/ fd_topob_tile_in ( topo, "replay", 0UL, "metric_in", "poh_replay", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
/**/ fd_topob_tile_in ( topo, "replay", 0UL, "metric_in", "tower_out", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
/**/ fd_topob_tile_in ( topo, "replay", 0UL, "metric_in", "txsend_out", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
FOR(resolv_tile_cnt) fd_topob_tile_in( topo, "replay", 0UL, "metric_in", "resolv_replay", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
if( FD_LIKELY( snapshots_enabled ) ) {
fd_topob_tile_in ( topo, "replay", 0UL, "metric_in", "snapin_manif", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
}

/**/ fd_topob_tile_in ( topo, "replay", 0UL, "metric_in", "poh_replay", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
FOR(execrp_tile_cnt) fd_topob_tile_in ( topo, "execrp", i, "metric_in", "replay_execrp", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
FOR(execrp_tile_cnt) fd_topob_tile_out( topo, "execrp", i, "execrp_replay", i );

/**/ fd_topob_tile_in ( topo, "tower", 0UL, "metric_in", "dedup_resolv", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
/**/ fd_topob_tile_in ( topo, "tower", 0UL, "metric_in", "replay_epoch", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
Expand All @@ -927,15 +922,14 @@ fd_topo_initialize( config_t * config ) {
/**/ fd_topob_tile_in( topo, "verify", 0UL, "metric_in", "txsend_out", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
FOR(verify_tile_cnt) fd_topob_tile_out( topo, "verify", i, "verify_dedup", i );
FOR(verify_tile_cnt) fd_topob_tile_in( topo, "dedup", 0UL, "metric_in", "verify_dedup", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
/**/ fd_topob_tile_in( topo, "dedup", 0UL, "metric_in", "executed_txn", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
/**/ fd_topob_tile_in( topo, "dedup", 0UL, "metric_in", "replay_out", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
/**/ fd_topob_tile_out( topo, "dedup", 0UL, "dedup_resolv", 0UL );
FOR(resolv_tile_cnt) fd_topob_tile_in( topo, "resolv", i, "metric_in", "dedup_resolv", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
FOR(resolv_tile_cnt) fd_topob_tile_in( topo, "resolv", i, "metric_in", "replay_out", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
FOR(resolv_tile_cnt) fd_topob_tile_out( topo, "resolv", i, "resolv_pack", i );
FOR(resolv_tile_cnt) fd_topob_tile_out( topo, "resolv", i, "resolv_replay", i );
/**/ fd_topob_tile_in( topo, "pack", 0UL, "metric_in", "resolv_pack", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
/**/ fd_topob_tile_in( topo, "pack", 0UL, "metric_in", "replay_out", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
/**/ fd_topob_tile_in( topo, "pack", 0UL, "metric_in", "executed_txn", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
fd_topob_tile_out( topo, "pack", 0UL, "pack_execle", 0UL );
fd_topob_tile_out( topo, "pack", 0UL, "pack_poh" , 0UL );
if( FD_LIKELY( config->tiles.pack.use_consumed_cus ) ) {
Expand All @@ -959,12 +953,6 @@ fd_topo_initialize( config_t * config ) {
FOR(shred_tile_cnt) fd_topob_tile_in ( topo, "shred", i, "metric_in", "poh_shred", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
FOR(shred_tile_cnt) fd_topob_tile_out( topo, "shred", i, "shred_net", i );

FOR(execrp_tile_cnt) fd_topob_tile_in ( topo, "dedup", 0UL, "metric_in", "execrp_sig", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
FOR(execrp_tile_cnt) fd_topob_tile_in ( topo, "pack", 0UL, "metric_in", "execrp_sig", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
FOR(execrp_tile_cnt) fd_topob_tile_out( topo, "execrp", i, "execrp_sig", i );
FOR(execrp_tile_cnt) fd_topob_tile_out( topo, "execrp", i, "execrp_replay", i );
FOR(execrp_tile_cnt) fd_topob_tile_in ( topo, "replay", 0UL, "metric_in", "execrp_replay", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );

if( FD_LIKELY( telemetry_enabled ) ) {
fd_topob_wksp( topo, "event" );
fd_topob_wksp( topo, "event_sign" );
Expand Down
21 changes: 17 additions & 4 deletions src/disco/dedup/fd_dedup_tile.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include "../topo/fd_topo.h"
#include "../metrics/fd_metrics.h"

#include "../../discof/replay/fd_replay_tile.h"

/* fd_dedup provides services to deduplicate multiple streams of input
fragments and present them to a mix of reliable and unreliable
consumers as though they were generated by a single multi-stream
Expand All @@ -13,6 +15,7 @@
#define IN_KIND_GOSSIP (0UL)
#define IN_KIND_VERIFY (1UL)
#define IN_KIND_EXECUTED_TXN (2UL)
#define IN_KIND_REPLAY (3UL)

/* fd_dedup_in_ctx_t is a context object for each in (producer) mcache
connected to the dedup tile. */
Expand Down Expand Up @@ -95,7 +98,7 @@ static inline void
during_frag( fd_dedup_ctx_t * ctx,
ulong in_idx,
ulong seq FD_PARAM_UNUSED,
ulong sig FD_PARAM_UNUSED,
ulong sig,
ulong chunk,
ulong sz,
ulong ctl FD_PARAM_UNUSED ) {
Expand All @@ -114,7 +117,16 @@ during_frag( fd_dedup_ctx_t * ctx,
if( FD_UNLIKELY( txnm->payload_sz>FD_TPU_MTU ) ) {
FD_LOG_ERR(( "vote txn payload size %hu exceeds max %lu", txnm->payload_sz, FD_TPU_MTU ));
}
} else if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_EXECUTED_TXN ) ) {
} else if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_REPLAY ) ) {
if( FD_LIKELY( sig==REPLAY_SIG_TXN_EXECUTED ) ) {
fd_replay_txn_executed_t * txn_executed = fd_type_pun( src );
if( FD_UNLIKELY( !txn_executed->is_committable ) ) return;
ulong ha_dedup_tag = fd_hash( ctx->hashmap_seed, fd_txn_get_signatures( TXN(txn_executed->txn), txn_executed->txn->payload ), FD_TXN_SIGNATURE_SZ );
int _is_dup;
FD_TCACHE_INSERT( _is_dup, *ctx->tcache_sync, ctx->tcache_ring, ctx->tcache_depth, ctx->tcache_map, ctx->tcache_map_cnt, ha_dedup_tag );
(void)_is_dup;
}
} else if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_EXECUTED_TXN ) ) { /* Frankendancer-only */
if( FD_UNLIKELY( sz!=FD_TXN_SIGNATURE_SZ ) ) FD_LOG_ERR(( "received an executed transaction signature message with the wrong size %lu", sz ));
/* Executed txns just have their signature inserted into the tcache
so we can dedup them easily. */
Expand Down Expand Up @@ -149,6 +161,7 @@ after_frag( fd_dedup_ctx_t * ctx,
(void)_tspub;

if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_EXECUTED_TXN ) ) return;
if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_REPLAY ) ) return;

fd_txn_m_t * txnm = (fd_txn_m_t *)fd_chunk_to_laddr( ctx->out_mem, ctx->out_chunk );
FD_TEST( txnm->payload_sz<=FD_TPU_MTU );
Expand Down Expand Up @@ -262,8 +275,8 @@ unprivileged_init( fd_topo_t * topo,
ctx->in_kind[ i ] = IN_KIND_VERIFY;
} else if( !strcmp( link->name, "executed_txn" ) ) {
ctx->in_kind[ i ] = IN_KIND_EXECUTED_TXN;
} else if( !strcmp( link->name, "execrp_sig" ) ) {
ctx->in_kind[ i ] = IN_KIND_EXECUTED_TXN;
} else if( !strcmp( link->name, "replay_out" ) ) {
ctx->in_kind[ i ] = IN_KIND_REPLAY;
} else {
FD_LOG_ERR(( "unexpected link name %s", link->name ));
}
Expand Down
13 changes: 12 additions & 1 deletion src/disco/pack/fd_pack_tile.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ typedef struct {
int is_bundle; /* is the current transaction a bundle */

uchar executed_txn_sig[ 64UL ];
uchar txn_committed;

/* One of the FD_PACK_STRATEGY_* values defined above */
int strategy;
Expand Down Expand Up @@ -855,6 +856,13 @@ during_frag( fd_pack_ctx_t * ctx,

switch( ctx->in_kind[ in_idx ] ) {
case IN_KIND_REPLAY: {
if( FD_LIKELY( sig==REPLAY_SIG_TXN_EXECUTED ) ) {
fd_replay_txn_executed_t const * txn_executed = fd_type_pun_const( dcache_entry );
ctx->txn_committed = !!txn_executed->is_committable;
if( FD_UNLIKELY( !txn_executed->is_committable ) ) return;
memcpy( ctx->executed_txn_sig, fd_txn_get_signatures( TXN(txn_executed->txn), txn_executed->txn->payload ), FD_TXN_SIGNATURE_SZ );
return;
}
if( FD_LIKELY( sig!=REPLAY_SIG_BECAME_LEADER ) ) return;

/* There was a leader transition. Handle it. */
Expand Down Expand Up @@ -1014,6 +1022,10 @@ after_frag( fd_pack_ctx_t * ctx,
ulong leader_slot = ULONG_MAX;
switch( ctx->in_kind[ in_idx ] ) {
case IN_KIND_REPLAY:
if( FD_LIKELY( sig==REPLAY_SIG_TXN_EXECUTED && ctx->txn_committed ) ) {
ulong deleted = fd_pack_delete_transaction( ctx->pack, fd_type_pun( ctx->executed_txn_sig ) );
FD_MCNT_INC( PACK, TRANSACTION_ALREADY_EXECUTED, deleted );
}
if( FD_UNLIKELY( sig!=REPLAY_SIG_BECAME_LEADER ) ) return;
leader_slot = ctx->_became_leader->slot;

Expand Down Expand Up @@ -1234,7 +1246,6 @@ unprivileged_init( fd_topo_t * topo,
else if( FD_LIKELY( !strcmp( link->name, "sign_pack" ) ) ) ctx->in_kind[ i ] = IN_KIND_SIGN;
else if( FD_LIKELY( !strcmp( link->name, "replay_out" ) ) ) ctx->in_kind[ i ] = IN_KIND_REPLAY;
else if( FD_LIKELY( !strcmp( link->name, "executed_txn" ) ) ) ctx->in_kind[ i ] = IN_KIND_EXECUTED_TXN;
else if( FD_LIKELY( !strcmp( link->name, "execrp_sig" ) ) ) ctx->in_kind[ i ] = IN_KIND_EXECUTED_TXN;
else FD_LOG_ERR(( "pack tile has unexpected input link %lu %s", i, link->name ));
}

Expand Down
26 changes: 4 additions & 22 deletions src/discof/execrp/fd_execrp_tile.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ struct fd_execrp_tile {
/* link-related data structures. */
link_ctx_t replay_in[ 1 ];
link_ctx_t execrp_replay_out[ 1 ]; /* TODO: Remove with solcap v2 */
link_ctx_t execrp_sig_out[ 1 ];

fd_sha512_t sha_mem[ FD_TXN_ACTUAL_SIG_MAX ];
fd_sha512_t * sha_lj[ FD_TXN_ACTUAL_SIG_MAX ];
Expand Down Expand Up @@ -147,11 +146,13 @@ publish_txn_finalized_msg( fd_execrp_tile_t * ctx,
fd_execrp_task_done_msg_t * msg = fd_chunk_to_laddr( ctx->execrp_replay_out->mem, ctx->execrp_replay_out->chunk );
msg->bank_idx = ctx->bank->data->idx;
msg->txn_exec->txn_idx = ctx->txn_idx;
msg->txn_exec->err = !ctx->txn_out.err.is_committable;
msg->txn_exec->is_committable = ctx->txn_out.err.is_committable;
msg->txn_exec->is_fees_only = ctx->txn_out.err.is_fees_only;
msg->txn_exec->txn_err = ctx->txn_out.err.txn_err;
msg->txn_exec->slot = ctx->slot;
msg->txn_exec->start_shred_idx = ctx->txn_in.txn->start_shred_idx;
msg->txn_exec->end_shred_idx = ctx->txn_in.txn->end_shred_idx;
if( FD_UNLIKELY( msg->txn_exec->err ) ) {
if( FD_UNLIKELY( !msg->txn_exec->is_committable ) ) {
uchar * signature = (uchar *)ctx->txn_in.txn->payload + TXN( ctx->txn_in.txn )->signature_off;
FD_BASE58_ENCODE_64_BYTES( signature, signature_b58 );
FD_LOG_WARNING(( "block marked dead (slot=%lu) because of invalid transaction (signature=%s) (txn_err=%d)", ctx->slot, signature_b58, ctx->txn_out.err.txn_err ));
Expand Down Expand Up @@ -210,16 +211,6 @@ returnable_frag( fd_execrp_tile_t * ctx,
ctx->accdb->base.ro_active, ctx->accdb->base.rw_active ));
}

if( FD_LIKELY( ctx->execrp_sig_out->idx!=ULONG_MAX ) ) {
/* Copy the txn signature to the signature out link so the
dedup/pack tiles can drop already executed transactions. */
memcpy( fd_chunk_to_laddr( ctx->execrp_sig_out->mem, ctx->execrp_sig_out->chunk ),
(uchar *)ctx->txn_in.txn->payload + TXN( ctx->txn_in.txn )->signature_off,
64UL );
fd_stem_publish( stem, ctx->execrp_sig_out->idx, 0UL, ctx->execrp_sig_out->chunk, 64UL, 0UL, 0UL, 0UL );
ctx->execrp_sig_out->chunk = fd_dcache_compact_next( ctx->execrp_sig_out->chunk, 64UL, ctx->execrp_sig_out->chunk0, ctx->execrp_sig_out->wmark );
}

/* Notify replay. */
ctx->txn_idx = msg->txn_idx;
ctx->dispatch_time_comp = tspub;
Expand Down Expand Up @@ -325,15 +316,6 @@ unprivileged_init( fd_topo_t * topo,
ctx->execrp_replay_out->chunk = ctx->execrp_replay_out->chunk0;
}

ctx->execrp_sig_out->idx = fd_topo_find_tile_out_link( topo, tile, "execrp_sig", ctx->tile_idx );
if( FD_LIKELY( ctx->execrp_sig_out->idx!=ULONG_MAX ) ) {
fd_topo_link_t * execrp_sig_link = &topo->links[ tile->out_link_id[ ctx->execrp_sig_out->idx ] ];
ctx->execrp_sig_out->mem = topo->workspaces[ topo->objs[ execrp_sig_link->dcache_obj_id ].wksp_id ].wksp;
ctx->execrp_sig_out->chunk0 = fd_dcache_compact_chunk0( ctx->execrp_sig_out->mem, execrp_sig_link->dcache );
ctx->execrp_sig_out->wmark = fd_dcache_compact_wmark( ctx->execrp_sig_out->mem, execrp_sig_link->dcache, execrp_sig_link->mtu );
ctx->execrp_sig_out->chunk = ctx->execrp_sig_out->chunk0;
}

/********************************************************************/
/* banks */
/********************************************************************/
Expand Down
23 changes: 22 additions & 1 deletion src/discof/replay/fd_execrp.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,28 @@ typedef union fd_execrp_task_msg fd_execrp_task_msg_t;

struct fd_execrp_txn_exec_done_msg {
ulong txn_idx;
int err;

/* These flags form a nested series of if statements.
if( is_committable ) {
if( is_fees_only ) {
instructions will not be executed
txn_err will be non-zero and will be one of the account loader errors
} else {
instructions will execute
if( txn_err is non-zero ) {
there's likely an instruction error
} else {
transaction executed successfully
https://github.com/anza-xyz/agave/blob/v3.1.8/svm/src/transaction_execution_result.rs#L26
}
}
} else {
either failed before account loading, or failed cost tracker
}
*/
int is_committable;
int is_fees_only;
int txn_err;

/* used by monitoring tools */
ulong slot;
Expand Down
Loading
Loading