- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;8 C% N1 L; e7 u# K# \
- require_once __DIR__ . '/Workerman/Autoloader.php';* `* G2 R& R' h5 W4 W5 i, b8 D
- ; L! a i' c* V
- $worker = new Worker();/ a' C; j- t! C8 G x( ?
- // 4个进程
, _0 [5 d4 t; g. D- C - $worker->count = 4;
) D( \$ \, w) F: B# p - // 每个进程启动后在当前进程新增一个Worker监听
) w) a! s G5 E; @" D# N - $worker->onWorkerStart = function($worker)% X! y1 S- d* ]* C5 n3 ?+ ^5 s; C3 P
- {
/ e6 N8 m/ E0 W$ W9 x; v: M - /**
* H, y+ \" w) e" L* h+ T, q. t - * 4个进程启动的时候都创建2016端口的Worker, p9 \5 [2 o( [5 D* i6 x' g
- * 当执行到worker->listen()时会报Address already in use错误8 s9 o0 ]% a) D
- * 如果worker->count=1则不会报错
s- z9 E# q5 } - */$ q4 q+ Z3 O" Q+ n: a
- $inner_worker = new Worker('http://0.0.0.0:2016');
4 E( d/ z( U7 B - $inner_worker->onMessage = 'on_message';
; Z* ^ I3 b2 ~9 ^ - // 执行监听。这里会报Address already in use错误1 d7 k4 S$ V- t$ W% ~3 O( i
- $inner_worker->listen();
/ o6 F8 ] a' q6 y! d3 q - };0 j; {% F! r2 ?* U) F5 q. J; D
- 1 N3 q9 @) ]7 i
- $worker->onMessage = 'on_message';
5 ^4 [8 t/ e" A/ d- m9 n( y5 ^ - , ]; R; n4 J, M1 `. X, s* o
- function on_message($connection, $data)
7 f; J( I! a; [7 a# ` - {
7 v0 m, U$ ^* m w* j - $connection->send("hello\n");
# O4 S8 D; A- a! A - }
+ d+ C- P. o& f0 f - 7 s3 Q- n) w3 L @/ P! k
- // 运行worker
- T# P$ u- E4 h; |* s y - Worker::runAll();
( m, J( l3 d. G" d$ V - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:5 r8 F2 ], e! \9 F/ T( q
& V4 r3 G1 F5 V# \6 e- use Workerman\Worker;
+ t/ C" u2 D. r# R7 k$ I" Q - require_once './Workerman/Autoloader.php';
+ X' y( e$ Q- U
3 Z% ]" ]4 ]+ U1 i+ S/ u6 M- $worker = new Worker('text://0.0.0.0:2015');4 z5 c. c( P0 G% A h& O
- // 4个进程# N0 e( Y) U/ j
- $worker->count = 4;
/ \0 e( G) [; W. }. [ - // 每个进程启动后在当前进程新增一个Worker监听
# m% p" r. k& }0 }6 o - $worker->onWorkerStart = function($worker)% T: M: I8 W% b9 P" u, s6 D- n, x
- {
7 O. e3 _% r, t1 Z& j' T0 } - $inner_worker = new Worker('http://0.0.0.0:2016');
7 n- j, w" g9 `3 t# q' E - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
/ v8 X! e$ l/ ~: H- C7 L6 y* O - $inner_worker->reusePort = true;
d: t2 ^8 w4 J7 S; N) K - $inner_worker->onMessage = 'on_message';5 p8 D4 V6 l/ D c1 p0 _
- // 执行监听。正常监听不会报错$ Q; x1 d: x6 `; m
- $inner_worker->listen();; O0 o# Z+ U6 ]) a
- };
3 l: \# @5 ?/ N* e0 ?" n0 U _& I - }* U% a; W0 O# \
- $worker->onMessage = 'on_message';
6 s9 d" m6 I8 m2 T8 [' j# B - " ?6 p' l" u" N/ N* n$ F% U
- function on_message($connection, $data)5 N1 p2 ] z+ N3 v, \1 O
- {
# c: K, _) x1 E# ? - $connection->send("hello\n");
7 d8 {/ t/ `- i! ~, b. j& _$ N - }+ ]/ @! O$ c. X7 g+ G2 ?
- 5 f- Y& v9 A" X" E/ U6 _. b/ m1 Z
- // 运行worker' \6 J9 K4 T5 y
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php6 q& L' w1 i9 A/ c% K i
- use Workerman\Worker;
4 b! a7 E* l/ e - require_once './Workerman/Autoloader.php';
: s/ }% G% b/ A2 L" X/ [( E - // 初始化一个worker容器,监听1234端口* {$ e8 {5 i% s, R3 V. ^
- $worker = new Worker('websocket://0.0.0.0:1234');: P7 r% S$ x; a* R0 j* |* h
- ' m- r s0 f1 ?0 \
- /*" Y3 X& R+ T, u; B4 `8 c
- * 注意这里进程数必须设置为1,否则会报端口占用错误, U) K7 `# u4 m4 j0 t4 ~" `* D6 P
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)! V) k3 d: J# Z6 b$ r+ k7 w
- */
- H6 R9 o5 B e @8 y% v* s g - $worker->count = 1;
6 A+ @. ?) l6 G" m- q0 b' }& h" P - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
8 J$ e$ |* Q& H. I) a* N. b1 V - $worker->onWorkerStart = function($worker)
# B* p) g/ P! e, {5 o# R" ]9 p# N - {# ?; M" _" J9 j3 b7 S
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
" [0 y6 \. q# f9 f - $inner_text_worker = new Worker('text://0.0.0.0:5678');5 q! W. {4 |7 R. `/ x$ A/ @
- $inner_text_worker->onMessage = function($connection, $buffer)6 O U; N8 R& f& w
- {
' j- _2 Z. |. x6 G }4 m, I9 e - // $data数组格式,里面有uid,表示向那个uid的页面推送数据
$ _4 y4 K( u% \ - $data = json_decode($buffer, true);9 u" a8 T7 a2 u& O2 [) m( c
- $uid = $data['uid'];
" b7 ` k v7 m4 s - // 通过workerman,向uid的页面推送数据- R4 Z+ X& |# {
- $ret = sendMessageByUid($uid, $buffer);9 M) d* o% r: O- b
- // 返回推送结果8 }% K; N5 O: i! g
- $connection->send($ret ? 'ok' : 'fail');1 V" A; A/ R& ~
- };# e2 \1 `2 H/ z( S5 n% l6 s
- // ## 执行监听 ##3 n0 p( _+ y1 N3 a& v7 S4 {" v
- $inner_text_worker->listen();
) B( C( w7 Y; y/ Z% H6 c6 e - };
. {+ w: P4 V% w* J- E$ F% V+ ? - // 新增加一个属性,用来保存uid到connection的映射
u- t! z% p" n' H' t8 _% s$ y - $worker->uidConnections = array();; Q! ]1 |" b6 G# X: W4 k
- // 当有客户端发来消息时执行的回调函数; |1 o: V3 Q* w# o( T
- $worker->onMessage = function($connection, $data)7 ?3 R+ l% N: V5 v- h; Y& q. n( p3 Y9 B/ B
- {4 n( D' |# x6 N! C
- global $worker;
$ @; R \9 A7 K- m& N - // 判断当前客户端是否已经验证,既是否设置了uid
: l S! \; i" j: Q* z - if(!isset($connection->uid))' Q0 A/ ]( M: e! Q7 m5 z `
- {/ o5 K6 {* Y: S5 q! g
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)% R% C% w6 y$ _ `' W2 y3 s4 ^
- $connection->uid = $data;; i0 ~' G$ U* f9 u
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection, T& M0 B$ D% C2 k" C- j* [- \
- * 实现针对特定uid推送数据
* W* w: J2 S' d+ Y, {0 j - *// y; g: [/ i. A1 S! }4 z) H6 j& y) q; d
- $worker->uidConnections[$connection->uid] = $connection;
M3 ^1 b4 T, x a7 q* }, y! y* h* F - return;
$ u) d' C6 O' w+ G3 G - }: n& H# V( V0 J0 g8 o2 Q
- };5 |* L j0 [5 e* A/ b' f5 L5 L! T5 D
- ; N; W% ]; _) _6 n
- // 当有客户端连接断开时/ R8 M8 _% ?# y$ G
- $worker->onClose = function($connection)0 D0 y7 Z7 C4 W. J7 q9 Z7 o
- {# x; I' J4 B9 x M% O0 W3 Y+ d
- global $worker;
4 b9 D# a9 k% b$ x8 ^$ ?) h - if(isset($connection->uid))$ }) ?$ G& v" N+ E" _! ?9 z$ T
- {
4 Q; |5 `: M0 Q - // 连接断开时删除映射# u" c" E0 b4 ]- K5 K4 L; V# R- z
- unset($worker->uidConnections[$connection->uid]);4 K7 \" q& G$ @9 v
- }
5 n: J( i& @2 o) o8 c0 U& `9 t' X9 ` - };1 q+ Z3 G* N& q9 G7 ^$ _. U0 I4 D
- " `9 k$ r; \# W7 G& p
- // 向所有验证的用户推送数据4 ?6 H% `! Z/ o6 C4 F5 t- L' U
- function broadcast($message)* `+ S$ B4 w9 V+ ~
- {$ {0 {1 g+ c- l2 m$ e' [- d3 n
- global $worker;# T* f* `$ W& @/ `5 `6 u2 w V6 d
- foreach($worker->uidConnections as $connection). b3 h5 P3 G, P
- {7 y& }- _4 D, r- O5 v2 }$ w
- $connection->send($message);5 R3 ` x8 Q( Y& r& V
- }" ^ ~* g$ I/ p1 V) v1 I+ ~
- }
) G. a+ j1 V; a$ ^9 U! F2 N
1 n$ |! g) V: Q' w: K- // 针对uid推送数据
6 J& T' p7 j/ j - function sendMessageByUid($uid, $message)0 z6 H# ]# g! a8 L8 y4 ?
- {1 Z0 O7 c, n: H# B0 N
- global $worker;( z, r2 }# G: H
- if(isset($worker->uidConnections[$uid]))
: S! \& v0 w; v- K$ G$ W8 o - {
) Z% ^& T- W( Y - $connection = $worker->uidConnections[$uid];
% l. {1 P' y: Q7 Z$ O! f, h - $connection->send($message);
$ k! j9 r i# C- ]" } - return true;& g. r) R" K9 I4 h* K; k& `) o
- }$ a2 _( U" n; \5 u6 s+ i5 \! i
- return false;* A4 o, V% P: v7 U* W. F: L& D
- }
" C1 L4 |7 N& \" m" A
/ e! ^2 r% h6 b. K- // 运行所有的worker
, \5 i, e- ` _2 {, O' f - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');8 T. }3 A: o/ [7 z- Z9 G- }
- ws.onopen = function(){8 F* v9 l* C* C1 Q9 f
- var uid = 'uid1';
2 Q4 U- G8 y+ N! A, L6 j/ c - ws.send(uid);
" s. n* U/ a# j& g2 U! N& a - };# }+ F& S# `0 o. W
- ws.onmessage = function(e){
" p% f# S- l8 n0 j# ^& [( A$ G - alert(e.data);
0 Y0 F6 p" s% S1 G3 R+ q2 t - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
# r( q: {' o f7 Q. s1 Z+ P+ [ - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);$ u$ x# J- a) u5 S2 L
- // 推送的数据,包含uid字段,表示是给这个uid推送' G% h9 u- K g1 n+ u/ t; W
- $data = array('uid'=>'uid1', 'percent'=>'88%');2 t0 b, b5 ^" O$ {- H) K
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
+ `$ E% u* e z1 J- e# z8 ^3 v6 w - fwrite($client, json_encode($data)."\n");9 N) j P% l0 I' e* e* R' t
- // 读取推送结果& Q6 Z2 v4 f# S: W$ `
- echo fread($client, 8192);
复制代码
3 y4 l( |) _- k! A( V
4 g5 ^9 \8 @. t |