public class GRpcAgentFileStreamServiceImpl extends com.netflix.genie.proto.FileStreamServiceGrpc.FileStreamServiceImplBase implements AgentFileStreamService
AgentFileStreamService
gRPC implementation.
Receives and caches manifests from connected agents.
Allows requesting a file, which is returned in the form of a AgentFileResource
.
Implementation overview: Each agent maintains a single "control" bidirectional stream (through the 'sync' RPC method). This stream is used by the agent to regularly push manifests. And it is used by the server to request files.
When a file is requested, the agent opens a separate "transfer" bidirectional stream (through the 'transmit' RPC method) for that file transfer and starts sending chunks (currently one at the time), the server sends acknowledgements in the same stream.
This service returns a resource immediately, but maintains a handle on a buffer where data is written as it is received.
AgentFileStreamService.AgentFileResource
Constructor and Description |
---|
GRpcAgentFileStreamServiceImpl(JobDirectoryManifestProtoConverter converter,
org.springframework.scheduling.TaskScheduler taskScheduler,
AgentFileStreamProperties properties,
io.micrometer.core.instrument.MeterRegistry registry)
Constructor.
|
Modifier and Type | Method and Description |
---|---|
java.util.Optional<DirectoryManifest> |
getManifest(java.lang.String jobId)
Returns the manifest for a given job, boxed in an
Optional . |
java.util.Optional<AgentFileStreamService.AgentFileResource> |
getResource(java.lang.String jobId,
java.nio.file.Path relativePath,
java.net.URI uri,
org.springframework.http.HttpRange range)
Returns a Resource for the given job file boxed in an
Optional . |
io.grpc.stub.StreamObserver<com.netflix.genie.proto.AgentManifestMessage> |
sync(io.grpc.stub.StreamObserver<com.netflix.genie.proto.ServerControlMessage> responseObserver) |
io.grpc.stub.StreamObserver<com.netflix.genie.proto.AgentFileMessage> |
transmit(io.grpc.stub.StreamObserver<com.netflix.genie.proto.ServerAckMessage> responseObserver) |
public GRpcAgentFileStreamServiceImpl(JobDirectoryManifestProtoConverter converter, org.springframework.scheduling.TaskScheduler taskScheduler, AgentFileStreamProperties properties, io.micrometer.core.instrument.MeterRegistry registry)
converter
- The JobDirectoryManifestProtoConverter
instance to usetaskScheduler
- A TaskScheduler
instance to useproperties
- The service propertiesregistry
- The meter registrypublic java.util.Optional<AgentFileStreamService.AgentFileResource> getResource(java.lang.String jobId, java.nio.file.Path relativePath, java.net.URI uri, @Nullable org.springframework.http.HttpRange range)
Optional
.
If the service is unable to determine whether the file exists, the optional is empty.
In all other cases, the optional is not empty. However the resource may return false to exist()
calls
(if the file is not believed to exist on the agent) or false to isReadable()
if the file cannot be
streamed for other reasons.getResource
in interface AgentFileStreamService
jobId
- the job idrelativePath
- the relative path in the job directoryuri
- the file uri //TODO redundantrange
- the list of ranges requested (RFC 7233) or null if no range is specifiedResource
public java.util.Optional<DirectoryManifest> getManifest(java.lang.String jobId)
Optional
.
The manifest may not be present if the agent is not connected to this node (for example because execution has
completed, or because the agent is connected to a different node).getManifest
in interface AgentFileStreamService
jobId
- the job idDirectoryManifest
public io.grpc.stub.StreamObserver<com.netflix.genie.proto.AgentManifestMessage> sync(io.grpc.stub.StreamObserver<com.netflix.genie.proto.ServerControlMessage> responseObserver)
sync
in class com.netflix.genie.proto.FileStreamServiceGrpc.FileStreamServiceImplBase
public io.grpc.stub.StreamObserver<com.netflix.genie.proto.AgentFileMessage> transmit(io.grpc.stub.StreamObserver<com.netflix.genie.proto.ServerAckMessage> responseObserver)
transmit
in class com.netflix.genie.proto.FileStreamServiceGrpc.FileStreamServiceImplBase