elasticsearch7.x
[ExposeServices(isSingleton: true)]
public class ElasticSearchServer : IElasticSearchServer
{
public IElasticClient ElasticLinqClient { get; set; }
public IElasticLowLevelClient ElasticJsonClient { get; set; }
public ElasticSearchServer(ESConfig esConfig)
{
var urls = esConfig.Urls.ConvertAll(x => new Uri(x));
var connectonPool = new StaticConnectionPool(urls);
var settings=new ConnectionSettings(connectonPool).RequestTimeout(TimeSpan.FromSeconds(esConfig.Timeout));
this.ElasticJsonClient=new ElasticLowLevelClient(settings);
this.ElasticLinqClient= new ElasticClient(settings);//linq请求客户端初始化
}
#region 创建Index。指定分片
///
/// 创建Index,如果已经存在则返回
///
///
/// index名称
/// 副本数量
/// 分片数量
///
public async Task CreateIndexAsync(string indexName,
int numberOfReplicas,
int numberOfShards,
CancellationToken cancellationToken = default) where T : class
{
var indexExistInfo = this.ElasticLinqClient.Indices.ExistsAsync(indexName,
item => item.IgnoreUnavailable(false),
cancellationToken);
var indexExist = await indexExistInfo;
if (indexExist.IsValid)
{
return true;
}
CreateIndexResponse result =
await this.ElasticLinqClient.Indices.CreateAsync(index: indexName, index =>
{
index.Map(x => x.AutoMap());
index.Settings(setting =>
{
setting.NumberOfShards(numberOfShards);
setting.NumberOfReplicas(numberOfReplicas);
setting.RefreshInterval(TimeSpan.FromSeconds(10));
return setting;
});
return index;
}
);
return result.IsValid;
}
///
/// 删除index
///
///
///
///
public async Task DeleteIndexAsync(string indexName,
CancellationToken cancellationToken = default)
{
var indexExistInfo = this.ElasticLinqClient.Indices.ExistsAsync(indexName,
item => item.IgnoreUnavailable(false),
cancellationToken);
var indexExist = await indexExistInfo;
if (indexExist.IsValid) //存在Index
{
await this.ElasticLinqClient.Indices.DeleteAsync(indexName,
item=>item.IgnoreUnavailable(false),
cancellationToken);
}
}
#endregion
#region 创建document,批量创建,删除document,批量删除,修改document,批量修改
///
/// 插入单条document
///
///
///
public async Task InsertdocumentAsync(string indexName,T t,CancellationToken cancellationToken=default) where T:class
{
try
{
await this.ElasticLinqClient.IndexAsync(t, i => i.Index(indexName),cancellationToken);
return true;
}
catch (Exception)
{
return false;
}
}
public bool InsertBulkdocuments(string indexName, IEnumerable list,int size=1000, CancellationToken cancellationToken = default) where T : class
{
try
{
var observableBulk = this.ElasticLinqClient.BulkAll(list, f => f.MaxDegreeOfParallelism(8).
BackOffTime(TimeSpan.FromSeconds(10)).
BackOffRetries(2).
Size(size).
RefreshonCompleted().
Index(indexName).
BufferToBulk((r, buffer) => r.IndexMany(buffer)),
cancellationToken);
var countdownEvent = new CountdownEvent(1);
var bulkAllObserver = new BulkAllObserver(
onNext: response =>
{
// WriteLine($"Indexed {response.Page * size} with {response.Retries} retries");
},
onError: ex =>
{
// WriteLine("BulkAll Error : {0}", ex);
// exception = ex;
countdownEvent.Signal();
},
() =>
{
// WriteLine("BulkAll Finished");
countdownEvent.Signal();
});
observableBulk.Subscribe(bulkAllObserver);
countdownEvent.Wait(cancellationToken);
return true;
}
catch (Exception)
{
return false;
}
}
///
/// 删除指定条件的document
///
///
///
///
///
///
///
///
public async Task DeleteBulkdocument(string indexName,
string deleteProperty,
string deletevalue,
string clusterName= "lcn_elasticsearch",
CancellationToken cancellationToken=default) where T : class
{
var esContent=
await this.ElasticLinqClient.DeleteByQueryAsync(p => p.Index(indexName).Query(op => op.Match(
x => x.Field(deleteProperty).Query(deletevalue)
)),cancellationToken);
return esContent.IsValid;
}
#endregion
#region 搜索
public async Task> SearchAsync(List indexNameList,
string searchProperty,
string searchValue,
string clusterName= "lcn_elasticsearch",
CancellationToken cancellationToken=default) where T:class
{
List indexNames = indexNameList.ConvertAll(x => {
IndexName indexName = x;
return indexName;
});
Indices indices = indexNames.ToArray();
var esContent =await this.ElasticLinqClient.SearchAsync(p => p.Index(indices).Query(
op=>op.Match(x=>x.Field(searchProperty).Query(searchValue))
),cancellationToken);
return esContent.documents;
}
public async Task> SearchPagingAsync(List indexNameList,
int pageSize,
int pageIndex,
string searchProperty,
string searchValue,
string clusterName = "lcn_elasticsearch",
CancellationToken cancellationToken = default) where T : class
{
List indexNames = indexNameList.ConvertAll(x => {
IndexName indexName = x;
return indexName;
});
Indices indices = indexNames.ToArray();
var esContent = await this.ElasticLinqClient.SearchAsync(p => p.Index(indices).From(pageIndex).Size(pageSize).
Query(
op => op.Match(x => x.Field(searchProperty).Query(searchValue))
), cancellationToken);
return esContent.documents;
}
#endregion
}