本文是nginx访问第三方服务之upstream使用_上善若水-CSDN博客 实例的详细讲解
当需要访问第三方服务时,Nginx提供了2种异步方式来与第三方服务器通信:upstream与subrequest.
本文将介绍upstream的使用方式,Nginx的HTTP反向代理模块就是基于upstream方式实现,当我们希望把第三方服务的内容几乎原封不懂地返回给用户时,一般使用upstream方式,它可以非常高效地透传HTTP。
Nginx访问上游服务器的流程大致分以下几个阶段:启动upstream、连接上游服务器、向上游发送请求、接收上游响应(包头/包体)、结束请求。
一 upstream的使用方式upstream的使用方式并不复杂,它提供了8个回调方法,用户需要实现的其中的几个回调方法,
本例中实现了3个回调方法:
struct ngx_http_upstream_s{
//...
ngx_int_t (*create_request)(ngx_http_request_t *r);//构造发往上游服务器的请求
//收到上游服务器的TCP流时回调,直到不返回NGX_AGAIN,处理http响应
ngx_int_t (*process_header)(ngx_http_request_t *r);
void (*finalize_request)(ngx_http_request_t *r,ngx_int_t rc);//销毁upstream时回调
//...
};
0 那么upstream是如何嵌入到一个请求中的?
模块在处理任何一个请求时都有ngx_http_request_t结构对象r,该结构中有一个ngx_http_upstream_t类型的成员upstream(模块对象创建)
struct ngx_http_request_s
{
//...
ngx_http_upstream_t *upstream;
//...
}
如果没有使用upstream机制,则需要将相应的成员置为NULL,否则需要对r->upstream进行设置.
1 HTTP模块启用upstream机制的步骤如下:
- 调用ngx_http_upstream_create方法为请求创建upstream
- 设置第三方服务器地址
- 设置upstream的回调方法
- 调用ngx_http_upstream_init方法启动upstream
2 HTTP如何运行upstream框架
使用upstream模块提供的ngx_http_upstream_init后,HTTP如何运行upstream框架,大致流程如下
- 创建发往上游服务器的请求
- 与上游服务器建立无阻塞TCP连接
- 发送请求到第三方服务
- 接收第三方服务响应并处理包头和包体
- 回调finalize_request销毁请求
整个处理过程中会调用 upstream 的回调方法进行相应的处理,其中最常用的回调方法为 create_request、process_header、finalize_request;
注,upstream 模块处理上游包体的方式
1.当请求结构体ngx_http_request_t中的成员subrequest_in_memory标志位为1时,upstream不转发响应包体到下游,并由HTTP模块实现的input_filter()方法处理包体;
2.当请求结构体ngx_http_request_t中的成员subrequest_in_memory标志位为0时,且ngx_http_upstream_conf_t配置结构体中的成员buffering标志位为1时,upstream将开启更多的内存和磁盘文件用于缓存上游的响应包体(此时,上游网速更快),并转发响应包体;
3.当请求结构体ngx_http_request_t中的成员subrequest_in_memory标志位为0时,且ngx_http_upstream_conf_t配置结构体中的成员buffering标志位为0时,upstream将使用固定大小的缓冲区来转发响应包体;
0 主要结构体说明
- ngx_http_upstream_t
struct ngx_http_upstream_s {
//...
ngx_chain_t *request_bufs;//发什么样的请求给上游服务器,create_request中实现
ngx_http_upstream_resolved_t *resolved;//可直接指定上游服务器的地址
ngx_buf_t buffer;//buffer存储接收上游服务器的响应内容。复用
//向客户端转发上游服务器的包体时才有用。
//1-多个缓冲区以及磁盘文件转发(Nginx与上游间的网速远大于Nginx与下游客户端之间的网速)
//0-只使用该结构体中的buffer缓冲区向下游转发响应包体
unsigned buffering:1;
//...
};
关于buffering参数,upstream提供了3种处理上游服务器包体的方式:
1.交由HTTP模块使用input filter回调方法直接处理包体。
2.以固定缓冲区(buffer成员)转发包体(本文使用的方式 buffering设置为0)
3.以多个缓冲加磁盘文件的方式转发包体等(buffering设置为1)。
- ngx_http_upstream_conf_t
该结构是ngx_http_upstream_t中的成员之一,通过nginx.conf和设置commands使用预设的方法可以对该结构体赋值。
typedef struct{
//...
ngx_msec_t connect_timeout;//连接上游服务器的超时时间
ngx_msec_t send_timeout;//发送tcp包到上游服务器的超时时间
ngx_msec_t read_timeout;//接收Tcp包的超时时间
//...
}ngx_http_upstream_conf_t;
- 设置需要访问的第三方服务地址resolved成员
这部分就是熟悉套接字编程了
typedef struct{
//...
ngx_uint_t naddrs;
struct sockaddr *sockaddr;//上游服务器地址
socklen_t socklen;
//...
}ngx_http_upstream_resolved_t;
1 设置回调方法
ngx_http_upstream_t结构体中有8个回调方法,不过必须实现的有3个回调方法:
create_request:创建请求
process_header:处理响应
finalize_request:销毁请求
书中给出google的URL搜索示例,由于墙的原因,我们使用baidu,其URL搜索方法是www.baidu.com/s?wd=haha在nginx.conf配置location,例如:
location /test{
mytest;
}
如果访问URL是/test?lumia则可以通过upstream机制向www.baidu.com发送搜索请求。
完整代码源码来源自深入理解Nginx,注释包含自己的理解。
0 定义结构体和声明函数
//ngx_http_mytest_module.c #include#include #include typedef struct { ngx_http_status_t status; ngx_str_t backendServer; }ngx_http_mytest_ctx_t; typedef struct { ngx_http_upstream_conf_t upstream; } ngx_http_mytest_conf_t; static char * ngx_http_mytest(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); static ngx_int_t ngx_http_mytest_handler(ngx_http_request_t *r); static void* ngx_http_mytest_create_loc_conf(ngx_conf_t *cf); static char *ngx_http_mytest_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child); static ngx_int_t mytest_upstream_process_header(ngx_http_request_t *r); static ngx_int_t mytest_process_status_line(ngx_http_request_t *r); static ngx_str_t ngx_http_proxy_hide_headers[] = { ngx_string("Date"), ngx_string("Server"), ngx_string("X-Pad"), ngx_string("X-Accel-Expires"), ngx_string("X-Accel-Redirect"), ngx_string("X-Accel-Limit-Rate"), ngx_string("X-Accel-Buffering"), ngx_string("X-Accel-Charset"), ngx_null_string };
1 创建模块
1.1 配置项,上下文,指令
static ngx_command_t ngx_http_mytest_commands[] =
{
{
ngx_string("mytest"),
NGX_HTTP_MAIN_CONF | NGX_HTTP_SRV_CONF | NGX_HTTP_LOC_CONF | NGX_HTTP_LMT_CONF | NGX_CONF_NOARGS,
ngx_http_mytest,
NGX_HTTP_LOC_CONF_OFFSET,
0,
NULL
},
ngx_null_command
};
static ngx_http_module_t ngx_http_mytest_module_ctx =
{
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
ngx_http_mytest_create_loc_conf,
ngx_http_mytest_merge_loc_conf
};
ngx_module_t ngx_http_mytest_module =
{
NGX_MODULE_V1,
&ngx_http_mytest_module_ctx,
ngx_http_mytest_commands,
NGX_HTTP_MODULE,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NGX_MODULE_V1_PADDING
};
1.2 执行指令
static char *
ngx_http_mytest(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_http_core_loc_conf_t *clcf;
clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
clcf->handler = ngx_http_mytest_handler;
return NGX_CONF_OK;
}
1.3 实现自定义handler
static ngx_int_t
ngx_http_mytest_handler(ngx_http_request_t *r)
{
ngx_http_mytest_ctx_t* myctx = ngx_http_get_module_ctx(r, ngx_http_mytest_module);
if (myctx == NULL)
{
myctx = ngx_palloc(r->pool, sizeof(ngx_http_mytest_ctx_t));
ngx_http_set_ctx(r, myctx, ngx_http_mytest_module);
}
//1 创建upstream 模块对象
ngx_http_upstream_create(r);
ngx_http_mytest_conf_t *mycf = (ngx_http_mytest_conf_t*)
ngx_http_get_module_loc_conf(r, ngx_http_mytest_module);
ngx_http_upstream_t *u = r->upstream;
u->conf = &mycf->upstream;
u->buffering = mycf->upstream.buffering;
static struct sockaddr_in backendSockAddr;
struct hostent *pHost = gethostbyname((char*) "www.baidu.com");
backendSockAddr.sin_family = AF_INET;
backendSockAddr.sin_port = htons((in_port_t) 80);
memcpy(&backendSockAddr.sin_addr,pHost->h_addr_list[0],sizeof(struct in_addr));
char* pDmsIP = inet_ntoa(*(struct in_addr*) (pHost->h_addr_list[0]));
//backendSockAddr.sin_addr.s_addr = inet_addr(pDmsIP);
myctx->backendServer.data = (u_char*)pDmsIP;
myctx->backendServer.len = strlen(pDmsIP);
u->resolved->sockaddr = (struct sockaddr *)&backendSockAddr;
u->resolved->socklen = sizeof(struct sockaddr_in);
u->resolved->naddrs = 1;
u->create_request = mytest_upstream_create_request;
u->process_header = mytest_process_status_line;
u->finalize_request = mytest_upstream_finalize_request;
r->main->count++;
ngx_http_upstream_init(r);
return NGX_DONE;
}
- 创建upstream 模块对象 ngx_http_upstream_create(r);
对每一个要使用upstream的请求,必须调用且只能调用1次 - 创建之后,配置upstream
ngx_http_upstream_t *u = r->upstream; u->conf = &mycf->upstream .....
- 决定转发包体时使用的缓冲区
u->buffering = mycf->upstream.buffering;
- 设置上游服务器
u->resolved = (ngx_http_upstream_resolved_t*) ngx_pcalloc(r->pool, sizeof(ngx_http_upstream_resolved_t)); static struct sockaddr_in backendSockAddr; struct hostent *pHost = gethostbyname((char*) "www.baidu.com"); backendSockAddr.sin_family = AF_INET; backendSockAddr.sin_port = htons((in_port_t) 80); .......
- 设置三个必须实现的回调方法:创建请求、处理头部、请求销毁
u->create_request = mytest_upstream_create_request; u->process_header = mytest_process_status_line; u->finalize_request = mytest_upstream_finalize_request;
- 启动upstream
ngx_http_upstream_init(r);
2 create config & merge config
static void*
ngx_http_mytest_create_loc_conf(ngx_conf_t *cf)
{
ngx_http_mytest_conf_t *mycf;
mycf = (ngx_http_mytest_conf_t*)ngx_pcalloc(cf->pool,sizeof(ngx_http_mytest_conf_t));
mycf->upstream.connect_timeout = 60000;
mycf->upstream.send_timeout = 60000;
mycf->upstream.read_timeout = 60000;
mycf->upstream.store_access = 0600;
mycf->upstream.buffering = 0;
mycf->upstream.bufs.num = 8;
mycf->upstream.bufs.size = ngx_pagesize;
mycf->upstream.buffer_size = ngx_pagesize;
mycf->upstream.busy_buffers_size = 2 * ngx_pagesize;
mycf->upstream.temp_file_write_size = 2 * ngx_pagesize;
mycf->upstream.max_temp_file_size = 1024 * 1024 * 1024;
mycf->upstream.hide_headers = NGX_CONF_UNSET_PTR;
mycf->upstream.pass_headers = NGX_CONF_UNSET_PTR;
return mycf;
}
static char *
ngx_http_mytest_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
{
ngx_http_mytest_conf_t *prev = (ngx_http_mytest_conf_t *)parent;
ngx_http_mytest_conf_t *conf = (ngx_http_mytest_conf_t *)child;
ngx_hash_init_t hash;
hash.max_size = 100;
hash.bucket_size = 1024;
hash.name = "proxy_headers_hash";
ngx_http_upstream_hide_headers_hash(cf, &conf->upstream,
&prev->upstream, ngx_http_proxy_hide_headers, &hash)
return NGX_CONF_OK;
}
3 创建发给上游服务器的请求 create_request
在ngx_http_mytest_handler 中设置,在ngx_http_upstream_init中调用.
create_request 是必须实现的业务逻辑函数
static ngx_int_t
mytest_upstream_create_request(ngx_http_request_t *r)
{
//=====1 构造请求string=====
static ngx_str_t backendQueryLine =
ngx_string("GET /s?wd=%V HTTP/1.1rnHost: www.baidu.comrnConnection: closernrn");
ngx_int_t queryLineLen = backendQueryLine.len + r->args.len - 2;
ngx_buf_t* b = ngx_create_temp_buf(r->pool, queryLineLen);
b->last = b->pos + queryLineLen;
ngx_snprintf(b->pos, queryLineLen ,(char*)backendQueryLine.data, &r->args);
r->upstream->request_bufs = ngx_alloc_chain_link(r->pool);
r->upstream->request_bufs->buf = b;
r->upstream->request_bufs->next = NULL;
r->upstream->request_sent = 0;
r->upstream->header_sent = 0;
r->header_hash = 1;
return NGX_OK;
}
4 process header
框架调用.此函数表示已经从第三方获取了respond.
mytest_process_status_line is process_header.
- 解析http响应行
rc = ngx_http_parse_status_line(r, &u->buffer, &ctx->status); - 赋值操作
- 之后收到的新字符流将有新的回调函数解析
u->process_header = mytest_upstream_process_header; - 如果本次收到的字符流除了http响应行外,还有多余的字符,
将由mytest_upstream_process_header方法解析
mytest_upstream_process_header(r);
static ngx_int_t
mytest_process_status_line(ngx_http_request_t *r)
{
size_t len;
ngx_int_t rc;
ngx_http_upstream_t *u;
ngx_http_mytest_ctx_t* ctx = ngx_http_get_module_ctx(r, ngx_http_mytest_module);
u = r->upstream;
rc = ngx_http_parse_status_line(r, &u->buffer, &ctx->status);
if (rc == NGX_AGAIN)
{
return rc;
}
if (u->state)
{
u->state->status = ctx->status.code;
}
u->headers_in.status_n = ctx->status.code;
len = ctx->status.end - ctx->status.start;
u->headers_in.status_line.len = len;
u->headers_in.status_line.data = ngx_pnalloc(r->pool, len);
ngx_memcpy(u->headers_in.status_line.data, ctx->status.start, len);
u->process_header = mytest_upstream_process_header;
return mytest_upstream_process_header(r);
}
mytest_upstream_process_header
static ngx_int_t
mytest_upstream_process_header(ngx_http_request_t *r)
{
ngx_int_t rc;
ngx_table_elt_t *h;
ngx_http_upstream_header_t *hh;
ngx_http_upstream_main_conf_t *umcf;
umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module);
for ( ;; )
{
rc = ngx_http_parse_header_line(r, &r->upstream->buffer, 1);
if (rc == NGX_OK)
{
h = ngx_list_push(&r->upstream->headers_in.headers);
h->hash = r->header_hash;
h->key.len = r->header_name_end - r->header_name_start;
h->value.len = r->header_end - r->header_start;
//1.key
//2.value
//3.lowcase_case
h->key.data = ngx_pnalloc(r->pool,
h->key.len + 1 + h->value.len + 1 + h->key.len);
h->value.data = h->key.data + h->key.len + 1;
h->lowcase_key = h->key.data + h->key.len + 1 + h->value.len + 1;
ngx_memcpy(h->key.data, r->header_name_start, h->key.len);
h->key.data[h->key.len] = ' ';
ngx_memcpy(h->value.data, r->header_start, h->value.len);
h->value.data[h->value.len] = ' ';
if (h->key.len == r->lowcase_index)
{
ngx_memcpy(h->lowcase_key, r->lowcase_header, h->key.len);
}
else
{
ngx_strlow(h->lowcase_key, h->key.data, h->key.len);
}
hh = ngx_hash_find(&umcf->headers_in_hash, h->hash,
h->lowcase_key, h->key.len);
if (hh && hh->handler(r, h, hh->offset) != NGX_OK)
{
return NGX_ERROR;
}
continue;
}
if (rc == NGX_HTTP_PARSE_HEADER_DONE)
{
if (r->upstream->headers_in.server == NULL)
{
h = ngx_list_push(&r->upstream->headers_in.headers);
ngx_str_t str=ngx_string("server");
h->hash=ngx_hash_key_lc(str.data,str.len);
#if 0
h->hash = ngx_hash(ngx_hash(ngx_hash(ngx_hash(
ngx_hash('s', 'e'), 'r'), 'v'),'e'), 'r');
#endif
ngx_str_set(&h->key, "Server");
ngx_str_null(&h->value);
h->lowcase_key = (u_char *) "server";
}
if (r->upstream->headers_in.date == NULL)
{
h = ngx_list_push(&r->upstream->headers_in.headers);
ngx_str_t str=ngx_string("date");
h->hash=ngx_hash_key_lc(str.data,str.len);
#if 0
h->hash = ngx_hash(ngx_hash(ngx_hash('d', 'a'), 't'), 'e');
#endif
ngx_str_set(&h->key, "Date");
ngx_str_null(&h->value);
h->lowcase_key = (u_char *) "date";
}
return NGX_OK;
}
if (rc == NGX_AGAIN)
{
return NGX_AGAIN;
}
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"upstream sent invalid header");
return NGX_HTTP_UPSTREAM_INVALID_HEADER;
}
}
5 finalize_request
static void
mytest_upstream_finalize_request(ngx_http_request_t *r, ngx_int_t rc)
{
ngx_log_error(NGX_LOG_DEBUG, r->connection->log, 0,
"mytest_upstream_finalize_request");
}
三 实验
将模块编入nginx,启动nginx服务,curl一下,百度就出来了。
curl -v localhost:1024/test?lumia
如图:



