参考链接:MongoDB Java API操作很全的整理 - 21cm - 博客园
MongoDB的操作分为同步操作和异步操作
一、同步操作API
使用API时,先引入maven依赖
mongo-java-driver
1、关于MongoDB Client的初始化和关闭。
从官方介绍来看,一般建议Client只需要一个建立一个长连接实例,然后使用时,都使用这个实例就可以,也就是可以用java的单例模式来创建连接实例。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | //mongoClient连接 protected static MongoClient mongoClient; public synchronized static MongodbClient getInstance(String mongodbUrl) { if (null == mongoClient) { mongoClient = MongoClients.create(mongodbUrl); if(null != mongoClient){ log.info("mongoClient init success!"); } else{ log.info("mongoClient init failed!"); } } return mongodbClient; } |
直接通过mongodb的host和port来创建client:
| 1 | MongoClient mongoClient = MongoClients.create("mongodb://host1:27017"); |
client连接到一个 Replica Set:
| 1 2 3 | MongoClient mongoClient = MongoClients.create("mongodb://host1:27017,host2:27017,host3:27017"); MongoClient mongoClient = MongoClients.create("mongodb://host1:27017,host2:27017,host3:27017/?replicaSet=myReplicaSet"); |
或者通过MongoClientSettings.builder() 来辅助生成连接字符串来创建client:
| 1 | MongoClient mongoClient = MongoClients.create( MongoClientSettings.builder() .applyToClusterSettings(builder -> builder.hosts(Arrays.asList( new ServerAddress("host1", 27017), new ServerAddress("host2", 27017), new ServerAddress("host3", 27017)))) .build()); |
连接关闭:
| 1 2 3 4 5 6 | public void close() { if(null!=mongoClient){ mongoClient.close(); mongoClient=null; } } |
2、关于MongoDB 的基本操作
//创建Collection
public void createCollection(String databaseName,String collectionName){ getDatabase(databaseName).createCollection(collectionName); }
//查询databaseName
public MongoDatabase getDatabase(String databaseName){ return mongoClient.getDatabase(databaseName); }
//查询Collection
public List
List
mongoClient.getDatabase(databaseName).listCollectionNames().forEach((Consumer super String>) t->{ stringList.add(t); });
return stringList; }
public MongoCollection
3、关于MongoDB 的查询操作
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | //通过id(objectid)精确查询 public FindIterable BasicDBObject searchDoc = new BasicDBObject().append("_id", id); return getCollectionByName(databaseName,collectionName).find(searchDoc); } //通过id(objectid)模糊查询 public FindIterable BasicDBObject searchDoc = new BasicDBObject().append("_id", new BasicDBObject("$regex",id)); return getCollectionByName(databaseName,collectionName).find(searchDoc); } //通过开始id和结束id 查询(根据objectId范围查询) public FindIterable BasicDBObject searchDoc = new BasicDBObject().append("_id", new BasicDBObject("$gte", startId).append("$lte", endId)); return getCollectionByName(databaseName,collectionName).find(searchDoc); } public FindIterable return getCollectionByName(databaseName,collectionName).find(basicDBObject); } //限制查询返回的条数 public FindIterable return findMongoDbDoc(databaseName,collectionName,basicDBObject).limit(limitNum) ; } public FindIterable return findMongoDbDocById(databaseName,collectionName,startId,endId).limit(limitNum); } public FindIterable return findMongoDbDocById(databaseName,collectionName,startId,endId).sort(new document().append(sortField, -1)); } public FindIterable return findMongoDbDocByIdDescSort(databaseName,collectionName,startId,endId,sortField).limit(limitNum); } public FindIterable return findMongoDbDocById(databaseName,collectionName,startId,endId).sort(new document().append(sortField, 1)); } public FindIterable return findMongoDbDocByIdAscSort(databaseName,collectionName,startId,endId,sortField).limit(limitNum); } |
4、关于MongoDB 的插入操作
| 1 2 3 4 5 6 7 8 9 | //插入操作,注意插入时,如果数据已经存在会报错,插入时必须数据不存在,不会自动进行覆盖 //插入单条记录 public void insertDoc(String databaseName, String collectionName, document document){ getCollectionByName(databaseName,collectionName).insertOne(document); } //插入多条记录 public void insertDoc(String databaseName, String collectionName,List extends document> listData){ getCollectionByName(databaseName,collectionName).insertMany(listData); } |
5、关于MongoDB 的更新操作
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | //更新单条 public void updateDoc(String databaseName, String collectionName, Bson var1, Bson var2){ getCollectionByName(databaseName,collectionName).updateOne(var1,var2); } public void updateDoc(String databaseName, String collectionName, Bson var1, List extends Bson> list){ getCollectionByName(databaseName,collectionName).updateOne(var1,list); } //批量更新 public void updateDocs(String databaseName, String collectionName, Bson var1, Bson var2){ getCollectionByName(databaseName,collectionName).updateMany(var1,var2); } public void updateDocs(String databaseName, String collectionName, Bson var1, List extends Bson> list){ getCollectionByName(databaseName,collectionName).updateMany(var1,list); } |
6、关于MongoDB 的删除操作
| 1 2 3 4 5 6 7 8 | //单条删除 public DeleteResult deleteDoc(String databaseName, String collectionName, Bson var1){ return getCollectionByName(databaseName,collectionName).deleteOne(var1); } //批量删除 public DeleteResult deleteDocs(String databaseName, String collectionName,Bson var1){ return getCollectionByName(databaseName,collectionName).deleteMany(var1); } |
7、关于MongoDB 的替换操作
| 1 2 3 4 | //存在就替换,不存在的话就插入 public UpdateResult replaceDoc(String databaseName, String collectionName, Bson var1, document var2){ return getCollectionByName(databaseName,collectionName).replaceOne(var1,var2); } |
8、关于MongoDB 的bulkWrite操作 (批量写入)
| 1 2 3 | public BulkWriteResult bulkWrite(String databaseName, String collectionName, List extends WriteModel extends document>> listData){ return getCollectionByName(databaseName,collectionName).bulkWrite(listData); } |
二、异步操作API
mongodb异步驱动程序提供了异步api,可以利用netty或java 7的asynchronoussocketchannel实现快速、无阻塞的i/o,maven依赖
mongodb-driver-async
官方地址:Installation
异步操作必然会涉及到回调,回调时采用ResultCallback
| 1 2 3 4 5 6 7 8 9 10 11 12 13 | SingleResultCallback @Override public void onResult(final document document, final Throwable t) { System.out.println(document.toJson()); } }; SingleResultCallback @Override public void onResult(final Void result, final Throwable t) { System.out.println("Operation Finished!"); } }; |
异步insert操作
| 1 2 3 4 5 6 | collection.insertMany(documents, new SingleResultCallback @Override public void onResult(final Void result, final Throwable t) { System.out.println("documents inserted!"); } }); |
异步删除操作
| 1 2 3 4 5 6 | collection.deleteMany(gte("i", 100), new SingleResultCallback @Override public void onResult(final DeleteResult result, final Throwable t) { System.out.println(result.getDeletedCount()); } }); |
异步更新操作
| 1 2 3 4 5 6 7 | collection.updateMany(lt("i", 100), inc("i", 100), new SingleResultCallback @Override public void onResult(final UpdateResult result, final Throwable t) { System.out.println(result.getModifiedCount()); } }); |
异步统计操作
| 1 2 3 4 5 6 7 | collection.countdocuments( new SingleResultCallback @Override public void onResult(final Long count, final Throwable t) { System.out.println(count); } }); |
三、MongoDB Reactive Streams 操作API
官方的MongoDB reactive streams Java驱动程序,为MongoDB提供异步流处理和无阻塞处理。
完全实现reactive streams api,以提供与jvm生态系统中其他reactive streams的互操作,一般适合于大数据的处理,比如spark,flink,storm等。
| 1 2 3 4 5 6 7 | mongodb-driver-reactivestreams
|
官方地址:MongoDB Java Driver Reactive Streams
会包含如下三部分:
- Publisher:Publisher 是数据的发布者。Publisher 接口只有一个方法 subscribe,用于添加数据的订阅者,也就是 Subscriber。
- Subscriber: 是数据的订阅者。Subscriber 接口有4个方法,都是作为不同事件的处理器。在订阅者成功订阅到发布者之后,其 onSubscribe(Subscription s) 方法会被调用。
- Subscription:表示的是当前的订阅关系。
API问的地址:Generated documentation (Untitled)
代码示例:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | //建立连接 MongoClient mongoClient = MongoClients.create(mongodbUrl); //获得数据库对象 MongoDatabase database = client.getDatabase(databaseName); //获得集合 MongoCollection collection = database.getCollection(collectionName); //异步返回Publisher FindPublisher publisher = collection.find(); //订阅实现 publisher.subscribe(new Subscriber() { @Override public void onSubscribe(Subscription str) { System.out.println("start..."); //执行请求 str.request(Integer.MAX_VALUE); } @Override public void onNext(document document) { //获得文档 System.out.println("document:" + document.toJson()); } @Override public void onError(Throwable t) { System.out.println("error occurs."); } @Override public void onComplete() { System.out.println("finished."); } }); |
参考链接:MongoDB Java API操作很全的整理 - 21cm - 博客园



