- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;
% G* r: I# y- H8 d6 u - require_once __DIR__ . '/Workerman/Autoloader.php';% e' Q) i+ |7 K1 t- ~* t6 j
! {0 F: ~% o' n% o! m7 U- $worker = new Worker();1 B2 w! G$ K: [* _* T7 M8 ~: F0 _
- // 4个进程
) A% }9 F$ W5 Z4 O3 p - $worker->count = 4;
! D+ Z& J5 U: [4 @ - // 每个进程启动后在当前进程新增一个Worker监听, ^! ]/ |$ ~0 _: p; J
- $worker->onWorkerStart = function($worker)
i$ B+ Q0 d0 [! C5 \. ~6 o; ] - {
5 l4 M9 r2 P5 e6 q2 s - /**
8 j- ?/ @2 q0 w9 N. r [. q - * 4个进程启动的时候都创建2016端口的Worker1 D6 h! N2 T1 y, r) |5 g! Y
- * 当执行到worker->listen()时会报Address already in use错误5 I/ |0 Z( {, \: f& Z
- * 如果worker->count=1则不会报错
; Y0 I+ y' O, Y2 a - */" i3 t) r& l& X y
- $inner_worker = new Worker('http://0.0.0.0:2016');6 [* A' y0 I! Q0 `" b
- $inner_worker->onMessage = 'on_message';: u& D7 l" S, R( n0 O
- // 执行监听。这里会报Address already in use错误# ?4 B# C# X/ E
- $inner_worker->listen();8 f$ F( r5 F/ m% E- B# f4 x
- };
9 E" P, s3 s7 n6 s) x# K* i: h - 8 U8 Y6 j- \: a6 i& I
- $worker->onMessage = 'on_message';3 T# p" H# ]6 C# z4 [
- 6 d0 X/ U9 c: ?; r
- function on_message($connection, $data). I' C6 u$ f) K
- {& G, _5 E9 ~3 L" b c
- $connection->send("hello\n");, T3 O" J' x4 ?" s; F
- }
. ~: o1 V5 y) L V. ^ - 5 D1 I5 ~( D6 g2 {; ]0 I" P
- // 运行worker
/ Y8 [2 U1 B6 z$ u - Worker::runAll();* Q. s+ `. H7 G
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
0 u* j4 V K* g, z) ?- t1 B: G
: y* c; j% n7 t1 c! j; m- use Workerman\Worker;
1 E) \( _& c; G% h# c5 g1 J7 O - require_once './Workerman/Autoloader.php';7 K# {; u! ]0 V& `5 @, D; P
$ O9 L- E* E0 R" j' z- $worker = new Worker('text://0.0.0.0:2015');; g" _( M i# D
- // 4个进程
8 @. z, s! @1 l2 N - $worker->count = 4;
: y2 M f1 c( {6 N0 Z' C - // 每个进程启动后在当前进程新增一个Worker监听9 v; y E) l8 Z& z, f
- $worker->onWorkerStart = function($worker)
6 z6 \7 \, ]+ r D3 Q) D' Y7 { - {
. b" R* I9 Z! ?9 F- P - $inner_worker = new Worker('http://0.0.0.0:2016');
# s- G+ \: m3 T+ j% r7 m$ b9 X - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)' A' o1 ]( H# e
- $inner_worker->reusePort = true;
# o0 Q- L* J, ?+ l# p* u' a+ G: s - $inner_worker->onMessage = 'on_message';
8 t- Y$ t! Q- T: J/ s" z3 G - // 执行监听。正常监听不会报错
6 H/ q t# U: E& t8 ~ - $inner_worker->listen();
# ]' [" e3 `( c, [& L - };8 _3 W# y* h+ K0 o, p" t- O
3 l' j# r; [) s3 i- $worker->onMessage = 'on_message';
! t/ {0 n# B8 s1 z$ | - . I+ j$ Y+ v, E# W, }
- function on_message($connection, $data)+ Z5 c; X- H& _7 B3 S
- {/ M' x" V8 N4 h/ W
- $connection->send("hello\n");" O6 i. M: X# ]6 q3 }
- }1 Z5 U2 g/ e3 b6 F' y; ^
- 4 r; W; ^( ~; C, E
- // 运行worker
: W" G' N# D' s - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php: [: q! Q3 d: q5 Q
- use Workerman\Worker;
! R* ]2 t7 O* u2 B0 r - require_once './Workerman/Autoloader.php';8 z) }" q0 ?+ @( D& a
- // 初始化一个worker容器,监听1234端口& |) m4 O" \, R* n! I
- $worker = new Worker('websocket://0.0.0.0:1234');
5 e+ M6 q/ z0 u+ \! Q. ^# d
/ j. r3 r' Q: R- /*+ \2 ]& m' ~4 g+ ~) V) x3 i
- * 注意这里进程数必须设置为1,否则会报端口占用错误7 @6 Z$ e! h- H; }/ Y
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)6 v& k) e$ h, g
- */$ ]7 E i/ M# f* E4 Q
- $worker->count = 1;
4 d4 D* D! g( Q& Y q; G# h# t - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
1 K/ G9 Y+ ?/ c0 l - $worker->onWorkerStart = function($worker)
9 U3 d0 Q, N0 S1 |8 E% K - {$ M3 ]) q# z$ j/ S: s2 |
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
. [" d- d+ e9 e5 J3 @ - $inner_text_worker = new Worker('text://0.0.0.0:5678');
' {/ z% b' }4 S" y - $inner_text_worker->onMessage = function($connection, $buffer)
' X! X2 y8 \, u4 O. z7 \ - {
* ?+ `" p: x3 Y - // $data数组格式,里面有uid,表示向那个uid的页面推送数据9 Q) x2 k! ]4 n: _" U# u: s
- $data = json_decode($buffer, true); N. @; `+ }7 R7 T7 d
- $uid = $data['uid'];2 u% v- u8 `- @# f/ o- L
- // 通过workerman,向uid的页面推送数据3 _1 C$ S/ x. r- |
- $ret = sendMessageByUid($uid, $buffer);
/ z3 R7 |* k6 f) @ - // 返回推送结果
. i! `: Z8 A: t# t - $connection->send($ret ? 'ok' : 'fail');
5 U6 M$ Z# r: q* N6 p - };
9 B; q* K1 }- `& b* T - // ## 执行监听 ##
7 Z4 v& l5 x4 u: F7 X% i - $inner_text_worker->listen();
3 T3 h. T- i! l, Y! [ - };
3 e1 o4 Q# b# Z+ p \. ]+ [7 R - // 新增加一个属性,用来保存uid到connection的映射
$ _+ k' d: k; s - $worker->uidConnections = array();
5 U' C) M% ]1 g - // 当有客户端发来消息时执行的回调函数
# k z( g3 c9 M1 i - $worker->onMessage = function($connection, $data)' Z7 ~2 c) R" Z& }& }% ~
- {* R y, a: r l& D
- global $worker;7 v& S- M. s- A& ^0 ^ `: F
- // 判断当前客户端是否已经验证,既是否设置了uid
3 k7 d0 z# d- o ?( S% i - if(!isset($connection->uid))
. o0 C2 k6 s5 u! _6 Y& f8 r: \ - {' s) l; a( V: M7 |- ^7 {# M9 m
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
% _" g c q S3 C* W" s9 \ - $connection->uid = $data;
- H6 k) l# t$ [* P; y1 c - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
' R' C/ _9 v+ A; p/ X) E$ B - * 实现针对特定uid推送数据 S# N) S3 \, e. f% v
- */
& G+ M0 {' [& l; W - $worker->uidConnections[$connection->uid] = $connection;
: O2 y' ~* n+ J( f% k& R7 U* t% @" @ - return;6 w9 ~, a$ n3 t9 a$ D: i
- }
7 u, Z9 }& }8 r - };
1 `$ w: h+ C' O L/ Y. g9 c9 K - # ~% k9 e& |+ p
- // 当有客户端连接断开时
- R6 Z" h( R+ x L$ X/ t - $worker->onClose = function($connection)
" F) p( C: L# F& X - {
; |; M( g+ g+ j$ R/ v/ p - global $worker;0 m6 u) ^" W" f5 l) w- d; p4 Z+ ~
- if(isset($connection->uid))8 {& {$ K* q( h1 J& W
- {
0 s2 _( W2 e; \) X( V8 @% d' H - // 连接断开时删除映射9 y) O# V; {6 q
- unset($worker->uidConnections[$connection->uid]);
4 n& A& }; Q+ ^9 @ - }
. {% C# n: q) C( v9 Y - };
) [% [. f( U% Z$ F$ l$ s
& q2 Y" i- x+ T5 I( i7 U# \- // 向所有验证的用户推送数据9 J3 d+ V, ~- N: d q) }& S" b
- function broadcast($message) P; |4 q& T; y" [6 x' y5 ?0 v
- {
7 A) {+ M5 X: {3 W* X7 i- s - global $worker;
# }7 z% M3 B/ Q, t5 I - foreach($worker->uidConnections as $connection)* R, D; m3 {* R; t Z' g& b
- {$ k$ p1 s+ l- t
- $connection->send($message);
4 r7 z, i3 V% C% | - }" }' K" I2 J4 Q
- }
- [6 g, _+ D6 q( F% k
* J u6 Q4 q1 Y- // 针对uid推送数据
n+ P, d c0 ^' j+ V' R) {% S9 I - function sendMessageByUid($uid, $message)
4 \% I3 @/ q6 _: ~ - {
; [& p/ e7 v) y* T - global $worker;
$ {3 b8 X A& Y$ X2 q1 b6 J - if(isset($worker->uidConnections[$uid]))
) o5 Y8 u0 c* ~6 k - {# ]# Y& {# J% F! N/ T f
- $connection = $worker->uidConnections[$uid];8 I; X) m0 F3 j8 z" u- ^/ K# ?# [ b5 ]
- $connection->send($message);
$ y( q+ B u* K/ u( U% l8 Y - return true;" \+ i9 E. i8 F; H
- }1 A' a+ ?& O. o0 ]- |9 j
- return false;3 F- j6 n+ l5 w& |- C0 a
- }
8 M: C6 |2 f: j/ S% N1 V. b! Y
0 ]2 U0 V0 [& B- \- // 运行所有的worker/ m" p- k( B P# ~/ @: ~
- Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');
, o2 Q' w% r2 d4 U( S1 z - ws.onopen = function(){0 Q, V+ S; H4 ?
- var uid = 'uid1';; w3 A4 k+ f4 H4 d) r+ r
- ws.send(uid);
( ]' ?. o9 f1 F: ~- c - };
, S# l: \; N" L# D* T& T - ws.onmessage = function(e){& }* A3 |3 Y( }' B8 B0 o
- alert(e.data);9 e/ I3 r* o" u0 d
- };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
" t6 Z; h( Z, ]! S2 [ - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);0 a, y9 O# r$ g. n3 Y& O6 l* u
- // 推送的数据,包含uid字段,表示是给这个uid推送
/ U0 W! B% l/ u# w# M$ u# ]$ ~ - $data = array('uid'=>'uid1', 'percent'=>'88%');7 e* S$ y( }& V2 a: \3 O. \
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
1 _+ k- d- Z8 m6 A& B - fwrite($client, json_encode($data)."\n");# r$ L4 C8 C' V) P R: y
- // 读取推送结果# L# k5 X& m7 E, l$ a! I1 F E$ @
- echo fread($client, 8192);
复制代码
# B9 Q! R: H; j9 O( }9 P/ G7 J/ x* d Q* _
|