- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;2 ~- j$ e& i s/ J" e; P- [
- require_once __DIR__ . '/Workerman/Autoloader.php';4 f, |: b% r( v# K3 ^! X+ j
- 5 U- J, `4 h |3 y
- $worker = new Worker();2 Y O# @4 b u% B+ f1 r
- // 4个进程
( J# P, H, V1 t - $worker->count = 4;
* e& `/ K M- H2 S0 T: D - // 每个进程启动后在当前进程新增一个Worker监听
]9 m( K; h/ @, H4 n2 D6 p) t - $worker->onWorkerStart = function($worker)
8 g, s4 X/ u1 w( g - {
8 o7 ^1 \4 z6 |: x1 e" L8 w, o - /**( N9 ?( M; m. t! {1 c3 N5 ]
- * 4个进程启动的时候都创建2016端口的Worker
9 b% l. X/ K: B6 h% `6 A - * 当执行到worker->listen()时会报Address already in use错误
3 X* T" }; h% s: W, D0 I) u - * 如果worker->count=1则不会报错+ _4 i0 ]" F( |4 G
- */% f2 {$ O( X$ H V O7 _
- $inner_worker = new Worker('http://0.0.0.0:2016');
4 k# `# r" j$ M' p) A a - $inner_worker->onMessage = 'on_message';: ?9 h# T6 E4 n# ?7 S& I
- // 执行监听。这里会报Address already in use错误, m% d r; C; l2 C1 K
- $inner_worker->listen();. \0 ] d4 q6 }/ W7 S" H# \
- };
n" s4 I" {7 M' E7 O: w
! f4 d" e2 g" G* h1 A- $worker->onMessage = 'on_message';
& ?+ T4 j, s1 {: Q4 }! ?7 b8 \% n - & o4 K. d1 X% D8 D/ G r& m
- function on_message($connection, $data)6 ^9 s% p; ]6 {$ @5 _! E, t8 p
- {8 T( s: m7 S2 ~3 S5 I0 ~9 x% a
- $connection->send("hello\n");
$ X, @! |8 B4 F9 w) d: k4 R, I - }0 E; K5 b) o' T4 H
$ n6 J6 {/ @+ ^+ J/ {- // 运行worker) @4 i/ x6 g( f9 r
- Worker::runAll();7 w& d1 G) K1 ]3 Q# C' E, l
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子: ]6 A& e( O% r& U) ]" T6 b6 C2 U
- ! h5 e4 V/ R3 M5 ?' T: o, Z# g
- use Workerman\Worker;
6 i% }9 d7 |( Y& R6 E - require_once './Workerman/Autoloader.php';
6 u- l% k4 W* X s0 ?
* J. a3 ~8 u$ I$ P: w$ Y4 N- $worker = new Worker('text://0.0.0.0:2015');
2 g( z" u. [7 D. ^ Q3 a - // 4个进程0 l# @8 x, z" F" g; z0 x' U
- $worker->count = 4;
4 M1 B+ Z) J9 z7 A9 N/ N% V2 z - // 每个进程启动后在当前进程新增一个Worker监听
8 [; X% O/ @: n2 t - $worker->onWorkerStart = function($worker), n. i ^4 G1 [3 M" t7 K
- {
+ p" L3 L4 ^% f N! Y& e - $inner_worker = new Worker('http://0.0.0.0:2016');# k- G+ G0 E: ^3 @2 M+ ?! m$ v6 t
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
& h7 \0 X. D4 t0 |5 b - $inner_worker->reusePort = true;
& _+ D( ]* c# V2 i: G6 R8 V, k - $inner_worker->onMessage = 'on_message';- Y3 a0 E! ]0 y. P
- // 执行监听。正常监听不会报错- x$ Z$ \3 _. w
- $inner_worker->listen();
( _0 Y* G9 X6 r4 ~$ `% p - };
. s! U" ^; U1 G8 h6 G- ]% V - 5 O0 d: P0 r5 ^
- $worker->onMessage = 'on_message';6 Q9 D4 J. U* U! S/ B9 v: b [
5 I P' R, z$ y3 K4 u: l3 Z' X- function on_message($connection, $data)
+ T6 n v" @" @ - {3 O' M z0 E9 C% }$ l* v" ^5 q6 W
- $connection->send("hello\n");
4 u( G* r$ ]/ w4 I7 q7 D - }
$ I1 G' p7 A- a$ M - # L: b# M) f& p2 k0 y
- // 运行worker
. d' h6 D \9 |1 y - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php) V) {4 y: _. Q% D
- use Workerman\Worker;# j h: ^' l8 [: t0 U9 |
- require_once './Workerman/Autoloader.php';
5 ?* \6 G' o* _5 P2 b - // 初始化一个worker容器,监听1234端口
) J7 i; a9 |8 Y - $worker = new Worker('websocket://0.0.0.0:1234');
- j9 t7 X( ?6 c6 l* }+ S* N. t - & ?8 t3 |1 r6 i( @; f2 x: Z4 E
- /*1 @) x! q. }% q1 R2 l
- * 注意这里进程数必须设置为1,否则会报端口占用错误( Q! }' W" Z# G- }. S' q) v3 q' I% s
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
2 Z4 T) }# r% W/ v - */
7 G8 O) P1 Q. R2 P" c - $worker->count = 1;; W0 p! G" f( l. B5 ]6 F- o
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
/ q' t5 E i. w N( z" b# o - $worker->onWorkerStart = function($worker)) V, m+ b6 [. h. h- g0 k1 v
- {/ h& A) b% E- s4 U
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
4 r! ]/ U4 y7 Y' Q' b+ S - $inner_text_worker = new Worker('text://0.0.0.0:5678');6 h( I2 S8 S+ \; z& J: {7 I* n T' p
- $inner_text_worker->onMessage = function($connection, $buffer)2 W0 A2 _; ]7 X
- {. ?* R l) ^3 b& v% s% U' \
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据' k- e: o9 o; \) t/ A
- $data = json_decode($buffer, true);
8 n9 Y; B- a& ?( x+ [4 k& l - $uid = $data['uid'];
! F- z6 L$ R' ?3 \9 N9 Z - // 通过workerman,向uid的页面推送数据
3 p E5 B. w# t$ y8 {$ \0 ?" C - $ret = sendMessageByUid($uid, $buffer);) X2 z: S3 C, }6 x* [1 n0 B- ^
- // 返回推送结果% A. C6 q9 E6 {
- $connection->send($ret ? 'ok' : 'fail');( x9 h5 B$ f- M; v3 @
- };2 D: l8 J0 x9 m g5 H$ i
- // ## 执行监听 ##
, R# x. x7 V' \( @ - $inner_text_worker->listen();
; E( I" |; t+ ^9 k3 L - };
( P3 _! s% O2 Z- S8 l3 C+ m3 ~8 T. q9 G3 d - // 新增加一个属性,用来保存uid到connection的映射
% n, q9 O# e: L. a0 N. Y3 Y - $worker->uidConnections = array();
$ q+ H2 ?; I" s - // 当有客户端发来消息时执行的回调函数! n& L0 g9 Q# H- N" c) [
- $worker->onMessage = function($connection, $data)2 [) K8 a4 O) a* ?
- {
1 f. p5 r9 k0 y2 ^! V - global $worker;
6 g* c4 g' T% \, ^: x8 p- S - // 判断当前客户端是否已经验证,既是否设置了uid# g5 S) C" C: h" Y
- if(!isset($connection->uid))
9 W3 v3 a$ ]3 R: x: |" V - {
$ n# X! v- P5 w, @ A" M: M - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)* C7 D, r3 r0 f q+ M
- $connection->uid = $data;% O3 U9 T" e j* t5 e
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,# P4 t I! U& M# X* \
- * 实现针对特定uid推送数据7 M: d: l, N, j; ?# U$ b
- */
I, `$ X( z, E7 E: o# L* C$ i# E, q - $worker->uidConnections[$connection->uid] = $connection;8 h7 U. F* S# f
- return;/ V: G. {6 z# ^: J4 s! m- _( G
- }
! f9 E, m' f$ e+ K2 x, f( ] - };
2 l& f) {3 M5 y0 o% Y+ {
" H9 u' X3 O; W- // 当有客户端连接断开时
7 Q R3 _" @7 Z. J) O+ m - $worker->onClose = function($connection)1 k" t: X( u% J/ b- E, |
- {0 P# w" X, w1 n7 q$ A
- global $worker;
* j7 G9 a, C/ s$ u! R - if(isset($connection->uid))
4 Z" O9 |% Y2 n3 q& g# `" ~ - {& r1 W: |4 F9 g, Q: Q. q
- // 连接断开时删除映射
/ \+ R* ^- c0 }) ^4 D$ f L - unset($worker->uidConnections[$connection->uid]);
& ~/ t) c- w$ g3 s - } W5 k6 J8 y5 {# t
- };" N; T* I$ M+ X& R
- 8 r+ l: Q. L; s* y0 `
- // 向所有验证的用户推送数据
4 j/ v% @0 R+ ]5 ~5 R - function broadcast($message)& w! j$ k/ N" I: a2 v
- {) H" o9 X3 m H+ w! B
- global $worker;
+ X4 @ i5 u# [2 x; q/ d - foreach($worker->uidConnections as $connection)
' [% F, T9 `3 F- q/ ? - {
' E4 B X( ]* C9 U+ R& q% s: Z - $connection->send($message);) f! a2 V; z2 g9 c
- }
5 S* A6 o' q0 W" o& D9 M3 {. ?6 r - }6 a8 Z9 y* N1 X
- : Q$ v9 ^% ^2 s* o' ^, \! g" x% l
- // 针对uid推送数据
% p0 s- l6 a. L. |& x - function sendMessageByUid($uid, $message)( L( l/ \9 [3 P5 l
- {
6 s/ ~1 ~. z* n. P, m6 B/ Q. x6 A - global $worker;* M! l5 M9 g2 ~ |- n! W
- if(isset($worker->uidConnections[$uid]))' B1 y0 B2 Z( j
- {( ]0 c' H6 U! a$ D6 u3 P
- $connection = $worker->uidConnections[$uid];
/ D- o6 k" o5 U3 r2 Q( r; ]. ~8 V - $connection->send($message);8 D4 Z6 { m$ D2 a2 Q. F- R2 Y
- return true;8 X) h5 l+ \. c6 x$ s
- }! J _% o4 a" R% z
- return false;
, |, A0 A8 t/ z! T( W! c8 I, S - }5 z( Q2 r, Y/ Z( ^2 z9 g# a
- $ H( k& E" D. ]& {) ^
- // 运行所有的worker0 F3 n" L: R' m! g5 f; K
- Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');, L' i3 N' q$ L+ j0 ~) h
- ws.onopen = function(){
- g, |! P1 p$ ~- h" R - var uid = 'uid1';
3 D$ Q/ h& [ B& C - ws.send(uid);% [9 q4 t( X% d3 C0 V; E
- };! |% \+ y3 F, \4 {% w7 U* y
- ws.onmessage = function(e){
. {5 ]) b# @6 C- g0 } - alert(e.data);
. y- {# `$ A" x4 T) a - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口3 o) l8 U w7 x( e9 Z( h& k( |
- $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);0 D# b+ T- o; @- g: q
- // 推送的数据,包含uid字段,表示是给这个uid推送
- e3 F$ ~6 |0 x( c - $data = array('uid'=>'uid1', 'percent'=>'88%');' |* G$ m0 f: \ r& d5 I
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符. E$ M/ _+ { W# L4 C* ^
- fwrite($client, json_encode($data)."\n");) d) }% J; E- C: \8 J
- // 读取推送结果
/ q* R& s0 u" L% }5 { - echo fread($client, 8192);
复制代码 3 Z5 X1 `/ M+ _9 G: r. @
% K+ g- C5 s$ z" [ |