
Workflow implementations
In this SWF example, we will create four different Java classes:
- Types.java: It has the project's domain, workflow, and activity type data which is shared with other components. It will help to handle registering these types with Amazon SWF.
- Activity.java (activity worker): It polls activity tasks and, in response, it runs activities.
- Worker.java (workflow worker or decider): Polls for decision tasks and schedules new tasks.
- Starter.java (workflow starter): Starts new workflow executions.
You need to import two classes, such as AmazonSimpleWorkflow, AmazonSimpleWorkflowClientBuilder, and the simpleworkflow.model package for this example.
Also create an instance of AmazonSimpleWorkflowClientBuilder to implement Amazon SWF in your application. You need to add this code for all classes:
Private static final AmazonSimpleWorkflow simpleWorkflow = AmazonSimpleWorkflowClientBuilder.defaultClient();
In Types.java, add the following constants into the file, which will used throughout the application:
public final static String DOMAIN = "ExampleDomain";
public final static String TASKLIST = "ExampleTasklist";
public final static String WORKFLOW = "ExampleWorkflow";
public final static String WORKFLOW_VERSION = "1.0";
public final static String ACTIVITY = "ExampleActivity";
public final static String ACTIVITY_VERSION = "1.0";
SWF components can communicate with each other if they are in the same domain. The following code will create the method to register the domain:
public static void registerDomain() {
try {
System.out.println("Register the domain '" + DOMAIN + "'.");
simpleWorkflow.registerDomain(new RegisterDomainRequest()
.withName(DOMAIN)
.withWorkflowExecutionRetentionPeriodInDays("7"));
} catch (DomainAlreadyExistsException e) {
System.out.println("Exception: Domain Already exists!");
}
}
Add the following function, which will help to register new activity types in your workflow:
public static void registerActivityType() {
try {
System.out.println("Register Activity Type'" + ACTIVITY +"-" + ACTIVITY_VERSION + "'.");
simpleWorkflow.registerActivityType(new RegisterActivityTypeRequest().withDomain(DOMAIN).withName(ACTIVITY) .withVersion(ACTIVITY_VERSION));
} catch (TypeAlreadyExistsException e) {
System.out.println("Exception: Activity type already exists!");
}
}
An activity type can be uniquely identified by its name and version.
Now register a new workflow type, which contains the logic of workflow execution:
public static void registerWorkflowType() {
try {
System.out.println("Register Workflow Type '" + WORKFLOW + "-" + WORKFLOW_VERSION + "'.");
simpleWorkflow.registerWorkflowType(new RegisterWorkflowTypeRequest().withDomain(DOMAIN)
.withName(WORKFLOW).withVersion(WORKFLOW_VERSION));
} catch (TypeAlreadyExistsException e) {
System.out.println("Exception: Workflow type already exists!");
}
}
As with an activity type, a workflow type is also uniquely identified by its name and version.
Now add the main method to make this class executable and call register domain, activity type, and workflow type methods in it:
public static void main(String[] args) {
registerDomain();
registerWorkflowType();
registerActivityType();
}
In Activity.java, it will poll for activity tasks which are generated by SWF in response to workflow decision.
Here we will implement a simple activity worker which drives a single activity.
Add the following method as an activity, which will take a string as input, concat with greetings, and return the result:
private static String greetings(String input) throws Throwable{
return "Hi, " + input + "!";
}
Now add the activity task polling method into the main method:
public static void main(String[] args) {
while (true) {
System.out.println("Polling for an activity task from the tasklist '" + Types.TASKLIST + "' in the domain '" + Types.DOMAIN + "'.");
ActivityTask task = simpleWorkflow.pollForActivityTask(
new PollForActivityTaskRequest().withDomain(Types.DOMAIN)
.withTaskList(new TaskList().withName(Types.TASKLIST)));
String task_token = task.getTaskToken();
}
}
Now add the following code into the main method that polls for tasks and get the task token:
if (task_token != null) {
String result = null;
Throwable error = null;
try {
System.out.println("Executing the activity task. Input is '" +
task.getInput() + "'.");
result = greetings(task.getInput());
} catch (Throwable th) {
error = th;
}
if (error == null) {
System.out.println("The activity task success. Result is '"
+ result + "'.");
simpleWorkflow.respondActivityTaskCompleted(
new RespondActivityTaskCompletedRequest()
.withTaskToken(task_token).withResult(result));
} else {
System.out.println("The activity task failed. Error is '"
+ error.getClass().getSimpleName() + "'.");
simpleWorkflow.respondActivityTaskFailed(
new RespondActivityTaskFailedRequest()
.withTaskToken(task_token)
.withReason(error.getClass().getSimpleName())
.withDetails(error.getMessage()));
}
}
If the task is successful then the worker responds to SWF by calling the respondActivityTaskCompleted() method with the RespondActivityTaskCompletedRequest object, which contains the task token and result.
If the task fails then the worker responds to SWF by calling the respondActivityTaskFailed() method with the RespondActivityTaskFailedRequest object, which contains the task token and error reason with the message.
In Worker.java, when the workflow worker receives a task, it will decide whether to schedule a new activity or not and take an action.
Now call the pollForDecisionTask method for continuous polling into the main method. Once the task is received, it will call its getTaskToken method to return a string to identify the task:
public static void main(String[] args) {
PollForDecisionTaskRequest task_request =
new PollForDecisionTaskRequest()
.withDomain(Types.DOMAIN)
.withTaskList(new TaskList().withName(HelloTypes.TASKLIST));
while (true) {
System.out.println("Polling for a decision task from the
tasklist '" + Types.TASKLIST + "' in the domain '" + Types.DOMAIN + "'.");
DecisionTask task = simpleWorkflow.pollForDecisionTask(task_request);
String taskToken = task.getTaskToken();
if (taskToken != null) {
try {
executeDecisionTask(taskToken, task.getEvents());
} catch (Throwable th) {
th.printStackTrace();
}
}
}
}
Add the executeDecisionTask method, which will take two parameters, string and list:
private static void executeDecisionTask(String taskToken, List<HistoryEvent> events)
throws Throwable {
List<Decision> decisions = new ArrayList<Decision>();
String workflowInput = null;
int scheduledActivity = 0;
int openActivity = 0;
boolean completedActivity = false;
String result = null;
}
In the preceding methods, we have set up some data members:
- decisions: This is a list of decisions with processing task results
- workflowInput: It has been provided by the WorkflowExecutionStarted event
- scheduledActivity: Count of scheduled activities
- openActivity: Count of open activities
- activity_completed: Boolean value of activity status; either it's completed or not
- result: String which holds the activity result
Now add the following code into the executeDecisionTask method to process HistoryEvent objects:
System.out.println("Decision task Execution for history events: [");
for (HistoryEvent historyEvent : events) {
System.out.println(" " + historyEvent);
switch(historyEvent.getEventType()) {
case "WorkflowExecutionStarted":
workflowInput = historyEvent.getWorkflowExecutionStartedEventAttributes().getInput();
break;
case "ActivityTaskScheduled":
scheduledActivity++;
break;
case "ScheduleActivityTaskFailed":
scheduledActivity--;
break;
case "ActivityTaskStarted":
scheduledActivity--;
openActivity++;
break;
case "ActivityTaskCompleted":
openActivity--;
completedActivity = true;
result = historyEvent.getActivityTaskCompletedEventAttributes().getResult();
break;
case "ActivityTaskFailed":
openActivity--;
break;
case "ActivityTaskTimedOut":
openActivity--;
break;
}
}
System.out.println("]");
In the preceding code, we are more interested in the WorkflowExecutionStarted event because it indicates that execution has been started and provides initial input to the workflow.
The ActivityTaskCompleted event is sent once the scheduled activity is completed.
Add the following code after the switch statement to respond with the proper decision based on the task:
if (completedActivity) {
decisions.add(
new Decision()
.withDecisionType(DecisionType.CompleteWorkflowExecution)
.withCompleteWorkflowExecutionDecisionAttributes(
new CompleteWorkflowExecutionDecisionAttributes()
.withResult(result)));
} else {
if (openActivity == 0 && scheduledActivity == 0) {
ScheduleActivityTaskDecisionAttributes attrs =
new ScheduleActivityTaskDecisionAttributes()
.withActivityType(new ActivityType().withName(Types.ACTIVITY).withVersion(Types.ACTIVITY_VERSION)).withActivityId(UUID.randomUUID().toString())
.withInput(workflowInput);
decisions.add(
new Decision()
.withDecisionType(DecisionType.ScheduleActivityTask)
.withScheduleActivityTaskDecisionAttributes(attrs));
}
}
If it is ScheduleActivityTask or CompletedWorkflowExecution decision, we add this information to the decision list which has been declared in the start of the method.
Add the following code to the executeDecisionTask method to return a list of decision objects:
simpleWorkflow.respondDecisionTaskCompleted(
new RespondDecisionTaskCompletedRequest()
.withTaskToken(taskToken).withDecisions(decisions));
In the preceding code, the respondDecisionTaskCompleted method will take task token and decision objects.
Add the WORKFLOW_EXECUTION constant and the main method to the Starter.java class. Create an instance of startWorkflowExecution which takes the StartWorkflowExecutionRequest object as input in the main method:
public static final String WORKFLOW_EXECUTION = "ExampleWorkflowExecution";
public static void main(String[] args) {
String workflowInput = "Amazon SWF";
if (args.length > 0) {
workflowInput = args[0];
}
System.out.println("Workflow execution starting '" + WORKFLOW_EXECUTION + "' with input '" + workflowInput + "'.");
WorkflowType wf_type = new WorkflowType() .withName(Types.WORKFLOW).withVersion(Types.WORKFLOW_VERSION);
Run run = simpleWorkflow.startWorkflowExecution(new StartWorkflowExecutionRequest().withDomain(Types.DOMAIN)
.withWorkflowType(wf_type).withWorkflowId(WORKFLOW_EXECUTION)
.withInput(workflowInput).withExecutionStartToCloseTimeout("90"));
System.out.println("Workflow execution started. Run id is '" + run.getRunId() + "'.");
}
The Run object of startWorkflowExecution provides a Run ID. This ID is useful to identify a particular workflow execution in SWF's history.