您可能要在
BulkAll这里使用,它实现了可观察的模式,以向Elasticsearch发出并发批量请求。这是一个例子
void Main(){ var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200")); var connectionSettings = new ConnectionSettings(pool); var client = new ElasticClient(connectionSettings); var indexName = "bulk-index"; if (client.IndexExists(indexName).Exists) client.DeleteIndex(indexName); client.CreateIndex(indexName, c => c .Settings(s => s .NumberOfShards(3) .NumberOfReplicas(0) ) .Mappings(m => m .Map<DeviceStatus>(p => p.AutoMap()) ) ); var size = 500; // set up the observable var bulkAllObservable = client.BulkAll(GetDeviceStatus(), b => b .Index(indexName) .MaxDegreeOfParallelism(4) .RefreshonCompleted() .Size(size) ); var countdownEvent = new CountdownEvent(1); Exception exception = null; // set up an observer. Delegates passed are: // 1. onNext // 2. onError // 3. onCompleted var bulkAllObserver = new BulkAllObserver( response => Console.WriteLine($"Indexed {response.Page * size} with {response.Retries} retries"), ex => { // capture exception for throwing outside Observer. // You may decide to do something different here exception = ex; countdownEvent.Signal(); }, () => { Console.WriteLine("Finished"); countdownEvent.Signal(); }); // subscribe to the observable bulkAllObservable.Subscribe(bulkAllObserver); // wait indefinitely for it to finish. May want to put a // max timeout on this countdownEvent.Wait(); if (exception != null) { throw exception; }}// lazily enumerated collectionprivate static IEnumerable<DeviceStatus> GetDeviceStatus(){ for (var i = 0; i < documentCount; i++) yield return new DeviceStatus(i); }private const int documentCount = 20000;public class DeviceStatus{ public DeviceStatus(int id) => Id = id; public int Id {get;set;}}如果您不需要在观察者中做任何特别的事情,可以
.Wait()在可观察对象上使用
var bulkAllObservable = client.BulkAll(GetDeviceStatus(), b => b .Index(indexName) .MaxDegreeOfParallelism(4) .RefreshonCompleted() .Size(size)).Wait( TimeSpan.FromHours(1), response => Console.WriteLine($"Indexed {response.Page * size} with {response.Retries} retries"));有可观察的方法
BulkAll,
ScrollAll和
Reindex(虽然有
ReindexOnServer内Elasticsearch其重新索引和映射到所述重新索引API -的
Reindex方法早此)



