cncml手绘网
标题: 用于实例化Worker后执行监听 [打印本页]
作者: admin 时间: 2018-12-17 21:22
标题: 用于实例化Worker后执行监听
- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
- use Workerman\Worker;' W9 V7 V3 ]* _. ?* g! L
- require_once __DIR__ . '/Workerman/Autoloader.php';
' q) |1 _" c) V7 ?" c
5 ^- \% \7 d L4 u- $worker = new Worker();
w& |* ?+ J7 ~9 e. Q - // 4个进程1 ~* N+ C; o4 J' x1 t, [+ P
- $worker->count = 4;
1 R" p, M- f2 B$ k' A, l - // 每个进程启动后在当前进程新增一个Worker监听2 K9 Q" d! Q1 d1 N; ?! L/ ~
- $worker->onWorkerStart = function($worker)
' y* L* ~ @2 Z - {) u& A6 x; J& l5 e; N; I" U
- /**) A c% Z& e8 J" p3 Y: h' }: T
- * 4个进程启动的时候都创建2016端口的Worker5 s. B/ Y2 `- l- V* a* g6 a
- * 当执行到worker->listen()时会报Address already in use错误- j+ j! U( j" _) x$ T+ Q( g3 Y2 B
- * 如果worker->count=1则不会报错
5 w. \( h6 F7 K( i. y9 m n; ^! z& B* i# F - */
0 T# U0 U* z4 R% e' h. Q$ ~: l - $inner_worker = new Worker('http://0.0.0.0:2016');0 f& [- ^7 r7 P5 O- o4 O7 O
- $inner_worker->onMessage = 'on_message';
, D) U; {/ k! r/ z) I - // 执行监听。这里会报Address already in use错误; L; Z: J6 x [ r* l _
- $inner_worker->listen();
7 L5 m' M7 U: {. | - };
" H& H8 e' z$ I) C$ t1 ^- _ - $ a1 Z! O. Q% Y
- $worker->onMessage = 'on_message';9 V; F( F! n9 x! F+ w$ P
" E" `- n7 y1 G7 N M: o, S- function on_message($connection, $data)* X; K9 [& c$ f* `! g. V
- {) p% r9 K" k. u# K& I
- $connection->send("hello\n");
* k* E! _ M* T4 g - }) h* N+ k$ ^. U4 y' I
5 B. x3 c) q7 J: ]: x$ j- // 运行worker
+ ^8 T' [; N* `; i2 ~ - Worker::runAll();- K+ g. F, K: f* e1 e* k
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
1 e: Z" A, r$ v
. L9 Z. J1 V/ P- use Workerman\Worker;+ q( U3 v' ^" {* N2 E$ h( b4 p3 o
- require_once './Workerman/Autoloader.php';# ^7 K2 q3 u1 h% j+ e; Z# i2 x
- 7 k& V$ t% m( B& m, a+ L
- $worker = new Worker('text://0.0.0.0:2015');
; T: E9 Q" d% F- s. u( t" q6 T - // 4个进程1 n6 |0 C6 n. X+ m, q4 H% {& S
- $worker->count = 4;( V4 k# x r7 i- g
- // 每个进程启动后在当前进程新增一个Worker监听. e, }, v3 i# K% P4 ?
- $worker->onWorkerStart = function($worker)& u; W2 M! u% W4 v6 l8 C+ u# a8 }
- {' m; b4 d! A4 m2 n$ }/ d7 g
- $inner_worker = new Worker('http://0.0.0.0:2016');7 i6 a( {8 m& w
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
5 F7 I0 W1 W5 T - $inner_worker->reusePort = true;
- |, a: R' k" }2 }7 m - $inner_worker->onMessage = 'on_message';- Z! u/ ^& _ v1 B
- // 执行监听。正常监听不会报错 H& \+ T$ D8 h4 @
- $inner_worker->listen();" u) w8 X4 J; g: Y$ e
- };
& O" ?) m( L( U5 L b - + o4 b* u: p: L) Y& I
- $worker->onMessage = 'on_message';- T% I* y: b: a. o/ `, E
- ; {& @6 L' h6 j% F. u2 Y& B) I
- function on_message($connection, $data)2 ]5 d& p/ p2 i, p& a C% u7 R
- {% @( _+ g+ v* c
- $connection->send("hello\n");( e0 y9 X: t; _, F
- }7 }3 X( e1 n( q4 i) g) x/ Y% E6 M
- 3 g) V3 O) q9 \2 p! R7 _
- // 运行worker1 c/ O! b A5 h8 e% T" r
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
- <?php6 ~& o* u0 W I7 | ~
- use Workerman\Worker;
/ \! h" Y& |: M1 m" e - require_once './Workerman/Autoloader.php';& n: l& v; @8 v
- // 初始化一个worker容器,监听1234端口
; q4 t% F8 {, t/ j) I - $worker = new Worker('websocket://0.0.0.0:1234');
% @* g ~/ \1 \2 c' M
) x. [' p/ I$ F' X! u- /*3 z/ `3 K* W: v
- * 注意这里进程数必须设置为1,否则会报端口占用错误
' s) ~5 Q" C4 l - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)- ^$ w& z2 p8 j0 j/ F9 y
- */* m% D; D4 T$ Q) r3 ]
- $worker->count = 1;
4 f9 p L6 i" Q( Q: i, {% M - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
, C7 x0 W; Q) ^: C- j7 x# e - $worker->onWorkerStart = function($worker)* `+ F+ G& W6 r
- {
) K1 J5 L- i7 \5 M* o - // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符$ a5 Y- ^7 B& p1 a# k
- $inner_text_worker = new Worker('text://0.0.0.0:5678');* B7 w( t+ a; ~0 I
- $inner_text_worker->onMessage = function($connection, $buffer), e/ U$ K5 x7 z9 P! L Q
- {
8 S% G( U+ A2 p2 V+ a - // $data数组格式,里面有uid,表示向那个uid的页面推送数据
. l& E% J6 H! j* i+ H8 t0 } - $data = json_decode($buffer, true);
) d. n2 D3 i: J, B7 { - $uid = $data['uid'];
+ ~1 t/ L' I6 s5 h) j - // 通过workerman,向uid的页面推送数据
) A7 f8 g) z8 P8 K. }" S7 u7 Y - $ret = sendMessageByUid($uid, $buffer);0 ?' j9 k1 u& H; x2 a6 r; x
- // 返回推送结果
' M; n. P! e' Q& b5 s# S - $connection->send($ret ? 'ok' : 'fail');: L i" j+ O8 E J; H. S/ f
- };9 F0 u1 k5 R% W1 A
- // ## 执行监听 ##
; u. ]: J$ x% z+ X2 X* O - $inner_text_worker->listen();# w' F: D0 {) B9 A6 h; }
- };2 i+ j3 A1 z/ `& a
- // 新增加一个属性,用来保存uid到connection的映射) ^5 j, Q* ?! u
- $worker->uidConnections = array();
: B) P0 C) s) l& \4 L* I) k) c. B - // 当有客户端发来消息时执行的回调函数2 a$ @. z1 Z* }+ j! u% S
- $worker->onMessage = function($connection, $data)
" s1 Y/ @# m- }: d7 k' } - {$ b( }0 X2 D4 f) N$ \
- global $worker;8 E# T3 r* Z) X
- // 判断当前客户端是否已经验证,既是否设置了uid) ~: I" e- d' X5 h. Q/ k! z
- if(!isset($connection->uid))
! O1 x; c" g/ r" y" J& v, S! E) w: g7 k - {9 M+ M# ^" \! H a3 J# \" b5 q/ a8 T
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
T6 m' g4 f# ? - $connection->uid = $data;5 i. w, }* s4 O
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,; t% c; m% d, @# x$ S& ]2 o9 i
- * 实现针对特定uid推送数据
+ S0 I* `& O+ n7 Q { - */. a5 ]! x0 A* U- h1 `. d( T1 q" V$ l3 @
- $worker->uidConnections[$connection->uid] = $connection;
2 }8 x, U$ v4 U# x/ V. H. B - return;
- s6 B$ p# d7 I4 |$ ^4 Q! M' W - }. b, _, W$ @/ E' q0 V1 c0 L1 n& M
- };
, S a, R7 c) j% e ~! L0 ]; r - 1 |. [: ]! W4 g# W$ E0 j
- // 当有客户端连接断开时
- [$ u" l! P4 K% ~" N - $worker->onClose = function($connection)% q4 W5 r9 I& Y& G
- {
2 }) o* a' r. [7 b% \ - global $worker;6 U& d# z: Q. g4 z8 T1 ]
- if(isset($connection->uid))
7 a4 a' z4 J2 W$ z7 M) B - {
# S: ~% x. d) z - // 连接断开时删除映射
" r1 M% J; _- y H& B4 I" H- T' w - unset($worker->uidConnections[$connection->uid]);
- [, j- X% u. u% ^1 ? - }) G* A9 P: L% i; k, s
- };
. c" f s- O1 y
) l: X+ }) u; A8 l; o5 e+ ]- // 向所有验证的用户推送数据- S' c5 a! Y7 m* p: Z: X7 |
- function broadcast($message)
; ^7 `0 R7 M7 Q. ^6 ^ - {
/ z9 I5 U x1 G - global $worker;
# Y+ b7 g3 E: G- A5 G - foreach($worker->uidConnections as $connection)/ M5 k- r j3 z" W% i/ K& q5 F
- {
# H8 Y+ s% b; V# p0 _ - $connection->send($message);5 y3 ^ x8 o' a
- }
+ k8 U* f6 ?) ~7 o+ O" D3 E1 p - }1 j9 n, A0 E; f( Z1 F& p2 g5 m* g% E1 I
. ?" e+ K* ~9 W- y- // 针对uid推送数据. g! Q: U1 N5 m8 }" |
- function sendMessageByUid($uid, $message)
- s a& T# d1 T0 p9 K3 N - {
: ?/ ?/ I8 \; p - global $worker;! @" n+ F# P9 O, i$ z
- if(isset($worker->uidConnections[$uid]))' X- @" ]0 A" C' C3 ~2 O
- {
& k+ h9 G* p' j ^$ B4 \ - $connection = $worker->uidConnections[$uid];+ K7 H- a& y% X3 S% ^( x( u1 g
- $connection->send($message);. y4 _7 g! p& x. I
- return true;
2 T4 t0 D7 D8 U8 r - }
( |9 E5 }* Y* D9 V - return false; ^( c/ n% v m/ f
- }
! ^, `4 Q# Z, p9 Q - 0 X. k! N" y; x! G
- // 运行所有的worker
2 a$ B( Z+ ^, a9 a, [ ] - Worker::runAll();
复制代码启动后端服务 php push.php start -d
前端接收推送的js代码
- var ws = new WebSocket('ws://127.0.0.1:1234');
" v u$ }: x7 ` - ws.onopen = function(){8 r4 X5 s* D, e: K
- var uid = 'uid1';
6 l' B, ?2 Q" r, g5 i' X - ws.send(uid);7 }& R" ]8 w H! F7 w; G- U6 A
- };
1 e! c( x9 G0 v# ?- M, J - ws.onmessage = function(e){7 o' \4 z" P* G* m' w- X, a( l
- alert(e.data);
& S6 Q8 \5 t( s+ u$ q* F - };
复制代码后端推送消息的代码
- // 建立socket连接到内部推送端口
+ h6 x- {" U8 A3 h2 K9 i9 n - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
7 T( E, S% m- V" S6 \" D* L _ - // 推送的数据,包含uid字段,表示是给这个uid推送
" [% O8 d- _4 A, C8 C - $data = array('uid'=>'uid1', 'percent'=>'88%');
6 Q) N. j& p& N& m - // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符1 y9 G b" F1 I; V. d) ^& D1 l6 z
- fwrite($client, json_encode($data)."\n");! U% p% r+ P7 D0 U; m) G% I
- // 读取推送结果
0 j8 Q) f* e" V4 M& Y" W' Z - echo fread($client, 8192);
复制代码
/ u9 Z3 |" E% r" g8 J5 W
. H8 p' Y7 Q3 U% b; Q
欢迎光临 cncml手绘网 (http://bbs.cncml.com/) |
Powered by Discuz! X3.2 |