栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

线程锁内的多线程

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

线程锁内的多线程

您可能要在

BulkAll
这里使用,它实现了可观察的模式,以向Elasticsearch发出并发批量请求。这是一个例子

void Main(){       var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));    var connectionSettings = new ConnectionSettings(pool);    var client = new ElasticClient(connectionSettings);    var indexName = "bulk-index";    if (client.IndexExists(indexName).Exists)        client.DeleteIndex(indexName);    client.CreateIndex(indexName, c => c        .Settings(s => s .NumberOfShards(3) .NumberOfReplicas(0)        )        .Mappings(m => m .Map<DeviceStatus>(p => p.AutoMap())        )    );    var size = 500;    // set up the observable    var bulkAllObservable = client.BulkAll(GetDeviceStatus(), b => b        .Index(indexName)        .MaxDegreeOfParallelism(4)        .RefreshonCompleted()        .Size(size)    );    var countdownEvent = new CountdownEvent(1);    Exception exception = null;    // set up an observer. Delegates passed are:    // 1. onNext    // 2. onError    // 3. onCompleted    var bulkAllObserver = new BulkAllObserver(        response => Console.WriteLine($"Indexed {response.Page * size} with {response.Retries} retries"),        ex =>         { // capture exception for throwing outside Observer. // You may decide to do something different here exception = ex; countdownEvent.Signal();        },        () =>         { Console.WriteLine("Finished"); countdownEvent.Signal();        });    // subscribe to the observable   bulkAllObservable.Subscribe(bulkAllObserver);    // wait indefinitely for it to finish. May want to put a    // max timeout on this      countdownEvent.Wait();    if (exception != null)     {        throw exception;    }}// lazily enumerated collectionprivate static IEnumerable<DeviceStatus> GetDeviceStatus(){    for (var i = 0; i < documentCount; i++)        yield return new DeviceStatus(i); }private const int documentCount = 20000;public class DeviceStatus{    public DeviceStatus(int id) => Id = id;    public int Id {get;set;}}

如果您不需要在观察者中做任何特别的事情,可以

.Wait()
在可观察对象上使用

var bulkAllObservable = client.BulkAll(GetDeviceStatus(), b => b    .Index(indexName)    .MaxDegreeOfParallelism(4)    .RefreshonCompleted()    .Size(size)).Wait(    TimeSpan.FromHours(1),     response => Console.WriteLine($"Indexed {response.Page * size} with {response.Retries} retries"));

有可观察的方法

BulkAll
ScrollAll
Reindex
(虽然有
ReindexOnServer
内Elasticsearch其重新索引和映射到所述重新索引API -的
Reindex
方法早此)



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/409520.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号