- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;
$ u& J# g0 b* E& B2 ]$ P - require_once __DIR__ . '/Workerman/Autoloader.php';
6 N D: n7 H4 [/ ]
' L. B/ v: H5 X- $worker = new Worker();" o( }7 f4 I3 P
- // 4个进程" a1 B+ h5 c! K- d/ }. o @
- $worker->count = 4;& c8 F" k+ U! i \& C
- // 每个进程启动后在当前进程新增一个Worker监听/ a9 L& y' b* T+ k x0 I
- $worker->onWorkerStart = function($worker)
4 N( _$ w/ `: d M - {
' q0 N# {* ]: V$ N( v8 \ - /**4 ^$ m% B |! K2 S
- * 4个进程启动的时候都创建2016端口的Worker: J7 o" P5 L5 S0 A* _
- * 当执行到worker->listen()时会报Address already in use错误
, P* ^8 a& s% e. N1 ^" x" i0 g6 L - * 如果worker->count=1则不会报错
9 W( h5 Q, B, F+ w$ R' } - */ Y( M' ^9 k7 Q* l9 c$ P0 R( G$ p
- $inner_worker = new Worker('http://0.0.0.0:2016');; E3 N5 a8 B1 U: ]
- $inner_worker->onMessage = 'on_message';5 Q4 q/ o4 }% j" A8 N( P
- // 执行监听。这里会报Address already in use错误
% q" ~# l4 g3 R+ |, Y* K' H - $inner_worker->listen();
W/ e6 P% }& j5 o - };
& F; O. n5 t; D! a7 c
3 l; O7 d/ w4 K* w& A1 [ M: [- $worker->onMessage = 'on_message';2 C/ V4 J* n, s/ r! }
- 0 o3 y7 R9 F% ` H& i E6 @* O3 \5 z
- function on_message($connection, $data)
! e2 G& L1 O& B# ]; h$ \+ m( e7 w - {
7 T3 [" ]# m* t0 W# C/ G7 i7 a! ^3 w - $connection->send("hello\n");/ u3 K3 p- Y, [6 v7 O2 Y- l
- }
3 v" ^" }; ^- @( _, D5 G - 4 N$ |+ I& F5 s; v _
- // 运行worker
S' E- p8 a/ q" r2 z8 | [' w - Worker::runAll();
" B L0 l& ]% N; S8 M - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
% r: v4 @/ L; }' L - . W1 o+ B" R2 W0 K
- use Workerman\Worker;
. ]9 \# O, H! n! @/ j$ y6 z \( ~( ^ - require_once './Workerman/Autoloader.php';
' @3 E1 q8 [: S7 s9 V0 k
" l* A) t. S+ B+ L# z u- $worker = new Worker('text://0.0.0.0:2015');* H& D) x/ h- y
- // 4个进程! p; V- o& X, n/ F4 V* m
- $worker->count = 4;# r- m! I& i) ?8 F/ p: L, J5 p: `
- // 每个进程启动后在当前进程新增一个Worker监听
7 ~9 G/ c1 q" C% y1 ] - $worker->onWorkerStart = function($worker) E! X/ ^2 N# j6 v
- {4 W! N2 t' f2 Z5 V" R
- $inner_worker = new Worker('http://0.0.0.0:2016');4 x+ Z+ w# }5 a
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0), j- ]; d) f, l* j, z z; R
- $inner_worker->reusePort = true;4 Z, v% f2 @' E( I. {" V! v0 Q( e
- $inner_worker->onMessage = 'on_message';8 J" L( @3 J7 M3 y3 U0 G
- // 执行监听。正常监听不会报错
$ ~) `: x3 R0 Q+ u8 @ r; C. W - $inner_worker->listen();
* H$ G! D+ u2 h% {0 B7 B8 ^ - };
. h+ ^' E I% o3 t( c( G$ O$ {3 y - . }9 t0 e$ D* ~) |- F
- $worker->onMessage = 'on_message';4 _: a+ C( m3 k" B( z
- ) ~* Z, E/ n# k, C/ b: H2 V; B
- function on_message($connection, $data)
- x8 N$ [- [2 g% v* t - {: H) P: ^( O! p4 H
- $connection->send("hello\n");
4 U" _0 s3 ]9 ]) `* a% h4 ^ - }
' `# `% U" @6 I5 a/ n - , @+ O6 T7 V% Q! p5 T" p
- // 运行worker; U' c. m* r, p
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
7 k1 h3 ]" `6 L* A - use Workerman\Worker;, ~& [7 F: M) C9 Y. g& F1 T
- require_once './Workerman/Autoloader.php';5 G- x. v" Z% u6 W( y& ~4 w1 Y
- // 初始化一个worker容器,监听1234端口
" {( `# ^2 z; S4 y- i - $worker = new Worker('websocket://0.0.0.0:1234');
1 O. {& ?3 [; u3 ` - 6 i9 f$ B$ n0 x- S3 j
- /*, ^8 B* n3 y) }' g. q& R
- * 注意这里进程数必须设置为1,否则会报端口占用错误
6 ?" _. g O9 @ J9 \( s - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true) \" L, t8 ~5 R7 r8 O5 v, z7 |0 ^
- */" C/ Z- i5 x7 c/ R( N
- $worker->count = 1;0 B& C8 x6 |7 G( V/ J
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口8 i4 s# Q8 K! J6 ]* {& A
- $worker->onWorkerStart = function($worker)' G6 ~9 q( u9 X/ k2 L- x8 l
- {
* U- A: e, e: ^0 a1 Y - // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符- K, K: W0 S" b5 Q/ _
- $inner_text_worker = new Worker('text://0.0.0.0:5678');
2 I, \/ i) U5 z9 O, R+ S+ J8 J( Z M - $inner_text_worker->onMessage = function($connection, $buffer)
" A$ w( m7 o, J v - {
t$ }/ W6 ]) F: m q- C; Q - // $data数组格式,里面有uid,表示向那个uid的页面推送数据+ ]( W' q! r$ f
- $data = json_decode($buffer, true);, ]: W* h* p, z$ f3 c
- $uid = $data['uid'];7 Q( @; Z4 [. q
- // 通过workerman,向uid的页面推送数据
) p# n0 Z% q# F( y$ Z% w6 P2 R4 l. \: h - $ret = sendMessageByUid($uid, $buffer);5 \$ L5 c9 c9 f7 [, T
- // 返回推送结果
1 m/ ~9 m+ `$ e2 R7 V, S% ^9 A/ A+ C; M - $connection->send($ret ? 'ok' : 'fail');
4 p" [8 \; G( t5 O8 Y - };: K* X6 ~3 p7 Y( c! ]
- // ## 执行监听 ##
. x" C) K' d6 {& j) x4 r5 v - $inner_text_worker->listen();
! B) W* T+ f3 M4 X9 [ - };) u- d$ v9 p; ^5 L/ b+ J# D' M
- // 新增加一个属性,用来保存uid到connection的映射+ u3 Y$ `/ s3 ^6 H
- $worker->uidConnections = array();/ Y# L. \$ `6 _1 P) {
- // 当有客户端发来消息时执行的回调函数4 I& l& F+ \3 \1 b
- $worker->onMessage = function($connection, $data)
- n- F1 L5 U: G9 U% ` - {
8 {% m S$ c4 T0 f$ o4 s - global $worker;+ F! i2 {4 ~5 J5 V/ o
- // 判断当前客户端是否已经验证,既是否设置了uid
: G! D2 O( m, G+ B' u9 V - if(!isset($connection->uid))
5 V+ s1 c% l* b( c. a0 O! z - {( a4 f& m( k0 f5 ]$ g3 `9 N
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
# {8 {. i r0 B - $connection->uid = $data;
1 W8 q2 v! }, o - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,! H8 r/ Q) | l# m4 Q
- * 实现针对特定uid推送数据
$ C5 _4 k0 V; ?1 ]# c" t! \ - */
& x5 `) F! B6 G4 b m% W- I - $worker->uidConnections[$connection->uid] = $connection;7 z2 q% x* Y( e0 D' V7 U) I
- return;
/ Y1 H5 \% v. K" ^* f - }- O6 b* B* {7 n% a8 [& }
- };
! q. L2 ?0 O' h9 c ` - - V% n& d7 [; p* A& F; A. }6 i# L0 g
- // 当有客户端连接断开时
0 P9 P7 P4 P- ] - $worker->onClose = function($connection)
9 p+ ?7 r. r4 ^) ?/ m4 I - {0 T; ^; t0 T% Z) }( Z
- global $worker;$ J+ {8 [, N' q- V2 }$ l9 c+ n
- if(isset($connection->uid))- }2 |5 M7 l& G" }* ^
- {" Q9 V8 y* i$ I' U6 j
- // 连接断开时删除映射
5 V! Q! T6 X; b4 P( G- ] - unset($worker->uidConnections[$connection->uid]);
9 o( B. L3 r1 V7 R# \- _ - }% o. j* g/ C- B) @' l
- };8 @9 L# n" |. J Z9 _3 ^, c
) A, d0 u1 R8 }- E. B/ C- // 向所有验证的用户推送数据$ d9 R( u5 C) @8 H) W
- function broadcast($message)
9 Q6 p# Y, N" a+ H4 k+ W; g" G - {
3 ]; d. ^: @% V4 \0 |' r5 c$ n" x* j1 E - global $worker;
& N) j4 H) y$ s2 F, \3 { - foreach($worker->uidConnections as $connection)& F1 z# Q! c; Y9 K, `
- {
+ c2 P8 l5 j H/ D - $connection->send($message);+ w3 |. N, a* \( V
- }
9 b" g; J' d- _8 D! A" \$ ~5 d - }+ k6 h: _9 E% r+ J
- : Y' V* z* _! c9 N$ A( P" e2 e
- // 针对uid推送数据
/ t7 [3 `/ O5 T- d9 g* h - function sendMessageByUid($uid, $message)- @; K! O; c: X# N" m( [
- {
1 }3 s! E' s+ E$ { - global $worker;
# P7 [2 p* e( } - if(isset($worker->uidConnections[$uid]))
) r# W# V; w) ?4 b! C/ } - {
5 Z L6 C. y' P/ R4 [6 Z - $connection = $worker->uidConnections[$uid];
$ i N6 Q) X9 H/ z8 P* L7 \( k - $connection->send($message);6 R" }1 k4 L& @7 H; g/ t
- return true;* n3 ~5 x! i. I% i6 O
- }
0 X* [7 I- i( Q* [, X, P - return false;! ]: }2 V1 Z" o, H: B* I
- }
. V8 f) y7 s9 }) c5 q& y - T' w3 K1 Q' R2 Y2 L( ?. e
- // 运行所有的worker1 {8 U! n* [- ^$ s6 W% |
- Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');
% z, L1 h9 P/ b2 a5 S" a0 C - ws.onopen = function(){- n: \. p' z$ q2 H: O( J6 \
- var uid = 'uid1';
" n3 ?2 ?/ z4 ?0 C* k$ T2 G - ws.send(uid);* o0 g% x2 h/ K* T# H- E4 e( ^
- };8 A- G T7 z6 N5 E
- ws.onmessage = function(e){. f9 i- V9 ?4 L1 B- K: m% L
- alert(e.data);
2 O% Y5 D& o( t; h - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
" L; ~. N: Q4 j L( ^1 {; L Z4 G - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1); I2 U; f" |/ r+ s! X* V
- // 推送的数据,包含uid字段,表示是给这个uid推送* f# }$ [* A7 ]; r$ G+ Y
- $data = array('uid'=>'uid1', 'percent'=>'88%');% T; b- Y8 e& |# O+ d1 I' d* G# `7 F
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符- m: Z3 [2 Y4 V8 m8 `2 F7 G
- fwrite($client, json_encode($data)."\n");
# @+ u* O0 i0 Y5 }- h) l - // 读取推送结果
. v8 z$ ^1 r2 M7 Y2 S9 E - echo fread($client, 8192);
复制代码 * G. x5 `8 X: |3 T
2 h2 B3 v* z; f7 \. t |