这个问题很旧,我记不清了,但是如果有人需要,这是代码:BeanShell Sampler
import org.MyKinesisClient;//Create a controller object every time Jmeter starts MyKinesisClient controller=new MyKinesisClient(vars.get("accessKey"),vars.get("secretKey"),vars.get("endpoint"),vars.get("serviceName"),vars.get("regionId"));bsh.shared.controller=controller;这是最后的代码:
import com.amazonaws.util.json.JSONArray;import com.amazonaws.util.json.JSONObject;import org.MyKinesisClient;//Variablesint timestampValue=(${i});String idValue=${__threadNum}+"_"+"1";JSonObject part = new JSonObject();//Inserimento campi Jsonpart.put("updated",timestampValue);part.put("parent","${__threadNum}");part.put("id",idValue);part.put("thingClass","CosyInverter");part.put("mac_address_w","${mac_address_w_1}");//Other put....//Send Json to kinesisMyKinesisClient controller=bsh.shared.controller;controller.sendJson(part, ${__Random(0,${__threadNum})},vars.get("streamName"));我的代码比上面的代码(数据库查询等)还要复杂,但是我希望这是您所需要的。
这是关于MyKinesisClient的Java代码
import java.io.UnsupportedEncodingException;import java.nio.ByteBuffer;import java.util.ArrayList;import java.util.List;import com.amazonaws.auth.AWSCredentials;import com.amazonaws.auth.BasicAWSCredentials;import com.amazonaws.services.kinesis.AmazonKinesisClient;import com.amazonaws.services.kinesis.model.PutRecordRequest;import com.amazonaws.services.kinesis.model.PutRecordsRequest;import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;import com.amazonaws.util.json.JSONArray;import com.amazonaws.util.json.JSONException;import com.amazonaws.util.json.JSONObject;public class MyKinesisClient { private AmazonKinesisClient kinesisClient; public MyKinesisClient(String accessKey, String secretKey, String endpoint, String serviceName, String regionId ) { AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey); kinesisClient = new AmazonKinesisClient(credentials); kinesisClient.setEndpoint(endpoint,serviceName,regionId); } public void sendJson(JSonObject json, int partitionKey, String streamName) throws UnsupportedEncodingException, JSonException { try{ PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName(streamName); putRecordRequest.setData(ByteBuffer.wrap(json.toString().getBytes("utf-8"))); putRecordRequest.setPartitionKey(String.format("partitionKey-%d", partitionKey)); kinesisClient.putRecord(putRecordRequest); }catch(Exception e){ System.out.println(e.getMessage()); } } public void sendJsonArray(JSonArray json,int partitionKey, String streamName) { try{ PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List <PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<PutRecordsRequestEntry>(); for(int i=0;i<json.length();i++){ PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(json.getJSonObject(i).toString().getBytes("utf-8"))); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", partitionKey)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); kinesisClient.putRecords(putRecordsRequest); }catch(Exception e){ System.out.println(e.getMessage()); } }}


