Processing incoming messages
Processing of incoming messages happens in different phases:
Deserializing messages
Incoming messages can be deserialized from byte slices into an MlsMessageIn
:
let mls_message =
MlsMessageIn::tls_deserialize_exact(bytes).expect("Could not deserialize message.");
If the message is malformed, the function will fail with an error.
Processing messages in groups
In the next step, the message needs to be processed in the context of the corresponding group.
MlsMessageIn
can carry all MLS messages, but only PrivateMessageIn
and
PublicMessageIn
are processed in the context of a group. In OpenMLS these two
message types are combined into a ProtocolMessage
enum
. There are 3 ways to
extract the messages from an MlsMessageIn
:
MlsMessageIn.try_into_protocol_message()
returns aResult<ProtocolMessage, ProtocolMessageError>
ProtocolMessage::try_from(m: MlsMessageIn)
returns aResult<ProtocolMessage, ProtocolMessageError>
MlsMessageIn.extract()
returns anMlsMessageBodyIn
enum
that has two variants forPrivateMessageIn
andPublicMessageIn
MlsGroup.process_message()
accepts either a ProtocolMessage
, a
PrivateMessageIn
, or a PublicMessageIn
and processes the message.
ProtocolMessage.group_id()
exposes the group ID that can help the application
find the right group.
If the message was encrypted (i.e. if it was a PrivateMessageIn
), it will be
decrypted automatically. The processing performs all syntactic and semantic
validation checks and verifies the message's signature. The function finally
returns a ProcessedMessage
object if all checks are successful.
let protocol_message: ProtocolMessage = mls_message
.try_into_protocol_message()
.expect("Expected a PublicMessage or a PrivateMessage");
let processed_message = bob_group
.process_message(provider, protocol_message)
.expect("Could not process message.");
Interpreting the processed message
In the last step, the message is ready for inspection. The ProcessedMessage
obtained in the previous step exposes header fields such as group ID, epoch,
sender, and authenticated data. It also exposes the message's content. There are
3 different content types:
Application messages
Application messages simply return the original byte slice:
if let ProcessedMessageContent::ApplicationMessage(application_message) =
processed_message.into_content()
{
// Check the message
assert_eq!(application_message.into_bytes(), b"Hi, I'm Alice!");
}
Proposals
Standalone proposals are returned as a QueuedProposal
, indicating that they are pending proposals. The proposal can be inspected through the .proposal()
function. After inspection, applications should store the pending proposal in the proposal store of the group:
if let ProcessedMessageContent::ProposalMessage(staged_proposal) =
charlie_processed_message.into_content()
{
// In the case we received an Add Proposal
if let Proposal::Add(add_proposal) = staged_proposal.proposal() {
// Check that Bob was added
assert_eq!(
add_proposal.key_package().leaf_node().credential(),
&bob_credential.credential
);
} else {
panic!("Expected an AddProposal.");
}
// Check that Alice added Bob
assert!(matches!(
staged_proposal.sender(),
Sender::Member(member) if *member == alice_group.own_leaf_index()
));
// Store proposal
charlie_group
.store_pending_proposal(provider.storage(), *staged_proposal)
.unwrap();
}
Rolling back proposals
Operations that add a proposal to the proposal store, will return its reference. This reference can be used to remove a proposal from the proposal store. This can be useful for example to roll back in case of errors.
let (_mls_message_out, proposal_ref) = alice_group
.propose_add_member(
provider,
&alice_signature_keys,
bob_key_package.key_package(),
)
.expect("Could not create proposal to add Bob");
alice_group
.remove_pending_proposal(provider.storage(), &proposal_ref)
.expect("The proposal was not found");
Commit messages
Commit messages are returned as StagedCommit
objects. The proposals they cover can be inspected through different functions, depending on the proposal type. After the application has inspected the StagedCommit
and approved all the proposals it covers, the StagedCommit
can be merged in the current group state by calling the .merge_staged_commit()
function. For more details, see the StagedCommit
documentation.
if let ProcessedMessageContent::StagedCommitMessage(staged_commit) =
alice_processed_message.into_content()
{
// We expect a remove proposal
let remove = staged_commit
.remove_proposals()
.next()
.expect("Expected a proposal.");
// Check that Bob was removed
assert_eq!(
remove.remove_proposal().removed(),
bob_group.own_leaf_index()
);
// Check that Charlie removed Bob
assert!(matches!(
remove.sender(),
Sender::Member(member) if *member == charlies_leaf_index
));
// Merge staged commit
alice_group
.merge_staged_commit(provider, *staged_commit)
.expect("Error merging staged commit.");
}
Interpreting remove operations
Remove operations can have different meanings, such as:
- We left the group (by our own wish)
- We were removed from the group (by another member or a pre-configured sender)
- We removed another member from the group
- Another member left the group (by their own wish)
- Another member was removed from the group (by a member or a pre-configured sender, but not by us)
Since all remove operations only appear as a QueuedRemoveProposal
, the RemoveOperation
enum can be constructed from the remove proposal and the current group state to reflect the scenarios listed above.
if let ProcessedMessageContent::StagedCommitMessage(staged_commit) =
bob_processed_message.into_content()
{
let remove_proposal = staged_commit
.remove_proposals()
.next()
.expect("An unexpected error occurred.");
// We construct a RemoveOperation enum to help us interpret the remove operation
let remove_operation = RemoveOperation::new(remove_proposal, &bob_group)
.expect("An unexpected Error occurred.");
match remove_operation {
RemoveOperation::WeLeft => unreachable!(),
// We expect this variant, since Bob was removed by Charlie
RemoveOperation::WeWereRemovedBy(member) => {
assert!(matches!(member, Sender::Member(member) if member == charlies_leaf_index));
}
RemoveOperation::TheyLeft(_) => unreachable!(),
RemoveOperation::TheyWereRemovedBy(_) => unreachable!(),
RemoveOperation::WeRemovedThem(_) => unreachable!(),
}
// Merge staged Commit
bob_group
.merge_staged_commit(provider, *staged_commit)
.expect("Error merging staged commit.");
} else {
unreachable!("Expected a StagedCommit.");
}