poprox_recommender.lkpipeline#

A vendored copy of LensKit’s pipeline abstraction, without trainability support.

Classes

Pipeline([name, version])

LensKit recommendation pipeline.

Exceptions

PipelineError

Pipeline configuration errors.

PipelineWarning

Pipeline configuration and setup warnings.

class poprox_recommender.lkpipeline.Pipeline(name=None, version=None)#

Bases: object

LensKit recommendation pipeline. This is the core abstraction for using LensKit models and other components to produce recommendations in a useful way. It allows you to wire together components in (mostly) abitrary graphs, train them on data, and serialize pipelines to disk for use elsewhere.

If you have a scoring model and just want to generate recommenations with a default setup and minimal configuration, see topn_pipeline().

Parameters:
  • name (str | None) – A name for the pipeline.

  • version (str | None) – A numeric version for the pipeline.

meta(*, include_hash=True)#

Get the metadata (name, version, hash, etc.) for this pipeline without returning the whole config.

Parameters:

include_hash (bool) – Whether to include a configuration hash in the metadata.

Return type:

PipelineMeta

property nodes: list[Node[object]]#

Get the nodes in the pipeline graph.

node(node, *, missing='error')#

Get the pipeline node with the specified name. If passed a node, it returns the node or fails if the node is not a member of the pipeline.

Parameters:
  • node (str | Node[Any]) – The name of the pipeline node to look up, or a node to check for membership.

  • missing (Literal['error', 'none'] | None)

Returns:

The pipeline node, if it exists.

Raises:

KeyError – The specified node does not exist.

Return type:

Node[object] | None

create_input(name, *types)#

Create an input node for the pipeline. Pipelines expect their inputs to be provided when they are run.

Parameters:
  • name (str) – The name of the input. The name must be unique in the pipeline (among both components and inputs).

  • types (type[T] | None) – The allowable types of the input; input data can be of any specified type. If None is among the allowed types, the input can be omitted.

Returns:

A pipeline node representing this input.

Raises:

ValueError – a node with the specified name already exists.

Return type:

Node[T]

literal(value, *, name=None)#

Create a literal node (a node with a fixed value).

Note

Literal nodes cannot be serialized witih get_config() or save_config().

Parameters:
  • value (T)

  • name (str | None)

Return type:

LiteralNode[T]

set_default(name, node)#

Set the default wiring for a component input. Components that declare an input parameter with the specified name but no configured input will be wired to this node.

This is intended to be used for things like wiring up user parameters to semi-automatically receive the target user’s identity and history.

Parameters:
  • name (str) – The name of the parameter to set a default for.

  • node (Node[Any] | object) – The node or literal value to wire to this parameter.

Return type:

None

get_default(name)#

Get the default wiring for an input name.

Parameters:

name (str)

Return type:

Node[Any] | None

alias(alias, node)#

Create an alias for a node. After aliasing, the node can be retrieved from node() using either its original name or its alias.

Parameters:
  • alias (str) – The alias to add to the node.

  • node (Node[Any] | str) – The node (or node name) to alias.

Raises:

ValueError – if the alias is already used as an alias or node name.

Return type:

None

add_component(name, obj, **inputs)#

Add a component and connect it into the graph.

Parameters:
  • name (str) – The name of the component in the pipeline. The name must be unique in the pipeline (among both components and inputs).

  • obj (Component[ND] | Callable[[...], ND]) – The component itself.

  • inputs (Node[Any] | object) – The component’s input wiring. See pipeline-connections for details.

Returns:

The node representing this component in the pipeline.

Return type:

Node[ND]

replace_component(name, obj, **inputs)#

Replace a component in the graph. The new component must have a type that is compatible with the old component. The old component’s input connections will be replaced (as the new component may have different inputs), but any connections that use the old component to supply an input will use the new component instead.

Parameters:
Return type:

Node[ND]

connect(obj, **inputs)#

Provide additional input connections for a component that has already been added. See pipeline-connections for details.

Parameters:
  • obj (str | Node[Any]) – The name or node of the component to wire.

  • inputs (Node[Any] | str | object) – The component’s input wiring. For each keyword argument in the component’s function signature, that argument can be provided here with an input that the pipeline will provide to that argument of the component when the pipeline is run.

component_configs()#

Get the configurations for the components. This is the configurations only, it does not include pipeline inputs or wiring.

Return type:

dict[str, dict[str, Any]]

clone(how='config')#

Clone the pipeline, optionally including trained parameters.

The how parameter controls how the pipeline is cloned, and what is available in the clone pipeline. It can be one of the following values:

"config"

Create fresh component instances using the configurations of the components in this pipeline. When applied to a trained pipeline, the clone does not have the original’s learned parameters. This is the default clone method.

"pipeline-config"

Round-trip the entire pipeline through get_config() and from_config().

Parameters:

how (Literal['config', 'pipeline-config']) – The mechanism to use for cloning the pipeline.

Returns:

A new pipeline with the same components and wiring, but fresh instances created by round-tripping the configuration.

Return type:

Pipeline

get_config(*, include_hash=True)#

Get this pipeline’s configuration for serialization. The configuration consists of all inputs and components along with their configurations and input connections. It can be serialized to disk (in JSON, YAML, or a similar format) to save a pipeline.

The configuration does not include any trained parameter values, although the configuration may include things such as paths to checkpoints to load such parameters, depending on the design of the components in the pipeline.

Note

Literal nodes (from literal(), or literal values wired to inputs) cannot be serialized, and this method will fail if they are present in the pipeline.

Parameters:

include_hash (bool)

Return type:

PipelineConfig

config_hash()#

Get a hash of the pipeline’s configuration to uniquely identify it for logging, version control, or other purposes.

The hash format and algorithm are not guaranteed, but is stable within a LensKit version. For the same version of LensKit and component code, the same configuration will produce the same hash, so long as there are no literal nodes. Literal nodes will usually hash consistently, but since literals other than basic JSON values are hashed by pickling, hash stability depends on the stability of the pickle bytestream.

In LensKit 2024.1, the configuration hash is computed by computing the JSON serialization of the pipeline configuration without a hash and returning the hex-encoded SHA256 hash of that configuration.

Return type:

str

run(*nodes, **kwargs)#

Run the pipeline and obtain the return value(s) of one or more of its components. See pipeline-execution for details of the pipeline execution model.

Parameters:
Returns:

The pipeline result. If zero or one nodes are specified, the result is returned as-is. If multiple nodes are specified, their results are returned in a tuple.

Raises:
  • PipelineError – when there is a pipeline configuration error (e.g. a cycle).

  • ValueError – when one or more required inputs are missing.

  • TypeError – when one or more required inputs has an incompatible type.

  • other – exceptions thrown by components are passed through.

Return type:

object

run_all(*nodes, **kwargs)#

Run all nodes in the pipeline, or all nodes required to fulfill the requested node, and return a mapping with the full pipeline state (the data attached to each node). This is useful in cases where client code needs to be able to inspect the data at arbitrary steps of the pipeline. It differs from run() in two ways:

  1. It returns the data from all nodes as a mapping (dictionary-like object), not just the specified nodes as a tuple.

  2. If no nodes are specified, it runs all nodes instead of only the last node. This has the consequence of running nodes that are not required to fulfill the last node (such scenarios typically result from using use_first_of()).

Parameters:
  • nodes (str | Node[Any]) – The nodes to run, as positional arguments (if no nodes are specified, this method runs all nodes).

  • kwargs (object) – The inputs.

Returns:

The full pipeline state, with default set to the last node specified (either the last node in nodes, or the last node added to the pipeline).

Return type:

PipelineState

exception poprox_recommender.lkpipeline.PipelineError#

Bases: Exception

Pipeline configuration errors.

Note

This exception is only to note problems with the pipeline configuration and structure (e.g. circular dependencies). Errors running the pipeline are raised as-is.

exception poprox_recommender.lkpipeline.PipelineWarning#

Bases: Warning

Pipeline configuration and setup warnings. We also emit warnings to the logger in many cases, but this allows critical ones to be visible even if the client code has not enabled logging.

Note

This warning is only to note problems with the pipeline configuration and structure (e.g. circular dependencies). Errors running the pipeline are raised as-is.

class poprox_recommender.lkpipeline.Node(name, *, types=None)#

Bases: Generic[ND]

Representation of a single node in a Pipeline.

Parameters:
name: str#

The name of this node.

types: set[type] | None#

The set of valid data types of this node, or None for no typechecking.

class poprox_recommender.lkpipeline.Configurable(*args, **kwargs)#

Bases: Protocol

Interface for configurable objects such as pipeline components with settings or hyperparameters. A configurable object supports two operations:

  • saving its configuration with get_config().

  • creating a new instance from a saved configuration with the class method from_config().

An object must implement both of these methods to be considered configurable. Components extending the Component class automatically have working versions of these methods if they define their constructor parameters and fields appropriately.

Note

Configuration data should be JSON-compatible (strings, numbers, etc.).

classmethod from_config(cfg)#

Reinstantiate this component from configuration values.

Parameters:

cfg (dict[str, Any])

Return type:

Self

get_config()#

Get this component’s configured hyperparameters.

Return type:

dict[str, object]

class poprox_recommender.lkpipeline.PipelineConfig(**data)#

Bases: BaseModel

Root type for serialized pipeline configuration. A pipeline config contains the full configuration, components, and wiring for the pipeline, but does not contain the

Parameters:

data (Any)

meta: PipelineMeta#

Pipeline metadata.

inputs: list[PipelineInput]#

Pipeline inputs.

defaults: dict[str, str]#

Default pipeline wirings.

components: OrderedDict[str, PipelineComponent]#

Pipeline components, with their configurations and wiring.

aliases: dict[str, str]#

Pipeline node aliases.

literals: dict[str, PipelineLiteral]#

Literals

model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}#

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'aliases': FieldInfo(annotation=dict[str, str], required=False, default_factory=dict), 'components': FieldInfo(annotation=OrderedDict[str, PipelineComponent], required=False, default_factory=OrderedDict), 'defaults': FieldInfo(annotation=dict[str, str], required=False, default_factory=dict), 'inputs': FieldInfo(annotation=list[PipelineInput], required=False, default_factory=list), 'literals': FieldInfo(annotation=dict[str, PipelineLiteral], required=False, default_factory=dict), 'meta': FieldInfo(annotation=PipelineMeta, required=True)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class poprox_recommender.lkpipeline.Lazy(*args, **kwargs)#

Bases: Protocol, Generic[T]

Type for accepting lazy inputs from the pipeline runner. If your function may or may not need one of its inputs, declare the type with this to only run it as needed:

def my_component(input: str, backup: Lazy[str]) -> str:
    if input == 'invalid':
        return backup.get()
    else:
        return input
get()#

Get the value behind this lazy instance.

Return type:

T

class poprox_recommender.lkpipeline.Component(*args, **kwargs)#

Bases: Configurable, Generic[COut]

Base class for pipeline component objects. Any component that is not just a function should extend this class.

Components are Configurable. The base class provides default implementations of get_config() and from_config() that inspect the constructor arguments and instance variables to automatically provide configuration support. By default, all constructor parameters will be considered configuration parameters, and their values will be read from instance variables of the same name. Components can also define EXTRA_CONFIG_FIELDS and IGNORED_CONFIG_FIELDS class variables to modify this behavior. Missing attributes are silently ignored.

To work as components, derived classes also need to implement a __call__ method to perform their operations.

EXTRA_CONFIG_FIELDS: ClassVar[list[str]] = []#

Names of instance variables that should be included in the configuration dictionary even though they do not correspond to named constructor arguments.

Note

This is rarely needed, and usually needs to be coupled with **kwargs in the constructor to make the resulting objects constructible.

IGNORED_CONFIG_FIELDS: ClassVar[list[str]] = []#

Names of constructor parameters that should be excluded from the configuration dictionary.

get_config()#

Get the configuration by inspecting the constructor and instance variables.

Return type:

dict[str, object]

classmethod from_config(cfg)#

Create a class from the specified construction. Configuration elements are passed to the constructor as keywrod arguments.

Parameters:

cfg (dict[str, Any])

Return type:

Self

Modules

components

Definition of the component interfaces.

config

Pydantic models for pipeline configuration and serialization support.

nodes

runner

Pipeline runner logic.

state

types