Schedules and sensors
Dagster offers several ways to run data pipelines without manual intervation, including traditional scheduling and event-based triggers. Automating your Dagster pipelines can boost efficiency and ensure that data is produced consistently and reliably.
Run requests
- class dagster.SkipReason
Represents a skipped evaluation, where no runs are requested. May contain a message to indicate why no runs were requested.
- skip_message
A message displayed in the Dagster UI for why this evaluation resulted in no requested runs.
Type: Optional[str]
Schedules
Schedules are Dagster’s way to support traditional ways of automation, such as specifying a job should run at Mondays at 9:00AM. Jobs triggered by schedules can contain a subset of assets or ops.
- @dagster.schedule
Creates a schedule following the provided cron schedule and requests runs for the provided job.
The decorated function takes in a ScheduleEvaluationContext
ScheduleEvaluationContext
as its only argument, and does one of the following:- Return a RunRequest
RunRequest
object. - Return a list of RunRequest
RunRequest
objects. - Return a SkipReason
SkipReason
object, providing a descriptive message of why no runs were requested. - Return nothing (skipping without providing a reason)
- Return a run config dictionary.
- Yield a SkipReason
SkipReason
or yield one ore more RunRequestRunRequest
objects. Returns a ScheduleDefinitionScheduleDefinition
.
Parameters:
- cron_schedule (Union[str, Sequence[str]]) – A valid cron string or sequence of cron strings
- name (Optional[str]) – The name of the schedule.
- tags (Optional[Mapping[str, str]]) – A set of key-value tags that annotate the schedule and can
- tags_fn (Optional[Callable[[ScheduleEvaluationContextScheduleEvaluationContext], Optional[Dict[str, str]]]]) – A function
- metadata (Optional[Mapping[str, Any]]) – A set of metadata entries that annotate the
- should_execute (Optional[Callable[[ScheduleEvaluationContextScheduleEvaluationContext], bool]]) – A function that runs at
- execution_timezone (Optional[str]) – Timezone in which the schedule should run.
- description (Optional[str]) – A human-readable description of the schedule.
- job (Optional[Union[GraphDefinitionGraphDefinition, JobDefinitionJobDefinition, UnresolvedAssetJobDefinition]]) – The job
- default_status (DefaultScheduleStatus) – If set to
RUNNING
, the schedule will immediately be active when starting Dagster. The default status can be overridden from the Dagster UI or via the GraphQL API. - required_resource_keys (Optional[Set[str]]) – The set of resource keys required by the schedule.
- target (Optional[Union[CoercibleToAssetSelection, AssetsDefinitionAssetsDefinition, JobDefinitionJobDefinition, UnresolvedAssetJobDefinition]]) – experimentalAssetSelection
AssetSelection
objects and anything coercible to it (e.g. str, Sequence[str], AssetKey, AssetsDefinition).
- Return a RunRequest
- class dagster.ScheduleDefinition
Defines a schedule that targets a job.
Parameters:
-
name (Optional[str]) – The name of the schedule to create. Defaults to the job name plus
-
cron_schedule (Union[str, Sequence[str]]) – A valid cron string or sequence of cron strings
-
execution_fn (Callable[ScheduleEvaluationContextScheduleEvaluationContext]) –
The core evaluation function for the schedule, which is run at an interval to determine whether a run should be launched or not. Takes a ScheduleEvaluationContext
ScheduleEvaluationContext
. -
run_config (Optional[Union[RunConfigRunConfig, Mapping]]) – The config that parameterizes this execution,
-
run_config_fn (Optional[Callable[[ScheduleEvaluationContextScheduleEvaluationContext], [Mapping]]]) – A function that
-
tags (Optional[Mapping[str, str]]) – A set of key-value tags that annotate the schedule
-
tags_fn (Optional[Callable[[ScheduleEvaluationContextScheduleEvaluationContext], Optional[Mapping[str, str]]]]) – A
-
should_execute (Optional[Callable[[ScheduleEvaluationContextScheduleEvaluationContext], bool]]) – A function that runs
-
execution_timezone (Optional[str]) –
-
description (Optional[str]) – A human-readable description of the schedule.
-
job (Optional[Union[GraphDefinitionGraphDefinition, JobDefinitionJobDefinition]]) – The job that should execute when this
-
default_status (DefaultScheduleStatus) – If set to
RUNNING
, the schedule will start as running. The default status can be overridden from the Dagster UI or via the GraphQL API. -
required_resource_keys (Optional[Set[str]]) – The set of resource keys required by the schedule.
-
target (Optional[Union[CoercibleToAssetSelection, AssetsDefinitionAssetsDefinition, JobDefinitionJobDefinition, UnresolvedAssetJobDefinition]]) – experimentalAssetSelection
AssetSelection
objects and anything coercible to it (e.g. str, Sequence[str], AssetKey, AssetsDefinition). -
metadata (Optional[Mapping[str, Any]]) – A set of metadata entries that annotate the
- property cron_schedule
The cron schedule representing when this schedule will be evaluated.
Type: Union[str, Sequence[str]]
- property default_status
The default status for this schedule when it is first loaded in a code location.
Type: DefaultScheduleStatus
- property description
A description for this schedule.
Type: Optional[str]
- property environment_vars
- deprecated
This API will be removed in version 2.0. Setting this property no longer has any effect..
Environment variables to export to the cron schedule.
Type: Mapping[str, str]
- property execution_timezone
The timezone in which this schedule will be evaluated.
Type: Optional[str]
- property job
The job that is targeted by this schedule.
Type: Union[JobDefinition, UnresolvedAssetJobDefinition]
- property job_name
The name of the job targeted by this schedule.
Type: str
- property metadata
The metadata for this schedule.
Type: Mapping[str, str]
- property name
The name of the schedule.
Type: str
- property required_resource_keys
The set of keys for resources that must be provided to this schedule.
Type: Set[str]
- property tags
The tags for this schedule.
Type: Mapping[str, str]
-
- class dagster.ScheduleEvaluationContext
The context object available as the first argument to various functions defined on a dagster.ScheduleDefinition
dagster.ScheduleDefinition
.A
ScheduleEvaluationContext
object is passed as the first argument torun_config_fn
,tags_fn
, andshould_execute
.Users should not instantiate this object directly. To construct a
ScheduleEvaluationContext
for testing purposes, use dagster.build_schedule_context()dagster.build_schedule_context()
.Example:
from dagster import schedule, ScheduleEvaluationContext
@schedule
def the_schedule(context: ScheduleEvaluationContext):
...- property instance
The current DagsterInstance
DagsterInstance
.Type: DagsterInstance
- property resources
Mapping of resource key to resource definition to be made available during schedule execution.
- property scheduled_execution_time
The time in which the execution was scheduled to happen. May differ slightly from both the actual execution time and the time at which the run config is computed.
- dagster.build_schedule_context
Builds schedule execution context using the provided parameters.
The instance provided to
build_schedule_context
must be persistent; DagsterInstance.ephemeral()DagsterInstance.ephemeral()
will result in an error.Parameters:
- instance (Optional[DagsterInstanceDagsterInstance]) – The Dagster instance configured to run the schedule.
- scheduled_execution_time (datetime) – The time in which the execution was scheduled to
Examples:
context = build_schedule_context(instance)
- dagster.build_schedule_from_partitioned_job
Creates a schedule from a job that targets time window-partitioned or statically-partitioned assets. The job can also be multi-partitioned, as long as one of the partition dimensions is time-partitioned.
The schedule executes at the cadence specified by the time partitioning of the job or assets.
Example:######################################
# Job that targets partitioned assets
######################################
from dagster import (
DailyPartitionsDefinition,
asset,
build_schedule_from_partitioned_job,
define_asset_job,
Definitions,
)
@asset(partitions_def=DailyPartitionsDefinition(start_date="2020-01-01"))
def asset1():
...
asset1_job = define_asset_job("asset1_job", selection=[asset1])
# The created schedule will fire daily
asset1_job_schedule = build_schedule_from_partitioned_job(asset1_job)
defs = Definitions(assets=[asset1], schedules=[asset1_job_schedule])
################
# Non-asset job
################
from dagster import DailyPartitionsDefinition, build_schedule_from_partitioned_job, jog
@job(partitions_def=DailyPartitionsDefinition(start_date="2020-01-01"))
def do_stuff_partitioned():
...
# The created schedule will fire daily
do_stuff_partitioned_schedule = build_schedule_from_partitioned_job(
do_stuff_partitioned,
)
defs = Definitions(schedules=[do_stuff_partitioned_schedule])
- dagster._core.scheduler.DagsterDaemonScheduler Scheduler
Default scheduler implementation that submits runs from the long-lived
dagster-daemon
process. Periodically checks each running schedule for execution times that don’t yet have runs and launches them.
Sensors
Sensors are typically used to poll, listen, and respond to external events. For example, you could configure a sensor to run a job or materialize an asset in response to specific events.
- @dagster.sensor
Creates a sensor where the decorated function is used as the sensor’s evaluation function.
The decorated function may:
- Return a RunRequest object.
- Return a list of RunRequest objects.
- Return a SkipReason object, providing a descriptive message of why no runs were requested.
- Return nothing (skipping without providing a reason)
- Yield a SkipReason or yield one or more RunRequest objects.
Takes a
SensorEvaluationContext
.
Parameters:
- name (Optional[str]) – The name of the sensor. Defaults to the name of the decorated
- minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse
- description (Optional[str]) – A human-readable description of the sensor.
- job (Optional[Union[GraphDefinitionGraphDefinition, JobDefinitionJobDefinition, UnresolvedAssetJobDefinition]]) – The job to be executed when the sensor fires.
- jobs (Optional[Sequence[Union[GraphDefinitionGraphDefinition, JobDefinitionJobDefinition, UnresolvedAssetJobDefinition]]]) – (experimental) A list of jobs to be executed when the sensor fires.
- default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default
- asset_selection (Optional[Union[str, Sequence[str], Sequence[AssetKeyAssetKey], Sequence[Union[AssetsDefinitionAssetsDefinition, SourceAssetSourceAsset]], AssetSelectionAssetSelection]]) – (Experimental) an asset selection to launch a run for if the sensor condition is met.
- tags (Optional[Mapping[str, str]]) – A set of key-value tags that annotate the sensor and can
- metadata (Optional[Mapping[str, object]]) – A set of metadata entries that annotate the
- target (Optional[Union[CoercibleToAssetSelection, AssetsDefinitionAssetsDefinition, JobDefinitionJobDefinition, UnresolvedAssetJobDefinition]]) – experimentalAssetSelection
AssetSelection
objects and anything coercible to it (e.g. str, Sequence[str], AssetKey, AssetsDefinition).
- class dagster.SensorDefinition
Define a sensor that initiates a set of runs based on some external state.
Parameters:
-
evaluation_fn (Callable[[SensorEvaluationContext]]) –
The core evaluation function for the sensor, which is run at an interval to determine whether a run should be launched or not. Takes a
SensorEvaluationContext
. -
name (Optional[str]) – The name of the sensor to create. Defaults to name of evaluation_fn
-
minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse
-
description (Optional[str]) – A human-readable description of the sensor.
-
job (Optional[GraphDefinitionGraphDefinition, JobDefinitionJobDefinition, UnresolvedAssetJob]) – The job to execute when this sensor fires.
-
jobs (Optional[Sequence[GraphDefinitionGraphDefinition, JobDefinitionJobDefinition, UnresolvedAssetJob]]) – (experimental) A list of jobs to execute when this sensor fires.
-
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default
-
asset_selection (Optional[Union[str, Sequence[str], Sequence[AssetKeyAssetKey], Sequence[Union[AssetsDefinitionAssetsDefinition, SourceAssetSourceAsset]], AssetSelectionAssetSelection]]) – (Experimental) an asset selection to launch a run for if the sensor condition is met.
-
tags (Optional[Mapping[str, str]]) – A set of key-value tags that annotate the sensor and can
-
metadata (Optional[Mapping[str, object]]) – A set of metadata entries that annotate the
-
target (Optional[Union[CoercibleToAssetSelection, AssetsDefinitionAssetsDefinition, JobDefinitionJobDefinition, UnresolvedAssetJobDefinition]]) – experimentalAssetSelection
AssetSelection
objects and anything coercible to it (e.g. str, Sequence[str], AssetKey, AssetsDefinition).
- property default_status
The default status for this sensor when it is first loaded in a code location.
Type: DefaultSensorStatus
- property description
A description for this sensor.
Type: Optional[str]
- property job
The job that is targeted by this schedule.
Type: Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]
- property job_name
The name of the job that is targeted by this sensor.
Type: Optional[str]
- property jobs
A list of jobs that are targeted by this schedule.
Type: List[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]
- property minimum_interval_seconds
The minimum number of seconds between sequential evaluations of this sensor.
Type: Optional[int]
- property name
The name of this sensor.
Type: str
- property required_resource_keys
The set of keys for resources that must be provided to this sensor.
Type: Set[str]
-
- class dagster.SensorEvaluationContext
The context object available as the argument to the evaluation function of a dagster.SensorDefinition
dagster.SensorDefinition
.Users should not instantiate this object directly. To construct a SensorEvaluationContext for testing purposes, use
dagster. build_sensor_context()
.- instance_ref
The serialized instance configured to run the schedule
Type: Optional[InstanceRef]
- cursor
The cursor, passed back from the last sensor evaluation via the cursor attribute of SkipReason and RunRequest
Type: Optional[str]
- last_tick_completion_time
The last time that the sensor was evaluated (UTC).
Type: float
- last_run_key
DEPRECATED The run key of the RunRequest most recently created by this sensor. Use the preferred cursor attribute instead.
Type: str
- log_key
The log key to use for this sensor tick.
Type: Optional[List[str]]
- repository_name
The name of the repository that the sensor belongs to.
Type: Optional[str]
- repository_def
The repository or that the sensor belongs to. If needed by the sensor top-level resource definitions will be pulled from this repository. You can provide either this or definitions.
Type: Optional[RepositoryDefinition]
- instance
The deserialized instance can also be passed in directly (primarily useful in testing contexts).
Type: Optional[DagsterInstance]
- definitions
Definitions object that the sensor is defined in. If needed by the sensor, top-level resource definitions will be pulled from these definitions. You can provide either this or repository_def.
Type: Optional[Definitions]
- resources
A dict of resource keys to resource definitions to be made available during sensor execution.
Type: Optional[Dict[str, Any]]
- last_sensor_start_time
The last time that the sensor was started (UTC).
Type: float
- code_location_origin
The code location that the sensor is in.
Type: Optional[CodeLocationOrigin]
Example:
from dagster import sensor, SensorEvaluationContext
@sensor
def the_sensor(context: SensorEvaluationContext):
...- update_cursor
Updates the cursor value for this sensor, which will be provided on the context for the next sensor evaluation.
This can be used to keep track of progress and avoid duplicate work across sensor evaluations.
Parameters: cursor (Optional[str])
- property cursor
The cursor value for this sensor, which was set in an earlier sensor evaluation.
- property instance
The current DagsterInstance.
Type: DagsterInstance
- property is_first_tick_since_sensor_start
Flag representing if this is the first tick since the sensor was started.
- property last_run_key
The run key supplied to the most recent RunRequest produced by this sensor.
Type: Optional[str]
- property last_sensor_start_time
Timestamp representing the last time this sensor was started. Can be used in concert with last_tick_completion_time to determine if this is the first tick since the sensor was started.
Type: Optional[float]
- property last_tick_completion_time
Timestamp representing the last time this sensor completed an evaluation.
Type: Optional[float]
- property repository_def
The RepositoryDefinition that this sensor resides in.
Type: Optional[RepositoryDefinition]
- property repository_name
The name of the repository that this sensor resides in.
Type: Optional[str]
- property resources
A mapping from resource key to instantiated resources for this sensor.
Type: Resources
- dagster.build_sensor_context
Builds sensor execution context using the provided parameters.
This function can be used to provide a context to the invocation of a sensor definition.If provided, the dagster instance must be persistent; DagsterInstance.ephemeral() will result in an error.
Parameters:
- instance (Optional[DagsterInstanceDagsterInstance]) – The dagster instance configured to run the sensor.
- cursor (Optional[str]) – A cursor value to provide to the evaluation of the sensor.
- repository_name (Optional[str]) – The name of the repository that the sensor belongs to.
- repository_def (Optional[RepositoryDefinitionRepositoryDefinition]) – The repository that the sensor belongs to.
- resources (Optional[Mapping[str, ResourceDefinitionResourceDefinition]]) – A set of resource definitions
- definitions (Optional[DefinitionsDefinitions]) – Definitions object that the sensor is defined in.
- last_sensor_start_time (Optional[float]) – The last time the sensor was started.
Examples:
context = build_sensor_context()
my_sensor(context)
- @dagster.asset_sensor
Creates an asset sensor where the decorated function is used as the asset sensor’s evaluation function.
If the asset has been materialized multiple times between since the last sensor tick, the evaluation function will only be invoked once, with the latest materialization.
The decorated function may:
- Return a RunRequest object.
- Return a list of RunRequest objects.
- Return a SkipReason object, providing a descriptive message of why no runs were requested.
- Return nothing (skipping without providing a reason)
- Yield a SkipReason or yield one or more RunRequest objects.
Takes a
SensorEvaluationContext
and an EventLogEntry corresponding to an AssetMaterialization event.
Parameters:
- asset_key (AssetKeyAssetKey) – The asset_key this sensor monitors.
- name (Optional[str]) – The name of the sensor. Defaults to the name of the decorated
- minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse
- description (Optional[str]) – A human-readable description of the sensor.
- job (Optional[Union[GraphDefinitionGraphDefinition, JobDefinitionJobDefinition, UnresolvedAssetJobDefinition]]) – The
- jobs (Optional[Sequence[Union[GraphDefinitionGraphDefinition, JobDefinitionJobDefinition, UnresolvedAssetJobDefinition]]]) – (experimental) A list of jobs to be executed when the sensor fires.
- default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default
- tags (Optional[Mapping[str, str]]) – A set of key-value tags that annotate the sensor and can
- metadata (Optional[Mapping[str, object]]) – A set of metadata entries that annotate the
Example:
from dagster import AssetKey, EventLogEntry, SensorEvaluationContext, asset_sensor
@asset_sensor(asset_key=AssetKey("my_table"), job=my_job)
def my_asset_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry):
return RunRequest(
run_key=context.cursor,
run_config=\{
"ops": \{
"read_materialization": \{
"config": \{
"asset_key": asset_event.dagster_event.asset_key.path,
}
}
}
},
)
- class dagster.AssetSensorDefinition
Define an asset sensor that initiates a set of runs based on the materialization of a given asset.
If the asset has been materialized multiple times between since the last sensor tick, the evaluation function will only be invoked once, with the latest materialization.
Parameters:
-
name (str) – The name of the sensor to create.
-
asset_key (AssetKeyAssetKey) – The asset_key this sensor monitors.
-
asset_materialization_fn (Callable[[SensorEvaluationContext, EventLogEntryEventLogEntry], Union[Iterator[Union[RunRequestRunRequest, SkipReasonSkipReason]], RunRequestRunRequest, SkipReasonSkipReason]]) –
The core evaluation function for the sensor, which is run at an interval to determine whether a run should be launched or not. Takes a
SensorEvaluationContext
and an EventLogEntry corresponding to an AssetMaterialization event. -
minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse
-
description (Optional[str]) – A human-readable description of the sensor.
-
job (Optional[Union[GraphDefinitionGraphDefinition, JobDefinitionJobDefinition, UnresolvedAssetJobDefinition]]) – The job
-
jobs (Optional[Sequence[Union[GraphDefinitionGraphDefinition, JobDefinitionJobDefinition, UnresolvedAssetJobDefinition]]]) – (experimental) A list of jobs to be executed when the sensor fires.
-
tags (Optional[Mapping[str, str]]) – A set of key-value tags that annotate the sensor and can
-
metadata (Optional[Mapping[str, object]]) – A set of metadata entries that annotate the
-
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default
- property asset_key
The key of the asset targeted by this sensor.
Type: AssetKey
-
- @dagster.freshness_policy_sensor
- experimental
This API may break in future versions, even between dot releases.
Define a sensor that reacts to the status of a given set of asset freshness policies, where the decorated function will be evaluated on every tick for each asset in the selection that has a FreshnessPolicy defined.
Note: returning or yielding a value from the annotated function will result in an error.
Takes a
FreshnessPolicySensorContext
.Parameters:
- asset_selection (AssetSelectionAssetSelection) – The asset selection monitored by the sensor.
- name (Optional[str]) – The name of the sensor. Defaults to the name of the decorated function.
- freshness_policy_sensor_fn (Callable[[FreshnessPolicySensorContext], None]) – The core
- minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse
- description (Optional[str]) – A human-readable description of the sensor.
- default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default
- class dagster.FreshnessPolicySensorDefinition
Define a sensor that reacts to the status of a given set of asset freshness policies, where the decorated function will be evaluated on every sensor tick.
Parameters:
- name (str) – The name of the sensor. Defaults to the name of the decorated function.
- freshness_policy_sensor_fn (Callable[[FreshnessPolicySensorContext], None]) – The core
- asset_selection (AssetSelectionAssetSelection) – The asset selection monitored by the sensor.
- minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse
- description (Optional[str]) – A human-readable description of the sensor.
- default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default
- class dagster.FreshnessPolicySensorContext
The
context
object available to a decorated function offreshness_policy_sensor
.- sensor_name
the name of the sensor.
Type: str
- asset_key
the key of the asset being monitored
Type: AssetKey
- freshness_policy
the freshness policy of the asset being monitored
Type: FreshnessPolicy
- minutes_overdue
Type: Optional[float]
- previous_minutes_overdue
the minutes_overdue value for this asset on the previous sensor tick.
Type: Optional[float]
- instance
the current instance.
Type: DagsterInstance
- dagster.build_freshness_policy_sensor_context
- experimental
This API may break in future versions, even between dot releases.
Builds freshness policy sensor context from provided parameters.
This function can be used to provide the context argument when directly invoking a function decorated with @freshness_policy_sensor, such as when writing unit tests.
Parameters:
- sensor_name (str) – The name of the sensor the context is being constructed for.
- asset_key (AssetKeyAssetKey) – The AssetKey for the monitored asset
- freshness_policy (FreshnessPolicyFreshnessPolicy) – The FreshnessPolicy for the monitored asset
- minutes_overdue (Optional[float]) – How overdue the monitored asset currently is
- previous_minutes_overdue (Optional[float]) – How overdue the monitored asset was on the
- instance (DagsterInstanceDagsterInstance) – The dagster instance configured for the context.
Examples:
context = build_freshness_policy_sensor_context(
sensor_name="freshness_policy_sensor_to_invoke",
asset_key=AssetKey("some_asset"),
freshness_policy=FreshnessPolicy(maximum_lag_minutes=30)\<
minutes_overdue=10.0,
)
freshness_policy_sensor_to_invoke(context)
- @dagster.multi_asset_sensor
- experimental
This API may break in future versions, even between dot releases.
Creates an asset sensor that can monitor multiple assets.
The decorated function is used as the asset sensor’s evaluation function. The decorated function may:
- Return a RunRequest object.
- Return a list of RunRequest objects.
- Return a SkipReason object, providing a descriptive message of why no runs were requested.
- Return nothing (skipping without providing a reason)
- Yield a SkipReason or yield one or more RunRequest objects.
Takes a MultiAssetSensorEvaluationContext
MultiAssetSensorEvaluationContext
.
Parameters:
- monitored_assets (Union[Sequence[AssetKeyAssetKey], AssetSelectionAssetSelection]) – The assets this
- name (Optional[str]) – The name of the sensor. Defaults to the name of the decorated
- minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse
- description (Optional[str]) – A human-readable description of the sensor.
- job (Optional[Union[GraphDefinitionGraphDefinition, JobDefinitionJobDefinition, UnresolvedAssetJobDefinition]]) – The
- jobs (Optional[Sequence[Union[GraphDefinitionGraphDefinition, JobDefinitionJobDefinition, UnresolvedAssetJobDefinition]]]) – (experimental) A list of jobs to be executed when the sensor fires.
- default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default
- request_assets (Optional[AssetSelectionAssetSelection]) – (Experimental) an asset selection to launch a run
- tags (Optional[Mapping[str, str]]) – A set of key-value tags that annotate the sensor and can
- metadata (Optional[Mapping[str, object]]) – A set of metadata entries that annotate the
- class dagster.MultiAssetSensorDefinition
- experimental
This API may break in future versions, even between dot releases.
Define an asset sensor that initiates a set of runs based on the materialization of a list of assets.
Users should not instantiate this object directly. To construct a MultiAssetSensorDefinition, use
dagster. multi_asset_sensor()
.Parameters:
-
name (str) – The name of the sensor to create.
-
asset_keys (Sequence[AssetKeyAssetKey]) – The asset_keys this sensor monitors.
-
asset_materialization_fn (Callable[[MultiAssetSensorEvaluationContextMultiAssetSensorEvaluationContext], Union[Iterator[Union[RunRequestRunRequest, SkipReasonSkipReason]], RunRequestRunRequest, SkipReasonSkipReason]]) –
The core evaluation function for the sensor, which is run at an interval to determine whether a run should be launched or not. Takes a MultiAssetSensorEvaluationContext
MultiAssetSensorEvaluationContext
. -
minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse
-
description (Optional[str]) – A human-readable description of the sensor.
-
job (Optional[Union[GraphDefinitionGraphDefinition, JobDefinitionJobDefinition, UnresolvedAssetJobDefinition]]) – The job
-
jobs (Optional[Sequence[Union[GraphDefinitionGraphDefinition, JobDefinitionJobDefinition, UnresolvedAssetJobDefinition]]]) – (experimental) A list of jobs to be executed when the sensor fires.
-
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default
-
request_assets (Optional[AssetSelectionAssetSelection]) – (Experimental) an asset selection to launch a run
-
tags (Optional[Mapping[str, str]]) – A set of key-value tags that annotate the sensor and can
-
metadata (Optional[Mapping[str, object]]) – A set of metadata entries that annotate the
-
- class dagster.MultiAssetSensorEvaluationContext
- experimental
This API may break in future versions, even between dot releases.
The context object available as the argument to the evaluation function of a dagster.MultiAssetSensorDefinition
dagster.MultiAssetSensorDefinition
.Users should not instantiate this object directly. To construct a MultiAssetSensorEvaluationContext for testing purposes, use
dagster. build_multi_asset_sensor_context()
.The MultiAssetSensorEvaluationContext contains a cursor object that tracks the state of consumed event logs for each monitored asset. For each asset, the cursor stores the storage ID of the latest materialization that has been marked as “consumed” (via a call to advance_cursor) in a latest_consumed_event_id field.
For each monitored asset, the cursor will store the latest unconsumed event ID for up to 25 partitions. Each event ID must be before the latest_consumed_event_id field for the asset.
Events marked as consumed via advance_cursor will be returned in future ticks until they are marked as consumed.
To update the cursor to the latest materialization and clear the unconsumed events, call advance_all_cursors.
- monitored_assets
The assets monitored by the sensor. If an AssetSelection object is provided, it will only apply to assets within the Definitions that this sensor is part of.
Type: Union[Sequence[AssetKey], AssetSelection]
- repository_def
The repository that the sensor belongs to. If needed by the sensor top-level resource definitions will be pulled from this repository. You can provide either this or definitions.
Type: Optional[RepositoryDefinition]
- instance_ref
The serialized instance configured to run the schedule
Type: Optional[InstanceRef]
- cursor
The cursor, passed back from the last sensor evaluation via the cursor attribute of SkipReason and RunRequest. Must be a dictionary of asset key strings to a stringified tuple of (latest_event_partition, latest_event_storage_id, trailing_unconsumed_partitioned_event_ids).
Type: Optional[str]
- last_tick_completion_time
The last time that the sensor was evaluated for a tick (UTC).
Type: Optional[float]
- last_run_key
DEPRECATED The run key of the RunRequest most recently created by this sensor. Use the preferred cursor attribute instead.
Type: str
- repository_name
The name of the repository that the sensor belongs to.
Type: Optional[str]
- instance
The deserialized instance can also be passed in directly (primarily useful in testing contexts).
Type: Optional[DagsterInstance]
- definitions
Definitions object that the sensor is defined in. If needed by the sensor, top-level resource definitions will be pulled from these definitions. You can provide either this or repository_def.
Type: Optional[Definitions]
- last_sensor_start_time
The last time the sensor was started.
Type: Optional[float]
- log_key
The log key to use for this sensor tick.
Type: Optional[List[str]]
- sensor_name
The name of the sensor, used for logging and error messages.
Type: Optional[str]
- code_location_origin
The code location that the sensor is in.
Type: Optional[CodeLocationOrigin]
Example:
from dagster import multi_asset_sensor, MultiAssetSensorEvaluationContext
@multi_asset_sensor(monitored_assets=[AssetKey("asset_1), AssetKey("asset_2)])
def the_sensor(context: MultiAssetSensorEvaluationContext):
...- advance_all_cursors
Updates the cursor to the most recent materialization event for all assets monitored by the multi_asset_sensor.
Marks all materialization events as consumed by the sensor, including unconsumed events.
- advance_cursor
Marks the provided materialization records as having been consumed by the sensor.
At the end of the tick, the cursor will be updated to advance past all materializations records provided via advance_cursor. In the next tick, records that have been consumed will no longer be returned.
Passing a partitioned materialization record into this function will mark prior materializations with the same asset key and partition as having been consumed.
Parameters: materialization_records_by_key (Mapping[AssetKeyAssetKey, Optional[EventLogRecordEventLogRecord]]) – Mapping of AssetKeys to EventLogRecord or None. If an EventLogRecord is provided, the cursor for the AssetKey will be updated and future calls to fetch asset materialization events will not fetch this event again. If None is provided, the cursor for the AssetKey will not be updated.
- all_partitions_materialized
A utility method to check if a provided list of partitions have been materialized for a particular asset. This method ignores the cursor and checks all materializations for the asset.
Parameters:
- asset_key (AssetKeyAssetKey) – The asset to check partitions for.
- partitions (Optional[Sequence[str]]) – A list of partitions to check. If not provided,
Returns: True if all selected partitions have been materialized, False otherwise.Return type: bool
- get_cursor_partition
A utility method to get the current partition the cursor is on.
- get_downstream_partition_keys
Converts a partition key from one asset to the corresponding partition key in a downstream asset. Uses the existing partition mapping between the upstream asset and the downstream asset if it exists, otherwise, uses the default partition mapping.
Parameters:
- partition_key (str) – The partition key to convert.
- from_asset_key (AssetKeyAssetKey) – The asset key of the upstream asset, which the provided
- to_asset_key (AssetKeyAssetKey) – The asset key of the downstream asset. The provided partition
Returns: A list of the corresponding downstream partitions in to_asset_key that partition_key maps to.
Return type: Sequence[str]
- get_trailing_unconsumed_events
Fetches the unconsumed events for a given asset key. Returns only events before the latest consumed event ID for the given asset. To mark an event as consumed, pass the event to advance_cursor. Returns events in ascending order by storage ID.
Parameters: asset_key (AssetKeyAssetKey) – The asset key to get unconsumed events for.Returns: The unconsumed events for the given asset key.Return type: Sequence[EventLogRecord]
- latest_materialization_records_by_key
Fetches the most recent materialization event record for each asset in asset_keys. Only fetches events after the latest consumed event ID for the given asset key.
Parameters: asset_keys (Optional[Sequence[AssetKeyAssetKey]]) – list of asset keys to fetch events for. If not specified, the latest materialization will be fetched for all assets the multi_asset_sensor monitors. Returns: Mapping of AssetKey to EventLogRecord where the EventLogRecord is the latest materialization event for the asset. If there is no materialization event for the asset, the value in the mapping will be None.
- latest_materialization_records_by_partition
Given an asset, returns a mapping of partition key to the latest materialization event for that partition. Fetches only materializations that have not been marked as “consumed” via a call to advance_cursor.
Parameters:
- asset_key (AssetKeyAssetKey) – The asset to fetch events for.
- after_cursor_partition (Optional[bool]) – If True, only materializations with partitions
Returns: Mapping of AssetKey to a mapping of partitions to EventLogRecords where the EventLogRecord is the most recent materialization event for the partition. The mapping preserves the order that the materializations occurred.Return type: Mapping[str, EventLogRecord] Example:
@asset(partitions_def=DailyPartitionsDefinition("2022-07-01"))
def july_asset():
return 1
@multi_asset_sensor(asset_keys=[july_asset.key])
def my_sensor(context):
context.latest_materialization_records_by_partition(july_asset.key)
# After materializing july_asset for 2022-07-05, latest_materialization_by_partition
# returns \{"2022-07-05": EventLogRecord(...)}
- latest_materialization_records_by_partition_and_asset
Finds the most recent unconsumed materialization for each partition for each asset monitored by the sensor. Aggregates all materializations into a mapping of partition key to a mapping of asset key to the materialization event for that partition.
For example, if the sensor monitors two partitioned assets A and B that are materialized for partition_x after the cursor, this function returns:
\{
"partition_x": \{asset_a.key: EventLogRecord(...), asset_b.key: EventLogRecord(...)}
}This method can only be called when all monitored assets are partitioned and share the same partition definition.
- materialization_records_for_key
Fetches asset materialization event records for asset_key, with the earliest event first.
Only fetches events after the latest consumed event ID for the given asset key.
Parameters:
- asset_key (AssetKeyAssetKey) – The asset to fetch materialization events for
- limit (Optional[int]) – The number of events to fetch
- property asset_keys
The asset keys which are monitored by this sensor.
Type: Sequence[AssetKey]
- property assets_defs_by_key
A mapping from AssetKey to the AssetsDefinition object which produces it. If a given asset is monitored by this sensor, but is not produced within the same code location as this sensor, then the value will be None.
Type: Mapping[AssetKey, Optional[AssetsDefinition]]
- dagster.build_multi_asset_sensor_context
- experimental
This API may break in future versions, even between dot releases.
Builds multi asset sensor execution context for testing purposes using the provided parameters.
This function can be used to provide a context to the invocation of a multi asset sensor definition. If provided, the dagster instance must be persistent; DagsterInstance.ephemeral() will result in an error.
Parameters:
- monitored_assets (Union[Sequence[AssetKeyAssetKey], AssetSelectionAssetSelection]) – The assets monitored
- repository_def (RepositoryDefinitionRepositoryDefinition) – RepositoryDefinition object that
- instance (Optional[DagsterInstanceDagsterInstance]) – The dagster instance configured to run the sensor.
- cursor (Optional[str]) – A string cursor to provide to the evaluation of the sensor. Must be
- repository_name (Optional[str]) – The name of the repository that the sensor belongs to.
- cursor_from_latest_materializations (bool) – If True, the cursor will be set to the latest
- resources (Optional[Mapping[str, object]]) – The resource definitions
- definitions (Optional[DefinitionsDefinitions]) – Definitions object that the sensor is defined in.
Examples:
with instance_for_test() as instance:
context = build_multi_asset_sensor_context(
monitored_assets=[AssetKey("asset_1"), AssetKey("asset_2")],
instance=instance,
)
my_asset_sensor(context)
- class dagster.RunStatusSensorDefinition
Define a sensor that reacts to a given status of job execution, where the decorated function will be evaluated when a run is at the given status.
Parameters:
- name (str) – The name of the sensor. Defaults to the name of the decorated function.
- run_status (DagsterRunStatusDagsterRunStatus) – The status of a run which will be
- run_status_sensor_fn (Callable[[RunStatusSensorContextRunStatusSensorContext], Union[SkipReasonSkipReason, DagsterRunReaction]]) – The core
- minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse
- description (Optional[str]) – A human-readable description of the sensor.
- monitored_jobs (Optional[List[Union[JobDefinitionJobDefinition, GraphDefinitionGraphDefinition, UnresolvedAssetJobDefinition, JobSelectorJobSelector, RepositorySelectorRepositorySelector, CodeLocationSelector]]]) – The jobs in the current repository that will be monitored by this sensor. Defaults to
- monitor_all_code_locations (bool) – If set to True, the sensor will monitor all runs in the
- default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default
- request_job (Optional[Union[GraphDefinitionGraphDefinition, JobDefinitionJobDefinition]]) – The job a RunRequest should
- tags (Optional[Mapping[str, str]]) – A set of key-value tags that annotate the sensor and can
- metadata (Optional[Mapping[str, object]]) – A set of metadata entries that annotate the
- request_jobs (Optional[Sequence[Union[GraphDefinitionGraphDefinition, JobDefinitionJobDefinition]]]) – (experimental)
- class dagster.RunStatusSensorContext
The
context
object available to a decorated function ofrun_status_sensor
.- property dagster_event
The event associated with the job run status.
- property dagster_run
The run of the job.
- property instance
The current instance.
- property log
The logger for the current sensor evaluation.
- property partition_key
The partition key of the relevant run.
Type: Optional[str]
- property sensor_name
The name of the sensor.
- class dagster.RunFailureSensorContext
The
context
object available to a decorated function ofrun_failure_sensor
.- sensor_name
the name of the sensor.
Type: str
- dagster_run
the failed run.
Type: DagsterRun
- get_step_failure_events
The step failure event for each step in the run that failed.
Examples:
error_strings_by_step_key = \{
# includes the stack trace
event.step_key: event.event_specific_data.error.to_string()
for event in context.get_step_failure_events()
}
- property failure_event
The run failure event.
If the run failed because of an error inside a step, get_step_failure_events will have more details on the step failure.
- dagster.build_run_status_sensor_context
Builds run status sensor context from provided parameters.
This function can be used to provide the context argument when directly invoking a function decorated with @run_status_sensor or @run_failure_sensor, such as when writing unit tests.
Parameters:
- sensor_name (str) – The name of the sensor the context is being constructed for.
- dagster_event (DagsterEventDagsterEvent) – A DagsterEvent with the same event type as the one that
- dagster_instance (DagsterInstanceDagsterInstance) – The dagster instance configured for the context.
- dagster_run (DagsterRunDagsterRun) – DagsterRun object from running a job
- resources (Optional[Mapping[str, object]]) – A dictionary of resources to be made available
- repository_def (Optional[RepositoryDefinitionRepositoryDefinition]) – experimental
Examples:
instance = DagsterInstance.ephemeral()
result = my_job.execute_in_process(instance=instance)
dagster_run = result.dagster_run
dagster_event = result.get_job_success_event() # or get_job_failure_event()
context = build_run_status_sensor_context(
sensor_name="run_status_sensor_to_invoke",
dagster_instance=instance,
dagster_run=dagster_run,
dagster_event=dagster_event,
)
run_status_sensor_to_invoke(context)
- @dagster.run_status_sensor
Creates a sensor that reacts to a given status of job execution, where the decorated function will be run when a job is at the given status.
Takes a RunStatusSensorContext
RunStatusSensorContext
.Parameters:
- run_status (DagsterRunStatusDagsterRunStatus) – The status of run execution which will be
- name (Optional[str]) – The name of the sensor. Defaults to the name of the decorated function.
- minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse
- description (Optional[str]) – A human-readable description of the sensor.
- monitored_jobs (Optional[List[Union[JobDefinitionJobDefinition, GraphDefinitionGraphDefinition, UnresolvedAssetJobDefinition, RepositorySelectorRepositorySelector, JobSelectorJobSelector, CodeLocationSelector]]]) – Jobs in the current code locations that will be monitored by this sensor. Defaults to None, which means the alert will
- monitor_all_code_locations (Optional[bool]) – If set to True, the sensor will monitor all runs in the Dagster deployment.
- job_selection (Optional[List[Union[JobDefinitionJobDefinition, GraphDefinitionGraphDefinition, RepositorySelectorRepositorySelector, JobSelectorJobSelector, CodeLocationSelector]]]) – deprecatedmonitored_jobs instead.) (deprecated in favor of monitored_jobs) Jobs in the current code location that will be
- default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default
- request_job (Optional[Union[GraphDefinitionGraphDefinition, JobDefinitionJobDefinition, UnresolvedAssetJobDefinition]]) – The job that should be
- request_jobs (Optional[Sequence[Union[GraphDefinitionGraphDefinition, JobDefinitionJobDefinition, UnresolvedAssetJobDefinition]]]) – (experimental)
- monitor_all_repositories (Optional[bool]) – deprecatedmonitor_all_code_locations instead.) (deprecated in favor of monitor_all_code_locations) If set to True, the sensor will monitor all runs in the Dagster instance.
- tags (Optional[Mapping[str, str]]) – A set of key-value tags that annotate the sensor and can
- metadata (Optional[Mapping[str, object]]) – A set of metadata entries that annotate the
- @dagster.run_failure_sensor
Creates a sensor that reacts to job failure events, where the decorated function will be run when a run fails.
Takes a RunFailureSensorContext
RunFailureSensorContext
.Parameters:
- name (Optional[str]) – The name of the job failure sensor. Defaults to the name of the
- minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse
- description (Optional[str]) – A human-readable description of the sensor.
- monitored_jobs (Optional[List[Union[JobDefinitionJobDefinition, GraphDefinitionGraphDefinition, UnresolvedAssetJobDefinition, RepositorySelectorRepositorySelector, JobSelectorJobSelector, CodeLocationSelector]]]) – The jobs in the current repository that will be monitored by this failure sensor.
- monitor_all_code_locations (bool) – If set to True, the sensor will monitor all runs in the
- job_selection (Optional[List[Union[JobDefinitionJobDefinition, GraphDefinitionGraphDefinition, RepositorySelectorRepositorySelector, JobSelectorJobSelector, CodeLocationSelector]]]) – deprecatedmonitored_jobs instead.) (deprecated in favor of monitored_jobs) The jobs in the current repository that will be
- default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default
- request_job (Optional[Union[GraphDefinitionGraphDefinition, JobDefinitionJobDefinition, UnresolvedAssetJob]]) – The job a RunRequest should
- request_jobs (Optional[Sequence[Union[GraphDefinitionGraphDefinition, JobDefinitionJobDefinition, UnresolvedAssetJob]]]) – (experimental)
- monitor_all_repositories (bool) – deprecatedmonitor_all_code_locations instead.) (deprecated in favor of monitor_all_code_locations) If set to True,
- tags (Optional[Mapping[str, str]]) – A set of key-value tags that annotate the sensor and can
- metadata (Optional[Mapping[str, object]]) – A set of metadata entries that annotate the
- class dagster.SensorResult
The result of a sensor evaluation.
- run_requests
A list of run requests to be executed.
Type: Optional[Sequence[RunRequest]]
- skip_reason
A skip message indicating why sensor evaluation was skipped.
Type: Optional[Union[str, SkipReason]]
- cursor
The cursor value for this sensor, which will be provided on the context for the next sensor evaluation.
Type: Optional[str]
- dynamic_partitions_requests (Optional[Sequence[Union[DeleteDynamicPartitionsRequest,
AddDynamicPartitionsRequest]]]): A list of dynamic partition requests to request dynamic partition addition and deletion. Run requests will be evaluated using the state of the partitions with these changes applied. We recommend limiting partition additions and deletions to a maximum of 25K partitions per sensor evaluation, as this is the maximum recommended partition limit per asset.
- asset_events
(Experimental) A list of materializations, observations, and asset check evaluations that the system will persist on your behalf at the end of sensor evaluation. These events will be not be associated with any particular run, but will be queryable and viewable in the asset catalog.
Type: Optional[Sequence[Union[AssetObservation, AssetMaterialization, AssetCheckEvaluation]]]
- class dagster.AddDynamicPartitionsRequest
A request to add partitions to a dynamic partitions definition, to be evaluated by a sensor or schedule.
- class dagster.DeleteDynamicPartitionsRequest
A request to delete partitions to a dynamic partitions definition, to be evaluated by a sensor or schedule.