- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;& o- v, T8 ?& `
- require_once __DIR__ . '/Workerman/Autoloader.php';
6 e# G( h9 G. C6 p4 C) v( T: c' p: n
/ q) G& ?4 l9 O: k2 |- $worker = new Worker();) E7 U& y5 x) x9 ^/ l
- // 4个进程
2 h' d* y7 Y. v. _ - $worker->count = 4;
4 w- k, f- e0 X1 ` - // 每个进程启动后在当前进程新增一个Worker监听* y) X/ m9 v2 y% T3 J8 Y6 Q. W3 |
- $worker->onWorkerStart = function($worker)
7 H- j, a' b: O: O% X - {
- o3 @( c) R4 S0 O - /**
0 V- o! e2 B( m5 U - * 4个进程启动的时候都创建2016端口的Worker
7 V! G, h- g! d# D* {7 I; B. _ - * 当执行到worker->listen()时会报Address already in use错误& k5 O. w3 ~! i$ P, r( ?8 t- L, }
- * 如果worker->count=1则不会报错: }- G2 E+ \6 x7 s, k4 x2 b, N9 ]
- */: H1 J& t2 n5 Q+ C5 E
- $inner_worker = new Worker('http://0.0.0.0:2016');
\. R6 I, ]0 ]: d - $inner_worker->onMessage = 'on_message';2 P+ d; d. h" U& h& \9 a3 x x
- // 执行监听。这里会报Address already in use错误
$ }7 j% e2 T1 m* b0 X - $inner_worker->listen();
, z& q$ t7 ^6 e, Q8 Z. A - };' \& b+ O+ V( m: B, I
- 7 }: l8 K2 Q- r1 Y" S/ K+ a
- $worker->onMessage = 'on_message';
! Q3 x5 B& O+ i: F
5 p& Z" A7 w8 i! ]0 h- function on_message($connection, $data)
& L: z( h- d7 a7 R' L3 h5 z- u ? - {* k W$ Y; g! P" @$ r" Y
- $connection->send("hello\n");
* i: b+ G0 X; C$ s& w - }8 A7 q- z, o( N7 f
- , ^: y9 b7 I, ~! {* h4 @* R) D
- // 运行worker. b' ?' g' a6 i8 t& r7 }
- Worker::runAll();
" m- _3 n. N! p" Y2 z, L1 _9 a - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:5 |, {7 `7 X# B8 L% Q& I z
9 k0 p9 B9 v9 v# b- use Workerman\Worker;8 u6 Q+ j2 r2 }3 Y
- require_once './Workerman/Autoloader.php';
" g% B8 T% d) b8 S( H p. G
. [) o- r3 _' |- $worker = new Worker('text://0.0.0.0:2015');
: O' W+ P; T: c$ R3 E5 e - // 4个进程) t. G2 l' E$ e1 t) b8 i
- $worker->count = 4;
! t* m1 w3 S/ |3 f - // 每个进程启动后在当前进程新增一个Worker监听
: W9 s: _+ z1 V% l1 b! c. _# L - $worker->onWorkerStart = function($worker)
+ P% G2 F5 I" L; u7 R+ \ - {; s% T5 k* B/ [8 _9 C" e9 O9 j
- $inner_worker = new Worker('http://0.0.0.0:2016');
% S/ p- h3 q7 r - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
. {. C, Q! n7 ?4 g$ Z: ]4 l - $inner_worker->reusePort = true;; ?. q6 D, j2 c) ]6 G
- $inner_worker->onMessage = 'on_message';
3 z8 Y1 P* w: ~/ b$ c, \ k+ h- } - // 执行监听。正常监听不会报错4 N# k- p+ E D5 ]6 ]
- $inner_worker->listen();! F8 |& C; u8 V/ e5 U$ H0 P4 f
- };
& k9 f4 d$ `- r6 A( z# Z - , q A5 Y n* }. k4 l
- $worker->onMessage = 'on_message'; |' Q! y. M ]! l- `
- ) h% g8 j( C5 V
- function on_message($connection, $data)& v' F3 ?/ O g, ]3 ]8 W
- {
& u' A+ y0 _1 H1 t+ a - $connection->send("hello\n");8 T& V% O1 Y% R. u% M
- }
, S; v3 K( f: M - $ t9 w2 @" N. W& E" u! c" r( J
- // 运行worker' C& F' x+ O9 \( }2 H: j
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php. m6 a7 u" k5 i+ D* {1 y
- use Workerman\Worker;
2 g4 l! X8 {* h, X& z - require_once './Workerman/Autoloader.php';6 p; i. f9 _! O$ [" D' r7 S
- // 初始化一个worker容器,监听1234端口: N/ I7 D$ N; G, [; d$ v7 C$ k
- $worker = new Worker('websocket://0.0.0.0:1234');
( x+ C7 n3 ~2 Q7 q - & [3 N4 ?( ?+ K2 q
- /*% d$ \; Y) b7 X5 d1 ` J/ H" S
- * 注意这里进程数必须设置为1,否则会报端口占用错误
9 f- @0 G$ M! h - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
; S# e" W2 |* W9 {1 r4 w - */
F/ _ R7 w8 R: i8 n - $worker->count = 1;
) a1 W- v* m' V, j - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
" N" e' o3 b, w5 q - $worker->onWorkerStart = function($worker)* \: A( m" L' X. [8 o! d! `; W
- {
% I d% e5 `% E4 Z6 _ - // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符8 A p% v- l6 w Z g! ]
- $inner_text_worker = new Worker('text://0.0.0.0:5678');5 U5 f- \9 I& g+ M! C
- $inner_text_worker->onMessage = function($connection, $buffer)
& V1 B) q& O/ S" c* _1 H P. R - {0 r+ C$ H# ~8 g+ B2 w z
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据3 C) } e/ [. k& K
- $data = json_decode($buffer, true);; K. x! u9 ?" f8 j3 u6 m# w1 ~, h
- $uid = $data['uid'];+ O5 f; g4 m8 Q7 m+ P
- // 通过workerman,向uid的页面推送数据1 d! S+ c9 R6 k! f& L/ e
- $ret = sendMessageByUid($uid, $buffer);( }1 O; z) V1 _0 @& C. Y: {
- // 返回推送结果2 n0 |8 D% t/ X9 i$ Y
- $connection->send($ret ? 'ok' : 'fail');
6 ^, I. l) A' T2 ^) a3 \ - };
' k4 ~- }# s( I4 i m - // ## 执行监听 ##
. f1 T6 R0 g0 N+ R - $inner_text_worker->listen();
* t. ^' ~' D# R+ y' y6 ^ - };, g3 p2 O0 g4 J) Y6 v( B: h
- // 新增加一个属性,用来保存uid到connection的映射- A& \3 c r: w" _$ |
- $worker->uidConnections = array();8 ^5 o8 W7 B; z# u5 m
- // 当有客户端发来消息时执行的回调函数2 q& S9 J* b5 R' z n( E
- $worker->onMessage = function($connection, $data)
- W7 g: [! T A1 Y; g* L0 l' Q$ D$ U - {
% K3 u8 z9 r$ V$ d0 C - global $worker;. B( i7 x2 A: ]$ {0 [1 n- V8 i
- // 判断当前客户端是否已经验证,既是否设置了uid
/ p( V6 j8 K4 I7 z% c' t - if(!isset($connection->uid))+ [$ ?" x+ z& w
- {
0 q4 `: E! b' k" H6 t4 o; Q - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)& M; G6 J- p1 O: w3 U5 o
- $connection->uid = $data;* C5 f! \9 N# c5 Q7 U- P3 u
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
, R% B4 o! g) P4 i/ e0 W9 N# x - * 实现针对特定uid推送数据1 p0 J6 v4 @4 c( j8 j9 j1 U
- */ ~+ m. v$ w* h% B o3 d j5 X2 [
- $worker->uidConnections[$connection->uid] = $connection;
. @7 Q5 G( R2 w6 D, B+ x3 o5 R - return;# T# U8 Y1 [5 {/ Y
- }4 |7 B- H p' D4 L4 O! N
- };
5 i- u$ L) V. s
+ y9 b8 V# p- F+ `9 Y* X! i0 K- // 当有客户端连接断开时( u5 o2 r/ g/ q0 m
- $worker->onClose = function($connection)
6 k m/ ^8 \1 L1 j7 T4 B - {
2 [" Q* R- W# q$ m - global $worker; s! }* y+ P! I& R6 j0 q- y
- if(isset($connection->uid))! {" o* Z8 O, ]3 }- _) y1 b- N
- {5 n( {0 q3 e. T& t, `. ^; G
- // 连接断开时删除映射+ I* }# N1 y! q- m2 h5 f
- unset($worker->uidConnections[$connection->uid]);
# K0 `1 Q3 R, B8 R8 R, f. w - }
! c% \! U$ ^ O - };
( k2 V6 s. Z* \5 j - 9 x/ [2 s t( r& G0 n
- // 向所有验证的用户推送数据
% ? m0 `2 m' F1 v - function broadcast($message)
1 v8 `% `# V4 i& G- V8 d - {! i/ z3 A0 r# {6 d% S$ [+ u& u$ k
- global $worker;' U, S3 d+ F" H% Z/ A. Y
- foreach($worker->uidConnections as $connection)& h; A# l: D% T- ?1 U1 q
- {
/ R# _0 ^5 f {0 T/ u - $connection->send($message);$ }+ S7 }+ e6 v, b! o. r
- }3 C; x" z6 ~7 X3 B
- }; R' e p# _9 J: c2 f" ~
- ; N/ z5 ?" p; l+ G+ I1 ~# l' i
- // 针对uid推送数据
' E' H7 ^* D* P; [, E - function sendMessageByUid($uid, $message)
: y, p9 ^, K# Y+ L& m - {6 o* {1 S5 m4 ]/ U* ]
- global $worker;
$ t2 ^* @: U+ H( {8 B, K# t' _ - if(isset($worker->uidConnections[$uid]))8 W0 }+ t& E! G" f. g$ l( f$ A5 l
- {
/ Z" v S1 C7 ^% R - $connection = $worker->uidConnections[$uid];+ |% b" p5 p0 f
- $connection->send($message);
# w# z3 J& T1 N7 I - return true;1 K" b; E4 M6 Z: k+ M
- }
4 ]/ `( D# D7 O0 B | - return false;
' y/ B; y; _! n# e6 r4 B - }
% a1 X) _) [$ V9 k& x
) B7 I3 g9 \0 e9 T( Q- // 运行所有的worker
; f d2 t0 ]6 c/ P! s# ~: Z - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');: C' e- x1 j/ }% u# W4 H* K" I! e0 k
- ws.onopen = function(){
* M# R" {: k% z; s) ?- [) G - var uid = 'uid1';
* m, i% q. N( F) e+ N" q - ws.send(uid);
; u6 ]5 i8 H9 @( s5 B& ^( R. W - };$ x) [/ \* q8 h1 B: q( Z! ^
- ws.onmessage = function(e){2 Z1 i$ O1 s2 d) X
- alert(e.data);
2 x1 V. t" ]- l7 b( | - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口$ t. }9 P1 X2 m" L3 ~, o: i
- $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);0 {% p) V( y' h
- // 推送的数据,包含uid字段,表示是给这个uid推送( G. h6 S' R3 F5 y
- $data = array('uid'=>'uid1', 'percent'=>'88%');
, {. ] V; A$ N- ~# A3 i V0 e* w/ l/ z - // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
* H9 V" \9 u! M - fwrite($client, json_encode($data)."\n");6 w+ Z f5 @; i1 u) _
- // 读取推送结果. L3 Y8 K _9 Q& C+ b9 |
- echo fread($client, 8192);
复制代码 ; ?6 i/ Z, w- B+ l, |4 ^
; `5 V0 g- e, k9 r
|