原文&思路参见(本例代码调整较多,也做了比较多的改进):基于Ocelot的gRpcHttp网关_dotNET跨平台的博客-CSDN博客
网关架设后,请求即为如下:
思路解析:
1、定时监控某个存放.proto的文件夹。(参见:DirectoryMonitorBackgroundService)
2、当文件有变动时 调用protoc工具生成C#代码。(本例增加一次性编译多个文件,但未解决引用其他proto的问题&相同类名的问题)
3、生成代码后调用CSharpCompilation实例来生成对应的DLL。
4、反编译DLL后取得MethodDescriptor对象,并缓存起来。
5、重新注入Ocelot中的IHttpRequester接口。该接口作用是 根据获取得到的DownstreamRequest采用HTTP下发下游主机请求得到数据后 在转发给请求方。
6、IHttpRequester在处理请求时判断是否包含grpc的请求值,如果是则解析出服务名与方法名,并匹配MethodDescriptor对象缓存。
7、自行实现ClientBase
8、取得数据后,转发给请求方
以下列出部分具体的代码。
//作用:根据取得的变动文件,生成c#代码,而后生成DLL,然后反编译获取MethodDescriptor
[Serializable]
public class GrpcCodeGeneraterSubscriber : IEventSubscriber
{
private readonly ILogger logger = null;
private static readonly string BaseDirectory = AppDomain.CurrentDomain.BaseDirectory;
private readonly IGrpcServiceDescriptor serviceDescriptor = null;
private readonly IProtoGenerateCode protoGenerateCode = null;
public GrpcCodeGeneraterSubscriber(ILogger logger, IGrpcServiceDescriptor serviceDescriptor, IProtoGenerateCode protoGenerateCode)
{
this.logger = logger;
this.serviceDescriptor = serviceDescriptor;
this.protoGenerateCode = protoGenerateCode;
}
[EventSubscribe("GrpcCodeGenerater")]
public async Task GrpcCodeGenerater(EventHandlerExecutingContext context)
{
var protefileList = context.Source.Payload as string[];
if (protefileList == null || protefileList.Length <= 0)
return;
this.protoGenerateCode.GenerateCsharpFromProto(protefileList);
foreach (var protofilepath in protefileList)
{
var protofilenamewithoutExtension = Path.GetFileNameWithoutExtension(protofilepath);
if (GenerateDllAsync(protofilenamewithoutExtension) == false)
return;
var csharp_out = Path.Combine(BaseDirectory, $"plugins/.{protofilenamewithoutExtension}");
File.WriteAllText(Path.Combine(csharp_out, $"plugin.txt"), File.GetLastWriteTime(protofilepath).ToString("yyyy-MM-dd HH:mm:ss"));
await this.serviceDescriptor.CreateGrpcDescriptorAsync(Path.Combine(csharp_out, $"{protofilenamewithoutExtension}.dll"));
}
this.logger.LogInformation($" generater dll compeleted:{string.Join(",", protefileList.Select(y => Path.GetFileName(y)))} ");
//删除文件
foreach (string var in Directory.GetFiles(Path.Combine(BaseDirectory, "plugins"), "*.cs"))
File.Delete(var);
}
private bool GenerateDllAsync(string assemblyName)
{
var dirpath = Path.Combine(BaseDirectory, "plugins");
var dllFiles = Directory.GetFiles(dirpath, "*.cs");
if (dllFiles.Length == 0)
return false;
List trees = new List();
foreach (var file in dllFiles)
{
var fileName = Path.GetFileNameWithoutExtension(file).ToLower();
if (fileName != assemblyName.ToLower() && fileName != string.Concat(assemblyName, "Grpc").ToLower())
continue;
var csStr = File.ReadAllText(file);
trees.Add(CSharpSyntaxTree.ParseText(csStr, encoding: Encoding.UTF8));
}
var references2 = new[]{
MetadataReference.CreateFromFile(Assembly.Load("netstandard, Version=2.0.0.0").Location),
MetadataReference.CreateFromFile(Assembly.Load("System.Runtime, Version=0.0.0.0").Location),
MetadataReference.CreateFromFile(Assembly.Load("System.IO, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a").Location),
MetadataReference.CreateFromFile(Assembly.Load("System.Memory, Version=5.0.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51").Location),
MetadataReference.CreateFromFile(Assembly.Load("System.Threading.Tasks, Version=4.0.10.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a").Location),
MetadataReference.CreateFromFile(typeof(object).Assembly.Location),
MetadataReference.CreateFromFile(typeof(Google.Protobuf.ProtoPreconditions).Assembly.Location),
MetadataReference.CreateFromFile(typeof(SerializationContext).Assembly.Location),
MetadataReference.CreateFromFile(typeof(Channel).Assembly.Location)
};
var options = new CSharpCompilationOptions(outputKind: OutputKind.DynamicallyLinkedLibrary, optimizationLevel: OptimizationLevel.Debug, generalDiagnosticOption: ReportDiagnostic.Error);
var dlldir = Path.Combine(dirpath, $".{assemblyName}");
if (Directory.Exists(dlldir) == false)
Directory.CreateDirectory(dlldir);
var result2 = CSharpCompilation.Create(assemblyName, trees, references2, options).Emit(Path.Combine(dlldir, $"{assemblyName}.dll"));
this.logger.Log(result2.Success ? LogLevel.Debug : LogLevel.Error, string.Join(",", result2.Diagnostics.Select(d => string.Format("[{0}]:{1}({2})", d.Id, d.GetMessage(), d.Location.GetLineSpan().StartLinePosition))));
return result2.Success;
}
}
//作用:调用protoc工具生成C#代码,本例生成xxx.cs & xxxGrpc.cs
public class ProtoGenerateCode : IProtoGenerateCode
{
private readonly ILogger Logger = null;
private static readonly string BaseDirectory = AppDomain.CurrentDomain.BaseDirectory;
public ProtoGenerateCode(ILogger logger)
{
this.Logger = logger;
}
public void GenerateCsharpFromProto(params string[] protoPath)
{
var architecture = RuntimeInformation.OSArchitecture.ToString().ToLower();// 系统架构,x86 x64
var bin = string.Empty;
var os = string.Empty;
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
os = "windows";
bin = ".exe";
}
else if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
os = "linux";
else if (RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
os = "macosx";
else
{
Logger.LogError("该平台不支持grpctools.");
return;
}
var args = new Dictionary>();
var protocPath = Path.Combine(BaseDirectory, $"tools/{os}_{architecture}/protoc{bin}");
var grpcPath = Path.Combine(BaseDirectory, $"tools/{os}_{architecture}/grpc_csharp_plugin{bin}");
string outdir = Path.Combine(BaseDirectory, "plugins");
if (Directory.Exists(outdir) == false)
Directory.CreateDirectory(outdir);
args.Add("proto_path", new List(protoPath.Select(t => Path.GetDirectoryName(t)).Distinct(StringComparer.OrdinalIgnoreCase)));
args.Add("csharp_out", new List(new string[] { outdir }));
if (!string.IsNullOrEmpty(grpcPath))
{
args.Add("plugin", new List(new string[] { "protoc-gen-grpc=" + grpcPath }));
args.Add("grpc_out", new List(new string[] { outdir }));
}
var argsValue = WriteArgs(string.Join(" ", protoPath), args);//批量.stsupplier.proto .stcustomer.proto
Logger.LogInformation("Running: " + protocPath + " " + argsValue);
var exitCode = RunProtoc(protocPath, argsValue, string.Empty, out string stdout, out string stderr);
if (!string.IsNullOrEmpty(stderr))
throw new InvalidOperationException(stderr);
}
private string WriteArgs(string protoFile, Dictionary> args)
{
var sb = new StringBuilder();
foreach (var kvp in args)
{
foreach (var argInstance in kvp.Value)
sb.AppendFormat("--{0}={1} ", kvp.Key, argInstance);
}
sb.AppendFormat(" {0}", protoFile);
return sb.ToString();
}
static int RunProtoc(string path, string arguments, string workingDir, out string stdout, out string stderr)
{
using (var proc = new Process())
{
var psi = proc.StartInfo;
psi.FileName = path;
psi.Arguments = arguments;
if (!string.IsNullOrEmpty(workingDir))
psi.WorkingDirectory = workingDir;
psi.RedirectStandardError = psi.RedirectStandardOutput = true;
psi.UseShellExecute = false;
psi.CreateNoWindow = true;
proc.Start();
var stdoutTask = proc.StandardOutput.ReadToEndAsync();
var stderrTask = proc.StandardError.ReadToEndAsync();
if (!proc.WaitForExit(5000))
{
try { proc.Kill(); } catch { }
}
var exitCode = proc.ExitCode;
stderr = stdout = "";
if (stdoutTask.Wait(1000))
stdout = stdoutTask.Result;
if (stderrTask.Wait(1000))
stderr = stderrTask.Result;
return exitCode;
}
}
}
//重新该接口以实现RPC的转发,
//indexof('grpc')的实现,是因为有可能采用https的请求
//端口号加1000的作用是下游服务也支持http,所以grpc请求的端口就是http的端口在加上1000
[Serializable]
public class GrpcHttpRequester : IHttpRequester
{
private readonly IHttpClientCache _cacheHandlers;
private readonly IOcelotLogger _logger;
private readonly IDelegatingHandlerHandlerFactory _factory;
private readonly IExceptionToErrorMapper _mapper;
public GrpcHttpRequester(IOcelotLoggerFactory loggerFactory,
IHttpClientCache cacheHandlers,
IDelegatingHandlerHandlerFactory factory,
IExceptionToErrorMapper mapper)
{
this._logger = loggerFactory.CreateLogger();
this._cacheHandlers = cacheHandlers;
this._factory = factory;
this._mapper = mapper;
}
public async Task> GetResponse(HttpContext httpContext)
{
var downstreamRequest = httpContext.Items.DownstreamRequest();
if (downstreamRequest.Scheme.IndexOf("grpc") < 0)
return await ProcessHttpResponse(httpContext);
return await ProcessGrpcResponse(httpContext);
}
private async Task> ProcessGrpcResponse(HttpContext httpContext)
{
try
{
GrpcRequestMessage grpcRequestMessage = await GrpcRequestMessage.FromReuqest(httpContext);
if (grpcRequestMessage == null || grpcRequestMessage.GrpcRequestMethod == null)
return await this.ProcessHttpResponse(httpContext);
//throw new NullReferenceException($"Request url:{httpContext.Request.Path.ToString()}Can't found Grpc.ServiceName & MethodName.");
var downStreamRqeust = httpContext.Items.DownstreamRequest();
var options = new List { new ChannelOption("keepalive_time_ms", 60000) };
if (string.IsNullOrEmpty(grpcRequestMessage.RequestVersion) == false)
options.Add(new ChannelOption("requestVersion", grpcRequestMessage.RequestVersion));
//这个是长链接的,不加连接池是否会有问题??(参照:https://github.com/leenux/GrpcPool)-->数据量比较大的情况下怎么处理
Channel channel = new Channel(downStreamRqeust.Host, Convert.ToInt32(downStreamRqeust.Port) + 1000, ChannelCredentials.Insecure, options);
var client = new MethodDescriptorClient(channel);
var httpResponseMessage = await client.InvokeAsync(grpcRequestMessage);
return new OkResponse(httpResponseMessage);
}
catch (RpcException exception)
{
var error = _mapper.Map(exception);
return new OKButFailResponse(error);
}
catch (Exception exception)
{
var error = _mapper.Map(exception);
return new ErrorResponse(error);
}
}
private async Task> ProcessHttpResponse(HttpContext httpContext)
{
var builder = new HttpClientBuilder(_factory, _cacheHandlers, _logger);
var downstreamRoute = httpContext.Items.DownstreamRoute();
var downstreamRequest = httpContext.Items.DownstreamRequest();
var httpClient = builder.Create(downstreamRoute);
try
{
var response = await httpClient.SendAsync(downstreamRequest.ToHttpRequestMessage(), httpContext.RequestAborted);
return new OkResponse(response);
}
catch (Exception exception)
{
var error = _mapper.Map(exception);
return new ErrorResponse(error);
}
finally
{
builder.Save();
}
}
}
//作用:实现GRPC的请求,具体看InvokeAsync方法 public class MethodDescriptorClient : ClientBase{ public MethodDescriptorClient(Channel channel) : base(channel) { } public MethodDescriptorClient(CallInvoker callInvoker) : base(callInvoker) { } public MethodDescriptorClient() : base() { } protected MethodDescriptorClient(ClientBaseConfiguration configuration) : base(configuration) { } protected override MethodDescriptorClient NewInstance(ClientBaseConfiguration configuration) { return new MethodDescriptorClient(configuration); } /// /// InvokeAsync /// public TaskInvokeAsync(GrpcRequestMessage grpcRequestMessage) { var methodDescriptor = grpcRequestMessage.GrpcRequestMethod; System.Reflection.MethodInfo m = typeof(MethodDescriptorClient).GetMethod("CallGrpcAsyncCore", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic); return (Task )m.MakeGenericMethod(new Type[] { methodDescriptor.InputType.ClrType, methodDescriptor.OutputType.ClrType }).Invoke(this, new object[] { grpcRequestMessage }); } private async Task CallGrpcAsyncCore (GrpcRequestMessage grpcRequestMessage) where TRequest : class, IMessage where TResponse : class, IMessage { CallOptions option = CreateCallOptions(grpcRequestMessage.Headers); var rpc = GrpcMethodBuilder .GetMethod(grpcRequestMessage.GrpcRequestMethod); var requestMessage = await grpcRequestMessage.RequestMessage.Content.ReadAsStringAsync(); TRequest request = JsonConvert.DeserializeObject (requestMessage); List requests = new List () { request }; switch (rpc.Type) { case MethodType.Unary: var taskUnary = await AsyncUnaryCall(CallInvoker, rpc, option, requests.FirstOrDefault()); return await ProcessHttpResponseMessage(taskUnary.Item2, taskUnary.Item1); case MethodType.ClientStreaming: var taskClientStreaming = await AsyncClientStreamingCall(CallInvoker, rpc, option, requests); return await ProcessHttpResponseMessage(taskClientStreaming.Item2, taskClientStreaming.Item1); case MethodType.ServerStreaming: var taskServerStreaming = await AsyncServerStreamingCall(CallInvoker, rpc, option, requests.FirstOrDefault()); return await ProcessHttpResponseMessage(taskServerStreaming.Item2, taskServerStreaming.Item1); case MethodType.DuplexStreaming: var taskDuplexStreaming = await AsyncDuplexStreamingCall(CallInvoker, rpc, option, requests); return await ProcessHttpResponseMessage(taskDuplexStreaming.Item2, taskDuplexStreaming.Item1); default: throw new NotSupportedException($"MethodType '{rpc.Type}' is not supported."); } } private Task ProcessHttpResponseMessage (Metadata headers, params TResponse[] responses) { HttpResponseMessage httpResponseMessage = new HttpResponseMessage(HttpStatusCode.OK); httpResponseMessage.Content = new StringContent(JsonConvert.SerializeObject(responses)); foreach (var entry in headers) httpResponseMessage.Headers.Add(entry.Key, entry.Value); return Task.FromResult(httpResponseMessage); } private CallOptions CreateCallOptions(HttpRequestHeaders headers) { Metadata meta = new Metadata(); foreach (var entry in headers) meta.Add(entry.Key.Replace("grpc.", ""), entry.Value.FirstOrDefault()); CallOptions option = new CallOptions(meta); return option; } /// /// /// ////// /// /// /// /// /// private async Task > AsyncUnaryCall (CallInvoker invoker, Method method, CallOptions option, TRequest request) where TRequest : class where TResponse : class { using (AsyncUnaryCall call = invoker.AsyncUnaryCall(method, null, option, request)) { return Tuple.Create(await call.ResponseAsync, await call.ResponseHeadersAsync); } } /// /// /// ////// /// /// /// /// /// private async Task > AsyncClientStreamingCall (CallInvoker invoker, Method method, CallOptions option, IEnumerable requests) where TRequest : class where TResponse : class { using (AsyncClientStreamingCall call = invoker.AsyncClientStreamingCall(method, null, option)) { if (requests != null) { foreach (TRequest request in requests) await call.RequestStream.WriteAsync(request).ConfigureAwait(false); } await call.RequestStream.CompleteAsync().ConfigureAwait(false); return Tuple.Create(await call.ResponseAsync, await call.ResponseHeadersAsync); } } /// /// /// ////// /// /// /// /// /// private async Task , Metadata>> AsyncServerStreamingCall (CallInvoker invoker, Method method, CallOptions option, TRequest request) where TRequest : class where TResponse : class { using (AsyncServerStreamingCall call = invoker.AsyncServerStreamingCall(method, null, option, request)) { IList responses = new List (); while (await call.ResponseStream.MoveNext().ConfigureAwait(false)) responses.Add(call.ResponseStream.Current); return Tuple.Create(responses, await call.ResponseHeadersAsync); } } /// /// /// ////// /// /// /// /// /// private async Task , Metadata>> AsyncDuplexStreamingCall (CallInvoker invoker, Method method, CallOptions option, IEnumerable requests) where TRequest : class where TResponse : class { using (AsyncDuplexStreamingCall call = invoker.AsyncDuplexStreamingCall(method, null, option)) { if (requests != null) { foreach (TRequest request in requests) await call.RequestStream.WriteAsync(request).ConfigureAwait(false); } await call.RequestStream.CompleteAsync().ConfigureAwait(false); IList responses = new List (); while (await call.ResponseStream.MoveNext().ConfigureAwait(false)) responses.Add(call.ResponseStream.Current); return Tuple.Create(responses, await call.ResponseHeadersAsync); } } }
MethodDescriptorClient 类中获取Reuqest的请求参数中,目前只支持StringContent (httpClient POST的方式请求到后端,取得到的都是ByteArrayContent)
所以客户端的调用方式如下:
using (HttpClient httpClient = new HttpClient())
{
var queryurl = queryUrl();
if (queryurl.EndsWith("/") == false)
queryurl += "/";
//var content = new MultipartFormDataContent();//DateTime.Now.Ticks.ToString("X")
//content.Add(new StringContent(" no like %'1.402.03.01674'%"), "WhereString");
var postObj = new { WhereString = " no like %'1.402.03.01674'%" };
StringContent content = new StringContent(JsonConvert.SerializeObject(postObj), Encoding.UTF8, "application/json");
var postResultTask = httpClient.PostAsync(string.Concat(queryurl, usegrpc ? "grpc" : "api", "/", "Supplier", "/", "QuerySupplier"), content);
var responseMessage = postResultTask.GetAwaiter().GetResult();
this.textBox1.Text = JsonConvert.SerializeObject(responseMessage.Content.ReadAsStringAsync().GetAwaiter().GetResult());
}
网关注册如下
public void ConfigureServices(IServiceCollection services)
{
services.AddOcelot().AddOcelotGrpc();
}
网关服务需要放置protoc程序。第二个目录用来放置proto文件
Git地址:Sam/Ocelot.Provider.RPC



