栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

关于 .NET 运行时 Sockets Datagram Socket,大量 Socket 缓冲区内存问题

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

关于 .NET 运行时 Sockets Datagram Socket,大量 Socket 缓冲区内存问题

本文解决的问题如上所示,Datagram(数据报)类型的 Socket,这包含 “ICMP、UDP”、“RAW IP”等,那么为什么会发生这样的问题?

我们在阻塞模式下是不存在这个问题,但本文描述的问题仅仅包含在以下几个 Socket 异步接口实现上面。

以下为发生故障的 .NET API:

1、System::Net::Sockets::Socket::BeginSendTo

2、System::Net::Sockets::Socket::BeginReceiveFrom

3、System::Net::Sockets::Socket::EndSendTo

4、System::Net::Sockets::Socket::EndReceiveFrom

5、System::Net::Sockets::Socket::SendToAsync

6、System::Net::Sockets::Socket::ReceiveFromAsync

总结:

于 Windows 平台上而言,则是 .NET 基于 CompletionRoutine(完成例程)、IoCompletionPort(完成端口)实现导致的问题。

于 Linux 平台上而言,则是 .NET 基于 libuv(epoll)实现导致的问题。

但这东西可以说它并不是 BUG,但仅仅只是造成大量不必要的内存被分配,挤占系统硬件资源(服务器上是很忌讳的),C/C++ 直接利用 IOCP、EPOLL 实现多线程的工作队列仍旧有很多人会造成这样的问题存在,比如:把所有的需要 Completion Context 投递到 ROOT IoCP or epoll,所有完成工作线程从 ROOT 上等待,操作系统触发事件到达,并不是固定某个特定的线程的,多个线程等待相同的 handle,触发可能是等待的任意一个线程。

问题:(源于)

为了实现真正高效的,事件驱动状态机(EDSM),并且提供与多个平台相近的API实现,每个事件(IoCompletionContext)被投递到 .NET ThreadPool(完成事件上下文工作线程池)进行驱动,Windows 平台基于 IOCP/IOCR 模型进行驱动,Linux 由 epoll 模型进行驱动,它类似 C/C++ boost::asio 的 boost::asio::io_context(io_service)IO完成队列驱动上下文,但与 boost::asio 框架不同的是,每个 Socket 实例的每次 “完成事务、Completion Transaction” 可以由 .NET 完成队列线程池 dispatch,而 boost::asio,仅仅只允许最终由每个 asio 确切的具体对象构造时指定的 Executor(io_context) 进行驱动,即不存在 .NET Implement,可能导致的,由A线程执行可变为A线程自身直接完成,或由B/C/D任意一个线程进行完成。

而对于一个 MT(多线程)架构的 Datagram 网络应用程序而言,我们不需要为每个 Socket 都需要分配一个有效的内存缓冲区(RAM Buffer),仅仅只需要为每个线程分配固态缓冲区 64KB(65535字节)即可,纵然是分配此缓冲区大小仍旧存在 RAM 空间浪费(C/C++ 建议 64KB 尽量按照粒度分配,.NET 程式分配器自行对齐)

准确需要按照不同的网络来区别需要多大的缓冲区,例如 UDP 协议则应最大缓冲区大小为:65535 - sizeof(udp_hdr, 8),但借助 .NET 运行时提供的 Socket 是无法办到的,所以我们必须要解决这个问题,每个 UDP Dgram-socket 都需分配 64K 内存,那么 1G RAM 满打满算顶多只能分配 1.6W 个左右缓冲区,然而现实情况是根本不可能,能够分配几千个那都是烧了高香,因为你不可以单纯只算你具体分配了多少,.NET 一个托管对象实例需要占用很多无用户内存的。

#pragma pack(push, 1)
            struct udp_hdr {
            public:
                unsigned short                  src;
                unsigned short                  dest;  
                unsigned short                  len;
                unsigned short                  chksum;

            public:
                static struct udp_hdr*          Parse(struct ip_hdr* iphdr, const void* packet, int size);
            };
#pragma pack(pop)

但 TCP/IP 类型的 Stream Socket 是不建议为分个线程都分配固态缓冲区的,理由为每个操作系统都为 TCP/IP 进行过单独的吞吐优化,如果一个线程上多个 TCP Socket 使用相同 RAM 地址缓冲区数据,那么这会导致严重的 TCP/IP Socket 网络IO的收发(Received[RX]、Sent[TX])吞吐性能严重降速,这不是框架问题,而是系统层实现问题,大量的SYN速度能快起来那就有问题了,强行解决办法则是抛弃系统层实现的易于应用层用户调用 TCP Socket,自主实现或引入一个 3rd TCP/IP 网络协议栈。

 那么既然已知道有这么个疑难问题,处理上就很简单的,本人基于 boost::asio 封装了一个可用的 C/C++/C# 库代码,看到本贴文内容的小伙伴,可自行通过其它方法自行实现或摘要本文提供的处理代码。

Usage:

        [MTAThread]
        private static void Main(string[] args)
        {
            IPAddress interfaceIP = IPAddress.Parse("192.168.0.24");
            AsyncScheduler scheduler = new AsyncScheduler(1);
            AsyncContext context = scheduler.GetContext();

            AsyncSocket socket = context.CreateSocket(new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp));
            socket.Socket.Bind(new IPEndPoint(interfaceIP, 0));

            byte[] buffer = context.Buffer;
            socket.SendTo(new byte[] { (byte)'1', (byte)'2', (byte)'3' }, 0, 3, new IPEndPoint(interfaceIP, 7000), null);
            socket.ReceiveFrom(buffer, 0, buffer.Length, (bytesTransferred, sourceEP) =>
            {
                string RX = Encoding.UTF8.GetString(buffer, 0, Math.Max(bytesTransferred, 0));
                Console.WriteLine($"bytesTransferred[{bytesTransferred}] RX[{RX}]");
            });
            Console.ReadKey(false);
        }

C/C++ Dlllibrary Implement: 

#include 
#include 
#include 
#ifdef _WIN32
#include 
#include 

#pragma comment(lib, "Ws2_32.lib")
#else
#include 
#include 
#include 
#include 
#endif

#include 
#include 
#include 
#include 

#ifndef LIBASIO_API
#ifdef __cplusplus 
#ifdef _WIN32
#define LIBASIO_API extern "C" __declspec(dllexport)
#else
#define LIBASIO_API extern "C" __attribute__((visibility("default")))
#endif
#else
#define LIBASIO_API
#endif
#endif

typedef struct {
    uint32_t            v4_or_v6_;
    union {
        struct {
            uint32_t    address_;
            uint32_t    port_;
        } in4_;
        struct {
            char        address_[16];
            uint32_t    port_;
        } in6_;
        char            data_[20];
    };
} libasio_endpoint;
typedef void(*libasio_post_callback)(void* context_, uint64_t key_);
typedef void(*libasio_sendto_callback)(void* socket_, uint64_t key_, int length_);
typedef void(*libasio_recvfrom_callback)(void* socket_, uint64_t key_, int length_, libasio_endpoint* remoteEP_);

typedef std::shared_ptr                      libasio_socket;
typedef std::shared_ptr                           libasio_context;
typedef std::unordered_map      libasio_context_hashtable;
typedef std::unordered_map  libasio_socket_hashtable;
typedef std::unordered_map libasio_so2ctx;

static libasio_so2ctx            _so2ctxs_;
static libasio_socket_hashtable  _sockets_;
static libasio_context_hashtable _contexts_;
static std::mutex                _syncobj_;

#define __lock__(obj) 
do {
    std::lock_guard scoped_(obj);
#define __unlock__ 
} while(0);

static libasio_context libasio_getcontext(boost::asio::io_context* context_) {
    libasio_context_hashtable::iterator tail = _contexts_.find(context_);
    libasio_context_hashtable::iterator endl = _contexts_.end();
    if (tail == endl) {
        return NULL;
    }
    return tail->second;
}

static libasio_context libasio_getcontext(boost::asio::ip::udp::socket* socket_) {
    libasio_so2ctx::iterator tail = _so2ctxs_.find(socket_);
    libasio_so2ctx::iterator endl = _so2ctxs_.end();
    if (tail == endl) {
        return NULL;
    }
    return tail->second;
}

static libasio_socket libasio_getsocket(boost::asio::ip::udp::socket* socket_) {
    libasio_socket_hashtable::iterator tail = _sockets_.find(socket_);
    libasio_socket_hashtable::iterator endl = _sockets_.end();
    if (tail == endl) {
        return NULL;
    }
    return tail->second;
}

LIBASIO_API
boost::asio::io_context* libasio_newcontext() {
    std::shared_ptr context_ = std::make_shared();
    std::thread([context_] {
        #ifdef _WIN32
        SetThreadPriority(GetCurrentProcess(), THREAD_PRIORITY_HIGHEST);
        #else
        
        struct sched_param param_;
        param_.sched_priority = sched_get_priority_max(SCHED_FIFO); // SCHED_RR
        pthread_setschedparam(pthread_self(), SCHED_FIFO, ¶m_);
        #endif

        boost::asio::io_context::work work_(*context_);
        boost::system::error_code ec_;
        context_->run(ec_);

        __lock__(_syncobj_){
            std::shared_ptr p = libasio_getcontext(context_.get());
            if (p) {
                _contexts_.erase(context_.get());
            }
        } __unlock__;
    }).detach();
    boost::asio::io_context* p = context_.get();
    __lock__(_syncobj_) {
        _contexts_[p] = std::move(context_);
    } __unlock__;
    return p;
}

LIBASIO_API
void libasio_closecontext(boost::asio::io_context* context_) {
    if (!context_) {
        return;
    }
    __lock__(_syncobj_) {
        std::shared_ptr p = libasio_getcontext(context_);
        if (!p) {
            return;
        }
        context_->stop();
        _contexts_.erase(context_);
    } __unlock__;
}

LIBASIO_API
bool libasio_postcontext(boost::asio::io_context* context_, uint64_t key_, libasio_post_callback callback_) {
    if (!context_ || !callback_) {
        return false;
    }
    __lock__(_syncobj_) {
        std::shared_ptr p = libasio_getcontext(context_);
        if (!p) {
            return false;
        }
        context_->post([context_, key_, callback_] {
            callback_(context_, key_);
        });
    } __unlock__;
    return true;
}

LIBASIO_API
boost::asio::ip::udp::socket* libasio_createsocket(boost::asio::io_context* context_, int sockfd_, bool v4_or_v6_) {
    if (!context_ || sockfd_ == -1) {
        return NULL;
    }
    __lock__(_syncobj_) {
        libasio_context context = libasio_getcontext(context_);
        if (!context) {
            return NULL;
        }
        libasio_socket socket_ = std::make_shared(*context_);
        if (v4_or_v6_) {
            socket_->assign(boost::asio::ip::udp::v4(), sockfd_);
        }
        else {
            socket_->assign(boost::asio::ip::udp::v6(), sockfd_);
        }
        boost::asio::ip::udp::socket* r_ = socket_.get();
        _sockets_[r_] = std::move(socket_);
        _so2ctxs_[r_] = std::move(context);
        return r_;
    } __unlock__;
}

LIBASIO_API
void libasio_closesocket(boost::asio::ip::udp::socket* socket_) {
    if (!socket_) {
        return;
    }
    __lock__(_syncobj_) {
        libasio_context context = libasio_getcontext(socket_);
        if (!context) {
            return;
        }
        libasio_socket socket = libasio_getsocket(socket_);
        boost::asio::post(*context, [context, socket] {
            if (socket->is_open()) {
                boost::system::error_code ec;
                try {
                    socket->cancel(ec);
                }
                catch (std::exception&) {}
                try {
                    socket->shutdown(boost::asio::ip::udp::socket::shutdown_send, ec);
                }
                catch (std::exception&) {}
                try {
                    socket->close(ec);
                }
                catch (std::exception&) {}
            }
        });
        _sockets_.erase(socket_);
        _so2ctxs_.erase(socket_);
    } __unlock__;
}

LIBASIO_API
bool libasio_recvfrom(boost::asio::ip::udp::socket* socket_, uint64_t key_, char* buf_, int size_, libasio_recvfrom_callback callback_) {
    if (!socket_ || !buf_ || size_ <= 0 || !callback_) {
        return false;
    }
    std::shared_ptr endpoint_ = std::make_shared();
    __lock__(_syncobj_) {
        libasio_context context_ = libasio_getcontext(socket_);
        if (!context_) {
            return false;
        }
        libasio_socket socket = libasio_getsocket(socket_);
        socket->async_receive_from(boost::asio::buffer(buf_, size_), *endpoint_,
            [context_, socket, key_, callback_, endpoint_](const boost::system::error_code& ec, uint32_t sz) {
                int length_ = -1;
                if (!ec) {
                    length_ = sz;
                }
                libasio_endpoint stack_;
                if (endpoint_->protocol() == boost::asio::ip::udp::v4()) {
                    stack_.v4_or_v6_ = 1;
                    stack_.in4_.address_ = htonl(endpoint_->address().to_v4().to_uint());
                    stack_.in4_.port_ = endpoint_->port();

                    callback_(socket.get(), key_, length_, &stack_);
                }
                else if (endpoint_->protocol() == boost::asio::ip::udp::v6()) {
                    stack_.v4_or_v6_ = 0;
                    stack_.in6_.port_ = endpoint_->port();

                    boost::asio::ip::address_v6::bytes_type addr_bytes_ = endpoint_->address().to_v6().to_bytes();
                    memcpy(stack_.in6_.address_, addr_bytes_.data(), addr_bytes_.size());

                    callback_(socket.get(), key_, length_, &stack_);
                }
                else {
                    callback_(socket.get(), key_, length_, NULL);
                }
            });
    } __unlock__;
    return true;
}

LIBASIO_API
bool libasio_sendto(boost::asio::ip::udp::socket* socket_, uint64_t key_, char* buf_, int size_, libasio_endpoint* endpoint_, libasio_sendto_callback callback_) {
    if (!socket_ || !buf_ || size_ <= 0 || !endpoint_) {
        return false;
    }
    boost::asio::ip::udp::endpoint sendtoEP_;
    if (endpoint_->v4_or_v6_) {
        sendtoEP_ = boost::asio::ip::udp::endpoint(boost::asio::ip::address_v4(ntohl(endpoint_->in4_.address_)), endpoint_->in4_.port_);
    }
    else {
        boost::asio::ip::address_v6::bytes_type addr_bytes_;
        memcpy(addr_bytes_.data(), endpoint_->in6_.address_, addr_bytes_.size());
        sendtoEP_ = boost::asio::ip::udp::endpoint(boost::asio::ip::address_v6(addr_bytes_), endpoint_->in6_.port_);
    }
    __lock__(_syncobj_) {
        libasio_context context_ = libasio_getcontext(socket_);
        if (!context_) {
            return false;
        }
        libasio_socket socket = libasio_getsocket(socket_);
        socket->async_send_to(boost::asio::buffer(buf_, size_), sendtoEP_,
            [context_, socket, key_, callback_](const boost::system::error_code& ec, uint32_t sz) {
                if (callback_) {
                    int length_ = -1;
                    if (!ec) {
                        length_ = sz;
                    }
                    callback_(socket.get(), key_, length_);
                }
            });
    } __unlock__;
    return true;
}

C# AsyncContext.cs

namespace Ppp.Net.Auxiliary
{
    using System;
    using System.Security;
    using System.Collections.Concurrent;
    using System.Diagnostics;
    using System.Net.Sockets;
#if NETCOREAPP
    using System.Runtime.CompilerServices;
#endif
    using System.Runtime.InteropServices;
    using System.Threading;

    public unsafe sealed class AsyncContext : IDisposable
    {
        [Dllimport("libasio", EntryPoint = "libasio_newcontext", CallingConvention = CallingConvention.Cdecl)]
        [return: MarshalAs(UnmanagedType.SysInt)]
        private static extern IntPtr libasio_new_context();

        [Dllimport("libasio", EntryPoint = "libasio_closecontext", CallingConvention = CallingConvention.Cdecl)]
        private static extern void libasio_closecontext([MarshalAs(UnmanagedType.SysInt)] IntPtr context_);

        [Dllimport("libasio", EntryPoint = "libasio_postcontext", CallingConvention = CallingConvention.Cdecl)]
        [return: MarshalAs(UnmanagedType.Bool)]
        private static extern bool libasio_postcontext([MarshalAs(UnmanagedType.SysInt)] IntPtr context_, long key_, [MarshalAs(UnmanagedType.FunctionPtr)] libasio_post_callback callback_);

        [UnmanagedFunctionPointer(CallingConvention.Cdecl)]
        private delegate void libasio_post_callback([MarshalAs(UnmanagedType.SysInt)] IntPtr context_, long key_);

        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private IntPtr _handle = IntPtr.Zero;
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private bool _disposed = false;
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private long _mapkey = 0;
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private readonly object _synobj = new object();
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private readonly byte[] _buffer = new byte[ushort.MaxValue];
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private GCHandle _buffer_gc = default(GCHandle);

        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private static readonly AsyncScheduler _scheduler = new AsyncScheduler(Environment.ProcessorCount);
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private static readonly libasio_post_callback _callback = (context_, key_) =>
        {
            _callbacks.TryGetValue(context_, out ConcurrentDictionary callbacks);
            if (callbacks == null)
            {
                return;
            }
            callbacks.TryRemove(key_, out IOCompletionCallback callback_);
            if (callback_ == null)
            {
                return;
            }
            callback_.callback_(callback_.state_);
        };
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private static readonly ConcurrentDictionary> _callbacks =
            new ConcurrentDictionary>();

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        [SecurityCritical]
        [SecuritySafeCritical]
        static AsyncContext() => GCHandle.Alloc(_callback);

        private sealed class IOCompletionCallback
        {
            public object state_;
            public Action callback_;
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        private ConcurrentDictionary GetAllCallback()
        {
            lock (this._synobj)
            {
                if (this._disposed)
                {
                    return null;
                }
                lock (_callbacks)
                {
                    _callbacks.TryGetValue(this._handle, out ConcurrentDictionary d);
                    if (d == null)
                    {
                        d = new ConcurrentDictionary();
                        if (!_callbacks.TryAdd(this._handle, d))
                        {
                            d = null;
                        }
                    }
                    return d;
                }
            }
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        private long BindCallback(Action callback, object state)
        {
            ConcurrentDictionary callbacks = this.GetAllCallback();
            if (callbacks == null)
            {
                return 0;
            }
            IOCompletionCallback cb = new IOCompletionCallback()
            {
                callback_ = callback,
                state_ = state,
            };
            long key_ = 0;
            do
            {
                while (key_ == 0)
                {
                    key_ = Interlocked.Increment(ref this._mapkey);
                }
            } while (!callbacks.TryAdd(key_, cb));
            return key_;
        }

        public static AsyncScheduler Scheduler
        {
#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            get => AsyncContext._scheduler;
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public AsyncContext()
        {
            this._handle = libasio_new_context();
            this._buffer_gc = GCHandle.Alloc(this._buffer, GCHandleType.Pinned);
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        ~AsyncContext() => this.Dispose();

        public IntPtr Handle
        {
#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            get => Interlocked.CompareExchange(ref this._handle, IntPtr.Zero, IntPtr.Zero);
        }

        public byte[] Buffer
        {
#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            get => this._buffer;
        }

        public object Tag
        {
#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            get;
#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            set;
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public AsyncSocket CreateSocket(Socket socket)
        {
            if (socket == null)
            {
                throw new ArgumentNullException(nameof(socket));
            }
            return new AsyncSocket(this, socket);
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public bool Post(Action callback, object state)
        {
            if (callback == null)
            {
                return false;
            }
            lock (this._synobj)
            {
                if (this._disposed)
                {
                    return false;
                }
                long key_ = this.BindCallback(callback, state);
                if (key_ == 0)
                {
                    return false;
                }
                return libasio_postcontext(this._handle, key_, _callback);
            }
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public void Dispose()
        {
            lock (this._synobj)
            {
                if (!this._disposed)
                {
                    this.Post((state) =>
                    {
                        var gc = __makeref(this._buffer_gc);
                        if (__refvalue(gc, GCHandle).IsAllocated)
                        {
                            __refvalue(gc, GCHandle).Free();
                        }
                        libasio_closecontext(this._handle);
                        _callbacks.TryRemove(this._handle, out ConcurrentDictionary _);
                    }, default(object));
                    this._disposed = true;
                }
            }
            GC.SuppressFinalize(this);
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public static AsyncContext GetContext() => AsyncContext._scheduler.GetContext();
    }
}
 

C# AsyncScheduler.cs

namespace Ppp.Net.Auxiliary
{
    using System;
    using System.Collections.Generic;
    using System.Diagnostics;
#if NETCOREAPP
    using System.Runtime.CompilerServices;
#endif

    public sealed class AsyncScheduler : IDisposable
    {
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private readonly linkedList _contexts = new linkedList();
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private readonly object _syncobj = new object();

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public AsyncScheduler(int concurrent)
        {
            if (concurrent < 1)
            {
                concurrent = 1;
            }
            for (int i = 0; i < concurrent; i++)
            {
                AsyncContext context = new AsyncContext();
                this._contexts.AddLast(context);
            }
        }

        public int Concurrent
        {
#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            get => _contexts.Count;
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public void Dispose()
        {
            lock (this._syncobj)
            {
                linkedListNode node = this._contexts.First;
                while (node != null)
                {
                    node.Value.Dispose();
                    node = node.Next;
                }
                this._contexts.Clear();
            }
            GC.SuppressFinalize(this);
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public AsyncContext GetContext()
        {
            lock (this._syncobj)
            {
                linkedListNode node = this._contexts.First;
                if (node == null)
                {
                    return null;
                }
                this._contexts.RemoveFirst();
                this._contexts.AddLast(node);
                return node.Value;
            }
        }
    }
}

C# AsyncSocket.cs 

namespace Ppp.Net.Auxiliary
{
    using System;
    using System.Collections.Concurrent;
    using System.Diagnostics;
    using System.Net;
    using System.Net.Sockets;
#if NETCOREAPP
    using System.Runtime.CompilerServices;
#endif
    using System.Runtime.InteropServices;
    using System.Security;
    using System.Threading;

    public unsafe sealed class AsyncSocket : IDisposable
    {
        [Dllimport("libasio", EntryPoint = "libasio_createsocket", CallingConvention = CallingConvention.Cdecl)]
        [return: MarshalAs(UnmanagedType.SysInt)]
        private static extern IntPtr libasio_createsocket([MarshalAs(UnmanagedType.SysInt)] IntPtr context_, int sockfd_, bool v4_or_v6_);

        [Dllimport("libasio", EntryPoint = "libasio_closesocket", CallingConvention = CallingConvention.Cdecl)]
        private static extern void libasio_closesocket([MarshalAs(UnmanagedType.SysInt)] IntPtr socket_);

        [Dllimport("libasio", EntryPoint = "libasio_sendto", CallingConvention = CallingConvention.Cdecl)]
        [return: MarshalAs(UnmanagedType.Bool)]
        private static extern bool libasio_sendto([MarshalAs(UnmanagedType.SysInt)] IntPtr socket_, long key_, void* buf_, int size_,
            libasio_endpoint* endpoint_, [MarshalAs(UnmanagedType.FunctionPtr)] libasio_sendto_callback callback_);

        [Dllimport("libasio", EntryPoint = "libasio_recvfrom", CallingConvention = CallingConvention.Cdecl)]
        private static extern bool libasio_recvfrom([MarshalAs(UnmanagedType.SysInt)] IntPtr socket_, long key_, void* buf_, int size_,
            [MarshalAs(UnmanagedType.FunctionPtr)] libasio_recvfrom_callback callback_);

        [UnmanagedFunctionPointer(CallingConvention.Cdecl)]
        private delegate void libasio_sendto_callback([MarshalAs(UnmanagedType.SysInt)] IntPtr socket_, long key_, int length_);

        [UnmanagedFunctionPointer(CallingConvention.Cdecl)]
        private delegate void libasio_recvfrom_callback([MarshalAs(UnmanagedType.SysInt)] IntPtr socket_, long key_, int length_, libasio_endpoint* remoteEP_);

        [StructLayout(LayoutKind.Explicit)]
        private struct libasio_endpoint
        {
            [StructLayout(LayoutKind.Sequential)]
            public struct in4
            {
                public uint address_;
                public int port_;
            }
            [StructLayout(LayoutKind.Sequential)]
            public struct in6
            {
                public long address_1_;
                public long address_2_;
                public int port_;
            }

            [FieldOffset(0)]
            public uint v4_or_v6_;

            [FieldOffset(4)]
            public in4 in4_;

            [FieldOffset(4)]
            public in6 in6_;

            [FieldOffset(0)]
            public long data_1_;
            [FieldOffset(8)]
            public long data_2_;
            [FieldOffset(16)]
            public long data_3_;
        };

        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private readonly object _synobj = new object();
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private readonly Socket _socket = null;
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private readonly AsyncContext _context = null;
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private bool _disposed = false;
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private IntPtr _handle = IntPtr.Zero;
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private long _mapkey = 0;
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private static readonly libasio_recvfrom_callback _recvfrom_callback = (socket_, key_, length_, remoteEP_) =>
        {
            _recvfrom_callbacks.TryGetValue(socket_, out ConcurrentDictionary> callbacks);
            if (callbacks == null)
            {
                return;
            }
            callbacks.TryRemove(key_, out Action callback_);
            if (callback_ == null)
            {
                return;
            }
            IPEndPoint remoteEP = null;
            if (remoteEP_ != null)
            {
                if (remoteEP_->v4_or_v6_ != 0)
                {
                    remoteEP = new IPEndPoint(new IPAddress(remoteEP_->in4_.address_), remoteEP_->in4_.port_);
                }
                else
                {
                    byte[] address_bytes = new byte[16];
                    fixed (byte* paddr_bytes = address_bytes)
                    {
                        long* paddr_i64 = (long*)paddr_bytes;
                        paddr_i64[0] = remoteEP_->in6_.address_1_;
                        paddr_i64[1] = remoteEP_->in6_.address_2_;
                    }
                    remoteEP = new IPEndPoint(new IPAddress(address_bytes), remoteEP_->in4_.port_);
                }
            }
            callback_(length_, remoteEP);
        };
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private static readonly ConcurrentDictionary>> _recvfrom_callbacks =
            new ConcurrentDictionary>>();
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private static readonly libasio_sendto_callback _sendto_callback = (socket_, key_, length_) =>
        {
            _sendto_callbacks.TryGetValue(socket_, out ConcurrentDictionary> callbacks);
            if (callbacks == null)
            {
                return;
            }
            callbacks.TryRemove(key_, out Action callback_);
            if (callback_ == null)
            {
                return;
            }
            callback_(length_);
        };
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private static readonly ConcurrentDictionary>> _sendto_callbacks =
            new ConcurrentDictionary>>();

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        [SecurityCritical]
        [SecuritySafeCritical]
        static AsyncSocket()
        {
            GCHandle.Alloc(_sendto_callback);
            GCHandle.Alloc(_recvfrom_callback);
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        private ConcurrentDictionary> GetAllReceiveFromCallback()
        {
            lock (this._synobj)
            {
                if (this._disposed)
                {
                    return null;
                }
                lock (_recvfrom_callbacks)
                {
                    _recvfrom_callbacks.TryGetValue(this._handle, out ConcurrentDictionary> d);
                    if (d == null)
                    {
                        d = new ConcurrentDictionary>();
                        if (!_recvfrom_callbacks.TryAdd(this._handle, d))
                        {
                            d = null;
                        }
                    }
                    return d;
                }
            }
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        private long BindReceiveFromCallback(Action callback)
        {
            ConcurrentDictionary> callbacks = this.GetAllReceiveFromCallback();
            if (callbacks == null)
            {
                return 0;
            }
            long key_ = 0;
            do
            {
                while (key_ == 0)
                {
                    key_ = Interlocked.Increment(ref this._mapkey);
                }
            } while (!callbacks.TryAdd(key_, callback));
            return key_;
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        private ConcurrentDictionary> GetAllSendToCallback()
        {
            lock (this._synobj)
            {
                if (this._disposed)
                {
                    return null;
                }
                lock (_sendto_callbacks)
                {
                    _sendto_callbacks.TryGetValue(this._handle, out ConcurrentDictionary> d);
                    if (d == null)
                    {
                        d = new ConcurrentDictionary>();
                        if (!_sendto_callbacks.TryAdd(this._handle, d))
                        {
                            d = null;
                        }
                    }
                    return d;
                }
            }
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        private long BindSendToCallback(Action callback)
        {
            ConcurrentDictionary> callbacks = this.GetAllSendToCallback();
            if (callbacks == null)
            {
                return 0;
            }
            long key_ = 0;
            do
            {
                while (key_ == 0)
                {
                    key_ = Interlocked.Increment(ref this._mapkey);
                }
            } while (!callbacks.TryAdd(key_, callback));
            return key_;
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        internal AsyncSocket(AsyncContext context, Socket socket)
        {
            this._handle = libasio_createsocket(context.Handle, socket.Handle.ToInt32(), socket.AddressFamily == AddressFamily.InterNetwork);
            this._socket = socket;
            this._context = context;
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        ~AsyncSocket() => this.Dispose();

        public IntPtr Handle
        {
#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            get => Interlocked.CompareExchange(ref this._handle, IntPtr.Zero, IntPtr.Zero);
        }

        public Socket Socket
        {
#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            get => this._socket;
        }

        public AsyncContext Context
        {
#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            get => this._context;
        }

        public object Tag
        {
#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            get;
#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            set;
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public bool SendTo(byte[] buffer, int offset, int length, EndPoint destinationEP, Action callback)
        {
            if (buffer == null ||
                offset < 0 ||
                length <= 0 ||
                (offset + length) > buffer.Length ||
                SocketExtension.CleanedUp(this._socket))
            {
                return false;
            }
            libasio_endpoint* localEP = stackalloc libasio_endpoint[1];
            if (destinationEP.AddressFamily == AddressFamily.InterNetwork)
            {
                IPEndPoint ipep = (IPEndPoint)destinationEP;
                localEP->v4_or_v6_ = 1;
                localEP->in4_.port_ = ipep.Port;
                fixed (byte* pb = ipep.Address.GetAddressBytes())
                {
                    localEP->in4_.address_ = *(uint*)pb;
                }
            }
            else
            {
                IPEndPoint ipep = (IPEndPoint)destinationEP;
                localEP->v4_or_v6_ = 0;
                localEP->in6_.port_ = ipep.Port;
                fixed (byte* pb = ipep.Address.GetAddressBytes())
                {
                    long* pl = (long*)pb;
                    localEP->in6_.address_1_ = pl[0];
                    localEP->in6_.address_2_ = pl[1];
                }
            }
            fixed (byte* p = buffer)
            {
                lock (this._synobj)
                {
                    if (this._disposed)
                    {
                        return false;
                    }
                    long key_ = this.BindSendToCallback(callback);
                    if (key_ == 0)
                    {
                        return false;
                    }
                    return libasio_sendto(this.Handle, key_, p + offset, length, localEP, _sendto_callback);
                }
            }
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public bool ReceiveFrom(byte[] buffer, int offset, int length, Action callback)
        {
            if (buffer == null ||
                callback == null ||
                offset < 0 ||
                length <= 0 ||
                (offset + length) > buffer.Length ||
                SocketExtension.CleanedUp(this._socket))
            {
                return false;
            }
            fixed (byte* p = buffer)
            {
                lock (this._synobj)
                {
                    if (this._disposed)
                    {
                        return false;
                    }
                    long key_ = this.BindReceiveFromCallback(callback);
                    if (key_ == 0)
                    {
                        return false;
                    }
                    return libasio_recvfrom(this.Handle, key_, p + offset, length, _recvfrom_callback);
                }
            }
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public void Close() => this.Dispose();

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public void Dispose()
        {
            lock (this._synobj)
            {
                if (!this._disposed)
                {
                    this._disposed = true;
                    lock (this._synobj)
                    {
                        SocketExtension.Closesocket(this._socket);
                        libasio_closesocket(this._handle);
                    }
                }
            }
            _sendto_callbacks.TryRemove(this._handle, out ConcurrentDictionary> _);
            _recvfrom_callbacks.TryRemove(this._handle, out ConcurrentDictionary> __);
            GC.SuppressFinalize(this);
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public void Post(Action callback, object state) => this._context.Post(callback, state);
    }
}
 

C# SocketExtension.cs

        public static Func CleanedUp
        {
#if NETCOREAPP
            [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            get;
#if NETCOREAPP
            [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            private set;
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        [SecurityCritical]
        [SecuritySafeCritical]
        static SocketExtension()
        {
            SocketExtension.CleanedUp = SocketExtension.CompileCleanedUp();
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        [SecurityCritical]
        [SecuritySafeCritical]
        private static Func CompileCleanedUp()
        {
            try
            {
                PropertyInfo piCleanedUp = typeof(Socket).GetProperty("CleanedUp", BindingFlags.NonPublic | BindingFlags.Instance);
                Parameterexpression s = expression.Parameter(typeof(Socket), "s");
                expression> e = expression.Lambda>(expression.Property(s, piCleanedUp), s);
                Func fCleanedUp = e.Compile();
                return (socket) =>
                {
                    if (socket == null)
                    {
                        return true;
                    }
                    if (socket is NetworkSocket NS)
                    {
                        return NS.CleanedUp;
                    }
                    return fCleanedUp(socket);
                };
            }
            catch (Exception)
            {
                return (socket) =>
                {
                    if (socket == null)
                    {
                        return true;
                    }
                    if (socket is NetworkSocket NS)
                    {
                        return NS.CleanedUp;
                    }
                    return false;
                };
            }
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public static void Closesocket(Socket socket)
        {
            bool cleanup = SocketExtension.CleanedUp(socket);
            if (cleanup)
            {
                return;
            }
            lock (socket) // SocketExtension.Shutdown(socket);
            {
                try
                {
                    socket.Shutdown(SocketShutdown.Send);
                }
                catch (Exception) { }
                try
                {
                    socket.Dispose();
                }
                catch (Exception) { }
            }
        }

 

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号