cncml手绘网
标题: 用于实例化Worker后执行监听 [打印本页]
作者: admin 时间: 2018-12-17 21:22
标题: 用于实例化Worker后执行监听
- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
- use Workerman\Worker;$ b5 m, i* U: D% Z% v
- require_once __DIR__ . '/Workerman/Autoloader.php';
; A: K1 d% f0 A/ l
% h2 P E. m( k! p1 X% {4 U- $worker = new Worker();7 S0 W& N# T9 {" Q; i
- // 4个进程
0 T' l0 U3 s6 {8 ~; |9 h2 V7 W9 i U - $worker->count = 4;- K% V6 J! m4 N
- // 每个进程启动后在当前进程新增一个Worker监听
# l& F3 f. r* V+ F$ x* V7 F& U) ?. N - $worker->onWorkerStart = function($worker) t9 H0 E: W+ a; c& I) _
- {9 I1 }! w, M! H. g
- /**
4 Y. X, ~8 ~2 I5 D/ Q2 D Z - * 4个进程启动的时候都创建2016端口的Worker0 {* z: t9 _$ A7 b# p* v3 `
- * 当执行到worker->listen()时会报Address already in use错误. i& ~7 b. F) N9 h4 g2 X1 b
- * 如果worker->count=1则不会报错
# H' `( j8 b9 P - */% L" |3 q5 l8 G5 c- c
- $inner_worker = new Worker('http://0.0.0.0:2016');- _- Y* K" u# Z# |8 ?. H
- $inner_worker->onMessage = 'on_message';6 U& J2 ]: B' b
- // 执行监听。这里会报Address already in use错误5 o5 g4 z* B/ E7 E) b
- $inner_worker->listen();
$ m4 p2 C4 g% I; `+ k - };; }3 b Z! l% H2 G6 S: m3 g7 L
- . e9 x3 [; T% ~, M; k1 K5 T2 j
- $worker->onMessage = 'on_message';5 y* X6 f& K6 u L5 _
9 Y- p7 ?) z# z- function on_message($connection, $data)2 |' K$ f# ?, S9 N7 b# P
- {
7 ^0 p) _5 D( b+ ]2 m! m - $connection->send("hello\n");
. ?" W6 I6 \9 j9 b: E9 |* Q - }, {2 K3 w% J v) f
6 O; ~, W% G% j; p- // 运行worker- l( X7 y3 h8 {4 p6 v3 V
- Worker::runAll();4 a: I K6 J; N4 o
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
3 F4 F, p8 _. z8 N' Q0 a
2 m+ R. G, q, n: x& I. r- use Workerman\Worker;
6 B9 V, R9 ]3 b r( L - require_once './Workerman/Autoloader.php';
0 q+ B8 e; ~$ z9 k4 v - $ e- {! b. M) x: V0 e3 S; b
- $worker = new Worker('text://0.0.0.0:2015');& r" I- d5 c3 T3 d
- // 4个进程
+ M6 w; ~$ q) x- F' [6 E$ g' {. c - $worker->count = 4;
# d1 Q, n) x* A; i& [6 u9 M - // 每个进程启动后在当前进程新增一个Worker监听
( Z9 m9 q$ c5 ]0 d' r( a$ \. X - $worker->onWorkerStart = function($worker)3 c- r% x7 A) i1 T- n7 u' Y) R
- {
- |: b: `6 ?+ `7 O - $inner_worker = new Worker('http://0.0.0.0:2016');
" h) W; a h9 { - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)* g9 h3 G) k9 N ]
- $inner_worker->reusePort = true;
: G# I4 |$ R: C% q2 a - $inner_worker->onMessage = 'on_message';
# ~/ d# W5 l4 d, ~; u4 X6 p( _ - // 执行监听。正常监听不会报错
$ n$ x! }& [( k5 b - $inner_worker->listen();& h0 r& H2 Y y: u: N q" \' N
- };
) }9 w- ?9 h( F# d
2 `% |- s/ \. ]3 R4 x7 w( X- $worker->onMessage = 'on_message';
) O$ z4 c; P0 n) f& o' W3 k. ^: k - ' r8 m% t9 b* G' o: |- W# j
- function on_message($connection, $data)
7 c# } T$ n4 j - {
1 s6 q: k! u) a& y [. ` - $connection->send("hello\n");! h& [0 `( L- W$ e
- }
5 X# p& S+ T+ t- y" Z
: t8 o5 ~3 M: Q- // 运行worker# U @$ P1 P# z5 a
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
- <?php# m1 @ }, C! x& p/ F/ q" k# ]2 F
- use Workerman\Worker;
" Q9 `0 j* d; w8 ]. q - require_once './Workerman/Autoloader.php';
7 M' s4 L8 K0 L" G2 Z9 L - // 初始化一个worker容器,监听1234端口
% t5 n0 e3 @) L b - $worker = new Worker('websocket://0.0.0.0:1234');
7 E6 F% K2 Z# } - 1 ]/ F* |9 m! m1 V
- /*" a. a' N5 w- E9 |+ _1 f J
- * 注意这里进程数必须设置为1,否则会报端口占用错误% ]0 Y( U5 S; Y t/ J$ p9 W( @0 e
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
2 a" c8 J. F& a0 [# a' g1 r; w - */' U1 A. Y8 ~( L/ s+ f9 b0 w) o4 Y$ S/ B. ?
- $worker->count = 1;
' k- }4 l3 m. @& k* r# B - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口 i) w* }, V5 v3 j7 X& E
- $worker->onWorkerStart = function($worker)
( E* u& ^. e8 [ - {
( b, Q: N: F; D' I/ T - // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符- \5 u; K& z1 N: n3 U" ~
- $inner_text_worker = new Worker('text://0.0.0.0:5678');0 q( A5 V2 V" h# Q z/ P
- $inner_text_worker->onMessage = function($connection, $buffer)8 p7 g/ U: @+ B0 B% o( h
- {
% @' F' D0 @* p2 t) y3 | - // $data数组格式,里面有uid,表示向那个uid的页面推送数据
3 A# ?8 i& m& {9 T - $data = json_decode($buffer, true);
: ^. [3 ]! Y. a1 Z; d; q* h4 g - $uid = $data['uid'];
* R! h* G) m! ~, q$ n - // 通过workerman,向uid的页面推送数据
7 `1 L- `7 F% E/ q2 \& A g6 r8 W - $ret = sendMessageByUid($uid, $buffer);+ c n1 m3 ]' y+ N9 k! L
- // 返回推送结果
( Y1 v3 z7 @7 Q* h: ]6 u - $connection->send($ret ? 'ok' : 'fail');
: ?$ g; X+ G3 U1 F0 Z, V5 s - };6 [3 I0 n: q& n; k) r5 V
- // ## 执行监听 ##4 y8 t- F3 H( b. U! C; {9 [4 V
- $inner_text_worker->listen();
% u1 p0 y j- ^1 e s. G, O* H0 a - };$ L7 ?- K$ a5 d5 S" }
- // 新增加一个属性,用来保存uid到connection的映射
* \) W: A9 z6 }% L$ p* w1 K - $worker->uidConnections = array();
. C) p5 c: q# o" { - // 当有客户端发来消息时执行的回调函数
4 S/ O# v. S6 L! c* d' C - $worker->onMessage = function($connection, $data)
/ H `& e1 Q; ~- f. b$ n; E" h' g4 @ - {1 ]7 C/ B7 x9 h' ~8 @ G
- global $worker;0 _9 ?: [0 X4 ?1 k$ y' M& m0 a5 K
- // 判断当前客户端是否已经验证,既是否设置了uid# [5 Q& ~6 }6 Y8 r" j
- if(!isset($connection->uid)): Q' ~7 T* i1 n$ }0 U0 k) B0 z8 g# a
- {
0 M* u1 R3 D3 c, Y9 v3 Q - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
+ ` W$ x, K' ]1 `1 P( Q - $connection->uid = $data;
. b# }+ R; ]3 O/ m6 v8 s* R! X - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
/ f. D2 _7 a q - * 实现针对特定uid推送数据$ z$ |* `! P: e; W Z8 z
- */* p8 `- f- b e) `
- $worker->uidConnections[$connection->uid] = $connection;) |' s' H; b4 a5 x
- return;/ a8 h2 T0 g3 i- j- W- b4 n7 y7 S
- }
9 I: t' ?, n6 v2 s+ C2 O - };
6 A+ _- l& y( o
3 v; N0 S! W6 n; Q7 ?9 v3 \) n- // 当有客户端连接断开时
& t6 [2 u% r H. c% H - $worker->onClose = function($connection)
1 T7 z0 V+ `" N* E: o0 }0 ~ - {
) K% @- K' D. S& } - global $worker;8 v7 |1 s6 o, \- p, m+ U
- if(isset($connection->uid))
, v, W+ r6 y8 U - {
- }& g2 p/ z7 \- `% u* |" N; {, P - // 连接断开时删除映射
2 x, V M+ w% O. c( k - unset($worker->uidConnections[$connection->uid]);) B) l+ `! j X* E/ ?% S
- }
. \# v" e4 Z# _* f3 @1 E3 R - };
# c# r1 g0 p0 k8 D: ]& O# F4 z. e - 1 _+ O* T3 O9 j" K3 g
- // 向所有验证的用户推送数据0 r( w: e$ y, | T* ~
- function broadcast($message)
6 v/ E# ~: N H1 Y' o - {* t/ H2 d. _2 {( ?2 P
- global $worker;
" I2 B- J: v5 V, a" e - foreach($worker->uidConnections as $connection)
; U5 \, C6 W d8 {: |/ f9 Q - {! C- j% A- k' l/ N/ Y
- $connection->send($message);" ~9 {4 l; i) f0 C) S2 |' C
- }1 U' A- z" ]% d. G. d) `
- }
$ s/ j5 s: J- U0 z: Z - 6 t* [- E6 S" R \7 K3 `4 U! a
- // 针对uid推送数据! Q4 `! @+ W7 M( Z& B- T" _: A
- function sendMessageByUid($uid, $message)" j( k3 j; B1 E6 ]9 ~
- {
4 Z' U" C& o1 e/ ?, ]& W9 c- x! c - global $worker;
! K/ }8 h+ y& }! Q! K4 N - if(isset($worker->uidConnections[$uid]))4 V4 H; t6 O" c' r
- {: A& l/ g7 Y3 r# n8 a: d N
- $connection = $worker->uidConnections[$uid]; L# R) W. W2 Q. B+ ~
- $connection->send($message);
9 Q0 G, _% x5 D: b& M" ^, o - return true;
2 }4 u. ^" s$ L' c - }
* `# D( K! s# i# W5 O! ^! K: t - return false;* T, R( z+ A; b/ c7 y
- }7 Q5 J) `6 i' x
9 f2 C( }2 l! e5 R- // 运行所有的worker: M2 N- U+ x4 \5 S
- Worker::runAll();
复制代码启动后端服务 php push.php start -d
前端接收推送的js代码
- var ws = new WebSocket('ws://127.0.0.1:1234');5 B/ W$ L3 k: o" `6 u! r6 m' V
- ws.onopen = function(){* s" F% L- ~0 s; S/ c, V7 n/ t) {
- var uid = 'uid1';3 y2 s+ ]7 e9 o+ K( w/ b
- ws.send(uid);8 e1 M5 b' ?8 p+ p
- };
2 x( D0 O2 M5 U# C; q( B$ C - ws.onmessage = function(e){
7 u' x, R+ [* A! Q, _" a( c' {& ] - alert(e.data);
* H+ u5 H% T% ~5 q, t - };
复制代码后端推送消息的代码
- // 建立socket连接到内部推送端口
9 c. m* l6 l+ M% Y; z1 d( i - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
/ r2 o8 { S/ ~ - // 推送的数据,包含uid字段,表示是给这个uid推送; g5 J8 z5 h6 S1 v
- $data = array('uid'=>'uid1', 'percent'=>'88%');' g% r! v9 T8 |9 b) L* M) l5 w
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符$ Q9 Z4 V9 \" q4 r* v" s
- fwrite($client, json_encode($data)."\n");, F7 Y6 f# ?9 P* x# v" ~' y
- // 读取推送结果 w: ?% R! o6 F+ r, z
- echo fread($client, 8192);
复制代码 0 I3 r2 `6 g6 S' `8 g: G
. |$ w0 W }7 p5 D
| 欢迎光临 cncml手绘网 (http://bbs.cncml.com/) |
Powered by Discuz! X3.2 |