- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;% ^3 h4 u, Q+ W4 v5 V
- require_once __DIR__ . '/Workerman/Autoloader.php';
4 Y. C7 E: _) J0 \; T4 u - 1 w- f1 w. ]! e
- $worker = new Worker();
# T+ \) s# Z( t - // 4个进程3 r7 K) x/ f) O' u o6 F& F
- $worker->count = 4;# a2 U8 ?$ `# {) J1 f0 T
- // 每个进程启动后在当前进程新增一个Worker监听6 m ^- p% K0 o6 }, [ _* r
- $worker->onWorkerStart = function($worker)' z) i4 ~5 L* w3 r* I% g6 H
- {! E: h- x( C. |/ B. g- Y
- /**
2 x/ N7 p/ h) t/ J - * 4个进程启动的时候都创建2016端口的Worker
5 F, d& _5 h2 C8 v, D) V - * 当执行到worker->listen()时会报Address already in use错误5 T6 l4 S$ H$ | k% w d- Y) j
- * 如果worker->count=1则不会报错+ I. `! O$ l9 Y
- */
0 O& q x8 M1 |; o* K6 W - $inner_worker = new Worker('http://0.0.0.0:2016');- ?7 [7 F" H p2 R: B0 }
- $inner_worker->onMessage = 'on_message';& D/ J# h8 q6 g% `& t: `1 g
- // 执行监听。这里会报Address already in use错误
: u' _3 Y& o, e- z# K - $inner_worker->listen();2 z; }9 {' q0 p% A/ k
- };$ _6 L7 @" ~4 y5 S& |+ i
- * M& H, H* O' ]& r7 O9 W+ Z- r
- $worker->onMessage = 'on_message';
2 V; V9 E3 U& c% m! C$ x
# G, z' J+ N8 M/ E" l: {% G- function on_message($connection, $data). h+ [" ~" B' p) O
- {9 ?& R4 X6 a" |! n; @8 K' [
- $connection->send("hello\n");
6 Q% T1 Q6 b$ D - }
8 c2 \, z0 ?: J' X3 p; g$ s! y
+ U. [3 W- ^+ i; d6 i3 [' d& `- // 运行worker9 u0 Q# k+ z: T3 W# R, d$ o9 r
- Worker::runAll();
; a! M2 Z- ]6 n7 i - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子: b* V" [' Q1 L+ [: _" R+ @* d
- 1 Z+ J( H6 z, N0 u1 n
- use Workerman\Worker;
. T, R& J) ]4 x - require_once './Workerman/Autoloader.php';' X P3 h! q* J w& v
% D1 k3 m4 E: a* s* ^. c/ p& Q- $worker = new Worker('text://0.0.0.0:2015');0 f6 f) s3 w/ [% s, y; B5 o& T
- // 4个进程' g7 {" Y, u) W8 Q2 g$ l
- $worker->count = 4;# ~$ p/ P9 R# w# W
- // 每个进程启动后在当前进程新增一个Worker监听
3 k/ f1 N+ h& w( N1 \( B J - $worker->onWorkerStart = function($worker)$ G( U, D f" \0 A
- {
, l% O6 ?3 `3 I* l, ^. o7 ?7 B" b - $inner_worker = new Worker('http://0.0.0.0:2016');; ]6 L; s+ h7 D9 ^+ v. i
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)9 V; ^+ a, d* o* Z! k# W0 Y$ z
- $inner_worker->reusePort = true;
8 d% V1 j6 V/ J' }2 \ - $inner_worker->onMessage = 'on_message';/ e# s4 f5 U6 ?" P6 T8 x
- // 执行监听。正常监听不会报错
% y% K: p6 R: T1 f - $inner_worker->listen();& @2 J9 _! S3 B. ~' q: c, Y
- };
2 y0 _5 k* B; C4 f
) u' }/ u0 W: ]1 a% {1 ~" P- $worker->onMessage = 'on_message';. f* G. v) |" M! d* R6 A8 N7 b, E
& [1 ]- f8 Q7 n {- T6 y- function on_message($connection, $data)
0 N2 N" a$ S: X0 j5 ?- V9 ~0 h - {5 m* H( n, [8 A/ `
- $connection->send("hello\n");
) t. P( C/ o( C. N! ^ - }
E; v- n i5 L* a, ]
: c% A3 p. b: b- // 运行worker* I- F G; N6 P
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
1 t4 h3 ]2 [0 H/ q - use Workerman\Worker;" B W4 D8 d1 G9 c
- require_once './Workerman/Autoloader.php';
! t8 k& i( J k% D6 f$ }# @. m" a - // 初始化一个worker容器,监听1234端口' J* u! h# o, _ \
- $worker = new Worker('websocket://0.0.0.0:1234');, F6 i: V8 f# i- X: p4 t4 F7 {
- 6 S% F5 b F0 O! f
- /*
" G: @( r4 I! h - * 注意这里进程数必须设置为1,否则会报端口占用错误
8 S& f- c8 `2 k- P/ `6 x! P5 W% K: C - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)6 p5 |. _3 d* p' f9 ]4 Y
- */
$ c" R* V( o; ~ - $worker->count = 1;
Z0 H6 I) }9 I$ r9 u - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
3 ^3 Z% h* \! z3 I9 G - $worker->onWorkerStart = function($worker)2 R# K" L2 }! ?7 X: Y7 }
- {/ I( R. P9 v7 `4 Q: ^
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符$ z* S( ]1 v3 Q1 ^: P
- $inner_text_worker = new Worker('text://0.0.0.0:5678');
9 c: y0 q/ Y$ H7 a; ? - $inner_text_worker->onMessage = function($connection, $buffer), `0 j% y7 d2 \4 o
- {# }* E; d5 r8 l" W3 I2 i
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据
8 s/ ]# k3 Y, v r/ y+ T - $data = json_decode($buffer, true);7 Z. R6 {8 a7 h' V; F
- $uid = $data['uid'];
2 {0 D/ W- i8 n8 L2 L& t( m' a$ I2 D - // 通过workerman,向uid的页面推送数据- @2 y9 V f6 z$ r v
- $ret = sendMessageByUid($uid, $buffer);
& e# s' g+ G8 _( h: Y) W4 o - // 返回推送结果
, r' h$ C- I7 d: v! g" Q h - $connection->send($ret ? 'ok' : 'fail');
?7 Z, }7 u; f! k2 j" r: N - };" W( f" N3 E1 W* N+ L3 i& V
- // ## 执行监听 ##/ c I, \- \8 a: W8 t) y3 B' i% G
- $inner_text_worker->listen();
: o7 n3 O7 ^4 t8 ~1 e - };
, W$ `' d3 [7 i9 _3 T8 K$ y - // 新增加一个属性,用来保存uid到connection的映射* f! Z0 f7 `' x; N
- $worker->uidConnections = array();
4 t6 H8 ^0 c6 ]- S* E4 u - // 当有客户端发来消息时执行的回调函数
& \. v i; D% K' F4 C - $worker->onMessage = function($connection, $data)
- p9 f9 g2 g$ O/ \* N: y" o8 U - {& f Y `6 e# B; a# M, e" j2 w0 z
- global $worker;5 Q6 [/ T+ X1 O: D
- // 判断当前客户端是否已经验证,既是否设置了uid$ |8 d$ ?" y+ r9 Y
- if(!isset($connection->uid))
- M! }( V" N" v; n9 p - {- Y2 p; w) W/ H3 |- ?; n4 y
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
, l# z# C2 B. i1 D' m3 Z4 k - $connection->uid = $data;
7 ]7 e) s" ]. f1 { P3 ~% K* { - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,3 b1 F4 [( D' Y! A/ o- d
- * 实现针对特定uid推送数据
$ ]9 C# z% j( `' p" h2 R - */
- s; q, v( q& t& x. i9 o3 Y - $worker->uidConnections[$connection->uid] = $connection;
5 M: m: L# S' s/ \2 i - return;
) k7 m7 \+ L9 _ - }4 v9 u! G0 S# l. H9 U6 D2 t
- };: E0 S' c) U2 Z3 o
5 x4 s/ a9 ^5 Y G- a8 g. ]- // 当有客户端连接断开时
* Q1 J2 V7 i9 S$ Q F - $worker->onClose = function($connection)
5 [1 u5 o ~- A8 c$ ]& p. f9 h: W - {( L% i( e: c. x! d" m9 a w
- global $worker;) {2 S5 n. x1 Y# k2 J3 f, x \2 e
- if(isset($connection->uid))$ A4 E! O1 i1 f2 e& `* S( S6 n; O$ v
- {
! w# s) O( B1 E6 o; ]% H+ x: E3 l- d6 Q - // 连接断开时删除映射
, a& c% [7 p8 f% j+ y9 f7 O+ \8 } - unset($worker->uidConnections[$connection->uid]);
8 e& O: y+ Z) \9 A9 x* D+ x8 j - }
8 g7 p3 H) ]' |8 n* Q - };- `( z& S y% l$ q* R# S. N+ J
7 m" S0 p9 p8 b7 V- // 向所有验证的用户推送数据, u& m* W* n& y/ R% X
- function broadcast($message)3 ]/ [, r% F0 T( J
- {8 z! b4 z3 g9 N3 i$ H; `: x8 }
- global $worker;
' I0 R' ~" E0 W( T3 s - foreach($worker->uidConnections as $connection)8 R# q9 w# _: p; O
- {2 V6 n& ]0 l! ?6 ?, l8 k5 M
- $connection->send($message);, _5 T- z0 H& J/ i* N0 e5 [0 \5 b
- }
% R/ M4 T0 b+ w- z+ @4 z0 h$ L - }7 ]* F8 |7 |$ Y- K, k: k4 b8 P" v
- : {% |3 A+ a3 x, a6 `: f' \! Z
- // 针对uid推送数据, D$ a8 Z7 d! m7 Z2 k
- function sendMessageByUid($uid, $message)8 |! Y7 J9 @2 @3 R9 d
- {
) c- J9 W# H' J7 ^, }7 K% ~ - global $worker;
2 m V4 @+ N7 i+ r, R, V - if(isset($worker->uidConnections[$uid]))$ ~7 c- J2 t8 h
- {
3 O( l+ E7 T. d, Y! F - $connection = $worker->uidConnections[$uid];1 f3 @; w; h Y# m) {( s
- $connection->send($message);) x) L8 ?+ q2 ^+ V9 j! G& |
- return true;3 J L9 ^+ {* ~ c' [
- }
7 z8 r& [ ~9 L$ x$ B6 X' U1 b/ Z( p - return false;( i* a& b3 t- ]' Y
- }8 x7 P7 M" ~) C+ H* u0 I/ l4 L
+ c# Y* f% _. r- // 运行所有的worker
1 R" ? P; z0 R4 c2 e - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');: E3 Q/ }+ }: {* k
- ws.onopen = function(){, [$ w8 f, o1 _" P! k
- var uid = 'uid1';
. |1 N( o' [; o5 i - ws.send(uid);* I1 j, U9 |* C9 H6 {
- };/ a! [) q+ D( n" o! w
- ws.onmessage = function(e){7 Q) \2 @5 S4 o( P1 q9 B
- alert(e.data);
5 l8 m" t9 U `6 M7 U% Z$ L - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
3 m$ F5 u* `4 W, x7 [, h - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
: G/ b( w/ @# J% l - // 推送的数据,包含uid字段,表示是给这个uid推送
# {; ]' y- |3 r7 o8 b - $data = array('uid'=>'uid1', 'percent'=>'88%');2 ]" u2 c6 n; W0 w; ?5 J7 _
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
}8 `( T" {0 R( m. [" x - fwrite($client, json_encode($data)."\n");/ R6 R1 e& f; ]5 ~& v' Z5 ?% x0 s
- // 读取推送结果
7 o! c% `# [5 c/ O3 I - echo fread($client, 8192);
复制代码 - \% }, ?: w, T
; g0 z: i# C' a# I. X |