Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the agent #684

Merged
merged 8 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
321 changes: 291 additions & 30 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ blake2 = "0.9.0"
env_logger = "0.10.0"
futures = { version = "0.3.1", package = "futures" }
hyper = "0.14.2"
k8s-openapi = { version = "0.17.0", default-features = false, features = ["schemars", "v1_23"] }
kube = { version = "0.80.0", features = ["derive"] }
kube-runtime = "0.80.0"
itertools = "0.12.0"
k8s-openapi = { version = "0.20.0", default-features = false, features = ["schemars", "v1_23"] }
kube = { version = "0.87.1", features = ["derive"] }
kube-runtime = { version = "0.87.1", features = ["unstable-runtime-reconcile-on"] }
lazy_static = "1.4"
log = "0.4"
mockall_double = "0.3.1"
Expand All @@ -34,8 +35,9 @@ serde = "1.0.104"
serde_derive = "1.0.104"
serde_json = "1.0.45"
serde_yaml = { version = "0.8.11", optional = true }
thiserror = "1.0.50"
tokio = { version = "1.0", features = ["rt-multi-thread", "time", "fs", "macros", "net"] }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-stream = { version = "0.1", features = ["net", "sync"] }
tonic = "0.10"
tower = "0.4.8"

Expand Down
7 changes: 5 additions & 2 deletions agent/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
fn main() {
tonic_build::configure()
.build_client(true)
.out_dir("./src/util")
.compile(&["./proto/pluginapi.proto"], &["./proto"])
.out_dir("./src/plugin_manager")
.compile(
&["./proto/pluginapi.proto", "./proto/podresources.proto"],
&["./proto"],
)
.expect("failed to compile protos");
}
352 changes: 200 additions & 152 deletions agent/proto/pluginapi.proto
Original file line number Diff line number Diff line change
@@ -1,152 +1,200 @@
syntax = 'proto3';

package v1beta1;


// Registration is the service advertised by the Kubelet
// Only when Kubelet answers with a success code to a Register Request
// may Device Plugins start their service
// Registration may fail when device plugin version is not supported by
// Kubelet or the registered resourceName is already taken by another
// active device plugin. Device plugin is expected to terminate upon registration failure
service Registration {
rpc Register(RegisterRequest) returns (Empty) {}
}

message DevicePluginOptions {
// Indicates if PreStartContainer call is required before each container start
bool pre_start_required = 1;
}

message RegisterRequest {
// Version of the API the Device Plugin was built against
string version = 1;
// Name of the unix socket the device plugin is listening on
// PATH = path.Join(DevicePluginPath, endpoint)
string endpoint = 2;
// Schedulable resource name. As of now it's expected to be a DNS Label
string resource_name = 3;
// Options to be communicated with Device Manager
DevicePluginOptions options = 4;
}

message Empty {
}

// DevicePlugin is the service advertised by Device Plugins
service DevicePlugin {
// GetDevicePluginOptions returns options to be communicated with Device
// Manager
rpc GetDevicePluginOptions(Empty) returns (DevicePluginOptions) {}

// ListAndWatch returns a stream of List of Devices
// Whenever a Device state change or a Device disapears, ListAndWatch
// returns the new list
rpc ListAndWatch(Empty) returns (stream ListAndWatchResponse) {}

// Allocate is called during container creation so that the Device
// Plugin can run device specific operations and instruct Kubelet
// of the steps to make the Device available in the container
rpc Allocate(AllocateRequest) returns (AllocateResponse) {}

// PreStartContainer is called, if indicated by Device Plugin during registeration phase,
// before each container start. Device plugin can run device specific operations
// such as reseting the device before making devices available to the container
rpc PreStartContainer(PreStartContainerRequest) returns (PreStartContainerResponse) {}
}

// ListAndWatch returns a stream of List of Devices
// Whenever a Device state change or a Device disapears, ListAndWatch
// returns the new list
message ListAndWatchResponse {
repeated Device devices = 1;
}

/* E.g:
* struct Device {
* ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e",
* State: "Healthy",
*} */
message Device {
// A unique ID assigned by the device plugin used
// to identify devices during the communication
// Max length of this field is 63 characters
string ID = 1;
// Health of the device, can be healthy or unhealthy, see constants.go
string health = 2;
}

// - PreStartContainer is expected to be called before each container start if indicated by plugin during registration phase.
// - PreStartContainer allows kubelet to pass reinitialized devices to containers.
// - PreStartContainer allows Device Plugin to run device specific operations on
// the Devices requested
message PreStartContainerRequest {
repeated string devicesIDs = 1;
}

// PreStartContainerResponse will be send by plugin in response to PreStartContainerRequest
message PreStartContainerResponse {
}

// - Allocate is expected to be called during pod creation since allocation
// failures for any container would result in pod startup failure.
// - Allocate allows kubelet to exposes additional artifacts in a pod's
// environment as directed by the plugin.
// - Allocate allows Device Plugin to run device specific operations on
// the Devices requested
message AllocateRequest {
repeated ContainerAllocateRequest container_requests = 1;
}

message ContainerAllocateRequest {
repeated string devicesIDs = 1;
}

// AllocateResponse includes the artifacts that needs to be injected into
// a container for accessing 'deviceIDs' that were mentioned as part of
// 'AllocateRequest'.
// Failure Handling:
// if Kubelet sends an allocation request for dev1 and dev2.
// Allocation on dev1 succeeds but allocation on dev2 fails.
// The Device plugin should send a ListAndWatch update and fail the
// Allocation request
message AllocateResponse {
repeated ContainerAllocateResponse container_responses = 1;
}

message ContainerAllocateResponse {
// List of environment variable to be set in the container to access one of more devices.
map<string, string> envs = 1;
// Mounts for the container.
repeated Mount mounts = 2;
// Devices for the container.
repeated DeviceSpec devices = 3;
// Container annotations to pass to the container runtime
map<string, string> annotations = 4;
}

// Mount specifies a host volume to mount into a container.
// where device library or tools are installed on host and container
message Mount {
// Path of the mount within the container.
string container_path = 1;
// Path of the mount on the host.
string host_path = 2;
// If set, the mount is read-only.
bool read_only = 3;
}

// DeviceSpec specifies a host device to mount into a container.
message DeviceSpec {
// Path of the device within the container.
string container_path = 1;
// Path of the device on the host.
string host_path = 2;
// Cgroups permissions of the device, candidates are one or more of
// * r - allows container to read from the specified device.
// * w - allows container to write to the specified device.
// * m - allows container to create device files that do not yet exist.
string permissions = 3;
}


syntax = "proto3";

package v1beta1;
diconico07 marked this conversation as resolved.
Show resolved Hide resolved


// Registration is the service advertised by the Kubelet
// Only when Kubelet answers with a success code to a Register Request
// may Device Plugins start their service
// Registration may fail when device plugin version is not supported by
// Kubelet or the registered resourceName is already taken by another
// active device plugin. Device plugin is expected to terminate upon registration failure
service Registration {
rpc Register(RegisterRequest) returns (Empty) {}
}

message DevicePluginOptions {
// Indicates if PreStartContainer call is required before each container start
bool pre_start_required = 1;
// Indicates if GetPreferredAllocation is implemented and available for calling
bool get_preferred_allocation_available = 2;
}

message RegisterRequest {
// Version of the API the Device Plugin was built against
string version = 1;
// Name of the unix socket the device plugin is listening on
// PATH = path.Join(DevicePluginPath, endpoint)
string endpoint = 2;
// Schedulable resource name. As of now it's expected to be a DNS Label
string resource_name = 3;
// Options to be communicated with Device Manager
DevicePluginOptions options = 4;
}

message Empty {
}

// DevicePlugin is the service advertised by Device Plugins
service DevicePlugin {
// GetDevicePluginOptions returns options to be communicated with Device
// Manager
rpc GetDevicePluginOptions(Empty) returns (DevicePluginOptions) {}

// ListAndWatch returns a stream of List of Devices
// Whenever a Device state change or a Device disappears, ListAndWatch
// returns the new list
rpc ListAndWatch(Empty) returns (stream ListAndWatchResponse) {}

// GetPreferredAllocation returns a preferred set of devices to allocate
// from a list of available ones. The resulting preferred allocation is not
// guaranteed to be the allocation ultimately performed by the
// devicemanager. It is only designed to help the devicemanager make a more
// informed allocation decision when possible.
rpc GetPreferredAllocation(PreferredAllocationRequest) returns (PreferredAllocationResponse) {}

// Allocate is called during container creation so that the Device
// Plugin can run device specific operations and instruct Kubelet
// of the steps to make the Device available in the container
rpc Allocate(AllocateRequest) returns (AllocateResponse) {}

// PreStartContainer is called, if indicated by Device Plugin during registeration phase,
// before each container start. Device plugin can run device specific operations
// such as resetting the device before making devices available to the container
rpc PreStartContainer(PreStartContainerRequest) returns (PreStartContainerResponse) {}
}

// ListAndWatch returns a stream of List of Devices
// Whenever a Device state change or a Device disappears, ListAndWatch
// returns the new list
message ListAndWatchResponse {
repeated Device devices = 1;
}

message TopologyInfo {
repeated NUMANode nodes = 1;
}

message NUMANode {
int64 ID = 1;
}

/* E.g:
* struct Device {
* ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e",
* Health: "Healthy",
* Topology:
* Node:
* ID: 1
*} */
message Device {
// A unique ID assigned by the device plugin used
// to identify devices during the communication
// Max length of this field is 63 characters
string ID = 1;
// Health of the device, can be healthy or unhealthy, see constants.go
string health = 2;
// Topology for device
TopologyInfo topology = 3;
}

// - PreStartContainer is expected to be called before each container start if indicated by plugin during registration phase.
// - PreStartContainer allows kubelet to pass reinitialized devices to containers.
// - PreStartContainer allows Device Plugin to run device specific operations on
// the Devices requested
message PreStartContainerRequest {
repeated string devicesIDs = 1;
}

// PreStartContainerResponse will be send by plugin in response to PreStartContainerRequest
message PreStartContainerResponse {
}

// PreferredAllocationRequest is passed via a call to GetPreferredAllocation()
// at pod admission time. The device plugin should take the list of
// `available_deviceIDs` and calculate a preferred allocation of size
// 'allocation_size' from them, making sure to include the set of devices
// listed in 'must_include_deviceIDs'.
message PreferredAllocationRequest {
repeated ContainerPreferredAllocationRequest container_requests = 1;
}

message ContainerPreferredAllocationRequest {
// List of available deviceIDs from which to choose a preferred allocation
repeated string available_deviceIDs = 1;
// List of deviceIDs that must be included in the preferred allocation
repeated string must_include_deviceIDs = 2;
// Number of devices to include in the preferred allocation
int32 allocation_size = 3;
}

// PreferredAllocationResponse returns a preferred allocation,
// resulting from a PreferredAllocationRequest.
message PreferredAllocationResponse {
repeated ContainerPreferredAllocationResponse container_responses = 1;
}

message ContainerPreferredAllocationResponse {
repeated string deviceIDs = 1;
}

// - Allocate is expected to be called during pod creation since allocation
// failures for any container would result in pod startup failure.
// - Allocate allows kubelet to exposes additional artifacts in a pod's
// environment as directed by the plugin.
// - Allocate allows Device Plugin to run device specific operations on
// the Devices requested
message AllocateRequest {
repeated ContainerAllocateRequest container_requests = 1;
}

message ContainerAllocateRequest {
repeated string devicesIDs = 1;
}

// AllocateResponse includes the artifacts that needs to be injected into
// a container for accessing 'deviceIDs' that were mentioned as part of
// 'AllocateRequest'.
// Failure Handling:
// if Kubelet sends an allocation request for dev1 and dev2.
// Allocation on dev1 succeeds but allocation on dev2 fails.
// The Device plugin should send a ListAndWatch update and fail the
// Allocation request
message AllocateResponse {
repeated ContainerAllocateResponse container_responses = 1;
}

message ContainerAllocateResponse {
// List of environment variable to be set in the container to access one of more devices.
map<string, string> envs = 1;
// Mounts for the container.
repeated Mount mounts = 2;
// Devices for the container.
repeated DeviceSpec devices = 3;
// Container annotations to pass to the container runtime
map<string, string> annotations = 4;
}

// Mount specifies a host volume to mount into a container.
// where device library or tools are installed on host and container
message Mount {
// Path of the mount within the container.
string container_path = 1;
// Path of the mount on the host.
string host_path = 2;
// If set, the mount is read-only.
bool read_only = 3;
}

// DeviceSpec specifies a host device to mount into a container.
message DeviceSpec {
// Path of the device within the container.
string container_path = 1;
// Path of the device on the host.
string host_path = 2;
// Cgroups permissions of the device, candidates are one or more of
// * r - allows container to read from the specified device.
// * w - allows container to write to the specified device.
// * m - allows container to create device files that do not yet exist.
string permissions = 3;
}
Loading
Loading