Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ on:

env:
CARGO_TERM_COLOR: always
CARGO_TOKEN: ${{ secrets.CRATES_IO_TOKEN }}
PROTOC_VERSION: 24.4
RUSTFLAGS: "-D warnings"

Expand Down Expand Up @@ -145,4 +144,6 @@ jobs:
version: ${{ env.PROTOC_VERSION }}
- uses: actions/checkout@v4
- name: cargo publish - ${{ matrix.crate }}
env:
CARGO_TOKEN: ${{ secrets.CARGO_TOKEN }}
run: cargo publish --manifest-path ${{ matrix.crate }}/Cargo.toml --token ${{ env.CARGO_TOKEN }}
2 changes: 1 addition & 1 deletion .github/workflows/validate-examples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ jobs:
fail-fast: false
matrix:
examples:
[ "actors", "bindings", "client", "configuration", "conversation", "crypto", "invoke/grpc", "invoke/grpc-proxying", "jobs", "pubsub", "query_state", "secrets-bulk" ]
[ "actors", "bindings", "client", "configuration", "conversation", "crypto", "invoke/grpc", "invoke/grpc-proxying", "jobs", "jobs-failurepolicy", "pubsub", "query_state", "secrets-bulk" ]
steps:
- name: Check out code
uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion dapr-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub fn actor(_attr: TokenStream, item: TokenStream) -> TokenStream {
Ok(actor_struct) => actor_struct.ident.clone(),
Err(_) => match syn::parse::<syn::ItemType>(item.clone()) {
Ok(ty) => ty.ident.clone(),
Err(e) => panic!("Error parsing actor struct: {}", e),
Err(e) => panic!("Error parsing actor struct: {e}"),
},
};

Expand Down
6 changes: 5 additions & 1 deletion dapr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ axum-test = "=16.4.0" # TODO: Remove problematic dep
litemap = "=0.7.4" # TODO: Remove pinned - linked to axum_test
zerofrom = "=0.1.5" # TODO: Remove pinned - linked to axum_test
reserve-port = "=2.1.0" # TODO: Remove pinned - linked to axum_test
idna_adapter = "=1.2.0"
idna_adapter = "=1.2.0" # TODO: Remove pinned - linked to axum_test
deranged = "=0.4.0" # TODO: Remove pinned - linked to axum_test
time = "=0.3.41" # TODO: Remove pinned - linked to axum_test
time-core = "=0.1.4" # TODO: Remove pinned - linked to axum_test
time-macros = "=0.2.22" # TODO: Remove pinned - linked to axum_test

once_cell = "1.19"
dapr = { path = "./" }
Expand Down
84 changes: 76 additions & 8 deletions dapr/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use std::collections::HashMap;

use crate::dapr::proto::common::v1::job_failure_policy::Policy;
use crate::dapr::proto::common::v1::JobFailurePolicyConstant;
use crate::dapr::proto::{common::v1 as common_v1, runtime::v1 as dapr_v1};
use crate::error::Error;
use async_trait::async_trait;
use futures::StreamExt;
use prost_types::Any;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::time::Duration;
use tokio::io::AsyncRead;
use tonic::codegen::tokio_stream;
use tonic::{transport::Channel as TonicChannel, Request};
use tonic::{Status, Streaming};

use crate::dapr::proto::{common::v1 as common_v1, runtime::v1 as dapr_v1};
use crate::error::Error;

#[derive(Clone)]
pub struct Client<T>(T);

Expand All @@ -25,7 +26,7 @@ impl<T: DaprInterface> Client<T> {
pub async fn connect(addr: String) -> Result<Self, Error> {
// Get the Dapr port to create a connection
let port: u16 = std::env::var("DAPR_GRPC_PORT")?.parse()?;
let address = format!("{}:{}", addr, port);
let address = format!("{addr}:{port}");

Ok(Client(T::connect(address).await?))
}
Expand All @@ -45,7 +46,7 @@ impl<T: DaprInterface> Client<T> {
}
};

let address = format!("{}:{}", addr, port);
let address = format!("{addr}:{port}");

Ok(Client(T::connect(address).await?))
}
Expand Down Expand Up @@ -559,9 +560,15 @@ impl<T: DaprInterface> Client<T> {
/// # Arguments
///
/// * job - The job to schedule
pub async fn schedule_job_alpha1(&mut self, job: Job) -> Result<ScheduleJobResponse, Error> {
/// * overwrite - Optional flag to overwrite an existing job with the same name
pub async fn schedule_job_alpha1(
&mut self,
job: Job,
overwrite: Option<bool>,
) -> Result<ScheduleJobResponse, Error> {
let request = ScheduleJobRequest {
job: Some(job.clone()),
overwrite: overwrite.unwrap_or(false),
};
self.0.schedule_job_alpha1(request).await
}
Expand Down Expand Up @@ -981,6 +988,9 @@ pub type DecryptRequestOptions = crate::dapr::proto::runtime::v1::DecryptRequest
/// The basic job structure
pub type Job = crate::dapr::proto::runtime::v1::Job;

/// A failure policy for a job
pub type JobFailurePolicy = crate::dapr::proto::common::v1::JobFailurePolicy;

/// A request to schedule a job
pub type ScheduleJobRequest = crate::dapr::proto::runtime::v1::ScheduleJobRequest;

Expand Down Expand Up @@ -1040,6 +1050,7 @@ pub struct JobBuilder {
ttl: Option<String>,
repeats: Option<u32>,
due_time: Option<String>,
failure_policy: Option<JobFailurePolicy>,
}

impl JobBuilder {
Expand All @@ -1052,6 +1063,7 @@ impl JobBuilder {
ttl: None,
repeats: None,
due_time: None,
failure_policy: None,
}
}

Expand Down Expand Up @@ -1080,6 +1092,11 @@ impl JobBuilder {
self
}

pub fn with_failure_policy(mut self, policy: JobFailurePolicy) -> Self {
self.failure_policy = Some(policy);
self
}

pub fn build(self) -> Job {
Job {
schedule: self.schedule,
Expand All @@ -1088,6 +1105,57 @@ impl JobBuilder {
ttl: self.ttl,
repeats: self.repeats,
due_time: self.due_time,
failure_policy: self.failure_policy,
}
}
}

// Enum for a job failure policy
pub enum JobFailurePolicyType {
Drop {},
Constant {},
}

pub struct JobFailurePolicyBuilder {
policy: JobFailurePolicyType,
pub retry_interval: Option<Duration>,
pub max_retries: Option<u32>,
}

impl JobFailurePolicyBuilder {
pub fn new(policy: JobFailurePolicyType) -> Self {
JobFailurePolicyBuilder {
policy,
retry_interval: None,
max_retries: None,
}
}

pub fn with_retry_interval(mut self, interval: Duration) -> Self {
// Convert interval string (e.g., "5s") to ProstDuration
self.retry_interval = Some(interval);
self
}

pub fn with_max_retries(mut self, max_retries: u32) -> Self {
self.max_retries = Some(max_retries);
self
}

pub fn build(self) -> common_v1::JobFailurePolicy {
match self.policy {
JobFailurePolicyType::Drop {} => common_v1::JobFailurePolicy {
policy: Some(Policy::Drop(Default::default())),
},
JobFailurePolicyType::Constant {} => JobFailurePolicy {
policy: Some(Policy::Constant(JobFailurePolicyConstant {
interval: self.retry_interval.map(|interval| {
prost_types::Duration::try_from(interval)
.expect("Failed to convert Duration")
}),
max_retries: self.max_retries,
})),
},
}
}
}
Expand Down
80 changes: 39 additions & 41 deletions dapr/src/dapr/dapr.proto.common.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,7 @@ pub mod http_extension {
/// Type of HTTP 1.1 Methods
/// RFC 7231: <https://tools.ietf.org/html/rfc7231#page-24>
/// RFC 5789: <https://datatracker.ietf.org/doc/html/rfc5789>
#[derive(
Clone,
Copy,
Debug,
PartialEq,
Eq,
Hash,
PartialOrd,
Ord,
::prost::Enumeration
)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum Verb {
None = 0,
Expand Down Expand Up @@ -148,10 +138,8 @@ pub struct StateItem {
pub etag: ::core::option::Option<Etag>,
/// The metadata which will be passed to state store component.
#[prost(map = "string, string", tag = "4")]
pub metadata: ::std::collections::HashMap<
::prost::alloc::string::String,
::prost::alloc::string::String,
>,
pub metadata:
::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>,
/// Options for concurrency and consistency to save the state.
#[prost(message, optional, tag = "5")]
pub options: ::core::option::Option<StateOptions>,
Expand All @@ -174,17 +162,7 @@ pub struct StateOptions {
/// Nested message and enum types in `StateOptions`.
pub mod state_options {
/// Enum describing the supported concurrency for state.
#[derive(
Clone,
Copy,
Debug,
PartialEq,
Eq,
Hash,
PartialOrd,
Ord,
::prost::Enumeration
)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum StateConcurrency {
ConcurrencyUnspecified = 0,
Expand Down Expand Up @@ -214,17 +192,7 @@ pub mod state_options {
}
}
/// Enum describing the supported consistency for state.
#[derive(
Clone,
Copy,
Debug,
PartialEq,
Eq,
Hash,
PartialOrd,
Ord,
::prost::Enumeration
)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum StateConsistency {
ConsistencyUnspecified = 0,
Expand Down Expand Up @@ -265,8 +233,38 @@ pub struct ConfigurationItem {
pub version: ::prost::alloc::string::String,
/// the metadata which will be passed to/from configuration store component.
#[prost(map = "string, string", tag = "3")]
pub metadata: ::std::collections::HashMap<
::prost::alloc::string::String,
::prost::alloc::string::String,
>,
pub metadata:
::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>,
}
/// JobFailurePolicy defines the policy to apply when a job fails to trigger.
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct JobFailurePolicy {
/// policy is the policy to apply when a job fails to trigger.
#[prost(oneof = "job_failure_policy::Policy", tags = "1, 2")]
pub policy: ::core::option::Option<job_failure_policy::Policy>,
}
/// Nested message and enum types in `JobFailurePolicy`.
pub mod job_failure_policy {
/// policy is the policy to apply when a job fails to trigger.
#[derive(Clone, Copy, PartialEq, ::prost::Oneof)]
pub enum Policy {
#[prost(message, tag = "1")]
Drop(super::JobFailurePolicyDrop),
#[prost(message, tag = "2")]
Constant(super::JobFailurePolicyConstant),
}
}
/// JobFailurePolicyDrop is a policy which drops the job tick when the job fails to trigger.
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct JobFailurePolicyDrop {}
/// JobFailurePolicyConstant is a policy which retries the job at a consistent interval when the job fails to trigger.
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct JobFailurePolicyConstant {
/// interval is the constant delay to wait before retrying the job.
#[prost(message, optional, tag = "1")]
pub interval: ::core::option::Option<::prost_types::Duration>,
/// max_retries is the optional maximum number of retries to attempt before giving up.
/// If unset, the Job will be retried indefinitely.
#[prost(uint32, optional, tag = "2")]
pub max_retries: ::core::option::Option<u32>,
}
Loading