栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

netcore使用elasticsearch

netcore使用elasticsearch

elasticsearch7.x
[ExposeServices(isSingleton: true)]
public class ElasticSearchServer : IElasticSearchServer
{
    public IElasticClient ElasticLinqClient { get; set; }
    public IElasticLowLevelClient ElasticJsonClient { get; set; }

    public ElasticSearchServer(ESConfig esConfig)
    {
        var urls = esConfig.Urls.ConvertAll(x => new Uri(x));
        var connectonPool = new StaticConnectionPool(urls);
        var settings=new ConnectionSettings(connectonPool).RequestTimeout(TimeSpan.FromSeconds(esConfig.Timeout));
        this.ElasticJsonClient=new ElasticLowLevelClient(settings);
        this.ElasticLinqClient= new ElasticClient(settings);//linq请求客户端初始化 
    }

    #region 创建Index。指定分片
    /// 
    /// 创建Index,如果已经存在则返回
    /// 
    /// 
    /// index名称
    /// 副本数量
    /// 分片数量
    /// 
    public async Task CreateIndexAsync(string indexName,
                                           int numberOfReplicas,
                                           int numberOfShards,
                                           CancellationToken cancellationToken = default) where T : class
    {
        var indexExistInfo = this.ElasticLinqClient.Indices.ExistsAsync(indexName,
                                                                    item => item.IgnoreUnavailable(false),
                                                                    cancellationToken);
        var indexExist = await indexExistInfo;
        if (indexExist.IsValid)
        {
            return true;
        }
        CreateIndexResponse result =
               await this.ElasticLinqClient.Indices.CreateAsync(index: indexName, index =>
               {
                   index.Map(x => x.AutoMap());
                   index.Settings(setting =>
                   {
                       setting.NumberOfShards(numberOfShards);
                       setting.NumberOfReplicas(numberOfReplicas);
                       setting.RefreshInterval(TimeSpan.FromSeconds(10));
                       return setting;
                   });
                   return index;
               }
               );
        return result.IsValid;
    }

    /// 
    /// 删除index
    /// 
    /// 
    /// 
    /// 
    public async Task DeleteIndexAsync(string indexName,
                                       CancellationToken cancellationToken = default)
    {
        var indexExistInfo = this.ElasticLinqClient.Indices.ExistsAsync(indexName,
                                                                    item => item.IgnoreUnavailable(false),
                                                                    cancellationToken);
        var indexExist = await indexExistInfo;
        if (indexExist.IsValid) //存在Index
        {
          await this.ElasticLinqClient.Indices.DeleteAsync(indexName,
                                                           item=>item.IgnoreUnavailable(false),
                                                           cancellationToken);
        }
    }

    #endregion

    #region 创建document,批量创建,删除document,批量删除,修改document,批量修改
    /// 
    /// 插入单条document
    /// 
    /// 
    /// 
    public async Task InsertdocumentAsync(string indexName,T t,CancellationToken cancellationToken=default) where T:class
    {
        try
        {
            await this.ElasticLinqClient.IndexAsync(t, i => i.Index(indexName),cancellationToken);
            return true;
        }
        catch (Exception)
        {

            return false;
        }         
    }

    public  bool InsertBulkdocuments(string indexName, IEnumerable list,int size=1000, CancellationToken cancellationToken = default) where T : class
    {
        try
        {
            var observableBulk = this.ElasticLinqClient.BulkAll(list, f => f.MaxDegreeOfParallelism(8).
                                         BackOffTime(TimeSpan.FromSeconds(10)).
                                         BackOffRetries(2).
                                         Size(size).
                                         RefreshonCompleted().
                                         Index(indexName).
                                         BufferToBulk((r, buffer) => r.IndexMany(buffer)),
                                        cancellationToken);
            var countdownEvent = new CountdownEvent(1);

            var bulkAllObserver = new BulkAllObserver(
            onNext: response =>
            {
              //  WriteLine($"Indexed {response.Page * size} with {response.Retries} retries");
            },
            onError: ex =>
            {
              //  WriteLine("BulkAll Error : {0}", ex);
              //  exception = ex;
                countdownEvent.Signal();
            },
            () =>
            {
              //  WriteLine("BulkAll Finished");
                countdownEvent.Signal();
            });
            observableBulk.Subscribe(bulkAllObserver);
            countdownEvent.Wait(cancellationToken);
            return true;
        }
        catch (Exception)
        {

            return false;
        }

    }

    /// 
    /// 删除指定条件的document
    /// 
    /// 
    /// 
    /// 
    /// 
    /// 
    /// 
    /// 
    public async Task DeleteBulkdocument(string indexName,
                                      string deleteProperty,
                                      string deletevalue,
                                      string clusterName= "lcn_elasticsearch",
                                      CancellationToken cancellationToken=default) where T : class
    {
     var esContent=  
            await this.ElasticLinqClient.DeleteByQueryAsync(p => p.Index(indexName).Query(op => op.Match(
                x => x.Field(deleteProperty).Query(deletevalue)
              )),cancellationToken);
        return esContent.IsValid;
    }
    #endregion

    #region 搜索
    public async Task> SearchAsync(List indexNameList,
                                                    string searchProperty,
                                                    string searchValue,
                                                    string clusterName= "lcn_elasticsearch",
                                                    CancellationToken cancellationToken=default) where T:class
    {
        List indexNames = indexNameList.ConvertAll(x => {
            IndexName indexName = x;
            return indexName;
        });
        Indices indices = indexNames.ToArray();
        var esContent =await this.ElasticLinqClient.SearchAsync(p => p.Index(indices).Query(
            op=>op.Match(x=>x.Field(searchProperty).Query(searchValue))
            ),cancellationToken);
        return esContent.documents;
    }

    public async Task> SearchPagingAsync(List indexNameList,
                                                        int pageSize,
                                                        int pageIndex,
                                                        string searchProperty,
                                                        string searchValue,
                                                        string clusterName = "lcn_elasticsearch",
                                                         CancellationToken cancellationToken = default) where T : class
    {
        List indexNames = indexNameList.ConvertAll(x => {
            IndexName indexName = x;
            return indexName;
        });
        Indices indices = indexNames.ToArray();
        var esContent = await this.ElasticLinqClient.SearchAsync(p => p.Index(indices).From(pageIndex).Size(pageSize).
                          Query(
                           op => op.Match(x => x.Field(searchProperty).Query(searchValue))
                           ), cancellationToken);
        return esContent.documents;
    }
    #endregion
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/741840.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号