Orchestrating Tasks from the Cloud

With Groovy and AWS SWF

Created by Clay McCoy


latest version of slides is on Github

git clone -b groovySwf https://github.com/claymccoy/reveal.js.git
open reveal.js/index.html

About Me

About Me

  • Sr. Software Engineer @ Netflix
  • Engineering Tools Team
  • Contributes to Asgard

Asgard

  • web app written in Grails
  • used for deployment to AWS
  • open sourced on Github

Why does Netflix open source?

  • share with community
  • recruiting and retaining
  • we want you to use AWS like we do
  • fix AWS gaps, expose issues

Asgard needed a workflow service

  • Task coordination
  • Long lived tasks
  • Distributed tasks

Overview

  • AWS SWF
  • AWS Flow
  • Groovy and SWF
  • distributed workflow examples

Amazon SWF

"Amazon Simple Workflow (Amazon SWF) is a task coordination and state management service for cloud applications. With Amazon SWF, you can stop writing complex glue-code and state machinery and invest more in the business logic that makes your applications unique."

SWF compared to _?

  • write my own workflow framework
    • it is harder than you think
  • use an existing workflow framework
    • not a distributed service

SWF compared to SQS?

  • task based API vs message based API
  • archiving of tasks and querying execution details
  • distributed error handling
  • retries vs taking different action based message received count
  • routing and versioning task handlers vs managing mulitple queues
  • timers and timeouts

SWF guarantees

  • tasks will be performed once and only once
  • ordered tasks
  • task will either succeed with a result or notify of failure
  • stores each activity's result or failure

Amazon SWF concepts

  • long polling for tasks
  • decision tasks quickly decide what to do next
  • activity tasks do real work

Workflow Execution

  • Workflow IDs
    • Run ID
    • Workflow ID
    • unfortunately both are needed for uniqueness
  • tags
    • you get 5 that are Strings (256 characters)
    • indexed and intended for searching/filtering by
    • tempting to put your own metadata in

Workflow history

Workflow history

  • history is kept in AWS
  • work is executed on your machines
  • deciders get the history so far with decision tasks
  • many more types of history events

Routing tasks

keep distributed tasks from executing on your dev machine

  • domains
    • namespace
    • must be registered
    • cannot be deleted
  • tasklists
    • just Strings for routing within a domain
    • no need to register
  • versioning
    • can be set per workflow and per activity
    • allows different code within tasklists

Things to consider when writing a distributed workflow

  • serialization of inputs and results to Strings (4k bytes)
  • heartbeats for long running activities
    • better timeout behavior
    • display message of last heartbeat on error
    • more responsive cancel
  • idempotence where possible

Workflow timeouts

  • Workflow Start to Close
  • Decision Task Start to Close

Activity timeouts

  • Activity Task Start to Close
  • Activity Task Heartbeat
  • Activity Task Schedule to Start
  • Activity Task Schedule to Close

SWF API

You are responsible for doing these things and more:

SWF API

You must really know about:

SWF API

  • good for managing AWS SWF objects
    • Domain
    • Workflow Type
    • Activity Type
    • Workflow Execution
  • bad for writing workflows
    • consume history events and provide decisions
    • workflow becomes a low level state machine

Amazon Flow framework

  • higher level API removes boilerplate
  • decider logic looks like typical Java
  • provides pollers, deciders, history analyzers
  • registers workflows and activites by version

Flow based code structure

  • workflow impl is decider logic used to orchestrate
  • workflows should be stateless and quick (replayed)
  • activities do the real work

Flow based code structure

interfaces

@Workflow
@WorkflowRegistrationOptions(
    defaultExecutionStartToCloseTimeoutSeconds = 60L)
interface TestWorkflow {
@Execute(version = '1.0')
    void doIt()
}
@Activities(version = "1.0")
@ActivityRegistrationOptions(
    defaultTaskScheduleToStartTimeoutSeconds = -1L,
    defaultTaskStartToCloseTimeoutSeconds = 300L)
interface TestActivities {
    String doSomething()
    void consumeSomething(String thing)
}
  • Flow annotations
  • registers workflows and activites by version
  • configure timeouts

code generation

interface TestActivities {
    String doSomething()
    void consumeSomething(String thing)
}

becomes

interface TestActivitiesClient {
    Promise<String> doSomething()
    void consumeSomething(Promise<String> thing)
}
  • promises must be made

Promise VS Future

AWS Java SDK

Promise is a future like object that is used as a placeholder for a result of an asynchronous API. Java Future is a synchronization construct that is used to block a thread that called get() method if result is not available yet. Promise differs from it as it cannot be used for blocking. A call to its get() method throws IllegalStateException if result is not available yet. The correct way to ensure that Promise is ready is to access it from a method that is annotated as @Asynchronous and have the given Promise as one its arguments or from Task.doExecute() method assuming that promise was passed to the Task as a constructor parameter.

Flow Workflow Impl

class TestWorkflowImpl implements TestWorkflow {
    private final TestActivitiesClient client = new TestActivitiesClientImpl();

    void doIt() {
        Promise<String> result = client.doSomething()
		waitForSomething(result)
    }

    @Asynchronous
    void waitForSomething(Promise<String> something) {
        client.consumeSomething(something)
    }
}
  • client is just generated versions of Activities
  • @Asynchronous methods gate promises

Difficult Setup

Groovy is good at hiding boilerplate code...

Glisten

  • Flow - (AspectJ + generated code) + Groovy
  • ease of use SWF library
  • still uses core Flow objects
  • was part of Asgard, now a separate project on Github

Flow based code structure with Glisten

Flow based code structure with Glisten

Abstract away activity concerns

interface ActivityOperations {
    void recordHeartbeat(String message)

    WorkflowExecution getWorkflowExecution()

    String getTaskToken()
}
  • recordHeartbeat sends status to workflow
  • WorkflowExecution is used for id
  • task token is id of activity
  • explain activities first, workflow is more interesting

Abstract away worflow concerns

interface WorkflowOperations<A> {
    A getActivities()

    <T> Promise<T> waitFor(Promise promise, Closure<Promise<T>> work)
    <T> Promise<T> promiseFor(T value)

    AndPromise allPromises(Promise... promises)
    OrPromise anyPromises(Promise... promises)

    Promise<Void> status(String message)

    <T> DoTry<T> doTry(Closure<Promise<T>> work)
    <T> Promise<T> retry(RetryPolicy retryPolicy,
            Closure<Promise<T>> work)
    Promise<Void> timer(long delaySeconds)
}
<T> Promise<T> waitFor(Promise promise, Closure<Promise<T>> work)
Promise<String> promise = promiseFor(activities.doSomething())
waitFor(promise) {
    status promise.get()
    Promise.Void
}
  • promiseFor wraps activity result in a Promise
  • waitFor only executes work when Promise is ready
  • Promise.Void because work must return a Promise

or

waitFor(activities.doSomething()) {
    status it
}
  • pass waitFor activity itself
  • work closure is passed result when ready
  • status returns Promise.Void
class TestWorkflowImpl implements TestWorkflow {
    private final TestActivitiesClient client = new TestActivitiesClientImpl();

    void doIt() {
        Promise<String> result = client.doSomething()
		waitForSomething(result)
    }

    @Asynchronous
    void waitForSomething(Promise<String> something) {
        client.consumeSomething(something)
    }
}

Flow vs Glisten Workflow Impl

class TestWorkflowImpl implements TestWorkflow {
    @Delegate
    WorkflowOperations<TestActivities> workflowOperations = SwfWorkflowOperations.of(TestActivities)

    void doIt() {
        waitFor(activities.doSomething()) {
            activities.consumeSomething(it)
        }
    }
}

Glisten Workflow Impl

class TestWorkflowImpl implements TestWorkflow {
    @Delegate
    WorkflowOperations<TestActivities> workflowOperations = SwfWorkflowOperations.of(TestActivities)

    void doIt() {
        waitFor(activities.doSomething()) {
            activities.consumeSomething(it)
        }
    }
}
  • no generated clients (forPromise worst case)
  • no @Asynchronous methods (waitFor)
new TryCatchFinally() {
    @Override
    protected void doTry() throws Throwable {
        // try something
    }

    @Override
    protected void doCatch(Throwable e) throws Throwable {
        // handle error
    }

    @Override
    protected void doFinally() throws Throwable {
        // mandatory clean up
    }
}

Flow vs Glisten Try Catch Finally

doTry {
    // try something
} withCatch { Throwable t ->
    // handle error
} withFinally {
   // mandatory clean up
}.result

Glisten Try Catch Finally

doTry {
    // try something
} withCatch { Throwable t ->
    // handle error
} withFinally {
   // mandatory clean up
}.result
  • you don't have to manage result
  • creates a Flow TryCatchFinally
  • local version for use in unit tests

Glisten handles basic needs not met by SWF

(by abusing tags and history)

  • workflow execution description
    • shove data into a tag
    • hooks to put your own metadata into 4 remaining tags
  • log
    • status shoves data into history
    • execution context of DecisionTaskCompletedEvent
    • use a history analyzer to pull it out

Now for an Example

Workflow Interface

@Workflow
@WorkflowRegistrationOptions(
        defaultExecutionStartToCloseTimeoutSeconds = 60L)
interface BayAreaTripWorkflow {

    @Execute(version = '1.0')
    void start(String name,
            Collection<BayAreaLocation> previouslyVisited)

    @GetState
    List<String> getLogHistory()
}
  • flow annotations
  • timeouts
  • versioning
  • workflow entry point
  • workflow inputs
  • state put on DecisionTaskCompletedEvent

Workflow Implementation Overview

class BayAreaTripWorkflowImpl implements BayAreaTripWorkflow {
    @Delegate
    WorkflowOperations<BayAreaTripActivities> workflowOperations =
    	SwfWorkflowOperations.of(BayAreaTripActivities)
    @Override
    void start(String name,
    		Collection<BayAreaLocation> previouslyVisited) {
        Promise<BayAreaLocation> destinationPromise =
        	determineDestination(previouslyVisited)
        waitFor(destinationPromise) { BayAreaLocation destination ->
            waitFor(activities.goTo(name, destination)) {
                ...
            }
        }
    }
}
  • implementation is decider logic
  • workflowOperations handles workflow concerns
  • @Delegate makes operations available (waitFor, activities)

Activities Interface

@Activities(version = "1.0")
@ActivityRegistrationOptions(
        defaultTaskScheduleToStartTimeoutSeconds = -1L,
        defaultTaskStartToCloseTimeoutSeconds = 300L)
interface BayAreaTripActivities {
  • more flow annotations
  • more timeouts
  • more versioning
  • this time for all activities

More Activities Interface

interface BayAreaTripActivities {
    @ActivityRegistrationOptions(defaultTaskScheduleToStartTimeoutSeconds = -1L,
            defaultTaskStartToCloseTimeoutSeconds = 86400L)
    boolean askYesNoQuestion(String question)

    String goTo(String name, BayAreaLocation location)

    @ActivityRegistrationOptions(defaultTaskScheduleToStartTimeoutSeconds = -1L,
            defaultTaskStartToCloseTimeoutSeconds = 300L,
            defaultTaskHeartbeatTimeoutSeconds = 30L)
    String hike(String somewhere)

    String enjoy(String something)

    String win(String game)
}
  • activity timeout overrides
  • activities correspond to blue ovals
  • activity annotation exists (name and version)

Activities Implementation Overview

class BayAreaTripActivitiesImpl implements BayAreaTripActivities {

    @Delegate ActivityOperations activityOperations =
        new SwfActivityOperations()

    @Override
    String goTo(String name, BayAreaLocation location) {
        "${name} went to ${location}."
    }
    ...
}
  • activities are POGOs (even POJOs)
  • dependencies can be injected
  • activityOperations handles activity concerns

Choosing a location

Workflow Implementation

determine location

Promise<BayAreaLocation> determineDestination(previouslyVisited) {
  if (!previouslyVisited.contains(BayAreaLocation.GoldenGateBridge)) {
    return promiseFor(BayAreaLocation.GoldenGateBridge)
  }
  if (!previouslyVisited.contains(BayAreaLocation.Redwoods)) {
    return promiseFor(BayAreaLocation.Redwoods)
  } else {
    waitFor(activities.askYesNoQuestion('Like roller coasters?')) {
            boolean isThrillSeeker ->
        if (isThrillSeeker) {
            return promiseFor(BayAreaLocation.Boardwalk)
        }
        promiseFor(BayAreaLocation.Monterey)
    }
  }
}
  • askYesNoQuestion is an activity
  • returns promise since some results depend on activity

Activities Implementation

determine location

@ManualActivityCompletion
boolean askYesNoQuestion(String question) {
    sendManualActivityCompletionInfo(taskToken, workflowExecution)
    true
}
  • activityOperations supplies taskToken and workflowExecution
  • @ManualActivityCompletion - result supplied manually
  • does not matter what is returned here

Golden Gate Bridge

Workflow History

Workflow Implementation

at bridge

private Promise<Void> doAtBridge() {
    waitFor(activities.hike('across the bridge')) { status it }
}

Activities Implementation

at bridge

String hike(String somewhere) {
    int stepsTaken = 0
    int totalStepsForHike = getHikeLengthInSteps(somewhere)
    while (stepsTaken < totalStepsForHike) {
        recordHeartbeat("Took ${++stepsTaken} steps.")
    }
    "And hiked ${somewhere}."
}
  • recordHeartbeat is a pulse for the workflow

Redwoods

  • timers

Workflow History

Workflow Implementation

at redwoods

private Promise<Void> doAtRedwoods() {
    status 'And stretched for 10 seconds before hiking.'
    waitFor(timer(10)) {
        DoTry<String> hiking = doTry {
            promiseFor(activities.hike('through redwoods'))
        }
        DoTry<Void> countDown = cancellableTimer(30)
        waitFor(anyPromises(countDown.getResult(), hiking.getResult())) {
            if (hiking.getResult().isReady()) {
                countDown.cancel(null)
                status "${hiking.getResult().get()}"
            } else {
                hiking.cancel(null)
                status 'And ran out of time when hiking.'
            }
        }
    }
}
  • timer fires, then do something
  • do things and stop when ANY one finishes
  • DoTry allows us to cancel other activity

Santa Cruz Boardwalk

  • retry
  • retry

Workflow Implementation

at boardwalk

private Promise<Void> doAtBoardwalk() {
    int numberOfTokensGiven = 3
    int numberOfTokens = numberOfTokensGiven
    RetryPolicy retryPolicy = new ExponentialRetryPolicy(60).
            withMaximumAttempts(numberOfTokens).
            withExceptionsToRetry([IllegalStateException])
    DoTry<String> tryToWin = doTry {
        return retry(retryPolicy) {
            numberOfTokens--
            promiseFor(activities.win('a carnival game'))
        }
    } withCatch { Throwable e ->
        return promiseFor("${e.message} ${numberOfTokensGiven} times.")
    }
    ...
  • Flow RetryPolicy
  • withCatch catches last exception

Workflow Implementation

at boardwalk

    ...
    waitFor(tryToWin.result) {
        status it
        if (numberOfTokens > 0) {
            waitFor(activities.enjoy('a roller coaster')) { status it }
        }
        Promise.Void()
    }
}

Activities Implementation

at boardwalk

String win(String game) {
    if (isWinner()) {
        return "And won ${game}."
    } else {
        throw new IllegalStateException("And lost ${game}.")
    }
}
  • win throws exception for retry

Monterey Bay

Workflow Implementation

at Monterey

private Promise<Void> doAtMonterey() {
  Promise<String> eating = promiseFor(activities.enjoy('eating seafood'))
  Promise<String> watching = promiseFor(activities.enjoy('watching sea lions'))
  waitFor(allPromises(eating, watching)) {
    status "${eating.get()} ${watching.get()}"
    doTry {
      promiseFor(activities.enjoy('looking for sea glass on the beach'))
    } withCatch { Throwable t ->
      status t.message
      promiseFor(activities.enjoy('the aquarium'))
    } withFinally { String result ->
      status result
      waitFor(activities.enjoy('the 17-Mile Drive')) { status it }
    }
    Promise.Void()
  }
}
  • allPromises
  • try, catch, and finally (distributed)
  • finally closure gets result (from try or catch)

Implementation of Glisten

Glisten SWF Implementation

class SwfWorkflowOperations<A> extends WorkflowOperations<A> {
    A getActivities() {
        AsyncCaller.of(activitiesType, activitySchedulingOptions)
    }

    <T> Promise<T> waitFor(Promise promise, Closure<Promise<T>> work) {
        new Functor([promise] as Promise[]) {
            protected Promise doExecute() {
                work(promise.get())
            }
        }
    }
    ...
  • AsyncCaller
  • Functor is a Flow class

Glisten SWF Implementation

class AsyncCaller<T> {
    final Class<T> type
    final DynamicActivitiesClient dynamicActivitiesClient

    def methodMissing(String name, args) {
        Activities activities = reflectionHelper.
            findAnnotationOnClass(Activities)
        Method method = reflectionHelper.
            findMethodForNameAndArgsOrFail(name, args as List)
        ActivityType activityType = new ActivityType(
            name: "${type.simpleName}.${method.name}",
            version: activities?.version())
        List<Promise> promises = args.collect { Promise.asPromise(it) }
        return dynamicActivitiesClient.scheduleActivity(activityType,
            promises as Promise[], method.returnType)
    }
}
  • DynamicActivitiesClient is a Flow class
  • use reflection to get activity name and version from annotations

Glisten Local Implementation

class LocalWorkflowOperations<A> extends WorkflowOperations<A> {
    final A activities

    <T> Promise<T> waitFor(Promise promise, Closure<Promise<T>> work) {
        if (promise.isReady()) {
            return work(promise.get())
        }
        new Settable()
    }
    ...
  • regular activities reference
  • promise.isReady()

Activities Unit Tests

class BayAreaTripActivitiesSpec extends Specification {
    ActivityOperations mockActivity = Mock(ActivityOperations)
    BayAreaTripActivitiesImpl bayAreaTripActivities =
        new BayAreaTripActivitiesImpl(
            activity: mockActivity,
            hikeNameToLengthInSteps: ['there': 3])

    def 'should hike'() {
        when: String expectedResult = bayAreaTripActivities.hike('there')
        
        then: expectedResult == 'And hiked there.'
        then: mockActivity.recordHeartbeat('Took 1 steps.')
        then: mockActivity.recordHeartbeat('Took 2 steps.')
        then: mockActivity.recordHeartbeat('Took 3 steps.')
    }
}

Workflow Unit Tests

class BayAreaTripWorkflowSpec extends Specification {
    BayAreaTripActivities mockActivities = Mock(BayAreaTripActivities)
    BayAreaTripWorkflow workflow = new BayAreaTripWorkflowImpl(
        workflow: LocalWorkflowOperations.of(mockActivities))

    def 'should go to Golden Gate Bridge'() {
        when: workflow.start('Clay', [])

        then:
        workflow.logHistory == ['Clay went to the Golden Gate Bridge.',
                                'And hiked across the bridge.']
        then: 1 * mockActivities.goTo('Clay', BayAreaLocation.GoldenGateBridge) >> 'Clay went to the Golden Gate Bridge.'
        then: 1 * mockActivities.hike('across the bridge') >> 'And hiked across the bridge.'
    }
}

Show Asgard's use of SWF

Asgard Show Episode 2 (30:45 - 38:30)

Questions