Written for veilid_core version 0.5.3.
This page walks through a unit test I wrote to check my own understanding of how to use the Veilid DHT. This is just about the simplest possible usage in demonstrating a put and a get, none of the fancy watch stuff. Walking through section by section:
#[tokio::test]
async fn basic_dht_write_and_get() {
let done = CancellationToken::new();
let alice_done = done.child_token();
let (dht_key_in, dht_key_out) = tokio::sync::oneshot::channel();
let alice_task = tokio::spawn(async move {
The CancellationToken and associated child_token are from tokio_util
and is coincidental; it could be replaced with another oneshot channel if so
desired. The oneshot channel is used to communicate the DHT record key from
Alice to Bob. Each Alice and Bob are assigned individual Tokio tasks to ensure
that they are otherwise isolated and allow them to connect to the Veilid network
concurrently.
let alice = create_and_attach_veilid_node(
"alice".to_string(),
Arc::new(|_update| ())
)
.await
.expect("failed to start alice node");
println!("alice ready");
Each Alice and Bob get their own Veilid node in their own namespace. This is a fairly easy way to do integration tests as far as I can tell, without having to actually spin up separate processes to create multiple nodes. It is fairly expensive though, so if you wanna do this in tests for an actual application you should probably share those nodes among tests.
I've omitted the definition of create_and_attach_node for
brevity; it calls veilid_core::api_startup and spin-waits until
attachment.public_internet_ready is true. A real applicaton would definitely
want to be more sophisticated here, but it works for tests.
let crypto_step = alice.crypto()
.expect("failed to get alice's cryptosystem");
let cryptosystem = crypto_step
.get(CRYPTO_KIND_VLD0)
.expect("failed to get alice's vld0 cryptosystem");
let alice_record_keypair = cryptosystem.generate_keypair();
let alice_rc = alice
.routing_context()
.expect("failed to get alice's routing context");
let dht_record = alice_rc
.create_dht_record(
CRYPTO_KIND_VLD0,
veilid_core::DHTSchema::DFLT(
DHTSchemaDFLT::new(6)
.expect("failed to make alice's schema"),
),
Some(alice_record_keypair),
)
.await;
assert!(dht_record.is_ok());
println!("dht record created");
Here I create the owning KeyPair manually because I wanted to make sure I knew
how that pathway works. It's a huge pain in the butt to get a CryptoSystem
reference (and even then it's some weird guard type) and the compiler complains
for reasons I haven't taken the time to grok without crypto_step being an
intermediate variable. Anyway, once we get ahold of the cryptosystem it's quite
straightforward to make a keypair and turn it into a DHT record.
This gets you a RecordKey which includes a SharedSecret, which
is transparently used to encrypt & decrypt the value data. You
can get a RecordKey without one of those, if you like, by using
VeilidAPI.get_dht_record_key, which deterministically generates a RecordKey
from the public key of the owner KeyPair, the schema, and (optionally) a
SharedSecret, but does not open or create an actual DHT record, just the key
used to identify the record.
I used a DFLT schema (meaning only the owner can write to it, as opposed to SMPL which means subkeys can be delegated to other writer keypairs) with an arbitrarily chosen 6 subkeys. As far as I can tell, with a DFLT schema, subkeys are just a way to organize the data stored in the DHT record, though each has a maximum size considerably smaller than the overall DHT record size limit, so multiple subkeys must be used in order to store the maximum amount of data in each DHT record.
let record_key = dht_record.unwrap().key();
println!("record enc key");
let value_set_result = alice_rc
.set_dht_value(
record_key.clone(),
0,
vec![42, 42, 42],
None
)
.await;
assert!(value_set_result.is_ok());
println!("dht value set");
And once we have the record, we can set a DHT value, in this case an array of
42 three times in a row to subkey 0. Easy enough.
let send_result = dht_key_in.send(record_key);
assert!(send_result.is_ok());
tokio::select! {
_ = alice_done.cancelled() => {
alice.shutdown().await;
}
}
});
Here we send the record key to bob and commence waiting for notice that we've concluded our business and should terminate Alice's node. I don't know how long the DHT record would be available if Alice's node just went offline immediately after publishing the value; possibly some amount of time but I have heard the devs say the DHT isn't intended for storage even though some caching may happen.
let bob_task = tokio::spawn(async move {
let bob = create_and_attach_veilid_node(
"bob".to_string(),
Arc::new(|_update| ())
)
.await
.expect("failed to start bob node");
let bob_rc = bob
.routing_context()
.expect("failed to get bob's routing context");
println!("awaiting from record key from alice");
let record_key = dht_key_out
.await
.expect("failed to get record from from alice");
Over in Bob's task, start up Bob's node and wait for the DHT record key, just like it says.
let opened_dht_rec = bob_rc
.open_dht_record(record_key.clone(), None)
.await;
println!("opened dht value");
assert!(opened_dht_rec.is_ok());
let value_result = bob_rc
.get_dht_value(record_key, 0, true)
.await;
println!("got dht value");
assert!(value_result.is_ok());
assert_eq!(value_result.unwrap().unwrap().data(), vec![42, 42, 42]);
Note that we must open the value before we try to get any of its subkeys. Trying to get the value before opening it will result in an error.
The boolean (true, in this case) passed to get_dht_value is used to indicate
whether the value must be refreshed from the network - it may be fetched from
the network even if false is given here, but the network will be checked if
true is given.
done.cancel();
bob.shutdown().await;
});
assert!(bob_task.await.is_ok());
assert!(alice_task.await.is_ok());
}
And then send the signal that everything is done, shut down bob's node, and (back in the main task) wait for both Alice and Bob's task to finish before finishing out the test.