- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;
" a/ A5 h. ]/ u0 ~( ] - require_once __DIR__ . '/Workerman/Autoloader.php';! P; Q9 n5 Y8 H5 w, z0 {- d
- 4 x- _! S6 i2 K* W, ]: n+ R
- $worker = new Worker();! h- B3 I& l# _) f# c" Z
- // 4个进程
* e( }/ G0 {% D( w/ q) T - $worker->count = 4;. K, D1 \% v; \5 r Z5 E
- // 每个进程启动后在当前进程新增一个Worker监听% h$ J. C9 D) f" G4 L2 c
- $worker->onWorkerStart = function($worker)
- V' W Q c- w; T% D - {
/ W8 a% n4 J3 {+ D" }3 y9 d - /**
( z/ f4 b& w) ]9 u$ Y, i- H - * 4个进程启动的时候都创建2016端口的Worker
1 [7 R( x* O3 ~& F- `" m; @ - * 当执行到worker->listen()时会报Address already in use错误
4 l& K/ X2 C2 o+ o/ a: C1 E1 U - * 如果worker->count=1则不会报错! r+ a1 [/ ~4 U8 c
- */
s) U1 Z% u; {+ s" L - $inner_worker = new Worker('http://0.0.0.0:2016');
; K! F, V9 h0 G, ]$ n - $inner_worker->onMessage = 'on_message';2 R4 W" a5 ]5 z9 A; X
- // 执行监听。这里会报Address already in use错误- X! O1 o; X4 E; n* l
- $inner_worker->listen();
: g/ l6 M" L( I6 G; M - };
# ^& g# f+ ^+ |3 W8 r - ( ^; ]2 G! ]. W; T4 W: ~% b" I
- $worker->onMessage = 'on_message';
$ ^: e- @; y0 k" V( k! `. D, q) Z - ! `9 g+ w& p/ k9 |" v
- function on_message($connection, $data)
7 w7 \: c: r7 q# f. L3 Q: B2 l - {
/ R+ F( }& ~! s, r4 g1 q7 S; H1 W+ ? - $connection->send("hello\n");
; ?* E* t% p) k4 h( S: ^! \& x - }
" V9 S, g, F& G0 f6 ? - # Z' n6 j; b% { s1 ~" g6 c4 M
- // 运行worker
$ m G! p( U$ t$ O' t7 R - Worker::runAll();% |* R7 ?' f- i" a0 V
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
9 A8 P, X( E: S c - . J5 U9 ]. @1 d8 k1 e
- use Workerman\Worker;
! L1 i' k* `: f$ ^ - require_once './Workerman/Autoloader.php';. J+ ^/ r- v2 F; @# S
' N7 F% K2 E" k/ k8 l- $worker = new Worker('text://0.0.0.0:2015');5 p% I" m1 A( v+ V% W3 _5 E
- // 4个进程
5 h. X$ q8 a% A - $worker->count = 4;
0 n: R2 ?# L, _( i Z% O% j( ~) x; d, q7 @ - // 每个进程启动后在当前进程新增一个Worker监听
" P! A0 ~- S5 \ s' b$ s6 a - $worker->onWorkerStart = function($worker)
( \% h1 N1 v! X' D) h - {3 L4 @, W: ?& k& l+ E
- $inner_worker = new Worker('http://0.0.0.0:2016');" `9 b8 }1 \9 K( Z% y
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)& {" w3 Q* s6 h$ j, p! Q; `' k
- $inner_worker->reusePort = true;
0 s7 G. a* D1 T' e' ^+ ~+ z - $inner_worker->onMessage = 'on_message';
% }6 k3 d7 o5 A - // 执行监听。正常监听不会报错
) r9 V- J* K7 Z/ _9 Z/ b9 \ - $inner_worker->listen();& V! ^( d" m2 r% p
- };
- J9 Z0 [3 a4 o& B" n L - * y( o* w% F7 C+ [1 m2 U
- $worker->onMessage = 'on_message';! o4 D' ]6 D7 f4 |* p7 p
* @* L; @# e" L2 C- function on_message($connection, $data)3 N, J, \$ ^2 r6 Z
- {
/ x. e, D2 y( y2 v |/ n3 s( l1 F - $connection->send("hello\n");
, S4 C7 L Q: p6 y8 I& b - }$ o( q, E5 x2 s" s
- # ~5 H$ B) ]7 k" q- F0 y
- // 运行worker
+ G& v7 G' T5 l! B - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
7 N# U- q) g9 f) c8 O( ~7 P - use Workerman\Worker;
. v) I4 q+ v$ d$ ^: f% C t - require_once './Workerman/Autoloader.php';# @0 }+ k: r7 S; Y
- // 初始化一个worker容器,监听1234端口
/ u! Z3 @. e: E( ?: ?. V( u - $worker = new Worker('websocket://0.0.0.0:1234');3 T0 a7 j* n( i- [4 S) s8 G* g5 Q
- ' } m6 F$ S) F
- /*
0 ?, P* T1 a: y* T) J, k6 }1 l - * 注意这里进程数必须设置为1,否则会报端口占用错误1 n# [2 o- k& g: Y3 Z4 Z3 }( c: `
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)( q; q! Z) l) X
- */
7 J/ Q6 v: K, e3 h - $worker->count = 1;. A( d8 ?' k( `$ C/ k" f
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口6 O6 p+ l5 _* I u3 X! U k& S% X( Z
- $worker->onWorkerStart = function($worker); c! i! S+ D! b; X( j
- {" T: n3 E2 l8 @0 e
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符, d+ {! t( v" d! b
- $inner_text_worker = new Worker('text://0.0.0.0:5678');
1 Y. S( ?& h. s- I' J A1 W. t - $inner_text_worker->onMessage = function($connection, $buffer)
' X. S, d- F7 d - {
+ i9 U" \5 m! ~+ K2 D - // $data数组格式,里面有uid,表示向那个uid的页面推送数据
/ o, G1 r3 k+ v3 o* t - $data = json_decode($buffer, true);3 E' }: {0 B" P8 @; d
- $uid = $data['uid'];
* b6 n6 n' A7 w$ r! E6 o - // 通过workerman,向uid的页面推送数据/ K. e4 x* z9 U8 @- c( z& K
- $ret = sendMessageByUid($uid, $buffer);7 `2 u/ M b. b$ K3 o7 j
- // 返回推送结果
|' s& N$ s7 ?' } - $connection->send($ret ? 'ok' : 'fail');8 Q" H) l( l2 q' y- \: I. o
- };
& v t2 T+ n! T - // ## 执行监听 ##% ~1 c$ J7 i- [4 ?4 u" e1 c2 S
- $inner_text_worker->listen();% N# ~5 ]" |$ B' ~
- };
5 s9 |- Z- I: G$ K7 v% J - // 新增加一个属性,用来保存uid到connection的映射
5 v/ w6 s* V* P8 Z R# w# |0 {0 o - $worker->uidConnections = array();. U1 ^, W; W# h% D# F& W; S
- // 当有客户端发来消息时执行的回调函数4 n3 I: E8 n$ r5 g: n
- $worker->onMessage = function($connection, $data)
0 I+ J% ]# k! A7 }6 H - {
1 \! ~8 ^6 H0 d9 b+ a0 V0 z7 h - global $worker;; O# r+ U4 o6 @% R: L [4 n
- // 判断当前客户端是否已经验证,既是否设置了uid
8 R2 j# w' A* h' c3 i/ H0 X - if(!isset($connection->uid))% T/ E2 g2 H! \
- {
% B) d; p6 v! H0 c - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)/ m+ N9 q; s7 L' P! M& ]
- $connection->uid = $data;$ H4 R. p) b7 |, G
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,! \3 b$ L; A; J, t: T/ T( o
- * 实现针对特定uid推送数据
+ |* ^& c. Z% q2 h' d* c( \ - */, j$ H1 c% r9 H1 q
- $worker->uidConnections[$connection->uid] = $connection;
+ Z( E% m8 ]! F4 |8 H0 B" e9 I - return;
( s% q0 q) b% P: y0 |% G' w2 C - }
0 E8 S0 h* N. S# ]0 }% X1 k - };
( _4 t/ u6 r+ I6 k9 f! _: Y8 f0 g, Z - ! g* {+ d: P. W: p. {0 f
- // 当有客户端连接断开时! Z; D5 J( q0 p9 ^9 R% f7 R
- $worker->onClose = function($connection)0 S3 L% T* v) S: D3 Q& G5 ~- c$ x/ E
- {
+ a+ L) n8 g. ]: h3 q' `' z, K - global $worker;
" Y, ?9 l" c* |7 r6 \ - if(isset($connection->uid))
' C* T7 L! h" _2 G - {; ?' r# {" A" B$ N- m& r5 T$ k% w
- // 连接断开时删除映射9 T0 ~4 K; w* C# b( P1 L, M
- unset($worker->uidConnections[$connection->uid]);+ l# y" K$ V7 q$ c; G+ F
- }
$ c( ^5 P+ q* G- p; ^6 Q - };
- Q6 l# D8 E- L0 ?2 k - . {; N7 V) y$ r# E
- // 向所有验证的用户推送数据
+ Q/ d: X3 d6 s5 P& p" g - function broadcast($message)# Q% U- Y$ _( n: e4 I: b
- {
) A+ ?- j* j3 f0 h: Q- X" Q - global $worker;5 w' S1 n8 F6 T$ O) w* o2 e: w! z
- foreach($worker->uidConnections as $connection)2 _4 d, j0 q- b( M; x
- {5 e; L- t% w1 i! G' ] T
- $connection->send($message);
: }' z8 e/ _2 i" G% f1 q - }
5 O* E3 z2 m. G1 @2 p" D - }
, Q* K9 Y1 J/ f4 E
^( r' Z& C8 ^3 V$ e) [- // 针对uid推送数据
. O6 W, K7 ^6 J9 R( g y - function sendMessageByUid($uid, $message); r$ y; g+ L1 y5 p/ t! M" r0 D0 n
- {* ~( y# T/ }' s1 H8 p, N
- global $worker;
# P5 Z' U& F4 Z. b% U - if(isset($worker->uidConnections[$uid]))
9 N. I. z; g' a1 ~5 f# X - {
" R5 T( B% U5 V2 q5 _7 h& ?& k- U$ d - $connection = $worker->uidConnections[$uid];
1 w7 N% Z# `- h3 i& S7 r - $connection->send($message);
. t6 [5 x$ v; z2 j - return true;' k w0 I' p+ V6 k# m! F% }- m
- }$ @9 d& L. b$ b! p0 R9 Y& {
- return false;9 x- i6 p' o7 @' k) k& K; X
- }
$ S v! X4 v# v6 V `+ W w5 P0 |1 m
P) p$ u d, T. ^7 q/ y- // 运行所有的worker. Z* V7 C1 d L" i% h( k
- Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');) [$ t. m. }3 J4 q+ @
- ws.onopen = function(){: X6 y w, A% K, {; H" \3 D+ K
- var uid = 'uid1';1 [/ I! x2 |6 H- ?, W
- ws.send(uid);6 @8 w. {& c0 u! _4 F$ P% F
- };
+ T7 }, ^: j$ G( W7 ^ - ws.onmessage = function(e){
6 i3 K0 x0 x/ j) Y - alert(e.data);
; m* l: Z: j, z; D - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
, A1 r5 j: n. b3 ~' s7 o - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);& S- Q1 z+ L0 n% ^. `1 M* T
- // 推送的数据,包含uid字段,表示是给这个uid推送! g# ^* \% `7 X, H9 k2 `
- $data = array('uid'=>'uid1', 'percent'=>'88%');& ?) a4 Y. a" g3 y( G6 i; G
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符, r/ M7 F, \5 k
- fwrite($client, json_encode($data)."\n");
( D) `: f6 {3 y8 l @: p - // 读取推送结果
( A$ n5 h: M' h% f# U5 C5 ?/ W% P - echo fread($client, 8192);
复制代码
$ H1 d& M0 J4 V; q* Q
" E+ T1 G4 ]# f5 t8 C |