栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

upstream demo

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

upstream demo

本文是nginx访问第三方服务之upstream使用_上善若水-CSDN博客 实例的详细讲解

当需要访问第三方服务时,Nginx提供了2种异步方式来与第三方服务器通信:upstream与subrequest.
本文将介绍upstream的使用方式,Nginx的HTTP反向代理模块就是基于upstream方式实现,当我们希望把第三方服务的内容几乎原封不懂地返回给用户时,一般使用upstream方式,它可以非常高效地透传HTTP。

Nginx访问上游服务器的流程大致分以下几个阶段:启动upstream、连接上游服务器、向上游发送请求、接收上游响应(包头/包体)、结束请求。

一 upstream的使用方式

upstream的使用方式并不复杂,它提供了8个回调方法,用户需要实现的其中的几个回调方法,
本例中实现了3个回调方法:

struct ngx_http_upstream_s{
//...
    ngx_int_t      (*create_request)(ngx_http_request_t *r);//构造发往上游服务器的请求
//收到上游服务器的TCP流时回调,直到不返回NGX_AGAIN,处理http响应
    ngx_int_t     (*process_header)(ngx_http_request_t *r);
    void       (*finalize_request)(ngx_http_request_t *r,ngx_int_t rc);//销毁upstream时回调
//...
};

0 那么upstream是如何嵌入到一个请求中的?
模块在处理任何一个请求时都有ngx_http_request_t结构对象r,该结构中有一个ngx_http_upstream_t类型的成员upstream(模块对象创建)

struct ngx_http_request_s
{
    //...
    ngx_http_upstream_t    *upstream;
    //...
}

如果没有使用upstream机制,则需要将相应的成员置为NULL,否则需要对r->upstream进行设置.

1 HTTP模块启用upstream机制的步骤如下:

  • 调用ngx_http_upstream_create方法为请求创建upstream
  • 设置第三方服务器地址
  • 设置upstream的回调方法
  • 调用ngx_http_upstream_init方法启动upstream

2 HTTP如何运行upstream框架

使用upstream模块提供的ngx_http_upstream_init后,HTTP如何运行upstream框架,大致流程如下

  • 创建发往上游服务器的请求
  • 与上游服务器建立无阻塞TCP连接
  • 发送请求到第三方服务
  • 接收第三方服务响应并处理包头和包体
  • 回调finalize_request销毁请求

整个处理过程中会调用 upstream 的回调方法进行相应的处理,其中最常用的回调方法为 create_request、process_header、finalize_request;

注,upstream 模块处理上游包体的方式

1.当请求结构体ngx_http_request_t中的成员subrequest_in_memory标志位为1时,upstream不转发响应包体到下游,并由HTTP模块实现的input_filter()方法处理包体;
2.当请求结构体ngx_http_request_t中的成员subrequest_in_memory标志位为0时,且ngx_http_upstream_conf_t配置结构体中的成员buffering标志位为1时,upstream将开启更多的内存和磁盘文件用于缓存上游的响应包体(此时,上游网速更快),并转发响应包体;
3.当请求结构体ngx_http_request_t中的成员subrequest_in_memory标志位为0时,且ngx_http_upstream_conf_t配置结构体中的成员buffering标志位为0时,upstream将使用固定大小的缓冲区来转发响应包体;

二.upstream的配置

0 主要结构体说明

  • ngx_http_upstream_t
struct ngx_http_upstream_s {
    //...
    ngx_chain_t            *request_bufs;//发什么样的请求给上游服务器,create_request中实现
    ngx_http_upstream_resolved_t    *resolved;//可直接指定上游服务器的地址
    ngx_buf_t                        buffer;//buffer存储接收上游服务器的响应内容。复用
    //向客户端转发上游服务器的包体时才有用。
    //1-多个缓冲区以及磁盘文件转发(Nginx与上游间的网速远大于Nginx与下游客户端之间的网速)
    //0-只使用该结构体中的buffer缓冲区向下游转发响应包体
    unsigned                         buffering:1;
    //...
};

关于buffering参数,upstream提供了3种处理上游服务器包体的方式:

1.交由HTTP模块使用input filter回调方法直接处理包体。
2.以固定缓冲区(buffer成员)转发包体(本文使用的方式 buffering设置为0)
3.以多个缓冲加磁盘文件的方式转发包体等(buffering设置为1)。

  • ngx_http_upstream_conf_t

该结构是ngx_http_upstream_t中的成员之一,通过nginx.conf和设置commands使用预设的方法可以对该结构体赋值。

typedef struct{
//...
    ngx_msec_t                       connect_timeout;//连接上游服务器的超时时间
    ngx_msec_t                       send_timeout;//发送tcp包到上游服务器的超时时间
    ngx_msec_t                       read_timeout;//接收Tcp包的超时时间
//...
}ngx_http_upstream_conf_t;
  • 设置需要访问的第三方服务地址resolved成员

这部分就是熟悉套接字编程了

typedef struct{
    //...
    ngx_uint_t naddrs;
    struct sockaddr *sockaddr;//上游服务器地址
    socklen_t socklen;
    //...
}ngx_http_upstream_resolved_t;

1 设置回调方法
ngx_http_upstream_t结构体中有8个回调方法,不过必须实现的有3个回调方法:

create_request:创建请求
process_header:处理响应
finalize_request:销毁请求

三 upstream的使用示例

书中给出google的URL搜索示例,由于墙的原因,我们使用baidu,其URL搜索方法是www.baidu.com/s?wd=haha在nginx.conf配置location,例如:

location /test{
    mytest;
}

如果访问URL是/test?lumia则可以通过upstream机制向www.baidu.com发送搜索请求。

完整代码

源码来源自深入理解Nginx,注释包含自己的理解。

0 定义结构体和声明函数

//ngx_http_mytest_module.c

#include 
#include 
#include 

typedef struct
{
    ngx_http_status_t status;
    ngx_str_t backendServer;
}ngx_http_mytest_ctx_t;

typedef struct
{
    ngx_http_upstream_conf_t upstream;
} ngx_http_mytest_conf_t;

static char *
ngx_http_mytest(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);


static ngx_int_t ngx_http_mytest_handler(ngx_http_request_t *r);

static void* ngx_http_mytest_create_loc_conf(ngx_conf_t *cf);

static char *ngx_http_mytest_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child);

static ngx_int_t
mytest_upstream_process_header(ngx_http_request_t *r);

static ngx_int_t
mytest_process_status_line(ngx_http_request_t *r);

static ngx_str_t  ngx_http_proxy_hide_headers[] =
{
    ngx_string("Date"),
    ngx_string("Server"),
    ngx_string("X-Pad"),
    ngx_string("X-Accel-Expires"),
    ngx_string("X-Accel-Redirect"),
    ngx_string("X-Accel-Limit-Rate"),
    ngx_string("X-Accel-Buffering"),
    ngx_string("X-Accel-Charset"),
    ngx_null_string
};

1 创建模块

1.1 配置项,上下文,指令

static ngx_command_t  ngx_http_mytest_commands[] =
{
    {
        ngx_string("mytest"),
        NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_HTTP_LMT_CONF | NGX_CONF_NOARGS,
        ngx_http_mytest,
        NGX_HTTP_LOC_CONF_OFFSET,
        0,
        NULL
    },
    ngx_null_command
};

static ngx_http_module_t  ngx_http_mytest_module_ctx =
{
    NULL, 
    NULL, 
    NULL, 
    NULL, 
    NULL, 
    NULL, 
    ngx_http_mytest_create_loc_conf,
    ngx_http_mytest_merge_loc_conf  
};

ngx_module_t  ngx_http_mytest_module =
{
    NGX_MODULE_V1,
    &ngx_http_mytest_module_ctx,           
    ngx_http_mytest_commands,              
    NGX_HTTP_MODULE,                       
    NULL,                                  
    NULL,                                  
    NULL,                                  
    NULL,                                  
    NULL,                                  
    NULL,                                  
    NULL,                                  
    NGX_MODULE_V1_PADDING
};

1.2 执行指令

static char *
ngx_http_mytest(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
    ngx_http_core_loc_conf_t  *clcf;
    
    clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
    
    
    clcf->handler = ngx_http_mytest_handler;

    return NGX_CONF_OK;
}

 1.3 实现自定义handler

static ngx_int_t
ngx_http_mytest_handler(ngx_http_request_t *r)
{
    
    ngx_http_mytest_ctx_t* myctx = ngx_http_get_module_ctx(r, ngx_http_mytest_module);
    if (myctx == NULL)
    {
        myctx = ngx_palloc(r->pool, sizeof(ngx_http_mytest_ctx_t));
        
        ngx_http_set_ctx(r, myctx, ngx_http_mytest_module);
    }
    
    
    //1 创建upstream 模块对象
    ngx_http_upstream_create(r);
    
    ngx_http_mytest_conf_t  *mycf = (ngx_http_mytest_conf_t*)             
    ngx_http_get_module_loc_conf(r, ngx_http_mytest_module);
    
    ngx_http_upstream_t *u = r->upstream;
    
    
    u->conf = &mycf->upstream;
    
    u->buffering = mycf->upstream.buffering;

    
    static struct sockaddr_in backendSockAddr;
    struct hostent *pHost = gethostbyname((char*) "www.baidu.com");
    
    backendSockAddr.sin_family = AF_INET;
    backendSockAddr.sin_port = htons((in_port_t) 80);

    memcpy(&backendSockAddr.sin_addr,pHost->h_addr_list[0],sizeof(struct in_addr));
    char* pDmsIP = inet_ntoa(*(struct in_addr*) (pHost->h_addr_list[0]));
    
    //backendSockAddr.sin_addr.s_addr = inet_addr(pDmsIP);
    myctx->backendServer.data = (u_char*)pDmsIP;
    myctx->backendServer.len = strlen(pDmsIP);

    
    u->resolved->sockaddr = (struct sockaddr *)&backendSockAddr;
    u->resolved->socklen = sizeof(struct sockaddr_in);
    u->resolved->naddrs = 1;

    
    u->create_request = mytest_upstream_create_request;
    u->process_header = mytest_process_status_line;
    u->finalize_request = mytest_upstream_finalize_request;

    
    r->main->count++;
    
    ngx_http_upstream_init(r);
    
    return NGX_DONE;
}
  •     创建upstream 模块对象 ngx_http_upstream_create(r);   
    对每一个要使用upstream的请求,必须调用且只能调用1次   
  •     创建之后,配置upstream
     
    ngx_http_upstream_t *u = r->upstream;
    u->conf = &mycf->upstream 
    .....
  •     决定转发包体时使用的缓冲区
u->buffering = mycf->upstream.buffering;
  •    设置上游服务器
 u->resolved = (ngx_http_upstream_resolved_t*) ngx_pcalloc(r->pool, sizeof(ngx_http_upstream_resolved_t));
    
    static struct sockaddr_in backendSockAddr;
    struct hostent *pHost = gethostbyname((char*) "www.baidu.com");
    
    backendSockAddr.sin_family = AF_INET;
    backendSockAddr.sin_port = htons((in_port_t) 80);
    .......
  •     设置三个必须实现的回调方法:创建请求、处理头部、请求销毁
    u->create_request = mytest_upstream_create_request;
    u->process_header = mytest_process_status_line;
    u->finalize_request = mytest_upstream_finalize_request;
  •      启动upstream
 ngx_http_upstream_init(r);

2  create config & merge config

static void*
ngx_http_mytest_create_loc_conf(ngx_conf_t *cf)
{
    ngx_http_mytest_conf_t *mycf;
    mycf = (ngx_http_mytest_conf_t*)ngx_pcalloc(cf->pool,sizeof(ngx_http_mytest_conf_t));
    
    mycf->upstream.connect_timeout = 60000;
    mycf->upstream.send_timeout = 60000;
    mycf->upstream.read_timeout = 60000;
    mycf->upstream.store_access = 0600;
    
    mycf->upstream.buffering = 0;
    mycf->upstream.bufs.num = 8;
    mycf->upstream.bufs.size = ngx_pagesize;
    mycf->upstream.buffer_size = ngx_pagesize;
    mycf->upstream.busy_buffers_size = 2 * ngx_pagesize;
    mycf->upstream.temp_file_write_size = 2 * ngx_pagesize;
    mycf->upstream.max_temp_file_size = 1024 * 1024 * 1024;
    
    mycf->upstream.hide_headers = NGX_CONF_UNSET_PTR;
    mycf->upstream.pass_headers = NGX_CONF_UNSET_PTR;

    return mycf;
}
static char *
ngx_http_mytest_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
{
    ngx_http_mytest_conf_t *prev = (ngx_http_mytest_conf_t *)parent;
    ngx_http_mytest_conf_t *conf = (ngx_http_mytest_conf_t *)child;

    ngx_hash_init_t hash;
    hash.max_size = 100;
    hash.bucket_size = 1024;
    hash.name = "proxy_headers_hash";
    
    ngx_http_upstream_hide_headers_hash(cf, &conf->upstream,
     &prev->upstream, ngx_http_proxy_hide_headers, &hash)

    return NGX_CONF_OK;
}

3  创建发给上游服务器的请求 create_request

在ngx_http_mytest_handler 中设置,在ngx_http_upstream_init中调用.
create_request 是必须实现的业务逻辑函数

static ngx_int_t
mytest_upstream_create_request(ngx_http_request_t *r)
{
    //=====1 构造请求string=====
    
    static ngx_str_t backendQueryLine =
        ngx_string("GET /s?wd=%V HTTP/1.1rnHost: www.baidu.comrnConnection: closernrn");
    
    ngx_int_t queryLineLen = backendQueryLine.len + r->args.len - 2;
    
    ngx_buf_t* b = ngx_create_temp_buf(r->pool, queryLineLen);
    
    b->last = b->pos + queryLineLen;
    
    ngx_snprintf(b->pos, queryLineLen ,(char*)backendQueryLine.data, &r->args);
    
    r->upstream->request_bufs = ngx_alloc_chain_link(r->pool);
    
    r->upstream->request_bufs->buf = b;
    r->upstream->request_bufs->next = NULL;
    r->upstream->request_sent = 0;
    r->upstream->header_sent = 0;
    
    r->header_hash = 1;

    return NGX_OK;
}

 4 process header

框架调用.此函数表示已经从第三方获取了respond.
mytest_process_status_line is process_header.

  •  解析http响应行
     rc = ngx_http_parse_status_line(r, &u->buffer, &ctx->status);
  •  赋值操作
  • 之后收到的新字符流将有新的回调函数解析
    u->process_header = mytest_upstream_process_header;
  • 如果本次收到的字符流除了http响应行外,还有多余的字符,
    将由mytest_upstream_process_header方法解析
    mytest_upstream_process_header(r);
static ngx_int_t
mytest_process_status_line(ngx_http_request_t *r)
{
    size_t len;
    ngx_int_t rc;
    ngx_http_upstream_t *u;

    
    ngx_http_mytest_ctx_t* ctx = ngx_http_get_module_ctx(r, ngx_http_mytest_module);

    u = r->upstream;
    
    rc = ngx_http_parse_status_line(r, &u->buffer, &ctx->status);
    
    if (rc == NGX_AGAIN)
    {
        return rc;
    }
    
    if (u->state)
    {
        u->state->status = ctx->status.code;
    }
    
    u->headers_in.status_n = ctx->status.code;
    len = ctx->status.end - ctx->status.start;
    u->headers_in.status_line.len = len;
    u->headers_in.status_line.data = ngx_pnalloc(r->pool, len);
    ngx_memcpy(u->headers_in.status_line.data, ctx->status.start, len); 

    
    
    u->process_header = mytest_upstream_process_header;

    
    
    return mytest_upstream_process_header(r);
}

mytest_upstream_process_header

static ngx_int_t
mytest_upstream_process_header(ngx_http_request_t *r)
{
    ngx_int_t rc;

    
    
    
    ngx_table_elt_t *h;
    ngx_http_upstream_header_t     *hh;
    ngx_http_upstream_main_conf_t  *umcf;

    
    
    umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module);

    
    for ( ;; )
    {
    
        rc = ngx_http_parse_header_line(r, &r->upstream->buffer, 1);

        
        if (rc == NGX_OK)
        {
            
        
            h = ngx_list_push(&r->upstream->headers_in.headers);
            
            h->hash = r->header_hash;
        
            h->key.len = r->header_name_end - r->header_name_start;
            h->value.len = r->header_end - r->header_start;
        
        //1.key
        //2.value
        //3.lowcase_case
        
        
            h->key.data = ngx_pnalloc(r->pool,
                                      h->key.len + 1 + h->value.len + 1 + h->key.len);
        
            h->value.data = h->key.data + h->key.len + 1;
            h->lowcase_key = h->key.data + h->key.len + 1 + h->value.len + 1;

            ngx_memcpy(h->key.data, r->header_name_start, h->key.len);
            h->key.data[h->key.len] = '';
            ngx_memcpy(h->value.data, r->header_start, h->value.len);
            h->value.data[h->value.len] = '';

            if (h->key.len == r->lowcase_index)
            {
                ngx_memcpy(h->lowcase_key, r->lowcase_header, h->key.len);
            }
            else
            {
                ngx_strlow(h->lowcase_key, h->key.data, h->key.len);
            }

            
            hh = ngx_hash_find(&umcf->headers_in_hash, h->hash,
                               h->lowcase_key, h->key.len);

            if (hh && hh->handler(r, h, hh->offset) != NGX_OK)
            {
                return NGX_ERROR;
            }
            continue;
        }
        
        if (rc == NGX_HTTP_PARSE_HEADER_DONE)
        {
            
        
            if (r->upstream->headers_in.server == NULL)
            {
                h = ngx_list_push(&r->upstream->headers_in.headers);
                
                ngx_str_t str=ngx_string("server");
                h->hash=ngx_hash_key_lc(str.data,str.len);
                #if 0
                h->hash = ngx_hash(ngx_hash(ngx_hash(ngx_hash(
                                          ngx_hash('s', 'e'), 'r'), 'v'),'e'), 'r');
                #endif
                ngx_str_set(&h->key, "Server");
                ngx_str_null(&h->value);
                h->lowcase_key = (u_char *) "server";
            }

            if (r->upstream->headers_in.date == NULL)
            {
                h = ngx_list_push(&r->upstream->headers_in.headers);
                ngx_str_t str=ngx_string("date");
                
                h->hash=ngx_hash_key_lc(str.data,str.len);
                #if 0
                h->hash = ngx_hash(ngx_hash(ngx_hash('d', 'a'), 't'), 'e');
                #endif

                ngx_str_set(&h->key, "Date");
                ngx_str_null(&h->value);
                h->lowcase_key = (u_char *) "date";
            }

            return NGX_OK;
        }

    
        if (rc == NGX_AGAIN)
        {
            return NGX_AGAIN;
        }

        
        ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                      "upstream sent invalid header");
        return NGX_HTTP_UPSTREAM_INVALID_HEADER;
    }
}

 5 finalize_request

static void
mytest_upstream_finalize_request(ngx_http_request_t *r, ngx_int_t rc)
{
    ngx_log_error(NGX_LOG_DEBUG, r->connection->log, 0,
                  "mytest_upstream_finalize_request");
}
三  实验

将模块编入nginx,启动nginx服务,curl一下,百度就出来了。

curl -v localhost:1024/test?lumia

如图:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/512911.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号