Expert AWS Development
上QQ阅读APP看书,第一时间看更新

Amazon Kinesis streams

In this example, we will create the stream if it does not exist and then we will put the records into the stream. Here you can use Eclipse IDE for the example.

You need to import a few classes. AmazonKinesis and AmazonKinesisClientBuilder are used to create the Kinesis clients. CreateStreamRequest will help to create the stream. DescribeStreamRequest will describe the stream request. PutRecordRequest will put the request into the stream and PutRecordResult will print the resulting record. ResourceNotFoundException will throw an exception when the stream does not exist. StreamDescription will provide the stream description:

Static AmazonKinesis kinesisClient;

kinesisClient is the instance of AmazonKinesis. You have to assign the credentials and region to this instance:

final String streamName = "MyExampleStream";
final Integer streamSize = 1;
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest().withStreamName(streamName);

Here you are creating an instance of describeStreamRequest. For that, you will pass the streamNameas parameter to the withStreamName() method:

StreamDescription streamDescription = kinesisClient.describeStream(describeStreamRequest).getStreamDescription();

It will create an instance of streamDescription. You can get information such as the stream name, stream status, and shards from this instance:

CreateStreamRequest createStreamRequest = new CreateStreamRequest();
createStreamRequest.setStreamName(streamName);
createStreamRequest.setShardCount(streamSize);
kinesisClient.createStream(createStreamRequest);

The createStreamRequest instance will help to create a stream request. You can set the stream name, shard count, and SDK request timeout. In the createStream method, you will pass the createStreamRequest:

long createTime = System.currentTimeMillis();
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(streamName);
putRecordRequest.setData(ByteBuffer.wrap(String.format("testData-%d", createTime).getBytes()));
putRecordRequest.setPartitionKey(String.format("partitionKey-%d", createTime));

Here we are creating a record request and putting it into the stream. We are setting the data and PartitionKey for the instance. It will create the records:

PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest);

It will create the record from the putRecord method and pass putRecordRequest as a parameter:

System.out.printf("Success : Partition key \"%s\", ShardID \"%s\" and SequenceNumber \"%s\".\n",
putRecordRequest.getPartitionKey(), putRecordResult.getShardId(), putRecordResult.getSequenceNumber());

It will print the output on the console as follows: