Created by Clay McCoy
latest version of slides is on Github
git clone -b groovySwf https://github.com/claymccoy/reveal.js.git
open reveal.js/index.html
"Amazon Simple Workflow (Amazon SWF) is a task coordination and state management service for cloud applications. With Amazon SWF, you can stop writing complex glue-code and state machinery and invest more in the business logic that makes your applications unique."
keep distributed tasks from executing on your dev machine
You are responsible for doing these things and more:
You must really know about:
SWF API
interfaces
@Workflow
@WorkflowRegistrationOptions(
defaultExecutionStartToCloseTimeoutSeconds = 60L)
interface TestWorkflow {
@Execute(version = '1.0')
void doIt()
}
@Activities(version = "1.0")
@ActivityRegistrationOptions(
defaultTaskScheduleToStartTimeoutSeconds = -1L,
defaultTaskStartToCloseTimeoutSeconds = 300L)
interface TestActivities {
String doSomething()
void consumeSomething(String thing)
}
interface TestActivities {
String doSomething()
void consumeSomething(String thing)
}
becomes
interface TestActivitiesClient {
Promise<String> doSomething()
void consumeSomething(Promise<String> thing)
}
AWS Java SDK
Promise is a future like object that is used as a placeholder for a result of an asynchronous API. Java Future is a synchronization construct that is used to block a thread that called get() method if result is not available yet. Promise differs from it as it cannot be used for blocking. A call to its get() method throws IllegalStateException if result is not available yet. The correct way to ensure that Promise is ready is to access it from a method that is annotated as @Asynchronous and have the given Promise as one its arguments or from Task.doExecute() method assuming that promise was passed to the Task as a constructor parameter.
class TestWorkflowImpl implements TestWorkflow {
private final TestActivitiesClient client = new TestActivitiesClientImpl();
void doIt() {
Promise<String> result = client.doSomething()
waitForSomething(result)
}
@Asynchronous
void waitForSomething(Promise<String> something) {
client.consumeSomething(something)
}
}
Abstract away activity concerns
interface ActivityOperations {
void recordHeartbeat(String message)
WorkflowExecution getWorkflowExecution()
String getTaskToken()
}
Abstract away worflow concerns
interface WorkflowOperations<A> {
A getActivities()
<T> Promise<T> waitFor(Promise promise, Closure<Promise<T>> work)
<T> Promise<T> promiseFor(T value)
AndPromise allPromises(Promise... promises)
OrPromise anyPromises(Promise... promises)
Promise<Void> status(String message)
<T> DoTry<T> doTry(Closure<Promise<T>> work)
<T> Promise<T> retry(RetryPolicy retryPolicy,
Closure<Promise<T>> work)
Promise<Void> timer(long delaySeconds)
}
<T> Promise<T> waitFor(Promise promise, Closure<Promise<T>> work)
Promise<String> promise = promiseFor(activities.doSomething())
waitFor(promise) {
status promise.get()
Promise.Void
}
waitFor(activities.doSomething()) {
status it
}
class TestWorkflowImpl implements TestWorkflow {
private final TestActivitiesClient client = new TestActivitiesClientImpl();
void doIt() {
Promise<String> result = client.doSomething()
waitForSomething(result)
}
@Asynchronous
void waitForSomething(Promise<String> something) {
client.consumeSomething(something)
}
}
Flow vs Glisten Workflow Impl
class TestWorkflowImpl implements TestWorkflow {
@Delegate
WorkflowOperations<TestActivities> workflowOperations = SwfWorkflowOperations.of(TestActivities)
void doIt() {
waitFor(activities.doSomething()) {
activities.consumeSomething(it)
}
}
}
Glisten Workflow Impl
class TestWorkflowImpl implements TestWorkflow {
@Delegate
WorkflowOperations<TestActivities> workflowOperations = SwfWorkflowOperations.of(TestActivities)
void doIt() {
waitFor(activities.doSomething()) {
activities.consumeSomething(it)
}
}
}
new TryCatchFinally() {
@Override
protected void doTry() throws Throwable {
// try something
}
@Override
protected void doCatch(Throwable e) throws Throwable {
// handle error
}
@Override
protected void doFinally() throws Throwable {
// mandatory clean up
}
}
Flow vs Glisten Try Catch Finally
doTry {
// try something
} withCatch { Throwable t ->
// handle error
} withFinally {
// mandatory clean up
}.result
Glisten Try Catch Finally
doTry {
// try something
} withCatch { Throwable t ->
// handle error
} withFinally {
// mandatory clean up
}.result
(by abusing tags and history)
@Workflow
@WorkflowRegistrationOptions(
defaultExecutionStartToCloseTimeoutSeconds = 60L)
interface BayAreaTripWorkflow {
@Execute(version = '1.0')
void start(String name,
Collection<BayAreaLocation> previouslyVisited)
@GetState
List<String> getLogHistory()
}
class BayAreaTripWorkflowImpl implements BayAreaTripWorkflow {
@Delegate
WorkflowOperations<BayAreaTripActivities> workflowOperations =
SwfWorkflowOperations.of(BayAreaTripActivities)
@Override
void start(String name,
Collection<BayAreaLocation> previouslyVisited) {
Promise<BayAreaLocation> destinationPromise =
determineDestination(previouslyVisited)
waitFor(destinationPromise) { BayAreaLocation destination ->
waitFor(activities.goTo(name, destination)) {
...
}
}
}
}
@Activities(version = "1.0")
@ActivityRegistrationOptions(
defaultTaskScheduleToStartTimeoutSeconds = -1L,
defaultTaskStartToCloseTimeoutSeconds = 300L)
interface BayAreaTripActivities {
interface BayAreaTripActivities {
@ActivityRegistrationOptions(defaultTaskScheduleToStartTimeoutSeconds = -1L,
defaultTaskStartToCloseTimeoutSeconds = 86400L)
boolean askYesNoQuestion(String question)
String goTo(String name, BayAreaLocation location)
@ActivityRegistrationOptions(defaultTaskScheduleToStartTimeoutSeconds = -1L,
defaultTaskStartToCloseTimeoutSeconds = 300L,
defaultTaskHeartbeatTimeoutSeconds = 30L)
String hike(String somewhere)
String enjoy(String something)
String win(String game)
}
class BayAreaTripActivitiesImpl implements BayAreaTripActivities {
@Delegate ActivityOperations activityOperations =
new SwfActivityOperations()
@Override
String goTo(String name, BayAreaLocation location) {
"${name} went to ${location}."
}
...
}
determine location
Promise<BayAreaLocation> determineDestination(previouslyVisited) {
if (!previouslyVisited.contains(BayAreaLocation.GoldenGateBridge)) {
return promiseFor(BayAreaLocation.GoldenGateBridge)
}
if (!previouslyVisited.contains(BayAreaLocation.Redwoods)) {
return promiseFor(BayAreaLocation.Redwoods)
} else {
waitFor(activities.askYesNoQuestion('Like roller coasters?')) {
boolean isThrillSeeker ->
if (isThrillSeeker) {
return promiseFor(BayAreaLocation.Boardwalk)
}
promiseFor(BayAreaLocation.Monterey)
}
}
}
determine location
@ManualActivityCompletion
boolean askYesNoQuestion(String question) {
sendManualActivityCompletionInfo(taskToken, workflowExecution)
true
}
at bridge
private Promise<Void> doAtBridge() {
waitFor(activities.hike('across the bridge')) { status it }
}
at bridge
String hike(String somewhere) {
int stepsTaken = 0
int totalStepsForHike = getHikeLengthInSteps(somewhere)
while (stepsTaken < totalStepsForHike) {
recordHeartbeat("Took ${++stepsTaken} steps.")
}
"And hiked ${somewhere}."
}
at redwoods
private Promise<Void> doAtRedwoods() {
status 'And stretched for 10 seconds before hiking.'
waitFor(timer(10)) {
DoTry<String> hiking = doTry {
promiseFor(activities.hike('through redwoods'))
}
DoTry<Void> countDown = cancellableTimer(30)
waitFor(anyPromises(countDown.getResult(), hiking.getResult())) {
if (hiking.getResult().isReady()) {
countDown.cancel(null)
status "${hiking.getResult().get()}"
} else {
hiking.cancel(null)
status 'And ran out of time when hiking.'
}
}
}
}
at boardwalk
private Promise<Void> doAtBoardwalk() {
int numberOfTokensGiven = 3
int numberOfTokens = numberOfTokensGiven
RetryPolicy retryPolicy = new ExponentialRetryPolicy(60).
withMaximumAttempts(numberOfTokens).
withExceptionsToRetry([IllegalStateException])
DoTry<String> tryToWin = doTry {
return retry(retryPolicy) {
numberOfTokens--
promiseFor(activities.win('a carnival game'))
}
} withCatch { Throwable e ->
return promiseFor("${e.message} ${numberOfTokensGiven} times.")
}
...
at boardwalk
...
waitFor(tryToWin.result) {
status it
if (numberOfTokens > 0) {
waitFor(activities.enjoy('a roller coaster')) { status it }
}
Promise.Void()
}
}
at boardwalk
String win(String game) {
if (isWinner()) {
return "And won ${game}."
} else {
throw new IllegalStateException("And lost ${game}.")
}
}
at Monterey
private Promise<Void> doAtMonterey() {
Promise<String> eating = promiseFor(activities.enjoy('eating seafood'))
Promise<String> watching = promiseFor(activities.enjoy('watching sea lions'))
waitFor(allPromises(eating, watching)) {
status "${eating.get()} ${watching.get()}"
doTry {
promiseFor(activities.enjoy('looking for sea glass on the beach'))
} withCatch { Throwable t ->
status t.message
promiseFor(activities.enjoy('the aquarium'))
} withFinally { String result ->
status result
waitFor(activities.enjoy('the 17-Mile Drive')) { status it }
}
Promise.Void()
}
}
class SwfWorkflowOperations<A> extends WorkflowOperations<A> {
A getActivities() {
AsyncCaller.of(activitiesType, activitySchedulingOptions)
}
<T> Promise<T> waitFor(Promise promise, Closure<Promise<T>> work) {
new Functor([promise] as Promise[]) {
protected Promise doExecute() {
work(promise.get())
}
}
}
...
class AsyncCaller<T> {
final Class<T> type
final DynamicActivitiesClient dynamicActivitiesClient
def methodMissing(String name, args) {
Activities activities = reflectionHelper.
findAnnotationOnClass(Activities)
Method method = reflectionHelper.
findMethodForNameAndArgsOrFail(name, args as List)
ActivityType activityType = new ActivityType(
name: "${type.simpleName}.${method.name}",
version: activities?.version())
List<Promise> promises = args.collect { Promise.asPromise(it) }
return dynamicActivitiesClient.scheduleActivity(activityType,
promises as Promise[], method.returnType)
}
}
class LocalWorkflowOperations<A> extends WorkflowOperations<A> {
final A activities
<T> Promise<T> waitFor(Promise promise, Closure<Promise<T>> work) {
if (promise.isReady()) {
return work(promise.get())
}
new Settable()
}
...
class BayAreaTripActivitiesSpec extends Specification {
ActivityOperations mockActivity = Mock(ActivityOperations)
BayAreaTripActivitiesImpl bayAreaTripActivities =
new BayAreaTripActivitiesImpl(
activity: mockActivity,
hikeNameToLengthInSteps: ['there': 3])
def 'should hike'() {
when: String expectedResult = bayAreaTripActivities.hike('there')
then: expectedResult == 'And hiked there.'
then: mockActivity.recordHeartbeat('Took 1 steps.')
then: mockActivity.recordHeartbeat('Took 2 steps.')
then: mockActivity.recordHeartbeat('Took 3 steps.')
}
}
class BayAreaTripWorkflowSpec extends Specification {
BayAreaTripActivities mockActivities = Mock(BayAreaTripActivities)
BayAreaTripWorkflow workflow = new BayAreaTripWorkflowImpl(
workflow: LocalWorkflowOperations.of(mockActivities))
def 'should go to Golden Gate Bridge'() {
when: workflow.start('Clay', [])
then:
workflow.logHistory == ['Clay went to the Golden Gate Bridge.',
'And hiked across the bridge.']
then: 1 * mockActivities.goTo('Clay', BayAreaLocation.GoldenGateBridge) >> 'Clay went to the Golden Gate Bridge.'
then: 1 * mockActivities.hike('across the bridge') >> 'And hiked across the bridge.'
}
}