Browse Source

Caching works:

- Insert new bucket for current access time before removing
    previous access time record.
- Add instrument to more methods for more precise tracing
main
Aode (Lion) 3 months ago
parent
commit
865f4ee09c
  1. 8
      defaults.toml
  2. 9
      dev.toml
  3. 1
      src/ingest.rs
  4. 5
      src/repo.rs
  5. 104
      src/repo/sled.rs

8
defaults.toml

@ -21,13 +21,7 @@ max_height = 10000
max_area = 40000000
max_file_size = 40
enable_silent_video = true
filters = [
'blur',
'crop',
'identity',
'resize',
'thumbnail',
]
filters = ['blur', 'crop', 'identity', 'resize', 'thumbnail']
skip_validate_imports = false
cache_duration = 168

9
dev.toml

@ -21,14 +21,9 @@ max_height = 10000
max_area = 40000000
max_file_size = 40
enable_silent_video = true
filters = [
'blur',
'crop',
'identity',
'resize',
'thumbnail',
]
filters = ['blur', 'crop', 'identity', 'resize', 'thumbnail']
skip_validate_imports = false
cache_duration = 168
[repo]
type = 'sled'

1
src/ingest.rs

@ -115,6 +115,7 @@ where
S: Store,
{
pub(crate) fn disarm(&mut self) {
let _ = self.hash.take();
let _ = self.alias.take();
let _ = self.identifier.take();
}

5
src/repo.rs

@ -56,6 +56,7 @@ pub(crate) trait FullRepo:
+ Clone
+ Debug
{
#[tracing::instrument]
async fn identifier_from_alias<I: Identifier + 'static>(
&self,
alias: &Alias,
@ -64,11 +65,13 @@ pub(crate) trait FullRepo:
self.identifier(hash).await
}
#[tracing::instrument]
async fn aliases_from_alias(&self, alias: &Alias) -> Result<Vec<Alias>, Error> {
let hash = self.hash(alias).await?;
self.aliases(hash).await
}
#[tracing::instrument]
async fn still_identifier_from_alias<I: Identifier + 'static>(
&self,
alias: &Alias,
@ -83,11 +86,13 @@ pub(crate) trait FullRepo:
}
}
#[tracing::instrument]
async fn mark_cached(&self, alias: &Alias) -> Result<(), Error> {
let hash = self.hash(alias).await?;
CachedRepo::create(self, hash).await
}
#[tracing::instrument]
async fn check_cached(&self, alias: &Alias) -> Result<(), Error> {
let hash = self.hash(alias).await?;
let hashes = CachedRepo::update(self, hash).await?;

104
src/repo/sled.rs

@ -132,13 +132,15 @@ impl From<InnerUploadResult> for UploadResult {
}
}
#[derive(serde::Serialize, serde::Deserialize)]
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct Bucket {
// each Vec<u8> represents a unique image hash
inner: HashSet<Vec<u8>>,
}
#[async_trait::async_trait(?Send)]
impl CachedRepo for SledRepo {
#[tracing::instrument(skip(hash))]
async fn create(&self, hash: Self::Bytes) -> Result<(), Error> {
let now = DateTime::now();
let bytes = serde_json::to_vec(&now)?;
@ -149,20 +151,26 @@ impl CachedRepo for SledRepo {
let mut old = cache_inverse.get(bytes.clone())?;
loop {
let new: Option<Vec<u8>> = if let Some(old) = old.as_ref() {
let mut bucket = serde_cbor::from_slice::<Bucket>(old)?;
bucket.inner.insert(hash.as_ref().to_vec());
let vec = serde_cbor::to_vec(&bucket)?;
Some(vec)
let mut bucket = if let Some(old) = old.as_ref() {
// unsure of whether to bail on deserialize error or fail with empty bucket
if let Ok(bucket) = serde_cbor::from_slice::<Bucket>(old) {
bucket
} else {
Bucket {
inner: HashSet::new(),
}
}
} else {
let mut bucket = Bucket {
Bucket {
inner: HashSet::new(),
};
bucket.inner.insert(hash.as_ref().to_vec());
let vec = serde_cbor::to_vec(&bucket)?;
Some(vec)
}
};
bucket.inner.insert(hash.as_ref().to_vec());
tracing::info!("Inserting new {:?}", bucket);
let bucket_bytes = serde_cbor::to_vec(&bucket)?;
let new = Some(bucket_bytes);
let res = cache_inverse.compare_and_swap(bytes.clone(), old, new)?;
if let Err(CompareAndSwapError { current, .. }) = res {
@ -177,26 +185,67 @@ impl CachedRepo for SledRepo {
Ok(())
}
#[tracing::instrument(skip(hash))]
async fn update(&self, hash: Self::Bytes) -> Result<Vec<Self::Bytes>, Error> {
let now = DateTime::now();
let bytes = serde_json::to_vec(&now)?;
let now_bytes = serde_json::to_vec(&now)?;
let to_clean = now.min_cache_date();
let to_clean_bytes = serde_json::to_vec(&to_clean)?;
let cache_inverse = self.cache_inverse.clone();
let hashes = b!(self.cache, {
let prev_value = cache
.fetch_and_update(hash.clone(), |prev_value| prev_value.map(|_| bytes.clone()))?;
let previous_datetime_opt = cache
.fetch_and_update(hash.clone(), |previous_datetime_opt| {
previous_datetime_opt.map(|_| now_bytes.clone())
})?;
if let Some(previous_datetime_bytes) = previous_datetime_opt {
// Insert cached media into new date bucket
let mut old = cache_inverse.get(now_bytes.clone())?;
loop {
let mut bucket = if let Some(bucket_bytes) = old.as_ref() {
if let Ok(bucket) = serde_cbor::from_slice::<Bucket>(bucket_bytes) {
bucket
} else {
Bucket {
inner: HashSet::new(),
}
}
} else {
Bucket {
inner: HashSet::new(),
}
};
bucket.inner.insert(hash.as_ref().to_vec());
tracing::info!("Inserting new {:?}", bucket);
let bucket_bytes = serde_cbor::to_vec(&bucket)?;
let new = Some(bucket_bytes);
if let Err(CompareAndSwapError { current, .. }) =
cache_inverse.compare_and_swap(now_bytes.clone(), old, new)?
{
old = current;
} else {
break;
}
}
if let Some(prev_value) = prev_value {
let mut old = cache_inverse.get(prev_value.clone())?;
// Remove cached media from old date bucket
let mut old = cache_inverse.get(previous_datetime_bytes.clone())?;
loop {
let new = if let Some(bucket_bytes) = old.as_ref() {
if let Ok(mut bucket) = serde_cbor::from_slice::<Bucket>(bucket_bytes) {
bucket.inner.remove(hash.as_ref());
let bucket_bytes = serde_cbor::to_vec(&bucket)?;
Some(bucket_bytes)
if bucket.inner.is_empty() {
tracing::info!("Removed old {:?}", bucket);
None
} else {
tracing::info!("Inserting old {:?}", bucket);
let bucket_bytes = serde_cbor::to_vec(&bucket)?;
Some(bucket_bytes)
}
} else {
None
}
@ -205,7 +254,7 @@ impl CachedRepo for SledRepo {
};
if let Err(CompareAndSwapError { current, .. }) =
cache_inverse.compare_and_swap(prev_value.clone(), old, new)?
cache_inverse.compare_and_swap(previous_datetime_bytes.clone(), old, new)?
{
old = current;
} else {
@ -219,17 +268,34 @@ impl CachedRepo for SledRepo {
for (date_bytes, bucket_bytes) in
cache_inverse.range(..to_clean_bytes).filter_map(Result::ok)
{
if let Ok(datetime) = serde_json::from_slice::<DateTime>(&date_bytes) {
tracing::info!("Checking {}", datetime);
} else {
tracing::warn!("Invalid date bytes");
}
if let Ok(bucket) = serde_cbor::from_slice::<Bucket>(&bucket_bytes) {
tracing::info!("Read for deletion: {:?}", bucket);
for hash in bucket.inner {
// Best effort cleanup
let _ = cache.remove(&hash);
hashes.push(hash.into());
}
} else {
tracing::warn!("Invalid bucket");
}
cache_inverse.remove(date_bytes)?;
}
#[cfg(debug)]
for date_bytes in cache_inverse.range(to_clean_bytes..).filter_map(Result::ok) {
if let Ok(datetime) = serde_json::from_slice::<DateTime>(&date_bytes) {
tracing::info!("Not cleaning for {}", datetime);
} else {
tracing::warn!("Invalid date bytes");
}
}
Ok(hashes) as Result<_, Error>
});

Loading…
Cancel
Save